canyin-project/ybcy/ws.php

174 lines
7.1 KiB
PHP
Raw Normal View History

2024-11-01 16:07:54 +08:00
<?php
class Chat{
private $server = null;//单例存放websocket_server对象
private $passWord;
private $redisPort;
public function __construct(){
$info=file_get_contents('./web/cacheSet.log');
$port=json_decode($info,true)['swoole'];
//$redisPassWd=json_decode($info,true)['redis'];
$ssl_certificate=json_decode($info,true)['ssl_certificate'];
$ssl_certificate_key=json_decode($info,true)['ssl_certificate_key'];
$this->passWord=json_decode($info,true)['redis'];
$this->redisPort=json_decode($info,true)['redispx'];
if(!$port){
return;
}
//实例化swoole_websocket_server并存储在我们Chat类中的属性上达到单例的设计
$this->server =new swoole_websocket_server("0.0.0.0","$port",SWOOLE_BASE, SWOOLE_SOCK_TCP | SWOOLE_SSL );
$this->server->on('WorkerStart',[$this,'onWokerStart']);
//监听连接事件
$this->server->on('open', [$this, 'onOpen']);
//监听接收消息事件
$this->server->on('message', [$this, 'onMessage']);
//监听关闭事件
$this->server->on('close', [$this, 'onClose']);
//设置允许访问静态文件
$this->server->set([
//'heartbeat_idle_time'=>600,//检查最近一次发送数据的时间和当前时间的差 大于指定的值就强制关闭
//建议 heartbeat_idle_time 为 heartbeat_check_interval 的两倍多一点。
// 'heartbeat_check_interval'=>60,//每60秒检测一次
'daemonize'=>1,//1为开启守护进程后台挂机自动运行false为不开机守护进程
//'task_worker_num'=>4,
// 'woker_num'=>4,
// 'log_file' => '/www/wwwroot/app.b-ke.cn/addons/yb_wm/swoole.log',//日志记录位置
'ssl_cert_file'=>$ssl_certificate,//https证书位置
'ssl_key_file'=>$ssl_certificate_key,//https证书位置
]);
swoole_set_process_name("openVoice");
//开启服务
$this->server->start();
}
public function connectRedis($redisPassWd,$port){
$redis=new Redis();
$redis->connect('127.0.0.1',$port);
if($redisPassWd){
$redis->auth($redisPassWd);
}
return $redis;
}
public function onWokerStart($server,$woker_id){
$redisPassWd=$this->passWord;
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
if($woker_id==0){
$redis->set("userData", []);
$server->tick(1500, function ($id)use($server) {
try {
//$info=file_get_contents('./web/cacheSet.log');
$redisPassWd=$this->passWord;
//var_dump($redisPassWd);
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
$data=$redis->lpop('myqueue');
foreach ($server->connections as $fd){
if($server->isEstablished($fd)){
$storeId=$redis->get("storeIdByFd".$fd);
$fastData=$redis->get('fastBy'.$storeId);
$queuingData=$redis->get('queuing'.$storeId);
// $server->push($fd,json_encode(['storeId'=>$storeId,'data'=>$fastData]));
if($fastData){
$newFastData=json_decode($fastData,true);
$newFastData['day']=date("Y-m-d");
$server->push($fd,json_encode($newFastData));
}
if($queuingData){
$newQueuingData=json_decode($queuingData,true);
$newQueuingData['day']=date("Y-m-d");
$server->push($fd,json_encode($newQueuingData));
}
// 将消息内容推送到客户端
if($data){
$server->push($fd,$data);
}
}
}
// if($str){
// foreach($str as $v){
// $server->push($v, $data);
// }
// }
//
// if(!$data){
// $data=json_encode(['code' =>'2', 'msg' =>'等待接收']);
// }
// $str = json_decode($redis->get("userData"), true);
// if($str){
// foreach($str as $v){
// $server->push($v, $data);
// }
// }
}catch (\Exception $exception){
// var_dump($exception);
}
//echo date('Y-m-d H:i:s',time()) . "\r\n";
});
}
}
/**
* 连接成功回调函数
* @param $server
* @param $request
*/
public function onOpen($server, $request){
// $server->push($request->fd,json_encode($request));
// $str=json_decode($redis->get("userData"), true);
// if($str == "") $str = [];
// // var_dump($request->fd);
// if(!in_array($request->fd, $str)){
// array_push($str, $request->fd); //压入数组
// $str = json_encode($str);
// $redis->set("userData", $str);
// }
//$server->push($request->fd,json_encode(['code' =>1, 'msg' =>$request->fd.'已经连接上了']));
}
/**
* 接收到信息的回调函数
* @param $server
* @param $frame
*/
public function onMessage($server, $frame){
$msg = json_decode($frame->data,true);
if($msg['type']=='ping'){
$server->push($frame->fd,json_encode(['code' =>1,'msg'=>'ping']));
}
if($msg['type']=='storeId'){
$redisPassWd=$this->passWord;
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
$redis->set("storeIdByFd".$frame->fd, $msg['data']);
}
// if($msg=="ping"){
// var_dump('我来自'.$frame->fd);
// $server->push($frame->fd,json_encode(['code' =>1,'msg'=>'ping']));
// }
//var_dump($frame->fd.$msg);
//echo '来了,说:' . $frame->data . PHP_EOL;//打印到我们终端
// foreach ($server->connections as $fd) {//遍历TCP连接迭代器拿到每个在线的客户端id
// //将客户端发来的消息,推送给所有用户,也可以叫广播给所有在线客户端
// $msg = $frame->data;
// $server->push($fd, json_encode(['no' => $frame->fd, 'msg' => $msg]));
// }
}
/**
* 断开连接回调函数
* @param $server
* @param $fd
*/
public function onClose($server, $fd){
$redisPassWd=$this->passWord;
$redis=$this->connectRedis($redisPassWd,$this->redisPort);
$redis->del("storeIdByFd".$fd);
// echo $fd . '走了' . PHP_EOL;//打印到我们终端
}
}
$obj = new Chat();