Swoole源码分析之数据接收与发送

目录:

  1. 探索什么
  2. 启动流程
  3. process模式
  4. base模式
  5. 数据读写
  6. 验证脚本
  7. 总结

探索什么

事情的起因是这样的,进行Swoole应用开发的时候,发现各个Worker可以给任意一个客户端发送消息,即调用$serv->send($fd, $data), 而不用关心客户端第一次是哪个Worker接收的数据.

如果你细细一想,发现这是一件很神奇的事情,多个不同的进程居然可以给任意的fd(文件描述符,客户端的连接标识)发送数据.

因为不同的进程都有自己的文件描述符表,在当前进程进行send的时候,首先是根据fd去查自己进程的文件描述符表. 这里容易有一个误区,认为fd是一个数字,父子进程或者兄弟进程只要拿到这个fd数字就能发送数据,这个是不行的, 而必须要fd对应的表中纪录的指针指向"目标客户端"才行.

可以先看看这篇关于文件描述符的文章,讲的很清晰.

那么纪录什么情况下产生,什么情况下不产生呢?当A进程fork出B,C等子进程时,A当前打开的文件描符就会被复制给他的子进程, fork之后这些进程新产生的fd,只会在当前进程的fd表生成文件关联纪录,他的父子进程或兄弟进程并不会自动添加这条纪录.

知道以上情况后,我们就会明白那的确有点神奇.但是它是怎么实现的呢?有两种猜想.

一是父进程(Master)将所有的客户端文件描传递(通过sendmsg等)传递给子进程们(Workers),那么子进程就会在自己的fd表中添加纪录, 然后自然就可以接收与发送数据了.

二是父进程(Master)将所有客户端发来的数据转发给子进程(Worker),然后子进程收到数据进行处理.如果要发送数据给客户端, 那么先把数据转发给Master,然后再由Master发送给客户端.

事实证明Swoole使用的是第二种,对应的是process模式.(就是swoole_server构造函数的第三个参数)

Swoole模式总共有三种,Base模式(多个Worker进程之间无法通讯),Thread模式(由于PHP对多线程支持不好已经废弃),Process模式(上面说的那种). 具体可以参见Swoole的官方文档.

启动流程

在具体分析数据的接收与发送之前我们先分析一下Swoole的启动流程,在将Swoole启动流程之前,先介绍一下PHP的启动流程,因为Swoole是基于PHP的.

PHP的启动流程大致分为4个过程(忽略sapi,配置文件解析等),我们以php-fpm+nginx举例.

  • 首先启动php-fpm的时候会初始化所有的扩展,像PDO Swoole这些,我们称之为MINIT阶段.
  • 然后当HTTP请求到达php-fpm,会调用所有扩展的RINIT函数,接下来我们写的PHP代码就会被解析运行,我们称之为RINIT阶段.
  • 然后当我们的代码全部执行完毕,会调用所有扩展的RSHUTDOWN函数,执行一些重置或清理操作,我们称之为RSHUTDOWN阶段.
  • 然后我们关闭php-fpm,这时会调用所有扩展的MSHUTDOWN函数,做一些申请的内存释放等操作,我们称之为MSHUTDOWN阶段.

Swoole自然也要遵循上面的流程.

MINIT阶段

/* { { { PHP_MINIT_FUNCTION
 */
PHP_MINIT_FUNCTION(swoole)
{
    // 全局变量初始化
    ZEND_INIT_MODULE_GLOBALS(swoole, php_swoole_init_globals, NULL);
    // 配置项的注册
    REGISTER_INI_ENTRIES();

    ....

    // 将一些常量注册到PHP中
    REGISTER_STRINGL_CONSTANT("SWOOLE_VERSION", PHP_SWOOLE_VERSION, sizeof(PHP_SWOOLE_VERSION) - 1, CONST_CS | CONST_PERSISTENT);

    // 将一些类注册到PHP中
    SWOOLE_INIT_CLASS_ENTRY(swoole_server_ce, "swoole_server", "Swoole\\Server", swoole_server_methods);
    swoole_server_class_entry_ptr = zend_register_internal_class(&swoole_server_ce TSRMLS_CC);

    // Swoole全局变量初始化 各个模块常量函数类的注册
    swoole_init();

    ....

    swoole_buffer_init(module_number TSRMLS_CC);
    swoole_websocket_init(module_number TSRMLS_CC);

    ....

    return SUCCESS;
}
/* }}} */

RINIT阶段

PHP_RINIT_FUNCTION(swoole)
{
    //将Server标识为启动状态
    SwooleG.running = 1;

    ...

    return SUCCESS;
}

调用$serv->start()函数

PHP_METHOD(swoole_server, start)
{
    ...

    swServer *serv = swoole_get_object(zobject);
    // 根据配置注册所有的回调函数,如onConnect onReceive等
    php_swoole_register_callback(serv);

    ...
    // 更新对象的属性,比如当前进程ID Worker数量等
    php_swoole_server_before_start(serv, zobject TSRMLS_CC);

    // 最复杂的Server启动过程
    ret = swServer_start(serv);

    ...

    RETURN_TRUE;
}

swServer_start函数

int swServer_start(swServer *serv)
{
    // 工厂模式
    // Base模式, 多进程, 在worker进程中直接accept连接
    // Thread模式, 现在都不支持了
    // Process模式, 多进程, 在master进程中accept连接
    swFactory *factory = &serv->factory;

    ...

    // 主要检查是否一些必须设置的回调没设置 比如配置了task数量却没有设置task回调函数
    ret = swServer_start_check(serv);

    ...

    // 变成守护进程的操作
    if (serv->daemonize > 0)
    {
        ....
    }
    ...

    // 分配workers的空间
    serv->workers = SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker));
    ....

    // 我们在下面专门进行讨论, 这里究竟做了多少东西
    if (factory->start(factory) < 0)
    {
        return SW_ERR;
    }
    // 信号处理 比如热更新 平滑重启 计时器通知等实现,这里master进程接收的信号,某些也会通过kill函数发送给manager进程
    swServer_signal_init();

    // single模式, 就是swoole_server构造函数的第三个参数
    if (serv->factory_mode == SW_MODE_SINGLE)
    {
        ret = swReactorProcess_start(serv);
    }
    // base & process模式
    else
    {
        ret = swServer_start_proxy(serv);
    }
    ...

    return SW_OK;
}

我们这里对factory->start进行讨论

// factory->start是怎么进行赋值的

// 在swoole_server->__construct中
#ifdef __CYGWIN__
    // 如果是windows环境
    serv->factory_mode = SW_MODE_SINGLE;
#else
    // 如果是thread或者base模式
    if (serv_mode == SW_MODE_THREAD || serv_mode == SW_MODE_BASE)
    {
        serv_mode = SW_MODE_SINGLE;
        // base模式也需要抛出错误???
        swoole_php_fatal_error(E_WARNING, "PHP can not running at multi-threading. Reset mode to SWOOLE_MODE_BASE");
    }
    serv->factory_mode = serv_mode;
#endif

* 说明不是process模式的,都是会设置称single模式

swoole_server.start -> php_swoole_server_before_start -> swServer_create
// 如果是single模式
if (serv->factory_mode == SW_MODE_SINGLE)
{
    return swReactorProcess_create(serv);
}
else
{
    return swReactorThread_create(serv);
}

swReactorProcess_create -> swFactory_create -> "factory->start = swFactory_start"
int swFactory_start(swFactory *factory)
{
    return SW_OK;
}

* 说明thread base single等模式什么也不做

swReactorThread_create函数:
if (serv->factory_mode == SW_MODE_THREAD)
{
    if (serv->worker_num < 1)
    {
        swError("Fatal Error: serv->worker_num < 1");
        return SW_ERR;
    }
    ret = swFactoryThread_create(&(serv->factory), serv->worker_num);
}
else if (serv->factory_mode == SW_MODE_PROCESS)
{
    if (serv->worker_num < 1)
    {
        swError("Fatal Error: serv->worker_num < 1");
        return SW_ERR;
    }
    ret = swFactoryProcess_create(&(serv->factory), serv->worker_num);
}
else
{
    ret = swFactory_create(&(serv->factory));
}

* 现在不会有thread模式了,只有process模式调用的才是swFactoryProcess_create
  该函数执行"factory->start = swFactoryProcess_start"

通过以上分析,我们可以得出结论: 只有设置模式为SWOOLE_PROCESS时,才会在"factory->start"处创建管理进程Worker进程Task进程.

可以看出虽然base模式被设置成了single模式,但是并没有修改其worker_num参数,也就是说base模式也是可以有多个worker的.通过后面的分析, 加上实际的测试脚本,也印证了上面的结论.

然后我们分析swFactoryProcess_start(process模式专有),看看它是怎么创建进程组的.

process模式

我们接着上面详细分析manager进程,worker进程组,task进程组是怎么创建的.它们又都做了写什么.稍后我们在分析master进程中reactor线程组是怎么工作的.

static int swFactoryProcess_start(swFactory *factory)
{
    ...

    //必须先启动manager进程组,否则会带线程fork
    if (swManager_start(factory) < 0)
    {
        swWarn("swFactoryProcess_manager_start failed.");
        return SW_ERR;
    }
    //主进程需要设置为直写模式
    factory->finish = swFactory_finish;
    return SW_OK;
}

int swManager_start(swFactory *factory)
{
    ...

    // worker进程组初始化,创建workers与master进程通讯的管道
    for (i = 0; i < serv->worker_num; i++)
    {
        if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0)
        {
            return SW_ERR;
        }
        serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER);
        serv->workers[i].pipe_worker = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_WORKER);
        serv->workers[i].pipe_object = &object->pipes[i];
        swServer_store_pipe_fd(serv, serv->workers[i].pipe_object);
    }

    // task进程组初始化
    if (SwooleG.task_worker_num > 0)
    {
        ...

        // 设置task的事件循环为swProcessPool_worker_loop, 设置task任务回调
        if (swProcessPool_create(&SwooleGS->task_workers, SwooleG.task_worker_num, SwooleG.task_max_request, key, create_pipe) < 0)
        {
            swWarn("[Master] create task_workers failed.");
            return SW_ERR;
        }
        ...
    }

    // process进程组初始化,现在应该还在完善当中,官方回调中也并没有指出onUserWorkerStart等UserProcess的相关php回调接口.
    if (serv->user_worker_num > 0)
    {
        serv->user_workers = sw_calloc(serv->user_worker_num, sizeof(swWorker *));
        swUserWorker_node *user_worker;
        i = 0;
        LL_FOREACH(serv->user_worker_list, user_worker)
        {
            if (swWorker_create(user_worker->worker) < 0)
            {
                return SW_ERR;
            }
            serv->user_workers[i++] = user_worker->worker;
        }
    }

    pid = fork();
    switch (pid)
    {
    //创建manager进程
    case 0:

        ...

        // 创建并启动worker进程组
        for (i = 0; i < serv->worker_num; i++)
        {
            //close(worker_pipes[i].pipes[0]);
            pid = swManager_spawn_worker(factory, i);
            if (pid < 0)
            {
                swError("fork() failed.");
                return SW_ERR;
            }
            else
            {
                serv->workers[i].pid = pid;
            }
        }

        // 创建并启动task进程组
        if (SwooleG.task_worker_num > 0)
        {
            swProcessPool_start(&SwooleGS->task_workers);
        }

        // 创建并启动process进程组
        if (serv->user_worker_list)
        {
            swUserWorker_node *user_worker;
            LL_FOREACH(serv->user_worker_list, user_worker)
            {
                /**
                 * store the pipe object
                 */
                if (user_worker->worker->pipe_object)
                {
                    swServer_store_pipe_fd(serv, user_worker->worker->pipe_object);
                }
                swManager_spawn_user_worker(serv, user_worker->worker);
            }
        }

        //标识为管理进程
        SwooleG.process_type = SW_PROCESS_MANAGER;
        SwooleG.pid = getpid();

        if (serv->reload_async)
        {
            ret = swManager_loop_async(factory);
        }
        else
        {
            ret = swManager_loop_sync(factory);
        }
        exit(ret);
        break;

        //master process
    default:
        SwooleGS->manager_pid = pid;
        break;
    case -1:
        swError("fork() failed.");
        return SW_ERR;
    }
    return SW_OK;
}

我们重点关注worker的创建: swManager_spawn_worker

swManager_spawn_worker -> swWorker_loop

int swWorker_loop(swFactory *factory, int worker_id)
{
    ...

    // 创建reactor, 比如epoll
    if (swReactor_create(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0)
    {
        swError("[Worker] create worker_reactor failed.");
        return SW_ERR;
    }

    serv->workers[worker_id].status = SW_WORKER_IDLE;

    int pipe_worker = serv->workers[worker_id].pipe_worker;

    swSetNonBlock(pipe_worker);
    SwooleG.main_reactor->ptr = serv;

    // 管道文件描述符监听
    SwooleG.main_reactor->add(SwooleG.main_reactor, pipe_worker, SW_FD_PIPE | SW_EVENT_READ);
    SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE, swWorker_onPipeReceive);
    SwooleG.main_reactor->setHandle(SwooleG.main_reactor, SW_FD_PIPE | SW_FD_WRITE, swReactor_onWrite);

    swWorker_onStart(serv);

    ...

    //main loop, 如果是epoll就是调用swReactorEpoll_wait函数
    SwooleG.main_reactor->wait(SwooleG.main_reactor, NULL);
    //clear pipe buffer
    swWorker_clean();
    //worker shutdown
    swWorker_onStop(serv);
    return SW_OK;
}

通过上面的分析,我们可以看出

worker读master的数据是调用的swWorker_onPipeReceive函数
写数据给master会调用swReactor_onWrite函数

上面在启动Server的时候分为了base和process模式,process就是Master负责接收连接,然后通过管道传输数据.

/**
 * proxy模式
 * 在单独的n个线程中接受维持TCP连接
 */
static int swServer_start_proxy(swServer *serv)
{
    ...

    // 初始化一个reactor, 这个函数指定了具体使用poll epoll kqueue等
    ret = swReactor_create(main_reactor, SW_REACTOR_MINEVENTS);

    ...

    // 根据配置的reactor_num创建指定大小的线程池, 用于执行swReactorThread_loop_stream函数,进行事件循环
    ret = swReactorThread_start(serv, main_reactor);

    ...

    // 额外创建心跳检测线程,用于执行swHeartbeatThread_loop函数,把超时的连接主动断掉
    swHeartbeatThread_start(serv);

    ...

    main_reactor->id = serv->reactor_num;
    main_reactor->ptr = serv;
    // 用于接收客户连接
    main_reactor->setHandle(main_reactor, SW_FD_LISTEN, swServer_master_onAccept);

    ...

    // master线程也进行事件循环
    return main_reactor->wait(main_reactor, &tmo);
}

这里总结下来最多会创建1+reactor_num+1个线程

  • 心跳线程用于关闭长时间没有发送消息的客户连接
  • master线程用于接受客户端连接(单独的线程不必加锁)
  • 其他线程用于接收与发送数据 关闭连接等

线程池的事件循环

static int swReactorThread_loop_stream(swThreadParam *param)
{
    ...

    // 设置当前线程的事件回调
    reactor->onFinish = NULL;
    reactor->onTimeout = NULL;
    reactor->close = swReactorThread_close;

    reactor->setHandle(reactor, SW_FD_CLOSE, swReactorThread_onClose);
    // 与worker进程通讯时的回调
    reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_READ, swReactorThread_onPipeReceive);
    reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_WRITE, swReactorThread_onPipeWrite);

    //set protocol function point
    // 与客户连接进行通讯的回调,如读数据调用swReactorThread_onRead函数
    swReactorThread_set_protocol(serv, reactor);

    ...

    // 多进程模式下,初始化进程间通讯管道的连接参数
    if (serv->factory_mode == SW_MODE_PROCESS)
    {
        for (i = 0; i < serv->worker_num; i++)
        {
            if (i % serv->reactor_num == reactor_id)
            {
                pipe_fd = serv->workers[i].pipe_master;

                ...

                serv->connection_list[pipe_fd].from_id = reactor_id;
                serv->connection_list[pipe_fd].fd = pipe_fd;
                serv->connection_list[pipe_fd].object = sw_malloc(sizeof(swLock));

                ...

            }
        }
    }

    ...

    //主要的事件循环,用于读写监听
    reactor->wait(reactor, NULL);
    //shutdown
    reactor->free(reactor);
    pthread_exit(0);
    return SW_OK;
}

reactor->wait我们这里以epoll为例,分析事件循环都做些什么,具体的函数就是swReactorEpoll_wait.

这里多说一点,除心跳检测线程外(因为它不需要事件循环),其他线程都会创建一个swReactor对象,并拥有一个epoll_fd资源.

master线程负责接受客户端连接,然后把连接分配给一个工作线程,并让工作线程添加该连接的读写监听,那么该线程就负责该客户连接的所有读写事务了.

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    ...

    while (reactor->running > 0)
    {
        msec = reactor->timeout_msec;
        n = epoll_wait(epoll_fd, events, max_event_num, msec);
        if (n < 0)
        {
            ...
        }
        else if (n == 0)
        {
            // 超时的回调
            if (reactor->onTimeout != NULL)
            {
                reactor->onTimeout(reactor);
            }
            continue;
        }
        for (i = 0; i < n; i++)
        {
            event.fd = events[i].data.u64;
            event.from_id = reactor_id;
            event.type = events[i].data.u64 >> 32;
            event.socket = swReactor_get(reactor, event.fd);

            //read
            if ((events[i].events & EPOLLIN) && !event.socket->removed)
            {
                // 这里要区分是worker进程还是客户端的事件(下同)
                // worker进程由于是管道通讯,所以设置的事件是"SW_FD_PIPE | SW_EVENT_READ"
                // 其对应的回调是swReactorThread_onPipeReceive函数
                // 而客户端连接由于是TCP连接,所以设置的是"SW_FD_TCP | SW_EVENT_READ"
                // 其对应的回调是swReactorThread_onRead函数
                // 这里的回调函数是通过查表得来的,因为管道的fd跟客户端的fd一定是不同的
                // swoole即把其fd作为key,回调函数作为value.这样数据不用加锁也不会串,因为他们在不同的信道当中.

                handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
                ret = handle(reactor, &event);
                ...
            }
            //write
            if ((events[i].events & EPOLLOUT) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
                ret = handle(reactor, &event);
                ...
            }
            //error
            ....
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_ERROR, event.type);
                ret = handle(reactor, &event);
                ...
            }
        }

        if (reactor->onFinish != NULL)
        {
            reactor->onFinish(reactor);
        }
    }
    return 0;
}

至此,master进程在process模式下的每个线程的具体工作也分析完毕了.

base模式

base模式相较process模式就要简单的多了,因为它少了多个reactor线程,也少了worker与reactor之间的通讯,而是由worker独立维护客户端连接, 从swoole提供的默认参数是process模式来看,大多数服务端的应用场景并不是base模式.

base模式调用的是swReactorProcess_start,我们就接着这个函数分析.

int swReactorProcess_start(swServer *serv)
{

    ...

    // 初始化event_workers字段
    if (swProcessPool_create(&SwooleGS->event_workers, serv->worker_num, serv->max_request, 0, 1) < 0)
    {
        return SW_ERR;
    }

    SwooleGS->event_workers.ptr = serv;
    // 设置worker的事件循环函数
    SwooleGS->event_workers.main_loop = swReactorProcess_loop;

    ...

    //
    if (SwooleG.task_worker_num > 0)
    {

        ...

        // 指定task的事件循环是swProcessPool_worker_loop函数
        // 这里主要是通过管道读取worker发过来的数据,然后调用pool->onTask把数据传输给用户设置的回调函数,消化这个task
        swTaskWorker_init(&SwooleGS->task_workers);
        // 运行task进程组
        swProcessPool_start(&SwooleGS->task_workers);

        int i;
        for (i = 0; i < SwooleGS->task_workers.worker_num; i++)
        {
            // 将task添加到manager管理的进程组中
            swProcessPool_add_worker(&SwooleGS->event_workers, &SwooleGS->task_workers.workers[i]);
        }
    }

    // 这里是通过swoole_server->addProcess添加的进程
    if (serv->user_worker_list)
    {
        swUserWorker_node *user_worker;
        LL_FOREACH(serv->user_worker_list, user_worker)
        {
            ...

            // fork进程并运行进程
            // 这里会调用php_swoole_process_start启动该process进程
            // 在该函数中,会读取swoole_process对象的callback属性,即用户设置的回调函数
            // 然后调用sw_call_user_function_ex执行用户设置的php函数

            swManager_spawn_user_worker(serv, user_worker->worker);
        }
        SwooleGS->event_workers.onWorkerNotFound = swManager_wait_user_worker;
    }

    ...

    // 信号量的初始化,同process模式
    swServer_signal_init();

    // worker进程组, 调用swReactorProcess_loop进程事件循环
    swProcessPool_start(&SwooleGS->event_workers);

    // manager进程进行循环,主要执行worker重启等操作
    swProcessPool_wait(&SwooleGS->event_workers);
    swProcessPool_shutdown(&SwooleGS->event_workers);

    return SW_OK;
}

worker进程的事件循环回调

static int swReactorProcess_loop(swProcessPool *pool, swWorker *worker)
{

    ...

    // 创建事件循环对象,像process模式的例子,就是创建的epoll多路复用
    if (swReactor_create(reactor, SW_REACTOR_MAXEVENTS) < 0)
    {
        swWarn("ReactorProcess create failed.");
        return SW_ERR;
    }

    swListenPort *ls;
    int fdtype;

    //listen the all tcp port
    LL_FOREACH(serv->listen_list, ls)
    {
        fdtype = swSocket_is_dgram(ls->type) ? SW_FD_UDP : SW_FD_LISTEN;

        ...

        // 监听TCP与UDP的端口
        reactor->add(reactor, ls->sock, fdtype);
    }
    SwooleG.main_reactor = reactor;

    // 设置一系列的监听回调,这里跟比较特别的就是前面讲的设置了accept回调

    //connect
    reactor->setHandle(reactor, SW_FD_LISTEN, swServer_master_onAccept);
    //close
    reactor->setHandle(reactor, SW_FD_CLOSE, swReactorProcess_onClose);
    //pipe
    reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_WRITE, swReactor_onWrite);
    reactor->setHandle(reactor, SW_FD_PIPE | SW_EVENT_READ, swReactorProcess_onPipeRead);

    ...

    // 设置客户端的读写回调
    swReactorThread_set_protocol(serv, reactor);

    ...

    // 每一个进程都有一个心跳检测的线程
    if (serv->heartbeat_check_interval > 0)
    {
        swHeartbeatThread_start(serv);
    }

    ...

    // 进程事件循环, 下面我们仍然以epoll举例
    struct timeval timeo;
    timeo.tv_sec = 1;
    timeo.tv_usec = 0;
    return reactor->wait(reactor, &timeo);
}

base模式跟process模式一样(epoll为例),最后都是调用的swReactorEpoll_wait函数

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    ...

    while (reactor->running > 0)
    {
        msec = reactor->timeout_msec;
        n = epoll_wait(epoll_fd, events, max_event_num, msec);
        if (n < 0)
        {
           ...
        }
        else if (n == 0)
        {
            ...
        }
        for (i = 0; i < n; i++)
        {
            event.fd = events[i].data.u64;
            event.from_id = reactor_id;
            event.type = events[i].data.u64 >> 32;
            event.socket = swReactor_get(reactor, event.fd);

            // 下面我们以读操作为例
            if ((events[i].events & EPOLLIN) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLIN handle failed. fd=%d.", event.fd);
                }
            }

            ...
        }
        ...
    }
    return 0;
}

当客户端连接来的时候,epoll会把客户端fd标识为可读,即EPOLLIN标志.

我们观察代码可以发现一个问题, Swoole并没有用加锁的方式处理惊群, 关于惊群可以看看这篇文章, nginx中是所有加锁的方式来处理的惊群,对应其配置文件event模块的accept_mutex配置项.

Swoole这里采取让epoll_wait等待1秒钟,那么当一个客户连接到来时,每个worker都会收到read事件,但是只有一个worker 能接收这个客户端连接,这里是不是还有优化的空间呢?仍值得商榷.

数据读写

通过上面的分析(我们只分析了关于客户端的数据走向,并且是TCP协议的) 我们已经明确了swoole在启动的时候,做了哪些操作.下面我们仍然分别对base模式跟process模式进行讨论.

process模式
master线程接收client连接:
swReactorEpoll_wait -> swServer_master_onAccept -> accept系统调用

master线程将client连接分配给reactor线程:
swServer_master_onAccept -> "sub_reactor->add" -> epoll_ctl系统调用

reactor线程读取client数据:
swReactorEpoll_wait -> swReactorThread_onRead -> "port->onRead" -> swPort_onRead_raw -> swConnection_recv -> recv系统调用

reactor线程转发数据给worker:
swPort_onRead_raw -> "serv->factory.dispatch" -> swFactoryProcess_dispatch -> swReactorThread_send2worker -> write管道

worker接收reactor线程数据:
swReactorEpoll_wait -> swWorker_onPipeReceive -> read管道 -> swWorker_onTask -> "goto do_task" -> serv->onReceive
-> php_swoole_onReceive -> sw_call_user_function_ex(将数据传递给用户设置的函数)

worker发送数据给reactor线程:
swoole_server.send -> PHP_METHOD(swoole_server, send) -> swServer_tcp_send -> "factory->finish" -> swFactoryProcess_finish
-> swWorker_send2reactor -> swSocket_write_blocking -> write管道

reactor线程接收worker数据:
swReactorEpoll_wait -> swReactorThread_onPipeReceive -> read管道

reactor线程发送数据给client:
swReactorThread_send -> swConnection_send -> send系统调用

server被动关闭连接:(同reactor线程读取客户端数据,但是recv 0)
swPort_onRead_raw -> swReactorThread_onClose -> swReactor_close -> close系统调用

server主动关闭连接:
swoole_server.close(worker发起) -> PHP_METHOD(swoole_server, close) -> "factory->end" -> swFactoryProcess_end(type:SW_EVENT_CLOSE)
-> "factory->finish" -> swFactoryProcess_finish -> swWorker_send2reactor -> swWorker_send2reactor
-> swSocket_write_blocking -> write管道
swReactorThread_onPipeReceive(reactor接收) -> read管道 -> swReactorThread_send -> "goto close_fd" -> "reactor->close"
-> swReactorThread_close -> swReactor_close -> close系统调用

至此,process模式下怎么接收客户端连接,怎么关闭客户端连接,怎么接收客户端数据,怎么给客户端发送数据等都有了清晰的分析.

下面我们分析base模式的.

base模式

base模式是每个worker进程单独接收client连接,各个client不能在worker端进行共享.

worker接收连接:
swServer_master_onAccept -> accept系统调用

worker接收client数据:
swReactorEpoll_wait -> swReactorThread_onRead ->  "port->onRead" -> swPort_onRead_raw -> swConnection_recv -> recv系统调用
swPort_onRead_raw -> "serv->factory.dispatch" -> swFactory_dispatch -> swWorker_onTask -> "goto do_task" -> serv->onReceive
-> php_swoole_onReceive -> sw_call_user_function_ex(将数据传递给用户设置的函数)

worker发送数据给client:
swoole_server.send -> PHP_METHOD(swoole_server, send) -> swServer_tcp_send -> "factory->finish" -> swFactory_finish
-> swReactorThread_send -> swConnection_send -> send系统调用

server被动关闭连接:
swReactorEpoll_wait -> swReactorThread_onRead ->  "port->onRead" -> swPort_onRead_raw -> swConnection_recv -> recv系统调用
-> swPort_onRead_raw(收到数据长度为0) -> swReactorThread_onClose -> swReactor_close -> close系统调用

server主动关闭连接:
swoole_server.close(worker发起) -> PHP_METHOD(swoole_server, close) -> "factory->end" -> swFactory_end
-> swReactorThread_close -> swReactor_close -> close系统调用

base模式下的流程相较process模式要简单的多.

验证脚本

我们写验证脚本只是简单的看看当前的分析有没有明显的问题.

其测试脚本模版为:

<?php
$serv = new swoole_server("127.0.0.1", 9501, SWOOLE_BASE);
$serv->set(array(
    'worker_num' => 0,
    'task_worker_num' => 0,
    'daemonize' => 1
));
function my_onStart($serv){
    echo "onStart\n";
}
function my_onShutdown($serv){
    echo "onShutdown\n";
}
function my_onTimer($serv, $interval){
    echo "onTimer\n";
}
function my_onClose($serv, $fd, $from_id){
    echo "onClose\n";
}
function my_onWorkerStart($serv, $worker_id){
    echo "onWorkerStart\n";
}
function my_onFinish(swoole_server $serv, $task_id, $from_worker_id, $data){
    echo "onFinish\n";

function my_onWorkerStop($serv, $worker_id){
    echo "onStop\n";
}
function my_onConnect($serv, $fd, $from_id)
{
    echo "Client: fd=$fd is connect.\n";
}
function my_onReceive(swoole_server $serv, $fd, $from_id, $data){
    $serv->send($fd, $data);
//  echo "Client: fd=$fd pid: " . posix_getpid() . " send: $data";
//  $serv->task($fd . '|' . $data);
}
function my_onTask(swoole_server $serv, $task_id, $from_id, $data){
    list($fd, $recv) = explode('|', $data, 2);
    $serv->send(intval($fd), $recv);
    echo "Task: fd=$fd pid: " . posix_getpid() ." send: $recv";
}
function my_onWorkerError(swoole_server $serv, $worker_id, $worker_pid, $exit_code){
    echo "worker abnormal exit. WorkerId=$worker_id|Pid=$worker_pid|ExitCode=$exit_code\n";
}
$serv->on('Start', 'my_onStart');
$serv->on('Connect', 'my_onConnect');
$serv->on('Receive', 'my_onReceive');
$serv->on('Close', 'my_onClose');
$serv->on('Shutdown', 'my_onShutdown');
$serv->on('Timer', 'my_onTimer');
$serv->on('WorkerStart', 'my_onWorkerStart');
$serv->on('WorkerStop', 'my_onWorkerStop');
$serv->on('Task', 'my_onTask');
$serv->on('Finish', 'my_onFinish');
$serv->on('WorkerError', 'my_onWorkerError');
$serv->start();

我们通过修改配置选项和构造函数参数等手段并查看输出,来完成测试.

1.测试process模式下,Worker可以给任意的fd发送消息.

$serv = new swoole_server("127.0.0.1", 9501, SWOOLE_BASE);
$serv->set(array(
    'worker_num' => 2,
    'task_worker_num' => 0,
    'daemonize' => 1
));

function my_onReceive(swoole_server $serv, $fd, $from_id, $data){
    $serv->send(1, $data);
//  echo "Client: fd=$fd pid: " . posix_getpid() . " send: $data";
//  $serv->task($fd . '|' . $data);
}

我们首先启动server, 因为第1个连接的客户端分配的fd我先测试出来都是1,所有我就都给1发. 我使用两个telnet连接server,并都给server发送消息,结果server显示收到两个连接. 并且第一个telnet收到两个client发来的消息(包括自己发的).

总结

我们首先从一个问题引出探索的内容,然后分析了Swoole的启动流程,在每个环节都干了什么.

然后我们分别分析了Swoole内置的两种模式: base和process模式,并分析了各自的执行流程.

最后我们分析了客户端从连接到关闭连接,客户端给服务端发送消息,服务端给客户端发送消息这些操作具体是怎么执行的.

通过对Swoole源代码有目的性的分析,虽然并没有完整的分析其使用的数据结构等,但也明显加深了我们对Swoole的理解.