Ruby 尝试使用 Ruby 3 调度器

dsh0416 · 2020年08月18日 · 最后由 Qwaz1314 回复于 2020年11月07日 · 4120 次阅读
本帖已被设为精华帖!

一次失败的提案

在准备 RubyConf China 2020 的时候,我仔细检查了 Fiber 调度器 提出的补丁。当我看调度器的样例代码的时候,我发现其调用的是 Ruby 中的 IO.select API。IO.select API 在 Ruby 内部有多种实现,它可能调用 poll、大尺寸 select、POSIX 兼容的 select 取决于不同的操作系统。于是我想用一些更快的 syscall 来实现,比如 epoll kqueueIOCP

我做了一个相关的提案但是被拒绝了。主要问题是 Ruby 的 IO.select API 是无状态的。如果没有含状态的注册,这些新 API 的性能甚至会不如 poll。在 Koichi Sasada 跑了 banchmark 证明了这一点后,提案被正式拒绝。在和 Samuel Williams 在 Twitter 上讨论后,它建议我从 Scheduler 的实现上来进行注入,因为 Scheduler 本身是有状态的。于是我开始写一个 gem 作为 Ruby 3 调度器接口的概念证明。

实现调度器

本文中的 Ruby 版本是:

ruby 2.8.0dev (2020-08-18T10:10:09Z master 172d44e809) [x86_64-linux]

基本的 Scheduler 例子来自于 Ruby 的单元测试。这是 Ruby 3 调度器的测试,而不是真正用于生产的,因此是使用 IO.select 进行 I/O 多路复用。因此我们可以基于此,开发一个性能更好的 Ruby 调度器。

我们需要做一些 C 开发来支持其它 syscall,因此第一件事是兼容原始的实现。

Fallback 到 Ruby IO.select

对于 select/poll API, 不需要预先创建文件描述符,也不需要在运行时注册文件描述符。所以唯一要做的就是处理调度器触发时的行为。

VALUE method_scheduler_wait(VALUE self) {
    // return IO.select(@readable.keys, @writable.keys, [], next_timeout)
    VALUE readable, writable, readable_keys, writable_keys, next_timeout;
    ID id_select = rb_intern("select");
    ID id_keys = rb_intern("keys");
    ID id_next_timeout = rb_intern("next_timeout");

    readable = rb_iv_get(self, "@readable");
    writable = rb_iv_get(self, "@writable");

    readable_keys = rb_funcall(readable, id_keys, 0);
    writable_keys = rb_funcall(writable, id_keys, 0);
    next_timeout = rb_funcall(self, id_next_timeout, 0);

    return rb_funcall(rb_cIO, id_select, 4, readable_keys, writable_keys, rb_ary_new(), next_timeout);
}

我们花了 10 行 C 干了原来 1 行 Ruby 就干好了的事。主要是这允许我们用 C 的宏定义来控制,从而使用其它 I/O 多路复用方法,例如 epoll and kqueue。我们需要实现 4 个 C 方法:

Scheduler.backend
scheduler = Scheduler.new

scheduler.register(io, interest)
scheduler.deregister(io)
scheduler.wait
#include <ruby.h>

VALUE Evt = Qnil;
VALUE Scheduler = Qnil;

void Init_evt_ext();
VALUE method_scheduler_init(VALUE self);
VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest);
VALUE method_scheduler_deregister(VALUE self, VALUE io);
VALUE method_scheduler_wait(VALUE self);
VALUE method_scheduler_backend();

void Init_evt_ext()
{
    Evt = rb_define_module("Evt");
    Scheduler = rb_define_class_under(Evt, "Scheduler", rb_cObject);
    rb_define_singleton_method(Scheduler, "backend", method_scheduler_backend, 0);
    rb_define_method(Scheduler, "init_selector", method_scheduler_init, 0);
    rb_define_method(Scheduler, "register", method_scheduler_register, 2);
    rb_define_method(Scheduler, "deregister", method_scheduler_deregister, 1);
    rb_define_method(Scheduler, "wait", method_scheduler_wait, 0);
}

Scheduler.backend 是专门给调试用的,剩下 4 个 API 会注入到调度器的 Scheduelr#run, Scheduelr#wait_readable, Scheduelr#wait_writable, Scheduelr#wait_any 中。

使用 epollkqueue

epoll 的三个核心 API 是 epoll_create epoll_ctl epoll_wait。很好理解,我们只要在调度器初始化的时候初始化 epoll fd,然后在注册 I/O 事件的时候调用 epoll_ctl,最后用 epoll_wait 替换掉 IO.select

#if defined(__linux__) // TODO: Do more checks for using epoll
#include <sys/epoll.h>
#define EPOLL_MAX_EVENTS 64

VALUE method_scheduler_init(VALUE self) {
    rb_iv_set(self, "@epfd", INT2NUM(epoll_create(1))); // Size of epoll is ignored after Linux 2.6.8.
    return Qnil;
}

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    struct epoll_event event;
    ID id_fileno = rb_intern("fileno");
    int epfd = NUM2INT(rb_iv_get(self, "@epfd"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_WRITABLE")));

    if (ruby_interest & readable) {
        event.events |= EPOLLIN;
    } else if (ruby_interest & writable) {
        event.events |= EPOLLOUT;
    }
    event.data.ptr = (void*) io;

    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
    return Qnil;
}

VALUE method_scheduler_deregister(VALUE self, VALUE io) {
    ID id_fileno = rb_intern("fileno");
    int epfd = NUM2INT(rb_iv_get(self, "@epfd"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // Require Linux 2.6.9 for NULL event.
    return Qnil;
}

VALUE method_scheduler_wait(VALUE self) {
    int n, epfd, i, event_flag, timeout;
    VALUE next_timeout, obj_io, readables, writables, result;
    ID id_next_timeout = rb_intern("next_timeout");
    ID id_push = rb_intern("push");

    epfd = NUM2INT(rb_iv_get(self, "@epfd"));
    next_timeout = rb_funcall(self, id_next_timeout, 0);
    readables = rb_ary_new();
    writables = rb_ary_new();

    if (next_timeout == Qnil) {
        timeout = -1;
    } else {
        timeout = NUM2INT(next_timeout);
    }

    struct epoll_event* events = (struct epoll_event*) xmalloc(sizeof(struct epoll_event) * EPOLL_MAX_EVENTS);

    n = epoll_wait(epfd, events, EPOLL_MAX_EVENTS, timeout);
    // TODO: Check if n >= 0

    for (i = 0; i < n; i++) {
        event_flag = events[i].events;
        if (event_flag & EPOLLIN) {
            obj_io = (VALUE) events[i].data.ptr;
            rb_funcall(readables, id_push, 1, obj_io);
        } else if (event_flag & EPOLLOUT) {
            obj_io = (VALUE) events[i].data.ptr;
            rb_funcall(writables, id_push, 1, obj_io);
        }
    }

    result = rb_ary_new2(2);
    rb_ary_store(result, 0, readables);
    rb_ary_store(result, 1, writables);

    xfree(events);
    return result;
}

VALUE method_scheduler_backend() {
    return rb_str_new_cstr("epoll");
}
#endif

kqueue 是类似的。唯一不同的是,BSD 的注册和等待用的是同一个 API,只是参数不同,所以有点难懂。

#if defined(__FreeBSD__) || defined(__NetBSD__) || defined(__APPLE__)
#include <sys/event.h>
#define KQUEUE_MAX_EVENTS 64

VALUE method_scheduler_init(VALUE self) {
    rb_iv_set(self, "@kq", INT2NUM(kqueue()));
    return Qnil;
}

VALUE method_scheduler_register(VALUE self, VALUE io, VALUE interest) {
    struct kevent event;
    u_short event_flags = 0;
    ID id_fileno = rb_intern("fileno");
    int kq = NUM2INT(rb_iv_get(self, "@kq"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    int ruby_interest = NUM2INT(interest);
    int readable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_READABLE")));
    int writable = NUM2INT(rb_const_get(rb_cIO, rb_intern("WAIT_WRITABLE")));

    if (ruby_interest & readable) {
        event_flags |= EVFILT_READ;
    } else if (ruby_interest & writable) {
        event_flags |= EVFILT_WRITE;
    }

    EV_SET(&event, fd, event_flags, EV_ADD|EV_ENABLE, 0, 0, (void*) io);
    kevent(kq, &event, 1, NULL, 0, NULL); // TODO: Check the return value
    return Qnil;
}

VALUE method_scheduler_deregister(VALUE self, VALUE io) {
    struct kevent event;
    ID id_fileno = rb_intern("fileno");
    int kq = NUM2INT(rb_iv_get(self, "@kq"));
    int fd = NUM2INT(rb_funcall(io, id_fileno, 0));
    EV_SET(&event, fd, 0, EV_DELETE, 0, 0, NULL);
    kevent(kq, &event, 1, NULL, 0, NULL); // TODO: Check the return value
    return Qnil;
}

VALUE method_scheduler_wait(VALUE self) {
    int n, kq, i;
    u_short event_flags = 0;

    struct kevent* events; // Event Triggered
    struct timespec timeout;
    VALUE next_timeout, obj_io, readables, writables, result;
    ID id_next_timeout = rb_intern("next_timeout");
    ID id_push = rb_intern("push");

    kq = NUM2INT(rb_iv_get(self, "@kq"));
    next_timeout = rb_funcall(self, id_next_timeout, 0);
    readables = rb_ary_new();
    writables = rb_ary_new();

   events = (struct kevent*) xmalloc(sizeof(struct kevent) * KQUEUE_MAX_EVENTS);

    if (next_timeout == Qnil || NUM2INT(next_timeout) == -1) {
        n = kevent(kq, NULL, 0, events, KQUEUE_MAX_EVENTS, NULL);
    } else {
        timeout.tv_sec = next_timeout / 1000;
        timeout.tv_nsec = next_timeout % 1000 * 1000 * 1000;
        n = kevent(kq, NULL, 0, events, KQUEUE_MAX_EVENTS, &timeout);
    }

    // TODO: Check if n >= 0
    for (i = 0; i < n; i++) {
        event_flags = events[i].filter;
        if (event_flags & EVFILT_READ) {
            obj_io = (VALUE) events[i].udata;
            rb_funcall(readables, id_push, 1, obj_io);
        } else if (event_flags & EVFILT_WRITE) {
            obj_io = (VALUE) events[i].udata;
            rb_funcall(writables, id_push, 1, obj_io);
        }
    }

    result = rb_ary_new2(2);
    rb_ary_store(result, 0, readables);
    rb_ary_store(result, 1, writables);

    xfree(events);
    return result;
}

VALUE method_scheduler_backend() {
    return rb_str_new_cstr("kqueue");
}
#endif

使用调度器的 HTTP 服务器例子

在实现好调度器后,我们要测试调度器的性能。因此我写了一个简单的 HTTP 服务器 benchmark

require 'evt'

puts "Using Backend: #{Evt::Scheduler.backend}"
Thread.current.scheduler = Evt::Scheduler.new

@server = Socket.new Socket::AF_INET, Socket::SOCK_STREAM
@server.bind Addrinfo.tcp '127.0.0.1', 3002
@server.listen Socket::SOMAXCONN
@scheduler = Thread.current.scheduler

def handle_socket(socket)
  line = socket.gets
  until line == "\r\n" || line.nil?
    line = socket.gets
  end
  socket.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
  socket.close
end

Fiber.new(blocking: false) do
  while true
    socket, addr = @server.accept
    Fiber.new(blocking: false) do
      handle_socket(socket)
    end.resume
  end
end.resume

@scheduler.run

比起原先阻塞的 I/O,使用 Ruby 3 非阻塞 I/O 后可以达到 3.33x 的性能,而使用 epoll 后可以达到 4.21x。服务器的例子很简单,所以当 JIT 启动时,不容易造成 ICache 不命中,因此性能进一步提升到了 4.54x。

Benchmark Result

测试是基于 Intel(R) Xeon(R) CPU E3-1220L V2 @ 2.30GHz CPU 的,而且程序是单线程的。如果有更好的 CPU,epollpoll 的差距会更大。欢迎尝试,相关 gem 代码已开源。

未来工作

未来工作主要是两部分。一个是提升现有 API 的稳定性,还有就是加入 io_uringIOCP 的支持。io_uring 倒是还好,但我是一点都不懂 Windows 开发。所以欢迎大家来提供意见和贡献。

源码

dsh0416/evt

Rei 将本帖设为了精华贴 08月18日 21:24

楼主谦虚了,我觉得懂 Windows 开发的一般也不懂 IOCP。

然后,我会告诉你,这种性能还是很低的,因为你只用了单线程,ruby thread 和 fiber 的关系是 thread 持有 fiber。

就算不用文中提到的 fiber,也是可以利用【面向状态】来实现让 fiber 内部 handler 处理的时候实现 schedule。

因此,fiber 跟并发还是扯不上关系,作者觉得并发了,是 set file descriptor flag non_block 了不让当前 thread 阻塞,好切入别的 subprocess。

魔术师喜欢用障眼法来让观众感到真实和快乐

amber 早就试过用这种模型,但是只是解决了编程风格问题,没有解决本质的 performance

作者需要认清本质问题【利用多核】,要想利用多核,Thread 是需要的,有了 Thread 再把 fiber 加入 cpu time 抢夺我觉得多此一举,要是这样还不如学 go 封装一下 thread,说到这里,表达的意思明确了,fiber 早期设计不是去解决 io block。

官方说明:

Fibers are primitives for implementing light weight cooperative concurrency in Ruby. Basically they are a means of creating code blocks that can be paused and resumed, much like threads. The main difference is that they are never preempted and that the scheduling must be done by the programmer and not the VM.

对于并发,Thread 是需要的,但是 GIL 的出现,让 Thread 捆成了无法 Parallel,这时候真正 3x3 表达的的,正是 Guild,我不知道为何要提出 Fiber 3x3,估计是对 fiber 的迷恋。

题主自己应该也发现,max performance 图也只有 8k qps,其实题主可能没意识到【这是单核的】,所以性能瓶颈其实不在 block nonblock,我建议博主试试一个 IO 往 另一个 IO write 1GB 的数据,你可以试试【阻塞 write】快,还是【非阻塞 write】快,你会得出更惊喜的答案。

extern "C"

jakit 回复

thread 持有 fiber 所以呢?

jakit 回复

非阻塞就是异步吧?

zzz6519003 回复

非阻塞 是 阻塞 socket,跟异步没有半毛钱关系,上面丁丁小哥已经说了,其实他的回答应该对你描述和理解的挺有回答意义

那么,异步指代的是什么?

现代分时操作系统,让每个 process 轮流使用内核,后面细化了 process 之后,就允许 thread 轮流使用内核,一般情况下,你开多个 thread,开发客户端的同学应该都知道,app 会有个主线程负责渲染界面,如果你在主线程写了 sleep 100s,那就完了,你整个界面卡住 100s 动不了,这时候为什么客户端会有异步的概念,就是开一个新 thread 去做另外的事情,让界面渲染的 函数能跑而不是自己这个函数占着 thread 接下来要干的事情

socket 阻塞,首先,读写本来就在你的 thread,不管是 main thread 部分的代码,还是 你自己的 thread 的代码,反正 read write 都会让这个线程做这个事情

如果 connect 了网络的一个 addr 如果你调用 read(100),另外一边长时间都没给你发 100byte 的 string,那么你的 thread 就会一直卡在那等待

这时候,有一种做法就是 multiplex,就是 select 这个 socket 有没有 readable 的 data,没有就不占用时间了,pass。

我要加个重点:

其实 ruby 的 IO.select() 已经封装了 iocp epoll 以及 kqueue

所以,这些已经很简单了,我只是觉得博主可能有点夸大 Evt,其实 IO.select(...) 就可以了

ruby 本质遇到的问题不是 IO performance 问题,作为 amber-kit 作者,我对 amber 开发感受到更多的是 ruby string 不支持 memcpy,而且 parsing decode encode 的效率较低,这些代码,你也会看到博主写的那个框架用了 c 来处理了这些部分的事情

其实我一直想表达的是 ruby 并没有优化 string perfomance,streaming 处理的性能还是比 c / c++ / go / java 差些

所以我一直在表达的意思是 过分强调 IO 优化,其实还不如优化一下基本类的算法性能

还有的就是 ruby 3x3 视频我都看完了,老外当时讲的 Guild 类,为什么不能跨线程写,只能跨线程读,应该也是有考量的,当时我看完了 Guild 设计挺失望的。

但是从业 go 之后,我明白了原来是 中国思维风格 趋向于喜欢把一些小玩意钻尖,就像2020年10月16日晚上 Apple 预订网站当时 App 都频频报错,估计是并发跟不上,你会不会蹦出这样的思维 “为什么 taobao 就能做高秒杀呢”,其实 博主想要的那种就有点偏向于细化上处理的做法,可能趋向于喜欢把多核取消多线程绑定,然后用户自己的 fiber 和 scheduler 能手动利用。

既然 把多核取消多线程绑定,然后用户自己的 fiber 和 scheduler 能手动利用,那 为何操作系统要发明 thread 和 process?

顺便自己回答下面提出的问题 “为什么 puma 使用多进程”,当然是我的个人观点,就是:简单

老外应该更喜欢遵循现有的东西做实现,但是中国式思维,尤其 花为 喜欢刻意强调一些具有破坏性的东西作为一些可能更有逼格或者能力提现吧

dsh0416 回复

事实上 Goroutine 的 Thread 并不是真正的 Thread 的封装并不能因为看起来暴露了一个类似 Thread 的接口,就认为这是 Thread

我只是打个比方,但是用 pthread + 调度器 包装 其实也是一种似乎可行的方案,毕竟还是得开多个 thread 去 handle 这些 proc,不信你开多个 gofunc 对照着 mac 的任务管理器里面该 go 进程用了多少个线程就会发现了

https://docs.google.com/document/d/1TTj4T2JO42uD5ID9e89oa0sLKhJYD0Y_kqxDv3I3XMw/edit

https://golang.org/src/runtime/proc.go

对于:

只要你熟悉一下 Goroutine 的实现就会发现,其也是如 Fiber 的内部协作式调度,再和整体的 Thread 一起做出来的封装,是多线程多协程切换的实现。

我确定它用的就是系统 thread

// Create a new m. It will start off with a call to fn, or else the scheduler.

// fn needs to be static and not a heap allocated closure.

// May run with m.p==nil, so write barriers are not allowed.

//

// id is optional pre-allocated m ID. Omit by passing -1.

//go:nowritebarrierrec

func newm(fn func(), _p_ *p, id int64) {

    mp := allocm(_p_, fn, id)

    mp.nextp.set(_p_)

    mp.sigmask = initSigmask

    if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {

        // We're on a locked M or a thread that may have been

        // started by C. The kernel state of this thread may

        // be strange (the user may have locked it for that

        // purpose). We don't want to clone that into another

        // thread. Instead, ask a known-good thread to create

        // the thread for us.

        //

        // This is disabled on Plan 9. See golang.org/issue/22227.

        //

        // TODO: This may be unnecessary on Windows, which

        // doesn't model thread creation off fork.

        lock(&newmHandoff.lock)

        if newmHandoff.haveTemplateThread == 0 {

            throw("on a locked thread with no template thread")

        }

        mp.schedlink = newmHandoff.newm

        newmHandoff.newm.set(mp)

        if newmHandoff.waiting {

            newmHandoff.waiting = false

            notewakeup(&newmHandoff.wake)

        }

        unlock(&newmHandoff.lock)

        return

    }

    newm1(mp)

}

另外,libdispatch 也是一个多线程管理的 lib,为什么它不做 fiber 而是直接管理系统分配的 thread?

puma 是多进程模型,但你有没有想过为啥 puma 一开始学 libdispatch 不写 pthread scheduler 而是使用 multi-process

你的回答的意图和想法是对的

我真正提出异议的是,我认为 多线程 / 进程 io 还是比单个 thread 单个进程,然后在单个进程线程来分时高效,因为你在单核利用到极致,也比不过多核心的使用。

而且更何况是在 业务端的 web 场景,大量的业务充斥着不一定非常高效的业务调用和某些确实需要等待的场景,你的调度器就会无法使用,所以你提出了你说的这些是针对 I/O boundry 场景,但是我个人认为没有谁会闲着上班不写业务代码去写 server io 的 rw 方法。

另外补充,java 的 netty / spring 好久之前就已经采用这种 selector worker handler 模型了,只是,或许你不喜欢用固定模型,你的想法或者憧憬的是那种用户 DIY 模式的 model 而不是那种固化的 SWH,就是什么时候调用随用户主义的

jakit 回复

Goroutine 的 newm 和 new 是不一样的。newm 是启用系统的 Thread,而 new 是 (newproc) 对于 Fiber (Continuation) 的封装。new (newproc) 会在一些情况下触发 newm。这就是「 Fiber 的内部协作式调度,再和整体的 Thread 一起做出来的封装」而不单纯是系统 Thread。

随便看个 https://golang.org/src/runtime/asm_amd64.s 252 行,就是内部 gosave,和 Fiber 实现完全一样。如果直接调用系统 Thread 自然就不用这东西了。

jakit 回复

Goroutine 用的是 M:N 这个线程模型,可搜到很多介绍比如这个 User-level threads....... with threads. - Paul Turner - Google

为啥不直接用线程做并发(1:1)?因为线程切换不够高效,可以参考下这个 Measuring context switching and memory overheads for Linux threads

一个线程只能利用一个 cpu,这个是没错的。但更重要的是,一个线程(scheduler)要跑多个 Goroutine。

dsh0416 回复

你说的是这个就是 各种 routine 常见 对 context 的上下文保存、切换

如果直接调用系统 Thread 自然就不用这东西了

对,没错,我们的理解都没错

我想表达的不是说命题有问题,我想说的是某个场景(以我最后阅读过的 ruby src 2.3 作为理解基准):

当你一个线程有多个 fiber,要多个 fiber 切来切去的时候,这时候还是用的一个核……

所以你很聪明的说 io 不涉及计算来绕开了这个问题

然后我想说的问题不是 io 的问题,而是计算的问题

就是讨论的东西不太一样,我只是纯粹日常发表个人看法~

jakit 回复

当你一个线程有多个 fiber,要多个 fiber 切来切去的时候,这时候还是用的一个核…… 所以你很聪明的说 io 不涉及计算来绕开了这个问题

Fiber 调度器改善的是:提高 IO 密集型应用的 CPU 利用率,这不叫 “绕开了这个问题” 而是,Fiber 调度器只为了解决 IO 的问题

然后我想说的问题不是 io 的问题,而是计算的问题

当 CPU 已经被充分利用时,使用调度器不仅没有意义,而且有害。 “计算的问题” 跟 Fiber 调度器一点关系都没有,计算密集型的场景也不应该使用 Fiber 调度器。

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册