简单的tp6队列queue闭环
php实现队列,遵循标准的生产者和消费者模型,可以使用控制器生产,然后定义一个消费者使用shell监控执行即可实现闭环。
首先,需要使用composer安装这个库composer require topthink/think-queue
;
然后,将config/queue.php
的配置同步sync改为redis
驱动;
源码参阅:https://github.com/top-think/think-queue
生产者控制器的方法
<?php
declare (strict_types=1);
namespace app\expay\controller;
use app\expay\BaseController;
use think\facade\Queue;
class Job extends BaseController
{
/**
* 生产者
*/
public function producer()
{
$producer = '';//生产者
$consumer = 'app\expay\job\Consumer';//消费类,队列任务对应
$queueData = ['email' => 'abc@qq.com'];
$queueName = 'imQueueName';//队列唯一名称
//新名称会重新创建队列,默认fire,否则为消费类@任务名,如app\expay\job\Consumer@otname
//立即执行,进队列
$result = Queue::push($consumer, $queueData, $queueName);
//延迟后,进队列
//Queue::later(3, $consumer, $queueData, $queueName);
//php think queue:listen
//php think queue:work
//php think queue:work --queue imQueueName
//php think queue:work --queue imQueueName --daemon
dump(sprintf('Queue::push()=%s', $result));
}
}
消费者的定义
<?php
declare (strict_types=1);
namespace app\expay\job;
use think\queue\Job;
/**
* 消费者Class Consumer
* @package app\expay\job
*/
class Consumer
{
public function fire(Job $job, $data)
{
//执行任务操作一下,返回执行状态
$works = $job->attempts();
dump(sprintf('%s 消费任务start(第%s次)', $job->getName(), $works));
$ok = rand(0, 1);
dump('任务执行ing...');
if ($ok) {
//任务执行成功
dump('任务执行成功,该任务将被删除');
$job->delete();
} else {
//任务执行失败
if ($works >= 3) {
//如果attempts超过3次就删除?
dump('失败超限,异常结束');
//$job->failed();
$job->delete();
} else {
//执行少于n次就重新加入队列
dump('失败了重发任务');
$job->release();
}
}
dump(sprintf('%s 消费任务end', $job->getName()));
}
public function failed($data)
{
//任务达到最大重试次数后失败了
}
}
自动检查启动队列脚本
一个脚本,不需要supervisor那么复杂,
如果因为windows和linux的换行问题,一个命令:sed -i "s/\r//" queue.sh
然后直接sh queue.sh
即可,或者使用crontab方式:*/1 * * * * sh /datas/website/com.xxx/docs/scripts/queue.sh queueName>/dev/null 2>&1 &
queue.sh脚本内容:
#!/bin/bash
BASEDIR=$(dirname $(dirname $(cd $(dirname ${BASH_SOURCE[0]}); pwd)));
PHPWORK="/usr/local/php/bin/php ${BASEDIR}/think queue:work --queue"
echo ""
echo `date +"%Y-%m-%d %H:%M:%S"` "正在校验 ${1} 队列...";
if [[ $1 ]]; then
TASKNUM=$(ps -ef |grep "${PHPWORK} ${1}"|grep -v grep|wc -l)
if [[ $TASKNUM = 0 ]] ;then
echo "找不到 ${1} ,正在启动队列..."
echo "执行:${PHPWORK} ${1}>> ${BASEDIR}/runtime/queue_${1}_working.log &"
${PHPWORK} ${1}>> ${BASEDIR}/runtime/queue_${1}_working.log &
echo "队列开启成功"
else
echo "发现${TASKNUM}任务,执行:ps -ef |grep \"${PHPWORK} ${1}\"|grep -v grep"
ps -ef |grep "${PHPWORK} ${1}"|grep -v grep
echo "TASK runing..."
fi
else
echo "USAGE:"
echo "example:sh PATHTO/docs/scripts/queue.sh QUEUENAME"
echo "source:php think queue:work --queue QUEUENAME"
echo "BASEDIR=${BASEDIR}"
echo "PHPWORK=${PHPWORK} QUEUENAME"
echo "所有队列:ps -ef |grep \"${PHPWORK}\"|grep -v grep"
ps -ef |grep "${PHPWORK}"|grep -v grep
fi
echo ""