This post was updated 995 days ago and some of the ideas may be out of date.

“终于不用再花钱买聊天软件啦”
最近在试着使用一些swoole框架,于是突发奇想写个客服聊天的小Demo

本项目使用Hyperf框架、WebSocket和Swoole共享内存实现了如下功能。

  1. 若当前有客服在线并且空闲,直接进入聊天
  2. 一名客服同时只能和一个客户聊天
  3. 客户若是超时未回复消息,自动退出聊天
  4. 若当前有客服在线并且都在聊天中,则分配一个排队人数最少的客服并且进入排队
  5. 若是当前无客服在线,直接返回'当前暂无客服上班',不进入排队
  6. 客户池以及客服池存入多线程共享内存,并且使用共享资源锁保 SWOOLE_MUTEX 保证多线程数据安全

以下是一个基本的代码实现,朋友们可以根据需要进行调整。例如使用mysql增加客服客户身份校验,以及聊天消息留存,这些在这里就不做示范。

安装Hyperf框架和依赖组件:

composer create-project hyperf/hyperf-skeleton chat_demo
cd chat_demo
composer require hyperf/websocket-server

这里需要注意的是,不分国内composer源并未更新库(如阿里源),所以无法安装最新的Hyperf,在这里我更换为华为源,这样默认安装的Hyperf是3.0版本

composer config --global repo.packagist composer https://mirrors.huaweicloud.com/repository/php/

Hyperf 安装选项一路回车即可,若是需要关联数据库,可在对应的选项打y

修改配置文件 config/autoload/server.php 中的 servers,启用WebSocket服务器

'servers' => [
        [
            'name' => 'ws',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 9502,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose']
            ]
        ],
    ],

在Hyperf的入口文件 bin/hyperf.php 中创建共享内存对象

use Swoole\Table;

// 创建共享内存实例
$table = new Table(2048);
$table->column('onlineStaff', Table::TYPE_STRING, 1024);
$table->column('customers', Table::TYPE_STRING, 1024);
$table->create();

// 将 Table 实例绑定到容器中,以便在 WebSocket 服务中使用
$container->set(Table::class, $table);

创建一个 WebSocketController 控制器来处理 WebSocket 连接和消息

<?php

namespace App\Controller;

use App\Service\ChatService;
use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Logger\LoggerFactory;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Http\Response;
use Swoole\WebSocket\Server;
use Swoole\WebSocket\Frame;

class WebSocketController implements OnMessageInterface, OnCloseInterface
{

    /**
     * 聊天服务
     * @Inject
     * @var ChatService
     */
    private ChatService $chatService;

    /**
     * 服务容器
     * @var ContainerInterface
     */
    protected ContainerInterface $container;

    /**
     * 日志类
     * @var LoggerInterface
     */
    protected LoggerInterface $logger;

    public function __construct(ContainerInterface $container, LoggerFactory $loggerFactory)
    {
        $this->container = $container;
        $this->chatService = $container->get(ChatService::class);
        $this->logger = $loggerFactory->get('log', 'default');
    }

    /**
     * 消息推送监听
     *
     * @param Response|Server $server
     * @param Frame $frame
     * @return void
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public function onMessage($server, $frame): void
    {
        $data = json_decode($frame->data, true);
        $action = $data['action'] ?? null;

        if ($action === 'staffOnline') {
            // 客服上线
            $this->handleStaffOnline($frame->fd, $data['payload']['name'] ?? '客服');
        } elseif ($action === 'customerConnect') {
            // 客户请求建立连接
            $this->handleCustomerConnect($frame->fd);
        } elseif ($action === 'message') {
            // 消息互发
            $message = $data['payload']['message'] ?? '';
            $this->handleMessage($frame->fd, $message);
        }
    }

    /**
     * 对象断开连接回调
     *
     * @param Response|Server $server
     * @param int $fd
     * @param int $reactorId
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    public function onClose($server, int $fd, int $reactorId): void
    {
        // 处理客服或者客户断开连接的逻辑
        $this->logger->info('断开连接触发:'. $fd. "|" . $reactorId);
        $customer = $this->chatService->getCustomer($fd);
        if ($customer) {
            $this->chatService->removeCustomer($fd);
        }

        $staff = $this->chatService->getStaff($fd);
        if ($staff) {
            $this->chatService->staffOffline($fd);
        }
    }

    /**
     * 客户连接逻辑处理
     *
     * @param int $fd
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    private function handleCustomerConnect(int $fd): void
    {
        // 在此可自行增加参数实现客户身份校验
        $this->chatService->addCustomer($fd);
        $availableData = $this->chatService->findAvailableStaff();
        $this->logger->info('匹配结果:'. json_encode($availableData));
        // selectedStaffFd 为null则说明无在线客服
        if ($availableData['selectedStaffFd'] === null) {
            $this->container->get(Sender::class)->push($fd, json_encode([
                'action' => 'message',
                'payload' => [
                    'message' => '当前暂无客服上班',
                ],
            ]));
            $this->chatService->removeCustomer($fd);
            return;
        }

        // minQueue 不为null则说明需要排队
        if ($availableData['minQueue'] !== null) {
            $this->chatService->setCustomerQueueUp($fd, $availableData['selectedStaffFd']);
            $this->container->get(Sender::class)->push($fd, json_encode([
                'action' => 'message',
                'payload' => [
                    'message' => '当前暂无空闲客服,已为您自动分配排队较少客服,请等待排队',
                    'queue' => $availableData['minQueue'] + 1
                ],
            ]));
            return;
        }

        $this->chatService->setStaffForCustomer($fd, $availableData['selectedStaffFd']);
        $this->chatService->resetCustomerTimer($fd, 60000); // 设置超时为1分钟(60000毫秒)

        $this->container->get(Sender::class)->push($fd, json_encode([
            'action' => 'message',
            'payload' => [
                'message' => '您已连接到客服',
            ],
        ]));
    }

    /**
     * 客服断开连接
     *
     * @param int $fd
     * @param string $name
     * @return void
     */
    private function handleStaffOnline(int $fd, string $name): void
    {
        // 若是数据库的话,可将name替换成客服登录token之类的,实现登录校验
        $this->chatService->staffOnline($fd, $name);
    }

    /**
     * 消息处理逻辑
     *
     * @param int $fd
     * @param string $message
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    private function handleMessage(int $fd, string $message): void
    {
        $customer = $this->chatService->getCustomer($fd);
        if ($customer && $customer['staff_fd']) {
            $staff = $this->chatService->getStaff($customer['staff_fd']);
            // 判断是否还在排队
            if ($staff['customer_fd'] != $fd) {
                $this->container->get(Sender::class)->push($fd, json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '当前正在排队中',
                    ],
                ]));
            } else {
                // 如果是客户发来的消息,则转发给客服
                $this->container->get(Sender::class)->push($customer['staff_fd'], json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => $message,
                        'from' => 'customer',
                    ],
                ]));

                // 重置客户的超时计时器
                $this->chatService->resetCustomerTimer($fd, 60000);
            }
            // 在此可实现聊天记录留存
            // ...
        } else {
            $staff = $this->chatService->getStaff($fd);
            if ($staff && $staff['customer_fd']) {
                // 如果是客服发来的消息,则转发给客户
                $this->container->get(Sender::class)->push($staff['customer_fd'], json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => $message,
                        'from' => 'staff',
                    ],
                ]));
            }
            // 在此可实现聊天记录留存
            // ...
        }
    }

}

创建一个 ChatService 类来处理聊天逻辑

<?php

namespace App\Service;

use Hyperf\Logger\LoggerFactory;
use Hyperf\WebSocketServer\Sender;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use Swoole\Table;

class ChatService
{

    /**
     * 日志类
     * @var LoggerInterface
     */
    private LoggerInterface $logger;

    /**
     * swoole 共享内存
     * @var Table
     */
    private Table $table;

    /**
     * 服务容器
     * @var ContainerInterface
     */
    private ContainerInterface $container;

    private \Swoole\Lock $lock;

    public function __construct(ContainerInterface $container, LoggerFactory $loggerFactory, Table $table)
    {
        // 设置共享锁,用于保护多线程的共享资源读写安全
        $this->lock = new \Swoole\Lock(SWOOLE_MUTEX);
        $this->logger = $loggerFactory->get('log', 'default');
        $this->table = $table;
        $this->container = $container;
    }

    /**
     * 客服上线
     *
     * @param int $fd
     * @param string $name
     * @return void
     */
    public function staffOnline(int $fd, string $name)
    {
        $this->saveData('onlineStaff', $fd, [
            'name' => $name,
            'customer_fd' => null,
            'queue' => []
        ]);
    }

    /**
     * 客服下线
     *
     * @param int $fd
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    public function staffOffline(int $fd): void
    {
        $staff = $this->findData('onlineStaff', $fd);
        if ($staff) {
            $this->removeAllCustomer($staff['customer_fd'], $staff['queue']);
            $this->del('onlineStaff', $fd);
        }
    }

    /**
     * 新建客户
     *
     * @param int $fd
     * @return void
     */
    public function addCustomer(int $fd): void
    {
        $this->saveData('customers', $fd, [
            'staff_fd' => null,
            'timeout_timer' => null,
        ]);
        $this->logger->info(json_encode($this->findData('customers')));
    }

    /**
     * 移除客户
     *
     * @param int $fd
     * @return void
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public function removeCustomer(int $fd): void
    {
        $customer = $this->findData('customers', $fd);
        if ($customer && $customer['staff_fd']) {
            $this->freeStaff($customer['staff_fd'], $fd);
        }
        if ($customer && $customer['timeout_timer']) {
            // 清除计时器
            swoole_timer_clear($customer['timeout_timer']);
        }
        $this->del('customers', $fd);
    }

    /**
     * 移除所有跟下线客服关联的客户
     *
     * @param int $fd
     * @param array $queue
     * @return void
     * @throws \Psr\Container\ContainerExceptionInterface
     * @throws \Psr\Container\NotFoundExceptionInterface
     */
    public function removeAllCustomer(int $fd, array $queue): void
    {
        $this->container->get(Sender::class)->push($fd, json_encode([
            'action' => 'chat_close',
            'payload' => [
                'message' => '当前客服已下线,聊天结束',
            ],
        ]));
        $this->del('customers', $fd);
        foreach ($queue as $q_fd) {
            $this->container->get(Sender::class)->push($q_fd, json_encode([
                'action' => 'queue_close',
                'payload' => [
                    'message' => '当前客服已下线,是否连接新的客服?',
                ],
            ]));
            $this->del('customers', $q_fd);
        }
    }

    /**
     * 建立聊天通道
     *
     * @param int $customerFd
     * @param int $staffFd
     * @return void
     */
    public function setStaffForCustomer(int $customerFd, int $staffFd): void
    {
        $this->bindStaffToCustomer($customerFd, $staffFd);

        $staff = $this->findData('onlineStaff', $staffFd);
        $staff['customer_fd'] = $customerFd;
        $this->saveData('onlineStaff', $staffFd, $staff);
    }

    /**
     * 客户加入排队
     *
     * @param int $customerFd
     * @param int $staffFd
     * @return void
     */
    public function setCustomerQueueUp(int $customerFd, int $staffFd): void
    {
        $this->bindStaffToCustomer($customerFd, $staffFd);

        $staff = $this->findData('onlineStaff', $staffFd);
        $staff['queue'][] = $customerFd;
        $this->saveData('onlineStaff', $staffFd, $staff);
    }

    /**
     * 聊天结束释放客服,并且更新排队信息
     *
     * @param int $staffFd
     * @param int $fd
     * @return void
     * @throws ContainerExceptionInterface
     * @throws NotFoundExceptionInterface
     */
    public function freeStaff(int $staffFd, int $fd): void
    {
        $staff = $this->findData('onlineStaff', $staffFd);
        if ($fd === $staff['customer_fd']) {
            if (count($staff['queue']) > 0) {
                $staff['customer_fd'] = $staff['queue'][0];

                $this->container->get(Sender::class)->push($staff['customer_fd'], json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '排队结束',
                    ],
                ]));
                $this->resetCustomerTimer($staff['customer_fd'], 60000); // 设置超时为1分钟(60000毫秒)

                $this->container->get(Sender::class)->push($staffFd, json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '新客户已接入,可以开始聊天了',
                    ],
                ]));

                unset($staff['queue'][0]);
                $staff['queue'] = array_values($staff['queue']);
                foreach ($staff['queue'] as $key => $customerFd) {
                    $this->container->get(Sender::class)->push($customerFd, json_encode([
                        'action' => 'message',
                        'payload' => [
                            'message' => '剩余排队人数: ' . ((int)$key + 1),
                        ],
                    ]));
                }
            } else {
                $staff['customer_fd'] = null;
            }
        } else {
            unset($staff['queue'][array_search($fd, $staff['queue'])]);
            $staff['queue'] = array_values($staff['queue']);
            foreach ($staff['queue'] as $key => $customerFd) {
                $this->container->get(Sender::class)->push($customerFd, json_encode([
                    'action' => 'message',
                    'payload' => [
                        'message' => '剩余排队人数: ' . ((int)$key + 1),
                    ],
                ]));
            }
        }

        $this->saveData('onlineStaff', $staffFd, $staff);
    }

    /**
     * 重置计时器
     *
     * @param int $fd
     * @param int $timeout
     * @return void
     */
    public function resetCustomerTimer(int $fd, int $timeout): void
    {
        $customer = $this->findData('customers', $fd);
        if ($customer) {
            if ($customer['timeout_timer']) {
                swoole_timer_clear($customer['timeout_timer']);
            }
            $customer['timeout_timer'] = swoole_timer_after($timeout, function () use ($fd) {
                $this->removeCustomer($fd);
            });
            $this->saveData('customers', $fd, $customer);
        }
    }

    /**
     * 获取可聊天/可排队的客服
     *
     * @return array
     */
    public function findAvailableStaff(): array
    {
        $minQueue = null;
        $selectedStaffFd = null;

        $onlineStaff = $this->findData('onlineStaff');
        $freeStaff = [];
        $queueStaff = [];
        foreach ($onlineStaff as $fd => $staff) {
            $staff['fd'] = $fd;
            // 若有空闲客服直接进入聊天
            if ($staff['customer_fd'] === null && count($staff['queue']) === 0) {
                $freeStaff[] = $staff;
            } else {
                $queueStaff[] = $staff;
            }
        }
        if (count($freeStaff) > 0) {
            $selectedStaffFd = $freeStaff[0]['fd'];
        }

        if (count($freeStaff) === 0 && count($queueStaff) > 0) {
            array_multisort(array_column($queueStaff, 'queue'), SORT_ASC, $queueStaff);
            $selectedStaffFd = $queueStaff[0]['fd'];
            $minQueue = count($queueStaff[0]['queue']);
        }
        return compact('minQueue', 'selectedStaffFd');
    }

    /**
     * 获取客户信息
     *
     * @param int $fd
     * @return array|null
     */
    public function getCustomer(int $fd): ?array
    {
        return $this->findData('customers', $fd);
    }

    /**
     * 获取客服信息
     *
     * @param int $fd
     * @return array|null
     */
    public function getStaff(int $fd): ?array
    {
        return $this->findData('onlineStaff', $fd);
    }

    /**
     * 给客户分配客服
     *
     * @param int $customerFd
     * @param int $staffFd
     * @return void
     */
    private function bindStaffToCustomer (int $customerFd, int $staffFd): void
    {
        $customer = $this->findData('customers', $customerFd);
        $this->logger->info('查询客户数据'. json_encode($customer));
        $customer['staff_fd'] = $staffFd;
        $this->logger->info('修改客户数据'. json_encode($customer));
        $this->saveData('customers', $customerFd, $customer);
    }

    /**
     * 从共享内存查询数据
     *
     * @param string $key
     * @param int|null $id
     * @return mixed|null
     */
    private function findData(string $key, int $id = null): mixed
    {
        $this->lock->lock();
        $data = json_decode($this->table->get('user', $key), true);
        $this->lock->unlock();
        if ($id) {
            return $data[$id] ?? null;
        } else {
            return $data ?? [];
        }
    }

    /**
     * 更新数据到共享内存
     *
     * @param string $key
     * @param int $id
     * @param array $body
     * @return void
     */
    private function saveData(string $key, int $id, array $body): void
    {
        $this->lock->lock();
        $data = json_decode($this->table->get('user', $key), true);
        $data[$id] = $body;
        $this->table->set('user', [$key => json_encode($data)]);
        $this->lock->unlock();
    }

    /**
     * 从共享内存中删除数据
     *
     * @param string $key
     * @param int $id
     * @return void
     */
    private function del(string $key, int $id): void
    {
        $this->lock->lock();
        $data = json_decode($this->table->get('user', $key), true);
        unset($data[$id]);
        $this->table->set('user', [$key => json_encode($data)]);
        $this->lock->unlock();
    }
}

创建路由

Router::addServer('ws', function () {
    Router::get('/', 'App\Controller\WebSocketController');
});

运行项目并且安装命令行请求工具

运行项目

php bin/hyperf.php start

安装webscoat

sudo apt update
sudo apt install curl
sudo apt install cargo
cargo install websocat

发起请求

websocat ws://localhost:9502

内容json

// 客服上线
{"action":"staffOnline","payload":{"name":"客服1"}}
// 客户连接
{"action":"customerConnect","payload":{}}
// 发送消息
{"action":"message","payload":{"message":"一条消息"}}

最终效果

最后附上demo代码仓库链接:https://github.com/dyjh/chat_demo