Redisson实现延迟队列

2023-01-05

Redisson实现延迟队列

本文是对redisson的delayedQueue延迟队列进行封装,做到开箱即用,使用延迟队列解决某些情况下定时任务的局限性。

1.引入Redission包

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.17.6</version>
</dependency>
<!-- 序列化所需要的fastjson2 -->
<dependency>
  <groupId>com.alibaba.fastjson2</groupId>
  <artifactId>fastjson2</artifactId>
  <version>2.0.19</version>
</dependency>

2.配置Redis连接

spring:
	# ....
  # redis 配置
  redis:
    host: 127.0.0.1
    port: 6379
    # password:

3.项目启动时开启监听类RedisDelayedQueueInit

  • 这里要注意使用redissonClient.getDelayedQueue(blockingFairQueue);解决项目重启后无法正常消费旧消息的问题
package com.boot.hodgepodge.redisson.config;

import com.alibaba.fastjson2.JSONObject;
import com.boot.hodgepodge.redisson.listener.RedisDelayedQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @Author: huanghwh
 * @Date: 2023/01/05 上午9:10
 * @Description:
 */
@Slf4j
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {

  @Autowired
  RedissonClient redissonClient;

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
    for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
      String listenerName = taskEventListenerEntry.getValue().getClass().getName();
      startThread(listenerName, taskEventListenerEntry.getValue());
    }
  }


  /**
     * 启动线程获取队列*
     *
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务回调监听
     * @param <T>                       泛型
     * @return
     */
  private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
    RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
    //由于此线程需要常驻,可以新建线程,不用交给线程池管理
    Thread thread = new Thread(() -> {
      log.info("启动监听队列线程" + queueName);
      while (true) {
        // 解决项目重新启动并不会消费之前队列里的消息
        redissonClient.getDelayedQueue(blockingFairQueue);
        try {
          T t = blockingFairQueue.take();
          log.info("监听队列线程 {},获取到值:{}", queueName, JSONObject.toJSONString(t));
          new Thread(() -> {
            redisDelayedQueueListener.invoke(t);
          }).start();
        } catch (Exception e) {
          log.info("监听队列线程错误,", e);
          try {
            Thread.sleep(10000);
          } catch (InterruptedException ex) {

          }
        }
      }
    });
    thread.setName(queueName);
    thread.start();
  }
}

4.新增队列监听接口RedisDelayedQueueListener

  • 每条队列都可通过实现这一接口进行监听
package com.boot.hodgepodge.redisson.listener;

/**
 * @Author: huanghwh
 * @Date: 2023/01/05 上午9:18
 * @Description: 队列事件监听接口,需要实现这个方法
 */
public interface RedisDelayedQueueListener<T> {

    /**
     * 执行方法
     *
     * @param t
     */
    void invoke(T t);

}

5.新增队列监听实现类

  • 实现队列监听接口,可理解为一个监听类为一条队列
  • 通过此类消费到期的数据
package com.boot.hodgepodge.redisson.listener;

import com.boot.hodgepodge.redisson.entity.TaskBodyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @Author: huanghwh
 * @Date: 2023/01/05 上午9:23
 * @Description: 监听器
 */
@Component
@Slf4j
public class TestListener implements RedisDelayedQueueListener<TaskBodyDto> {

    @Override
    public void invoke(TaskBodyDto taskBodyDto) {
        // 这里调用你延迟之后的代码
        log.info("----------> 执行...." + taskBodyDto.getBody() + "===" + taskBodyDto.getName());
    }

}

6.队列工具类RedisDelayedQueue

  • 调用此工具类向队列中插入数据
package com.boot.hodgepodge.redisson.util;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @Author: huanghwh
 * @Date: 2023/01/05 上午9:22
 * @Description:
 */
@Slf4j
@Component
public class RedisDelayedQueue {

  @Autowired
  RedissonClient redissonClient;


  /**
     * 添加队列
     *
     * @param t        DTO传输类
     * @param delay    时间数量
     * @param timeUnit 时间单位
     * @param <T>      队列监听实现类listener
     */
  public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
    log.info("添加队列{},delay:{},timeUnit:{}" + queueName, delay, timeUnit);
    RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
    RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
    delayedQueue.offer(t, delay, timeUnit);
  }

}

7.模拟测试

  • 调用队列工具类向指定队列中插入数据
  • addQueue(实体, 延迟时间, 时间单位, 队列监听实现类-可理解为队列名称)
@GetMapping("/add")
public void addQueue() {
  TaskBodyDto taskBody10 = new TaskBodyDto();
  taskBody10.setBody("测试DTO实体类的BODY,10秒之后执行");
  taskBody10.setName("测试DTO实体类的姓名,10秒之后执行");
  redisDelayedQueue.addQueue(taskBody10, 10, TimeUnit.SECONDS, TestListener.class.getName());

  TaskBodyDto taskBody20 = new TaskBodyDto();
  taskBody20.setBody("测试DTO实体类的BODY,20秒之后执行");
  taskBody20.setName("测试DTO实体类的姓名,20秒之后执行");
  redisDelayedQueue.addQueue(taskBody20, 20, TimeUnit.SECONDS, TestListener.class.getName());

  TaskBodyDto taskBody30 = new TaskBodyDto();
  taskBody30.setBody("测试DTO实体类的BODY,30秒之后执行");
  taskBody30.setName("测试DTO实体类的姓名,30秒之后执行");
  redisDelayedQueue.addQueue(taskBody30, 30, TimeUnit.SECONDS, TestListener.class.getName());

  TaskBodyDto taskBody60 = new TaskBodyDto();
  taskBody60.setBody("测试DTO实体类的BODY,60秒之后执行");
  taskBody60.setName("测试DTO实体类的姓名,60秒之后执行");
  redisDelayedQueue.addQueue(taskBody60, 60, TimeUnit.SECONDS, TestListener.class.getName());
}

8.源码地址

源码地址:https://gitee.com/sensationhhw/springboot-a-hodgepodge.git

本文参考:https://cloud.tencent.com/developer/article/1589324