Redisson实现延迟队列
本文是对redisson的delayedQueue延迟队列进行封装,做到开箱即用,使用延迟队列解决某些情况下定时任务的局限性。
1.引入Redission包
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.6</version>
</dependency>
<!-- 序列化所需要的fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.19</version>
</dependency>
2.配置Redis连接
spring:
# ....
# redis 配置
redis:
host: 127.0.0.1
port: 6379
# password:
3.项目启动时开启监听类RedisDelayedQueueInit
- 这里要注意使用
redissonClient.getDelayedQueue(blockingFairQueue);
解决项目重启后无法正常消费旧消息的问题。
package com.boot.hodgepodge.redisson.config;
import com.alibaba.fastjson2.JSONObject;
import com.boot.hodgepodge.redisson.listener.RedisDelayedQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author: huanghwh
* @Date: 2023/01/05 上午9:10
* @Description:
*/
@Slf4j
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {
@Autowired
RedissonClient redissonClient;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
String listenerName = taskEventListenerEntry.getValue().getClass().getName();
startThread(listenerName, taskEventListenerEntry.getValue());
}
}
/**
* 启动线程获取队列*
*
* @param queueName queueName
* @param redisDelayedQueueListener 任务回调监听
* @param <T> 泛型
* @return
*/
private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
//由于此线程需要常驻,可以新建线程,不用交给线程池管理
Thread thread = new Thread(() -> {
log.info("启动监听队列线程" + queueName);
while (true) {
// 解决项目重新启动并不会消费之前队列里的消息
redissonClient.getDelayedQueue(blockingFairQueue);
try {
T t = blockingFairQueue.take();
log.info("监听队列线程 {},获取到值:{}", queueName, JSONObject.toJSONString(t));
new Thread(() -> {
redisDelayedQueueListener.invoke(t);
}).start();
} catch (Exception e) {
log.info("监听队列线程错误,", e);
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
}
}
}
});
thread.setName(queueName);
thread.start();
}
}
4.新增队列监听接口RedisDelayedQueueListener
- 每条队列都可通过实现这一接口进行监听
package com.boot.hodgepodge.redisson.listener;
/**
* @Author: huanghwh
* @Date: 2023/01/05 上午9:18
* @Description: 队列事件监听接口,需要实现这个方法
*/
public interface RedisDelayedQueueListener<T> {
/**
* 执行方法
*
* @param t
*/
void invoke(T t);
}
5.新增队列监听实现类
- 实现队列监听接口,可理解为一个监听类为一条队列
- 通过此类消费到期的数据
package com.boot.hodgepodge.redisson.listener;
import com.boot.hodgepodge.redisson.entity.TaskBodyDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @Author: huanghwh
* @Date: 2023/01/05 上午9:23
* @Description: 监听器
*/
@Component
@Slf4j
public class TestListener implements RedisDelayedQueueListener<TaskBodyDto> {
@Override
public void invoke(TaskBodyDto taskBodyDto) {
// 这里调用你延迟之后的代码
log.info("----------> 执行...." + taskBodyDto.getBody() + "===" + taskBodyDto.getName());
}
}
6.队列工具类RedisDelayedQueue
- 调用此工具类向队列中插入数据
package com.boot.hodgepodge.redisson.util;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @Author: huanghwh
* @Date: 2023/01/05 上午9:22
* @Description:
*/
@Slf4j
@Component
public class RedisDelayedQueue {
@Autowired
RedissonClient redissonClient;
/**
* 添加队列
*
* @param t DTO传输类
* @param delay 时间数量
* @param timeUnit 时间单位
* @param <T> 队列监听实现类listener
*/
public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
log.info("添加队列{},delay:{},timeUnit:{}" + queueName, delay, timeUnit);
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
}
7.模拟测试
- 调用队列工具类向指定队列中插入数据
- addQueue(实体, 延迟时间, 时间单位, 队列监听实现类-可理解为队列名称)
@GetMapping("/add")
public void addQueue() {
TaskBodyDto taskBody10 = new TaskBodyDto();
taskBody10.setBody("测试DTO实体类的BODY,10秒之后执行");
taskBody10.setName("测试DTO实体类的姓名,10秒之后执行");
redisDelayedQueue.addQueue(taskBody10, 10, TimeUnit.SECONDS, TestListener.class.getName());
TaskBodyDto taskBody20 = new TaskBodyDto();
taskBody20.setBody("测试DTO实体类的BODY,20秒之后执行");
taskBody20.setName("测试DTO实体类的姓名,20秒之后执行");
redisDelayedQueue.addQueue(taskBody20, 20, TimeUnit.SECONDS, TestListener.class.getName());
TaskBodyDto taskBody30 = new TaskBodyDto();
taskBody30.setBody("测试DTO实体类的BODY,30秒之后执行");
taskBody30.setName("测试DTO实体类的姓名,30秒之后执行");
redisDelayedQueue.addQueue(taskBody30, 30, TimeUnit.SECONDS, TestListener.class.getName());
TaskBodyDto taskBody60 = new TaskBodyDto();
taskBody60.setBody("测试DTO实体类的BODY,60秒之后执行");
taskBody60.setName("测试DTO实体类的姓名,60秒之后执行");
redisDelayedQueue.addQueue(taskBody60, 60, TimeUnit.SECONDS, TestListener.class.getName());
}
8.源码地址
源码地址:https://gitee.com/sensationhhw/springboot-a-hodgepodge.git
本文参考:https://cloud.tencent.com/developer/article/1589324