发布于 4年前

php 非阻塞处理多进程请求

<?php
$pid = pcntl_fork();
if($pid==-1){
    throw new RuntimeException("fork error");
    exit(0);
}else if($pid==0){
    run();
}else{

    exit("exit");
}

function run(){

    $curChildPro = 0;
    $maxChildPro = 5;  // 同一时刻最多 5 个进程
    $index  = 0;
    $ppid = posix_getpid();
    $childs=[];

    $readFds = [];
    $writeFds= [];
    $exceptions = [];
    $context_option['socket']['backlog'] = 1024;
    $context = stream_context_create($context_option);
    stream_context_set_option($context, 'socket', 'so_reuseport', 1);
    $socket = stream_socket_server("tcp://0.0.0.0:8081", $errno, $errstr);
    stream_set_blocking($socket,0);
    $readFds[]=$socket;
    while (true) {
        while (count($childs)>=$maxChildPro){
            foreach($childs as $key => $pid) {
                $res = pcntl_waitpid($pid, $status, WNOHANG);
                // If the process has already exited
                if($res == -1 || $res > 0)
                    unset($childs[$key]);
            }
            sleep(1);
        }

        $index ++;
        $pid =  pcntl_fork();//在此处代码会裂开两部分,一个父进程,一个子进程,可以共享$index变量
        if ($pid == -1) {

        }elseif ($pid > 0) {
            $childs[]=$pid;
            $curChildPro++;
            //父进程会得到子进程号$pid,所以这里是父进程执行的逻辑
            //echo "-------- current process\e[1;31m" . $curChildPro . "\e[0m--------.\r\n";
            //echo "\e[1;31m我是父进程{$index},我的进程id是{$ppid}.我的子进程id{$pid}\e[0m".PHP_EOL;
            cli_set_process_title("我是父进程{$index},我的进程id是{$ppid}.我的子进程id{$pid}");

        }elseif($pid==0){
            $cpid = posix_getpid();

            //echo "我是{$ppid}的子进程,我的进程{$index}id是{$cpid}.".PHP_EOL;
            cli_set_process_title("我是{$ppid}的子进程,我的进程id是{$cpid}.");
            while (1) {
                $reads = $readFds;
                $writes = $writeFds;
                $ret = @stream_select($reads, $writes, $exceptions, null);
                if (!$ret){
                    continue;
                }
                if (!empty($reads)) {
                    foreach ($reads as $fd) {
                        if ($fd == $socket) {
                            set_error_handler(function () {
                            });
                            $connect = stream_socket_accept($socket, 0, $remoteAddr);
                            stream_set_blocking($connect, 0);
                            restore_error_handler();
                            $readFds[] = $connect;
                            $writeFds[] = $connect;
                        } else {
                            $data = fread($fd, 1024);
                            if ($data == '') {
                                $ret = array_search($fd, $readFds);
                                if ($ret !== false) {
                                    unset($readFds[$ret]);
                                    fclose($fd);
                                }
                            } else {
                               //request data
                            }
                        }
                    }
                }
                if(!empty($writes)){
                    foreach ($writes as $fd) {
                        if ($fd != $socket) {
                            set_error_handler(function () {
                            });
                            $res = "HTTP/1.1 200 ok\r\nAccept-Ranges: bytes\r\ncontent-type: text/html; charset=utf-8\r\n\r\nThe local time is " . date('Y-m-d H:i:s') . "\r\n";
                            fwrite($fd, $res);
                            restore_error_handler();
                            $wet = array_search($fd, $writeFds);
                            if ($wet !== false) {
                                unset($writeFds[$wet]);
                            }
                            fclose($fd);
                           exit();
                        }

                    }
                }

            }

        }
    }
}
©2020 edoou.com   京ICP备16001874号-3