简单的tp6队列queue闭环


php实现队列,遵循标准的生产者和消费者模型,可以使用控制器生产,然后定义一个消费者使用shell监控执行即可实现闭环。

首先,需要使用composer安装这个库composer require topthink/think-queue
然后,将config/queue.php的配置同步sync改为redis驱动;

源码参阅:https://github.com/top-think/think-queue

生产者控制器的方法

<?php
declare (strict_types=1);

namespace app\expay\controller;

use app\expay\BaseController;
use think\facade\Queue;

class Job extends BaseController
{
    /**
     * 生产者
     */
    public function producer()
    {
        $producer = '';//生产者
        $consumer = 'app\expay\job\Consumer';//消费类,队列任务对应

        $queueData = ['email' => 'abc@qq.com'];
        $queueName = 'imQueueName';//队列唯一名称
        //新名称会重新创建队列,默认fire,否则为消费类@任务名,如app\expay\job\Consumer@otname


        //立即执行,进队列
        $result = Queue::push($consumer, $queueData, $queueName);

        //延迟后,进队列
        //Queue::later(3, $consumer, $queueData, $queueName);

        //php think queue:listen
        //php think queue:work
        //php think queue:work --queue imQueueName
        //php think queue:work --queue imQueueName --daemon


        dump(sprintf('Queue::push()=%s', $result));
    }

}

消费者的定义

<?php
declare (strict_types=1);

namespace app\expay\job;

use think\queue\Job;

/**
 * 消费者Class Consumer
 * @package app\expay\job
 */
class Consumer
{

    public function fire(Job $job, $data)
    {
        //执行任务操作一下,返回执行状态
        $works = $job->attempts();
        dump(sprintf('%s 消费任务start(第%s次)', $job->getName(), $works));

        $ok = rand(0, 1);
        dump('任务执行ing...');
        if ($ok) {
            //任务执行成功
            dump('任务执行成功,该任务将被删除');
            $job->delete();

        } else {
            //任务执行失败
            if ($works >= 3) {
                //如果attempts超过3次就删除?
                dump('失败超限,异常结束');
                //$job->failed();
                $job->delete();
            } else {
                //执行少于n次就重新加入队列
                dump('失败了重发任务');
                $job->release();
            }

        }

        dump(sprintf('%s 消费任务end', $job->getName()));
    }


    public function failed($data)
    {
        //任务达到最大重试次数后失败了

    }
}

自动检查启动队列脚本

一个脚本,不需要supervisor那么复杂,
如果因为windows和linux的换行问题,一个命令:sed -i "s/\r//" queue.sh
然后直接sh queue.sh即可,或者使用crontab方式:
*/1 * * * * sh /datas/website/com.xxx/docs/scripts/queue.sh queueName>/dev/null 2>&1 &

queue.sh脚本内容:

#!/bin/bash

BASEDIR=$(dirname $(dirname $(cd $(dirname ${BASH_SOURCE[0]}); pwd)));
PHPWORK="/usr/local/php/bin/php ${BASEDIR}/think queue:work --queue"
echo ""
echo `date +"%Y-%m-%d %H:%M:%S"` "正在校验 ${1} 队列...";
if [[ $1 ]]; then
    TASKNUM=$(ps -ef |grep "${PHPWORK} ${1}"|grep -v grep|wc -l)
    if [[ $TASKNUM = 0 ]] ;then
        echo "找不到 ${1} ,正在启动队列..."
        echo "执行:${PHPWORK} ${1}>> ${BASEDIR}/runtime/queue_${1}_working.log &"
        ${PHPWORK} ${1}>> ${BASEDIR}/runtime/queue_${1}_working.log &
        echo "队列开启成功"
    else
        echo "发现${TASKNUM}任务,执行:ps -ef |grep \"${PHPWORK} ${1}\"|grep -v grep"
        ps -ef |grep "${PHPWORK} ${1}"|grep -v grep
        echo "TASK runing..."
    fi

else
    echo "USAGE:"
    echo "example:sh PATHTO/docs/scripts/queue.sh QUEUENAME"
    echo "source:php think queue:work --queue QUEUENAME"
    echo "BASEDIR=${BASEDIR}"
    echo "PHPWORK=${PHPWORK} QUEUENAME"
    echo "所有队列:ps -ef |grep \"${PHPWORK}\"|grep -v grep"
    ps -ef |grep "${PHPWORK}"|grep -v grep
fi

echo ""

原文链接:https://blog.yongit.com/note/715981.html