消息队列

消息队列

说明

将之前在定时任务和事件里面的众多方法转移消息队列中,提升系统运行速度。

文件目录结构

├─app        CREMB核心类库
│  ├─jobs      消息队列

Queue三种使用方式

消息队列类名::dispatch('参数');  单任务模式,直接请求此消息队列类的doJob方法
消息队列类名::dispatchDo('方法名称', ‘参数’); 子任务模式,请求此消息队列类的对应方法
消息队列类名::dispatchSece(’int类型的秒数‘, '方法名称', ‘参数’); 延迟队列模式,多少秒后请求此消息队列类中的对应方法

子任务和单任务

子任务

//下载商品轮播图
foreach ($slider_image as $s_image) {
    ProductCopyJob::dispatchDo('copySliderImage', [$res->id, $s_image, count($slider_image)]);
}
/**
 * 复制商品
 * Class ProductCopyJob
 * @package app\jobs
 */
class ProductCopyJob extends BaseJobs
{
    use QueueTrait;

    /**
     * 下载商品轮播图片
     * @param $id
     * @return bool
     */
    public function copySliderImage($id, $image, $count)
    {
        try {
            /** @var CopyTaobaoServices $copyTaobao */
            $copyTaobao = app()->make(CopyTaobaoServices::class);
            /** @var StoreProductServices $StoreProductServices */
            $StoreProductServices = app()->make(StoreProductServices::class);
            /** @var StoreProductAttrValueServices $StoreProductAttrValueServices */
            $StoreProductAttrValueServices = app()->make(StoreProductAttrValueServices::class);
            //下载图片
            $res = $copyTaobao->downloadCopyImage($image);
            //获取缓存中的轮播图
            $slider_images = CacheService::getTokenBucket('slider_images_' . $id);
            //缓存为null则赋值[]
            if ($slider_images === null) $slider_images = [];
            //将下载的图片插入数组
            array_push($slider_images, $res);
            //如果$slider_images中图片数量和传入的$count相等,说明已经下载完成,写入商品表,如果不等则继续插入缓存
            if (count($slider_images) == $count) {
                CacheService::clearToken('slider_images_' . $id);
                $image = $slider_images[0];
                $slider_images = $slider_images ? json_encode($slider_images) : '';
                $StoreProductServices->update($id, ['slider_image' => $slider_images, 'image' => $image]);
                $StoreProductAttrValueServices->update(['product_id' => $id], ['image' => $image]);
            } else {
                CacheService::setTokenBucket('slider_images_' . $id, $slider_images);
            }
        } catch (\Throwable $e) {
            Log::error('下载商品轮播图片失败,失败原因:' . $e->getMessage());
        }
        return true;
    }
}

单任务

//推广新人 处理自己、上级分销等级升级
AgentJob::dispatch([$uid]);
/**
 * 检测分销员等级升级
 * Class OrderJob
 * @package crmeb\jobs
 */
class AgentJob extends BaseJobs
{
    use QueueTrait;

    /**
     * 执行检测升级
     * @param $order
     * @return bool
     */
    public function doJob(int $uid)
    {
        //检测分销员等级升级
        try {
            //商城分销是否开启
            if (!sys_config('brokerage_func_status')) {
                return true;
            }
            /** @var UserServices $userServices */
            $userServices = app()->make(UserServices::class);
            $userInfo = $userServices->getUserInfo($uid);
            if (!$userInfo) {
                return true;
            }
            //获取上级uid || 开启自购返回自己uid
            $spread_uid = $userServices->getSpreadUid($uid, $userInfo);
            $two_spread_uid = 0;
            if ($spread_uid > 0 && $one_user_info = $userServices->getUserInfo($spread_uid)) {
                $two_spread_uid = $userServices->getSpreadUid($spread_uid, $one_user_info, false);
            }
            $uids = array_unique([$uid, $spread_uid, $two_spread_uid]);

            /** @var AgentLevelServices $agentLevelServices */
            $agentLevelServices = app()->make(AgentLevelServices::class);
            //检测升级
            $agentLevelServices->checkUserLevelFinish($uid, $uids);

            return true;
        } catch (\Throwable $e) {
            Log::error('检测分销等级升级失败,失败原因:' . $e->getMessage());
        }
    }
}

延迟队列

未支付10分钟后发送短信
UnpaidOrderSend::dispatchSece(600, [$orderId]);