Skip to main content

Swoole安装及应用(二)

一、Task任务应用(异步非阻塞)

1.简介

假如要发10000封邮件,for循环10000遍,用户直接就疯了,什么破网站,垃圾!
但实际上,我们很可能有超过1万的邮件。怎么处理这个延迟的问题?
答案就是用异步,方法有很多,异步队列、异步Task任务,那么这里的使用场景就是执行耗时的操作,发送邮件,广播等实现。

2.使用

设置任务投递,以及onTask任务处理
设置任务完成接收,onFinish方法
设置task_worker_num(设置异步任务的工作进程数量)

3.代码撸起

面向过程写法:

<?php
$server = new swoole_websocket_server("0.0.0.0", 9501);
$server->set(array('task_worker_num' => 4));
$server->on('open', function (swoole_websocket_server $server, $request) {
    echo "Connection:{$request->fd} has Ok\n";
});

$server->on('message', function (swoole_websocket_server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    $data = [
    	'task' => 1,
    	'fd'   => $frame->fd
    ];
    $server->task($data);
    $server->push($frame->fd, "this is server");
});

$server->on('task',function($ser, $task_id, $from_id, $data){
	sleep(10);
	print_r($data);
	return 'finsh task ok';
});

//处理异步任务的结果
$server->on('finish', function ($ser, $task_id, $data) {
    echo "AsyncTask[$task_id] Finish: $data".PHP_EOL;
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

 

面向对象写法:

class Ws {

    CONST HOST = "0.0.0.0";
    CONST PORT = 9501;

    public $ws = null;
    public function __construct() {
        $this->ws = new swoole_websocket_server("0.0.0.0", 9501);

        $this->ws->set(
            [
                'worker_num' => 2,
                'task_worker_num' => 2,
            ]
        );
        $this->ws->on("open", [$this, 'onOpen']);
        $this->ws->on("message", [$this, 'onMessage']);
        $this->ws->on("task", [$this, 'onTask']);
        $this->ws->on("finish", [$this, 'onFinish']);
        $this->ws->on("close", [$this, 'onClose']);

        $this->ws->start();
    }

    /**
     * 监听ws连接事件
     * @param $ws
     * @param $request
     */
    public function onOpen($ws, $request) {
        echo "Connection:{$request->fd} has Ok\n";
    }

    /**
     * 监听ws消息事件
     * @param $ws
     * @param $frame
     */
    public function onMessage($ws, $frame) {
        echo "ser-push-message:{$frame->data}\n";
        // todo 10s
        $data = [
            'task' => 1,
            'fd' => $frame->fd,
        ];
        $ws->task($data);
        $ws->push($frame->fd, "server-push:".date("Y-m-d H:i:s"));
    }

    /**
     * @param $serv
     * @param $taskId
     * @param $workerId
     * @param $data
     */
    public function onTask($serv, $taskId, $workerId, $data) {
        print_r($data);
        // 耗时场景 10s
        sleep(10);
        return "on task finish"; // 告诉worker
    }

    /**
     * @param $serv
     * @param $taskId
     * @param $data
     */
    public function onFinish($serv, $taskId, $data) {
        echo "taskId:{$taskId}\n";
        echo "finish-data-sucess:{$data}\n";
    }


    /**/
    public function onClose($ws, $fd) {
        echo "clientid:{$fd}\n";
    }
}

$obj = new Ws();

4.简单总结

就是增加2个方法,一个是onTask,一个是onFinsh,一旦onTask异步处理完成,会马上调用onFinsh方法。

 

二、毫秒级定时器

1.相关函数

使用方法:
int swoole_timer_tick(int $msec, callable $callback):
设置一个间隔时钟定时器,与after定时器不同的是tick定时器会持续触发,直到调用swoole_timer_clear清除。
int swoole_timer_after(int $after_time_ms, mixed $callback_function):
在指定的时间后执行函数,需要1.7.7或更高版本,仅仅执行一次
bool swoole_timer_clear(int $timer_id):
使用定时器ID来删除定时器。

2.代码撸起

class Ws {

    CONST HOST = "0.0.0.0";
    CONST PORT = 8812;

    public $ws = null;
    public function __construct() {
        $this->ws = new swoole_websocket_server("0.0.0.0", 9501);

        $this->ws->set(
            [
                'worker_num' => 2,
                'task_worker_num' => 2,
            ]
        );
        $this->ws->on("open", [$this, 'onOpen']);
        $this->ws->on("message", [$this, 'onMessage']);
        $this->ws->on("task", [$this, 'onTask']);
        $this->ws->on("finish", [$this, 'onFinish']);
        $this->ws->on("close", [$this, 'onClose']);

        $this->ws->start();
    }

    /**
     * 监听ws连接事件
     * @param $ws
     * @param $request
     */
    public function onOpen($ws, $request) {
        var_dump($request->fd);
        if($request->fd == 1) {
            // 每2秒执行
            swoole_timer_tick(2000, function($timer_id){
                echo "2s: timerId:{$timer_id}\n";
            });
        }
    }

    /**
     * 监听ws消息事件
     * @param $ws
     * @param $frame
     */
    public function onMessage($ws, $frame) {
        echo "ser-push-message:{$frame->data}\n";
        // todo 10s
        $data = [
            'task' => 1,
            'fd' => $frame->fd,
        ];
        //$ws->task($data);

        swoole_timer_after(5000, function() use($ws, $frame) {
            echo "5s-after\n";
            $ws->push($frame->fd, "server-time-after:");
        });
        $ws->push($frame->fd, "server-push:".date("Y-m-d H:i:s"));
    }

    /**
     * @param $serv
     * @param $taskId
     * @param $workerId
     * @param $data
     */
    public function onTask($serv, $taskId, $workerId, $data) {
        print_r($data);
        // 耗时场景 10s
        sleep(10);
        return "on task finish"; // 告诉worker
    }

    /**
     * @param $serv
     * @param $taskId
     * @param $data
     */
    public function onFinish($serv, $taskId, $data) {
        echo "taskId:{$taskId}\n";
        echo "finish-data-sucess:{$data}\n";
    }

    /**
     * close
     * @param $ws
     * @param $fd
     */
    public function onClose($ws, $fd) {
        echo "clientid:{$fd}\n";
    }
}

$obj = new Ws();

3.总结

代码中使用两个定时器,当客户端连接后,第一个定时器会监听用户连接,当用户连接,会每隔2s触发,而第二个定时器会监听消息,当收到消息,会在5s之后触发一次。

 

三、异步文件系统IO

1.简介

Swoole异步文件读写基于线程池同步IO模拟实现,文件读写请求投递到任务队列,然后由AIO线程读写文件,完成后通知主线程。

可使用swoole_async_set函数设置AIO线程数量,提高处理能力。请注意底层会在每个工作进程中分别创建AIO线程,因此假设设置了worker_num = 10thread_num = 10,将会启动100个线程。

swoole_async_set([
    'thread_num' => 16,
]);

2.相关函数

异步读取文件:
	swoole_async_readfile()      适合小文件
	swoole_async_read()	     适合大文件
异步写入文件:
	swoole_async_writefile()     写入小文件
	swoole_async_write()         写入大文件
Ps:文件大小以4M区分。

3.代码撸起

实例一:
<?php
//读取文件
$result = swoole_async_readfile(__DIR__."/index.html",function($filename,$fileContent){
	echo "filename:".$filename.PHP_EOL;
	echo "content:".$fileContent;
});
var_dump($result);
echo "start";

实例二:尾部重写
<?php
//文件写入
$data = date("Ymd H:i:s");
$result = swoole_async_writefile(__DIR__.'/test.php',$data,function($filename,$fileContent){
	echo "success";
},FILE_APPEND);
echo "start";

实例三:尾部不重写
<?php
//文件写入
$data = date("Ymd H:i:s");
$result = swoole_async_writefile(__DIR__.'/test.php',$data,function($filename,$fileContent){
	echo "success";
},FILE_APPEND);
echo "start";

注意:
1.文件不存在会warning,否则会返回错误。
2.因为是异步所以会影响代码读取顺序,先同步,在异步

 

四、异步mysql客户端

1.简介

首先说同步,同步就是一个查询完了之后才能继续下一个查询,但是并发大了,会阻塞导致超时。

异步的数据库查询功能,执行SQL语句之后不必等待数据库返回结果。继续去执行其他的代码,当数据库返回结果是再对数据进行处理,如渲染页面,并将HTML页面发送给客户端。这样应用程序完全不需要阻塞等待。这种方式运行效率非常高

2.简单代码

$db = new swoole_mysql();
$server = array(
    'host' => '192.168.56.102',
    'port' => 3306,
    'user' => 'test',
    'password' => 'test',
    'database' => 'test',
    'charset' => 'utf8', //指定字符集
    'timeout' => 2,  // 可选:连接超时时间(非查询超时时间),默认为SW_MYSQL_CONNECT_TIMEOUT(1.0)
);

$db->connect($server, function ($db, $r) {
    if ($r === false) {
        var_dump($db->connect_errno, $db->connect_error);
        die;
    }
    $sql = 'show tables';
    $db->query($sql, function(swoole_mysql $db, $r) {
        if ($r === false)
        {
            var_dump($db->error, $db->errno);
        }
        elseif ($r === true )
        {
            var_dump($db->affected_rows, $db->insert_id);
        }
        var_dump($r);
        $db->close();
    });
});

五、异步redis应用

1.安装

1.1.redis服务

本机需要安装redis服务

wget http://download.redis.io/releases/redis-2.8.3.tar.gz
tar xzf redis-2.8.3.tar.gz
cd redis-2.8.3
make && make install
mkdir /usr/redis
cp redis-server  /usr/redis
cp redis-benchmark /usr/redis
cp redis-cli  /usr/redis
cp redis.conf  /usr/redis
cd /usr/redis
redis-server   redis.conf

1.2.hiredis库

文件包地址:https://github.com/redis/hiredis/releases
安装:
wget tar zxvf antirez-hiredis-v0.10.1-0-g3cc6a7f.zip
cd antirez-hiredis-3cc6a7f
make
make install
ldconfig

1.3编译swoole需要加入 -enable-async-redis

安装:
./configure --with-php-config=/usr/local/php7/bin/php-config --enable-async-redis
make && make install 
注意:
4.2.x版本可能会显示redis-client而没有async字样, 实际上是一样的
4.2.6版本及以后不再需要手动安装和启用, 而是swoole自带

查看是否支持:

php7 --ri swoole

1.4代码撸起

<?php
$redisClient = new swoole_redis();
$redisClient->connect('127.0.0.1',6379,function($redisClient,$result){
	echo "connect ok";
	var_dump($result);
	$redisClient->set("qk",time(),function($redisClient,$result){
	var_dump("set ok");
});

$redisClient->get("qk",function($redisClient,$result){
	var_dump($result);
});
$redisClient->close();
});

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

− 2 = 7