Skip to content

消息队列

消息队列概述

在开发应用程序的时候,你可能需要执行一些任务,例如解析和存储一些大型文件,这些任务在Web 请求期间需要耗费很长时间才能执行完成。这时候我们就可以用消息队列来创建在后台处理的队列任务。通过将时间密集型任务移至队列,这样你的应用程序可以以极快的速度响应 Web 请求,并为你的程序提供更好的用户体验。

传统的程序执行流程一般是 即时|同步|串行的,在某些场景下,会存在并发低,吞吐量低,响应时间长等问题。在大型系统中,一般会引入消息队列的组件,将流程中部分任务抽离出来放入消息队列,并由专门的消费者作针对性的处理,从而降低系统耦合度,提高系统性能和可用性。

一般来说,可以抽离的任务具有以下的特点:
允许延后|异步|并行处理 (相对于传统的 即时|同步|串行 的执行方式):

1.允许延后:抢购活动时,先快速缓冲有限的参与人数到消息队列,后续再排队处理实际的抢购业务

2.允许异步:业务处理过程中的邮件,短信等通知

3.允许并行:用户支付成功之后,邮件通知,微信通知,短信通知可以由多个不同的消费者并行执行,通知到达的时间不要求先后顺序

允许失败和重试:

1.强一致性的业务放入核心流程处理

2.无一致性要求或最终一致即可的业务放入队列处理

配置

1.当前队列有三种驱动方式

驱动方式说明
sync同步执行,有新队列任务则通过事件 Event 来直接触发执行,不存储任务,直接执行
database数据库存储,新队列任务数据存储到数据库,队列执行程序再从数据库中读取任务数据
redis(推荐)Redis 存储,新队列任务数据存储到 Redis,队列执行程序再从 Redis 中读取任务数据

2.先打开config下的queue.php文件

php
<?php
return [
    'default'     => 'database',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            // 驱动方式
            'type'          => 'redis',

            'queue'      => 'default',
            // 服务器地址
            'host'          => env('redis.redis_hostname', '127.0.0.1'),
            // 端口
            'port'          => env('redis.port', '6379'),
            // 密码
            'password'      => env('redis.redis_password', ''),
            // 缓存有效期 0表示永久缓存
            'expire'        => 0 ,
            // 缓存前缀
            'prefix'     => 'QUERY',
            // 缓存标签前缀
            'tag_prefix'    => 'QUERY:',
            // 数据库 0号数据库
            'select'        => env('redis.select', 0),
            // 服务端主动关闭
            'timeout'       => 0,

            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'database',// none  不记录失败任务  database  将失败任务迁移到失败任务表
        'table' => 'jobs_failed',
    ],
];

default值:当前启动的队列驱动

消息的消费和删除

1.在app/job目录下创建一个任务消费类 例如:

php
<?php
namespace app\job;

use core\base\BaseJob;

/**
 * 订单延时关闭任务
 */
class OrderClose extends BaseJob
{


    /**
     * 消费
     * @param $order_id 订单id
     * @param $close_time 关闭时间
     * @return true
     */
    protected function doJob($order_id, $close_time)
    {
      	//业务代码
      	//......
        return true;
    }
}

消息的创建与推送

在业务控制器中新增一个控制器,创建一条消息,并推送到队列
1.新增一个控制器\app\adminapi\controller\test\JobTest.php,在该控制器中添加 helloWorldJob 方法,实例化刚刚创建的任务类,并调用dispatch函数

php
<?php
namespace app\adminapi\controller\test;

use app\job\OrderClose;
use core\base\BaseAdminController;
use think\Response;

/**
 * 测试队列任务
 */
class JobTest extends BaseAdminController
{
    /**
     * 创建消息
     * @return Response
     */
    public function helloWorldJob()
    {
        $order_id = 1;//订单id
        $close_time = time();//订单关闭时间
        $is_pushed = OrderClose::dispatch(['order_id' => $order_id, 'close_time' => $close_time]);
        if( $is_pushed !== false ){
            echo date('Y-m-d H:i:s') . " Hello World Job is Pushed.";
        }else{
            echo 'Hello World Job Error.';
        }
    }


}

dispatch函数的参数介绍:

  • action 任务消费类内部的方法, 非必填,不填写或类型为数组的话的话默认会调用doJob函数,如果是数组的话则视为当前值为data参数

  • data 数据传参, 他会将这儿的数组作为可变数量的参数传递给对应函数的给定变量参数

  • secs 延时执行时间(单位为秒),默认为0, 支持延时执行和定时执行,如果为0就是普通的异步任务,如果传值则视为当前时间的*秒后延时执行

  • queue_name 当前任务归属的队列名称,如果为新队列,会自动创建,结合命令行使用

  • is_async 默认为true,当前任务是否需要异步执行,最高优先级

2.新建一个路由配置文件\app\adminapi\route\job.php

php
<?php
use think\facade\Route;

Route::group('job', function () {
    //获取本地插件
    Route::get('test', 'test.JobTest/helloWorldJob');
});

至此,所有的代码都已准备完毕

发布任务

在浏览器中访问 http://your.project.com/adminapi/job/test ,可以看到消息推送成功。

处理任务

切换当前终端窗口的目录到项目niucloud目录下,执行

php think queue:listen

1.命令行参数

php
php think queue:listen\
--queue  helloJobQueue\         #监听的队列的名称
--delay  0\                     #如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128\                   #该进程允许使用的内存上限,以 M 为单位
--sleep  3\                     #如果队列中无任务,则多长时间后重新检查,默认为3秒
--tries  0\                     #如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60                    #work进程允许执行的最长时间,以秒为单位

这儿需要重点说明的是

  • --queue 参数 监听的队列名称,结合任务创建时的队列名来使用

  • --tries 任务失败几次后进入失败数理逻辑,防止任务一直重复执行

2.为了稳定要放在消息队列Supervisor下运行
至此,我们成功地完成了一个消息的 创建 -> 推送 -> 消费 -> 删除 的流程

计划任务字典定义

app/dict/schedule/ScheduleDict.php定义了计划任务的类型、状态和执行频率等常量:

php
namespace app\dict\schedule;

class ScheduleDict
{
    public const CRON = 'cron';//定时任务
    public const CROND = 'crond';//周期任务
    
    public const ON = 1;
    public const OFF = 2;
    public const MIN = 'min';
    public const HOUR = 'hour';
    public const DAY = 'day';
    public const WEEK = 'week';
    public const MONTH = 'month';
    
    // 获取任务模式
    public static function getType()
    {
        return [
            self::CRON => get_lang('dict_schedule.type_cron'),//定时任务
            self::CROND => get_lang('dict_schedule.type_crond'),//周期任务
        ];
    }
    
    // 获取任务启用状态
    public static function getStatus()
    {
        return [
            self::ON => get_lang('dict_schedule.on'),//启用
            self::OFF => get_lang('dict_schedule.off'),//关闭
        ];
    }
    
    // 获取日期类型
    public static function getDateType()
    {
        return [
            self::MIN => get_lang('dict_schedule.min'),
            self::HOUR => get_lang('dict_schedule.hour'),
            self::DAY => get_lang('dict_schedule.day'),
            self::WEEK => get_lang('dict_schedule.week'),
            self::MONTH => get_lang('dict_schedule.month'),
        ];
    }
}

计划任务配置定义

schedule.php文件定义了系统中的默认计划任务配置,包含了各个任务的执行时间、对应的处理类等信息。

计划任务实现类

在app/job/schedule/目录下实现了多个具体的计划任务类,所有任务类都继承自core/base/BaseJob基类。

海报和二维码文件清理

AutoClearPosterAndQrcode类用于定期清理系统生成的临时海报和二维码文件

计划任务日志清理

AutoClearScheduleLog类用于定期清理计划任务的执行日志

订单关闭任务

OrderClose类用于处理订单超时关闭的业务逻辑

站点到期自动关闭

SiteExpireClose类用于自动关闭到期的站点

计划任务配置

注意:计划任务配置前提条件,安装了Redis扩展

如果未安装可在宝塔侧边栏软件商店,搜索下载即可

1. 宝塔软件商店搜索 Supervisor 并安装

2.添加守护进程

2.1 安装完成之后,点击设置-添加守护进程

2.2 进程名称请使用英文,暂不支持中文!启动用户选择WWW,运行目录选择程序的根目录下的niucloud(站点名/niucloud),输入下方命令,保存即可。

 php think workerman

注意:使用的启动命令需在PHP8.0及以上环境运行才可生效

2.3 添加成功

注意:每一次上传代码后,都必须重启计划任务!!!

如果计划任务启动失败,试着操作以下步骤解决

1、将启动用户改为root

2、解除以下禁用函数(pcntl_signa_dispatch、pcntl_signal、pcntl_alarm、pcntl_fork、pcntl_wait)

3、如果你的redis设置过单独的密码,那要在niucloud/.env文件中也添加redis密码,如果没有配置过则忽略这一步。

任务命令行小知识:

ctrl+C结束终端进程

php think workerman stop 停止任务

php think workerman 启动计划任务

基于 MIT 协议发布