174 lines
7.1 KiB
PHP
174 lines
7.1 KiB
PHP
<?php
|
||
class Chat{
|
||
private $server = null;//单例存放websocket_server对象
|
||
private $passWord;
|
||
private $redisPort;
|
||
public function __construct(){
|
||
$info=file_get_contents('./web/cacheSet.log');
|
||
$port=json_decode($info,true)['swoole'];
|
||
//$redisPassWd=json_decode($info,true)['redis'];
|
||
$ssl_certificate=json_decode($info,true)['ssl_certificate'];
|
||
$ssl_certificate_key=json_decode($info,true)['ssl_certificate_key'];
|
||
$this->passWord=json_decode($info,true)['redis'];
|
||
$this->redisPort=json_decode($info,true)['redispx'];
|
||
if(!$port){
|
||
return;
|
||
}
|
||
|
||
//实例化swoole_websocket_server并存储在我们Chat类中的属性上,达到单例的设计
|
||
$this->server =new swoole_websocket_server("0.0.0.0","$port",SWOOLE_BASE, SWOOLE_SOCK_TCP | SWOOLE_SSL );
|
||
$this->server->on('WorkerStart',[$this,'onWokerStart']);
|
||
//监听连接事件
|
||
$this->server->on('open', [$this, 'onOpen']);
|
||
//监听接收消息事件
|
||
$this->server->on('message', [$this, 'onMessage']);
|
||
//监听关闭事件
|
||
$this->server->on('close', [$this, 'onClose']);
|
||
//设置允许访问静态文件
|
||
$this->server->set([
|
||
//'heartbeat_idle_time'=>600,//检查最近一次发送数据的时间和当前时间的差 大于指定的值就强制关闭
|
||
//建议 heartbeat_idle_time 为 heartbeat_check_interval 的两倍多一点。
|
||
// 'heartbeat_check_interval'=>60,//每60秒检测一次
|
||
'daemonize'=>1,//1为开启守护进程,后台挂机自动运行,false为不开机守护进程
|
||
//'task_worker_num'=>4,
|
||
// 'woker_num'=>4,
|
||
// 'log_file' => '/www/wwwroot/app.b-ke.cn/addons/yb_wm/swoole.log',//日志记录位置
|
||
'ssl_cert_file'=>$ssl_certificate,//https证书位置
|
||
'ssl_key_file'=>$ssl_certificate_key,//https证书位置
|
||
]);
|
||
swoole_set_process_name("openVoice");
|
||
//开启服务
|
||
$this->server->start();
|
||
}
|
||
public function connectRedis($redisPassWd,$port){
|
||
$redis=new Redis();
|
||
$redis->connect('127.0.0.1',$port);
|
||
if($redisPassWd){
|
||
$redis->auth($redisPassWd);
|
||
}
|
||
return $redis;
|
||
}
|
||
public function onWokerStart($server,$woker_id){
|
||
$redisPassWd=$this->passWord;
|
||
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
|
||
if($woker_id==0){
|
||
$redis->set("userData", []);
|
||
$server->tick(1500, function ($id)use($server) {
|
||
try {
|
||
//$info=file_get_contents('./web/cacheSet.log');
|
||
$redisPassWd=$this->passWord;
|
||
//var_dump($redisPassWd);
|
||
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
|
||
$data=$redis->lpop('myqueue');
|
||
foreach ($server->connections as $fd){
|
||
if($server->isEstablished($fd)){
|
||
$storeId=$redis->get("storeIdByFd".$fd);
|
||
$fastData=$redis->get('fastBy'.$storeId);
|
||
$queuingData=$redis->get('queuing'.$storeId);
|
||
// $server->push($fd,json_encode(['storeId'=>$storeId,'data'=>$fastData]));
|
||
if($fastData){
|
||
$newFastData=json_decode($fastData,true);
|
||
$newFastData['day']=date("Y-m-d");
|
||
$server->push($fd,json_encode($newFastData));
|
||
}
|
||
if($queuingData){
|
||
$newQueuingData=json_decode($queuingData,true);
|
||
$newQueuingData['day']=date("Y-m-d");
|
||
$server->push($fd,json_encode($newQueuingData));
|
||
}
|
||
// 将消息内容推送到客户端
|
||
if($data){
|
||
$server->push($fd,$data);
|
||
}
|
||
}
|
||
}
|
||
|
||
// if($str){
|
||
// foreach($str as $v){
|
||
// $server->push($v, $data);
|
||
// }
|
||
// }
|
||
//
|
||
// if(!$data){
|
||
// $data=json_encode(['code' =>'2', 'msg' =>'等待接收']);
|
||
// }
|
||
// $str = json_decode($redis->get("userData"), true);
|
||
// if($str){
|
||
// foreach($str as $v){
|
||
// $server->push($v, $data);
|
||
// }
|
||
// }
|
||
|
||
|
||
}catch (\Exception $exception){
|
||
// var_dump($exception);
|
||
}
|
||
|
||
//echo date('Y-m-d H:i:s',time()) . "\r\n";
|
||
});
|
||
}
|
||
}
|
||
/**
|
||
* 连接成功回调函数
|
||
* @param $server
|
||
* @param $request
|
||
*/
|
||
public function onOpen($server, $request){
|
||
// $server->push($request->fd,json_encode($request));
|
||
// $str=json_decode($redis->get("userData"), true);
|
||
// if($str == "") $str = [];
|
||
// // var_dump($request->fd);
|
||
// if(!in_array($request->fd, $str)){
|
||
// array_push($str, $request->fd); //压入数组
|
||
// $str = json_encode($str);
|
||
// $redis->set("userData", $str);
|
||
// }
|
||
//$server->push($request->fd,json_encode(['code' =>1, 'msg' =>$request->fd.'已经连接上了']));
|
||
}
|
||
|
||
/**
|
||
* 接收到信息的回调函数
|
||
* @param $server
|
||
* @param $frame
|
||
*/
|
||
public function onMessage($server, $frame){
|
||
$msg = json_decode($frame->data,true);
|
||
if($msg['type']=='ping'){
|
||
$server->push($frame->fd,json_encode(['code' =>1,'msg'=>'ping']));
|
||
}
|
||
if($msg['type']=='storeId'){
|
||
$redisPassWd=$this->passWord;
|
||
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
|
||
$redis->set("storeIdByFd".$frame->fd, $msg['data']);
|
||
}
|
||
|
||
// if($msg=="ping"){
|
||
// var_dump('我来自'.$frame->fd);
|
||
// $server->push($frame->fd,json_encode(['code' =>1,'msg'=>'ping']));
|
||
// }
|
||
//var_dump($frame->fd.$msg);
|
||
//echo '来了,说:' . $frame->data . PHP_EOL;//打印到我们终端
|
||
// foreach ($server->connections as $fd) {//遍历TCP连接迭代器,拿到每个在线的客户端id
|
||
// //将客户端发来的消息,推送给所有用户,也可以叫广播给所有在线客户端
|
||
// $msg = $frame->data;
|
||
// $server->push($fd, json_encode(['no' => $frame->fd, 'msg' => $msg]));
|
||
// }
|
||
}
|
||
|
||
|
||
/**
|
||
* 断开连接回调函数
|
||
* @param $server
|
||
* @param $fd
|
||
*/
|
||
public function onClose($server, $fd){
|
||
$redisPassWd=$this->passWord;
|
||
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
|
||
$redis->del("storeIdByFd".$fd);
|
||
// echo $fd . '走了' . PHP_EOL;//打印到我们终端
|
||
}
|
||
|
||
}
|
||
|
||
$obj = new Chat();
|