Swoole入门到实战打造高性能赛事直播平台

PHP7.2源码编译

安装步骤
1. 解压
2. ./configure --prefix=/usr/local/php
3. make && make install

编辑~/.bash_profile 增加
alias php7=/usr/local/php/bin/php
运行 source ~/.bash_profile

查看php.ini默认路径
php --ini # ./configure –with-config-file-path=/usr/local/php/etc (编译时指定)

客户端和服务端

tcp服务端

<?php

// 构建server对象
$serv = new swoole_server('127.0.0.1', 9501);

// 设置运行参数
$serv->set(array(
    'worker_num' => 4,
//    'daemonize' => true,
    'backlog' => 128,
));

// 注册事件回调

/**
 * $fd: 客户端唯一标识
 * $reactor_id: 处理线程id
 */
$serv->on('connect', function ($serv, $fd, $reactor_id) {
        echo "client connect: {$fd}";
});

$serv->on('receive', function ($serv, $fd, $reactor_id, $data) {
        $serv->send($fd, 'Swoole: '.$data);
});

$serv->on('close', function ($serv, $fd) {
        echo "Client: Close.\n";
});

// 启动
$serv->start();

tcp客户端

<?php

// 构建客户端对象
$client = new swoole_client(SWOOLE_SOCK_TCP);

if (!$client->connect('127.0.0.1', 9501)) {
        exit('链接失败');
}

fwrite(STDOUT, "请输入\n");

while ($msg = trim(fgets(STDIN))) {
        $client->send($msg);
        $result = $client->recv();
        echo $result."\n";
}

http服务端

<?php

// 构建http_server对象

$http = new swoole_http_server('127.0.0.1', 9501);

// 设置静态资源目录
$http->set(array(
        'enable_static_handler' => true,
        'document_root' => '/path/to/document_root'
));


$http->on('request', function ($request, $response) {
        $response->end("收到浏览器信息:".json_encode($request->get));
});

$http->start();

websocket服务端

<?php
class Ws
{
    public function __construct() {
        $this->server = new swoole_websocket_server("0.0.0.0", 9501);
        $server->on('open', array($this, 'open'));
        $server->on('message', array($this, 'message'));
        $server->on('close', array($this, 'close'));
        // 注册任务处理事件
        $server->on('task', array($this, 'task'));
        $server->on('finih', array($this, 'finih'));
        $server->set(array(
            'task_worker_num' => 4
        ));
        $this->server->start();
    }

    public function open(swoole_websocket_server $server, $request) {
        echo "server: handshake success with fd{$request->fd}\n";
    }

    public function message(swoole_websocket_server $server, $frame) {
        // 投递任务处理
        $this->server->task(array('name' => 'zhorz', 'td' => $frame->td));
        echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
        $server->push($frame->fd, "this is server");
    }

    public function close($ser, $fd) {
        echo "client {$fd} closed\n";
    }

    public function task(swoole_server $serv, int $task_id, int $src_worker_id, mixed $data) {
        // 处理耗时任务
        // $task_id和$src_worker_id组合起来全局唯一
    }

    public function finish(swoole_server $serv, int $task_id, string $data) {
        // 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程
    }
}

websocket客户端

// 协议标识
ws://example.com:80/some/path

// js例子
var ws = new WebSocket("wss://echo.websocket.org");

ws.onopen = function(evt) { 
  console.log("Connection open ..."); 
  ws.send("Hello WebSockets!");
};

ws.onmessage = function(evt) {
  console.log( "Received Message: " + evt.data);
  ws.close();
};

ws.onclose = function(evt) {
  console.log("Connection closed.");
};  

异步非阻塞IO场景

swoole毫秒定时器

int swoole_timer_tick(int $ms, callable $callback, mixed $user_param);
int swoole_timer_after(int $after_time_ms, mixed $callback_function, mixed $user_param);
bool swoole_timer_clear(int $timer_id)

异步文件IO读取文件

// 异步读取文件内容(不能用于大文件的读取, 最大可读取4M的文件)
swoole_async_readfile(__DIR__."/server.php", function($filename, $content) {
     echo "$filename: $content"; 
});

// 异步写文件(最大可写入4M)
swoole_async_writefile('test.log', $file_content, function($filename) {
    echo "wirte ok.\n";
}, $flags = 0);

// 异步读文件(支持大文件读取, 每次只读$size个字节,不会占用太多内存, 在读完后会自动回调$callback函数)
bool swoole_async_read(string $filename, mixed $callback, int $size = 8192, int $offset = 0);
// 回调函数接受2个参数:bool callback(string $filename, string $content);
// ($filename,文件名称; $content,读取到的分段内容,如果内容为空,表明文件已读完)
bool callback(string $filename, string $content);

// 异步写文件, 通过传入的offset参数来确定写入的位置
bool swoole_async_write(string $filename, string $content, int $offset = -1, mixed $callback = NULL);

异步mysql

$db = new swoole_mysql;
$server = array(
    'host' => '192.168.56.102',
    'port' => 3306,
    'user' => 'test',
    'password' => 'test',
    'database' => 'test',
    'charset' => 'utf8', //指定字符集
    'timeout' => 2,  // 可选:连接超时时间(非查询超时时间),默认为SW_MYSQL_CONNECT_TIMEOUT(1.0)
);

$db->connect($server, function ($db, $r) {
    if ($r === false) {
        var_dump($db->connect_errno, $db->connect_error);
        die;
    }
    $sql = 'show tables';
    $db->query($sql, function(swoole_mysql $db, $r) {
        if ($r === false)
        {
            var_dump($db->error, $db->errno);
        }
        elseif ($r === true )
        {
            var_dump($db->affected_rows, $db->insert_id);
        }
        var_dump($r);
        $db->close();
    });
});

异步redis

// 安装redis
// 安装hiredis库
// 重新编译swoole支持redis --enable-anysc-redis
// ./configure -> make clean -> make && make install -> 查看是否成功: php --ri swoole (查看扩展的版本信息)

php命令行查看信息:

  --rf <name>      Show information about function <name>.
  --rc <name>      Show information about class <name>.
  --re <name>      Show information about extension <name>.
  --rz <name>      Show information about Zend extension <name>.
  --ri <name>      Show configuration for extension <name>.

swoole process

linux下检测进程是否存在

使用"kill -0 pid",kill -0不会向进程发送任何信号,但是会进行错误检查。如果进程存在,命令返回0,如果不存在返回1。
对于普通用户来说只能用于检查自己的进程,因为向其它用户的进程发送信号会因为没有权限而出错,返回值也是1

wait如何处理多个子进程

https://blog.csdn.net/sl1248/article/details/50936823
调用一次wait只能回收一个子进程
1. 父进程while循环
2. 父进程注册SIGCHLD信号处理

swoole_table内存共享(解决多进程/多线程数据共享和同步加锁问)

$table = new swoole_table(1024); // $size行数
$table->column('data',Table::TYPE_STRING, 1); // 增加一列
$table->create();// 申请内存创建表
$table->set('zhorz', array('data' => 'A')); // 设置行数据
$table->get('zhorz', 'data'); // 获取某行某字段值
$table->del('zhorz');// 删除行数据

swoole协程Swoole\Coroutine*

// 创建协程
go(function () {
    co::sleep(0.5);
    echo "hello";
});
go("test");
go([$object, "method"]);
// 通道操作
$c = new chan(1);
$c->push($data);
$c->pop();
// 协程客户端
$redis = new Co\Redis;
$mysql = new Co\MySQL;
$http = new Co\Http\Client;
$tcp = new Co\Client;
$http2 = new Co\Http2\Client;
// 协程的让出和恢复
use Swoole\Coroutine as co;
$id = go(function(){
    $id = co::getUid();
    echo "start coro $id\n";
    co::suspend();
    echo "resume coro $id @1\n";
    co::suspend();
    echo "resume coro $id @2\n";
});
echo "start to resume $id @1\n";
co::resume($id);
echo "start to resume $id @2\n";
co::resume($id);
echo "main\n";

协程之channel

// 协程间同步
use Swoole\Coroutine as co;
$chan = new co\Channel(1);
co::create(function () use ($chan) {
    for($i = 0; $i < 100000; $i++) {
        co::sleep(1.0);
        $chan->push(['rand' => rand(1000, 9999), 'index' => $i]);
        echo "$i\n";
    }
});
co::create(function () use ($chan) {
    while(1) {
        $data = $chan->pop();
        var_dump($data);
    }
});
swoole_event::wait();

协程之select

$c1 = new chan();
$c2 = new chan();
function fibonacci($c1, $c2)
{
    go(function () use ($c1, $c2) {
        $a = 0;
        $b = 1;
        while(1) {
            $read_list = [$c2];
            $write_list = [$c1];
            $result = chan::select($read_list, $write_list, 2);
            // 当$read或$write数组中有部分channel对象处于可读或可写状态,select会立即返回
            if ($write_list) {
                $t = $a + $b;
                $a = $b;
                $b = $t;
                $c1->push($a);
            }
            if ($read_list) {
                $ret = $c2->pop();
                if ($ret === 1) {
                    return 1;
                }
            }
        }
    });
}
$num = 10;
go(function () use ($c1, $c2, $num) {
    for ($i = 0; $i < $num; $i ++) {
        $ret = $c1->pop();
        echo "fibonacci @$i $ret\n";
    }
    $c2->push(1);
});    
fibonacci($c1, $c2);

协程并发请求的列子

$serv = new \swoole_http_server("127.0.0.1", 9503, SWOOLE_BASE);

$serv->on('request', function ($req, $resp) {
    $chan = new chan(2);
    go(function () use ($chan) {
        $cli = new Swoole\Coroutine\Http\Client('www.qq.com', 80);
            $cli->set(['timeout' => 10]);
            $cli->setHeaders([
            'Host' => "www.qq.com",
            "User-Agent" => 'Chrome/49.0.2587.3',
            'Accept' => 'text/html,application/xhtml+xml,application/xml',
            'Accept-Encoding' => 'gzip',
        ]);
        $ret = $cli->get('/');
        $chan->push(['www.qq.com' => $cli->body]);
    });

    go(function () use ($chan) {
        $cli = new Swoole\Coroutine\Http\Client('www.163.com', 80);
        $cli->set(['timeout' => 10]);
        $cli->setHeaders([
            'Host' => "www.163.com",
            "User-Agent" => 'Chrome/49.0.2587.3',
            'Accept' => 'text/html,application/xhtml+xml,application/xml',
            'Accept-Encoding' => 'gzip',
        ]);
        $ret = $cli->get('/');
        $chan->push(['www.163.com' => $cli->body]);
    });

    $result = [];
    for ($i = 0; $i < 2; $i++)
    {
        $result += $chan->pop();
    }
    $resp->end(json_encode($result));
});
$serv->start();

系统服务监控和优化

# 调用linux命令监听
netstat -anp 2>/dev/null | grep 8811 | grep LISTEN | wc -l

# 搜索文件关键词
find . -type f -name '*.php' | xargs grep 'echo 111'

# 平滑重启服务
swoole_set_process_name -> 设置进程名称
pidof 进程名称 -> 获取进程id
kill -USR1 `pidof 进程名称` -> 平滑重启??

# nginx负载均衡说明
http://tengine.taobao.org/nginx_docs/cn/docs/http/ngx_http_upstream_module.html

标签: none