Spring_event发布订阅模式

2023-03-26

DEMO源码地址:https://gitee.com/sensationhhw/springboot-event

1、前言

在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。

image-20230326224215811

使用发布订阅模式来解决这些问题:把这些操作抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。

image-20230326224234708

2、Spring Event实现发布/订阅模式

步1 定义事件(监听接口)

1、该事件接口继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。

2、消息发布者通过调用此接口发布消息

import com.huanghwh.demo.event.entity.OrderMessage;
import org.springframework.context.ApplicationEvent;

/**
 * @Author: huanghwh
 * @Date: 2023/03/26 下午8:46
 * @Description: 订单事件监听事件(接口):继承ApplicationEvent,并重写构造函数
 */
public class OrderEvent extends ApplicationEvent {
    
    public OrderEvent(OrderMessage orderMessage) {
        super(orderMessage);
    }
}

步2 消息传输实体

1、用于过程中传输信息的专用实体

/**
 * @Author: huanghwh
 * @Date: 2023/03/26 下午8:47
 * @Description: 订单消息实体
 */
 @Data
public class OrderMessage implements Serializable {
    
    private String orderNo;
    
    private String orderName;

    public OrderMessage(String orderNo, String orderName) {
        this.orderNo = orderNo;
        this.orderName = orderName;
    }
}

步3 定义监听(消费)者

1、实际消费者,会消费指定接口中的消息

2、有2种方式:可使用注解@EventListener(OrderEvent.class)或实现ApplicationListener,二选一

补充@TransactionalEventListener(OrderEvent.class) //事务完成后才执行

3、这里定义了2种监听者为例

4、异步执行:@Async(“myThreadExecutor”) ,myThreadExecutor是自定义线程池

import cn.hutool.log.StaticLog;
import com.huanghwh.demo.event.entity.OrderMessage;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @Author: huanghwh
 * @Date: 2023/03/26 下午8:45
 * @Description: 订单消费者1:日志记录
 */
@Service
public class OrderLogListener implements ApplicationListener<OrderEvent> {
    
    //@EventListener(OrderEvent.class) //可使用注解或实现ApplicationListener,二选一
  	//@TransactionalEventListener(OrderEvent.class) //事务完成后才执行
    @Async("myThreadExecutor") // 异步执行
    @Override
    public void onApplicationEvent(OrderEvent orderEvent) {
        OrderMessage source = (OrderMessage) orderEvent.getSource();
        StaticLog.info("异步消费者1:日志记录,订单号{}", source.getOrderNo());
    }
    
}
import cn.hutool.log.StaticLog;
import com.huanghwh.demo.event.entity.OrderMessage;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @Author: huanghwh
 * @Date: 2023/03/26 下午8:45
 * @Description: 订单消费者2:消息通知
 */
@Service
public class OrderMessageListener implements ApplicationListener<OrderEvent> {

    //@EventListener(OrderEvent.class) //可使用注解或实现ApplicationListener,二选一
    @Async("myThreadExecutor") // 异步执行
    @Override
    public void onApplicationEvent(OrderEvent orderEvent) {
        OrderMessage source = (OrderMessage) orderEvent.getSource();
        StaticLog.info("异步消费者2:消息通知,订单号{}", source.getOrderNo());
    }
    
}

步4 发布消息

1、使用Spring 提供的`ApplicationEventPublisher`来发布自定义事件。

import cn.hutool.log.StaticLog;
import com.huanghwh.demo.event.entity.OrderMessage;
import com.huanghwh.demo.event.listener.OrderEvent;
import com.huanghwh.demo.event.publisher.IOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

/**
 * @Author: huanghwh
 * @Date: 2023/03/26 下午9:50
 * @Description: 消息发布demo
 */
@Service
public class OrderServiceImpl implements IOrderService {
    
    /* 使用Spring 提供的ApplicationEventPublisher来发布自定义事件 */
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    
    @Override
    public void saveOrder() {
        StaticLog.info("生产者:准备生成订单...");
        // ..生成订单(略)
        String orderNum = "001";
        String orderName = "眼镜";
        // 订单消息类
        StaticLog.info("准备发布订单号:{}", orderNum);
        OrderMessage orderMessage = new OrderMessage(orderNum, orderName);
        // 发布消息 入参: 订单事件监听接口(消息传输实体类)
        applicationEventPublisher.publishEvent(new OrderEvent(orderMessage));
        StaticLog.info("发布完成:{}", orderNum);
    }
}

3、测试

1.引入包

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>

2.测试方法编写

1、注意:测试类需要和springboot启动类处于同一层目录下

/**
 * @Author: huanghwh
 * @Date: 2023/03/26 下午9:58
 * @Description:
 */
@SpringBootTest
public class OrderTest {

    @Autowired
    private IOrderService orderService;

    /**
     * 测试发布订单消息
     */
    @Test
    public void testPublish() {
        orderService.saveOrder();
    }
}

输出:

2023-03-26 22:01:29.759  INFO 57428 --- [           main] c.h.d.e.publisher.impl.OrderServiceImpl  : 生产者:准备生成订单...
2023-03-26 22:01:29.765  INFO 57428 --- [           main] c.h.d.e.publisher.impl.OrderServiceImpl  : 准备发布订单号:001
2023-03-26 22:01:29.771  INFO 57428 --- [thread-execute1] c.h.d.event.listener.OrderLogListener    : 异步消费者1:日志记录,订单号001
2023-03-26 22:01:29.771  INFO 57428 --- [           main] c.h.d.e.publisher.impl.OrderServiceImpl  : 发布完成:001
2023-03-26 22:01:29.771  INFO 57428 --- [thread-execute2] c.h.d.e.listener.OrderMessageListener    : 异步消费者2:消息通知,订单号001

参考:https://blog.csdn.net/BASK2312/article/details/127961590