Spring事件如何使用
所谓千里之行始于足下,在研究Spring的事件的机制之前,我们先来看一下Spring事件是如何使用的。通常情况下,我们使用自定义事件和内置事件,自定义事件主要是配合业务使用,自定义事件则多是做系统启动时的初始化工作或者收尾工作。
自定义事件的使用
「定义自定义事件」自定义一个事件在使用上很简单,继承ApplicationEvent即可:
// 事件需要继承ApplicationEvent
public class MyApplicationEvent extends ApplicationEvent {
private Long id;
public MyApplicationEvent(Long id) {
super(id);
this.id = id;
}
public Long getId() {
return id;
}
}
「发布自定义事件」现在自定义事件已经有了,该如何进行发布呢?Spring提供了ApplicationEventPublisher进行事件的发布,我们平常使用最多的ApplicationContext也继承了该发布器,所以我们可以直接使用applicationContext进行事件的发布。
// 发布MyApplicationEvent类型事件
applicationContext.publishEvent(new MyApplicationEvent(1L));
「处理自定义事件」现在事件已经发布了,谁负责处理事件呢?当然是监听器了,Spring要求监听器需要实现ApplicationListener接口,同时需要通过泛型参数指定处理的事件类型。有了监听器需要处理的事件类型信息,Spring在进行事件广播的时候,就能找到需要广播的监听器了,从而准确传递事件了。
// 需要继承ApplicationListener,并指定事件类型
public class MyEventListener implements ApplicationListener<MyApplicationEvent> {
// 处理指定类型的事件
@Override
public void onApplicationEvent(MyApplicationEvent event) {
System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource());
}
}
Spring内置事件
ContextRefreshedEvent
在ConfigurableApplicationContext的refresh()执行完成时,会发出ContextRefreshedEvent事件。refresh()是Spring最核心的方法,该方法内部完成的Spring容器的启动,是研究Spring的重中之重。在该方法内部,当Spring容器启动完成,会在finishRefresh()发出ContextRefreshedEvent事件,通知容器刷新完成。我们一起来看一下源码:
// ConfigurableApplicationContext.java
public void refresh() throws BeansException, IllegalStateException {
try {
// ...省略部分非关键代码
//完成普通单例Bean的实例化(非延迟的)
this.finishBeanFactoryInitialization(beanFactory);
// 初始化声明周期处理器,并发出对应的时间通知
this.finishRefresh();
}
}
protected void finishRefresh() {
// ...省略部分非核心代码
// 发布上下文已经刷新完成的事件
this.publishEvent(new ContextRefreshedEvent(this));
}
其实这是Spring提供给我们的拓展点,此时容器已经启动完成,容器中的bean也已经创建完成,对应的属性、init()、Aware回调等,也全部执行。很适合我们做一些系统启动后的准备工作,此时我们就可以监听该事件,作为系统启动后初始预热的契机。
其实Spring内部也是这样使用ContextRefreshedEvent的, 比如我们常用的Spring内置的调度器,就是在接收到该事件后,才进行调度器的执行的。
public class ScheduledAnnotationBeanPostProcessor implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
finishRegistration();
}
}
}
Spring Boot 启动事件顺序
1、ApplicationStartingEvent
这个事件在 Spring Boot 应用运行开始时,且进行任何处理之前发送(除了监听器和初始化器注册之外)。
2、ApplicationEnvironmentPreparedEvent
这个事件在当已知要在上下文中使用 Spring 环境(Environment)时,在 Spring 上下文(context)创建之前发送。
3、ApplicationContextInitializedEvent
这个事件在当 Spring 应用上下文(ApplicationContext)准备好了,并且应用初始化器(ApplicationContextInitializers)已经被调用,在 bean 的定义(bean definitions)被加载之前发送。
4、ApplicationPreparedEvent
这个事件是在 Spring 上下文(context)刷新之前,且在 bean 的定义(bean definitions)被加载之后发送。
5、ApplicationStartedEvent
这个事件是在 Spring 上下文(context)刷新之后,且在 application/ command-line runners 被调用之前发送。
6、AvailabilityChangeEvent
这个事件紧随上个事件之后发送,状态:ReadinessState.CORRECT,表示应用已处于活动状态。
7、ApplicationReadyEvent
这个事件在任何 application/ command-line runners 调用之后发送。
8、AvailabilityChangeEvent
这个事件紧随上个事件之后发送,状态:ReadinessState.ACCEPTING_TRAFFIC,表示应用可以开始准备接收请求了。
9、ApplicationFailedEvent
这个事件在应用启动异常时进行发送。
上面所介绍的这些事件列表仅包括绑定到 SpringApplication 的 SpringApplicationEvents 事件,除了这些事件以外,以下事件也会在 ApplicationPreparedEvent 之后和 ApplicationStartedEvent 之前发送:
WebServerInitializedEvent
这个 Web 服务器初始化事件在 WebServer 启动之后发送,对应的还有 ServletWebServerInitializedEvent(Servlet Web 服务器初始化事件)、ReactiveWebServerInitializedEvent(响应式 Web 服务器初始化事件)。
ContextRefreshedEvent
这个上下文刷新事件是在 Spring 应用上下文(ApplicationContext)刷新之后发送。
Spring事件是如何运转的
经过第一章节的探讨,我们已经清楚Spring事件是如何使用的,然而这只是皮毛而已,我们的目标是把Spring事件机制脱光扒净的展示给大家看。所以这一章节我们深入探讨一下,Spring事件的运行机制,重点我们看一下:
- 事件是怎么广播给监听器的?会不会发送阻塞?
- 系统中bean那么多,ApplicationListener是被如何识别为监听器的?
- 监听器处理事件的时候,是同步处理还是异步处理的?
- 处理的时候发生异常怎么办,后面的监听器还能执行吗?
事件发布
在第一章节,我们直接通过applicationContext发布了事件,同时也提到了,它之所以能发布事件,是因为它是ApplicationEventPublisher的子类,因此是具备事件发布能力的。但按照接口隔离原则,如果我们只需要进行事件发布,applicationContext提供的能力太多,还是推荐直接使用ApplicationEventPublisher进行操作。
获取事件发布器的方式
我们先来ApplicationEventPublisher的提供的能力,它是一个接口,结构如下:
@FunctionalInterface
public interface ApplicationEventPublisher {
//发布ApplicationEvent事件
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
//发布PayloadApplicationEvent事件
void publishEvent(Object event);
}
通过源码我们发现ApplicationEventPublisher仅仅提供了事件发布的能力,支持自定义类型和PayloadApplicationEvent类型(如果没有定义事件类型,默认包装为该类型)。
那我们如何获取该发布器呢,我们最常使用的@Autowired注入是否可以呢,试一下呗。
[通过@Autowired 注入 ApplicationEventPublisher]
通过debug,我们可以直观的看到:是可以的,而且注入的就是ApplicationContext实例。也就是说注入ApplicationContext和注入ApplicationEventPublisher是等价的,都是一个ApplicationContext实例。
「通过ApplicationEventPublisherAware获取 ApplicationEventPublisher」
除了@Autowired注入,Spring还提供了使用ApplicationEventPublisherAware获取 ApplicationEventPublisher的方式,如果实现了这个感知接口,Spring会在合适的时机,回调setApplicationEventPublisher(),将applicationEventPublisher传递给我们。使用起来也很方便。
代码所示:
public class UserService implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
public void login(String username, String password){
// 1: 进行登录处理
...
// 2: 发送登录事件,用于记录操作
applicationEventPublisher.publishEvent(new UserLoginEvent(userId));
}
// Aware接口回调注入applicationEventPublisher
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}
现在我们已经知道通过@Autowired和ApplicationEventPublisherAware回调都能获取到事件发布器,两种有什么区别吗? 其实区别不大,主要是调用时机的细小差别,另外就是默写特殊场景下,@Autowired注入可能无法正常注入,实际开发中完成可以忽略不计。
所以优先推荐小伙伴们使用ApplicationEventPublisherAware,如果觉得麻烦,使用@Autowired也未尝不可。
事件的广播方式
现在我们已经知道,可以通过ApplicationEventPublisher发送事件了,那么这个事件发送后肯定是要分发给对应的监听器处理啊,谁处理这个分发逻辑呢?又是怎么匹配对应的监听器的呢?我们带着这两个问题来看ApplicationEventMulticaster。
事件是如何广播的
要探查事件是如何广播的,需要跟随事件发布后的逻辑一起看一下:
@Override
public void publishEvent(ApplicationEvent event) {
this.publishEvent(event, null);
}
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
// ...省略部分代码
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 将事件广播给Listener
this.getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
}
// 获取事件广播器
ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException {
if (this.applicationEventMulticaster == null) {
throw new IllegalStateException("ApplicationEventMulticaster not initialized - " +
"call 'refresh' before multicasting events via the context: " + this);
}
return this.applicationEventMulticaster;
}
通过上面源码,我们发现发布器直接把事件转交给applicationEventMulticaster了,我们再去里面看一下广播器里面做了什么。
// SimpleApplicationEventMulticaster.java
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
// ...省略部分代码
// getApplicationListeners 获取符合的监听器
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
// 执行每个监听器的逻辑
invokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
// 调用监听器的onApplicationEvent方法进行处理
listener.onApplicationEvent(event);
}
}
看到这里,我们发现事件的分发逻辑:先找到匹配的监听器,然后逐个调用onApplicationEvent()进行事件处理。
事件和监听器是如何匹配的
通过上述源码,我们发现通过getApplicationListeners(event, type)找到了所有匹配的监听器,我们继续跟踪看一下是如何匹配的。
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
// 省略缓存相关代码
return retrieveApplicationListeners(eventType, sourceType, newRetriever);
}
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable CachedListenerRetriever retriever) {
// 1: 获取所有的ApplicationListener
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.defaultRetriever) {
listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans);
}
for (ApplicationListener<?> listener : listeners) {
// 2: 遍历判断是否匹配
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
filteredListeners.add(listener);
}
allListeners.add(listener);
}
}
}
protected boolean supportsEvent(
ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) {
GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
// supportsEventType 根据ApplicationListener的泛型, 和事件类型,看是否匹配
// supportsSourceType 根据事件源类型,判断是否匹配
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}
通过源码跟踪,我们发现监听器匹配是根据事件类型匹配的,先获取容器中所有的监听器,在用supportsEvent()去判断对应的监听器是否匹配事件。这里匹配主要看两点:
判断事件类型和监听器上的泛型类型,是否匹配(子类也能匹配)。
监听器是否支持事件源类型,默认情况下,都是支持的。 如果两者都匹配,就转发给处理器处理。
ApplicationEventMulticaster是如何获取的(选读)
在事件广播时,Spring直接调用getApplicationEventMulticaster()去获取属性applicationEventMulticaster,并且当applicationEventMulticaster为空时,直接异常终止了。那么就要求该成员变量提早初始化,那么它是何时初始化的呢。
public void refresh() throws BeansException, IllegalStateException {
// ...省略无关代码
// 初始化事件广播器(转发ApplicationEvent给对应的ApplicationListener处理)
this.initApplicationEventMulticaster();
}
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
// spring容器中存在,直接返回
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
// 容器中不存在,创建SimpleApplicationEventMulticaster,放入容器
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (this.logger.isTraceEnabled()) {
this.logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
看到这里,是不是豁然开朗,原来在容器启动的时候,专门调用了initApplicationEventMulticaster()对applicationEventMulticaster进行了初始化,并放到了spring容器中。
事件监听器
监听器是负责处理事件的,在广播器将对应的事件广播给它之后,它正式上岗开始处理事件。Spring默认的监听器是同步执行的,并且支持一个事件由多个监听器处理,并可通过@Order指定监听器处理顺序。
定义监听器的方式
实现ApplicationListener定义监听器
第一种方式定义的方式当然是通过直接继承ApplicationListener,同时不要忘记通过泛型指定事件类型,它可是将事件广播给监听器的核心匹配标志。
public class MyEventListener implements ApplicationListener<MyApplicationEvent> {
@Override
public void onApplicationEvent(MyApplicationEvent event) {
System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource());
}
}
使用@EventListener定义监听器
第二种方式我们还可以使用@EventListener标注方法为监听器,该注解标注的方法上,方法参数为事件类型,标注该监听器要处理的事件类型。
public class AnnotationEventListener {
// 使用@EventListener标注方法为监听器,参数类型为事件类型
@EventListener
public void onApplicationEvent(MyApplicationEvent event) {
System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource());
}
@EventListener
public void onApplicationEvent(PayloadApplicationEvent payloadApplicationEvent) {
System.out.println(Thread.currentThread().getName() + "接受到事件:"+payloadApplicationEvent.getPayload());
}
}
通过广播器分发事件的逻辑,我们知道事件只能分发给ApplicationListener类型的监听器实例处理,这里仅仅是标注了@EventListener的方法,也能被是识别成ApplicationListener类型的监听器吗?
答案是肯定的,只是Spring在底层进行了包装,偷偷把@EventListener标注的方法包装成了ApplicationListenerMethodAdapter,它也是ApplicationListener的子类,这样就成功的把方法转换成ApplicationListener实例了.
后续章节我们会详细揭露Spring偷梁换柱的小把戏,小伙伴们稍安勿躁。
全局异常处理
通过我们长时间的啰嗦,聪明的你肯定清楚:Spring事件的处理,默认是同步依次执行。那如果前面的监听器出现了异常,并且没有处理异常,会对后续的监听器还能顺利接收该事件吗?
其实不能的,因为异常中断了事件的发送了,这里我们不做演示了,有兴趣的同学们可以自行验证一下。
难道同步执行我们就要在每个监听器都try catch一下,避免相互影响吗,不能全局处理吗?当前可以了,贴心的Spring为了简化我们的开发逻辑,特意提供了ErrorHandler来统一处理,话不多说,我们赶紧来试一下吧。
public class AnnotationEventListener {
@EventListener
@Order(1)
public void onApplicationEvent(MyApplicationEvent event) {
Date start = new Date();
// 制造异常
int i = 1/0;
System.out.printf("线程:[%s],监听器1,接收时间:[%s],处理完成时间:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
Date start = new Date();
System.out.printf("线程:[%s],监听器2,接收时间:[%s],处理完成时间:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
}
// 测试方法
public void applicationListenerTest() throws InterruptedException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(AnnotationEventListener.class);
context.refresh();
ApplicationEventMulticaster multicaster = context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (multicaster instanceof SimpleApplicationEventMulticaster) {
// 简单打印异常信息
((SimpleApplicationEventMulticaster) multicaster).setErrorHandler(t -> System.out.println(t));
}
System.out.printf("线程:[%s],时间:[%s],开始发布事件\n", new Date(), Thread.currentThread().getName());
context.publishEvent(new MyApplicationEvent(1L));
System.out.printf("线程:[%s],时间:[%s],发布事件完成\n", new Date(), Thread.currentThread().getName());
context.stop();
}
// 输出信息:
// 线程:[main],时间[23:35:15],开始发布事件
// java.lang.ArithmeticException: / by zero
// 线程:[main],监听器2,接收时间:[23:35:15],处理完成时间:[23:35:15],接收到事件:1
// 线程:[main],时间[23:35:15],,发布事件完成
经过测试发现:设置了ErrorHandler之后,确实可以对异常进行统一的管理了,再也不用繁琐的try catch了,今天又多了快乐划水五分钟的理由呢。
老规矩,我们不光要做到知其然,还要做到知其所以然,我们探究一下为什么加了ErrorHandler之后,就可以全局处理呢?
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
// 获取ErrorHandler
ErrorHandler errorHandler = getErrorHandler();
// 如果ErrorHandler存在,监听器执行出现异常,交给errorHandler处理,不会传递向上抛出异常。
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
// 调用监听器处理
doInvokeListener(listener, event);
}
}
经过阅读源码,我们发现:Sring先查找是否配置了ErrorHandler,如果配置了,在发生异常的时候,把异常信息转交给errorHandler处理,并且不会在向上传递异常了。这样可以达到异常全局处理的效果了。
工作中的使用
监听应用启动完成的事件—AvailabilityChangeEvent
@Component
public class AppStartCompleteEventListener implements ApplicationListener<AvailabilityChangeEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(AppStartCompleteEventListener.class);
@Autowired
Environment environment;
@Autowired
private RedisCache redisCache;
@Override
public void onApplicationEvent(AvailabilityChangeEvent event) {
LOGGER.info("监听到启动事件:{}",event);
try {
String serverPort = environment.getProperty("local.server.port");
String applicationName = environment.getProperty("spring.application.name");
String ip = InetAddress.getLocalHost().getHostAddress();
// 将服务实例id 和 服务名称、ip 、port,缓存到redis中
String instanceId = ServiceInstance.getInstanceId();
ServiceInstanceDTO serviceInstanceDTO = new ServiceInstanceDTO();
serviceInstanceDTO.setInstanceId(instanceId);
serviceInstanceDTO.setServiceName(applicationName);
serviceInstanceDTO.setIp(ip);
serviceInstanceDTO.setPort(serverPort);
redisCache.setCacheObject(CacheConstants.SERVICE_INSTANCE_INFO.concat(instanceId), JSON.toJSONString(serviceInstanceDTO),2, TimeUnit.HOURS);
} catch (UnknownHostException e) {
LOGGER.info("未找到ip,error:{}", ExceptionUtils.getStackTrace(e));
}
}
}
一个模块处理完成后,通过Spring Event 通知其他模块,实现解耦
事件:
public class ScadaTabelPanelRefreshEventContent {
private String id;
private String graphId;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGraphId() {
return graphId;
}
public void setGraphId(String graphId) {
this.graphId = graphId;
}
}
//-------------------------------------------------
public class ScadaTablePanelRefreshEvent extends ApplicationEvent {
private ScadaTabelPanelRefreshEventContent scadaTabelPanelRefreshEventContent;
public ScadaTablePanelRefreshEvent(ScadaTabelPanelRefreshEventContent scadaTabelPanelRefreshEventContent) {
super(scadaTabelPanelRefreshEventContent);
this.scadaTabelPanelRefreshEventContent = scadaTabelPanelRefreshEventContent;
}
public ScadaTabelPanelRefreshEventContent getScadaTabelPanelRefreshEventContent() {
return scadaTabelPanelRefreshEventContent;
}
}
监听器:
@Component
public class ScadaTablePanelEventListener implements ApplicationListener<ScadaTablePanelRefreshEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(ScadaTablePanelEventListener.class);
@Autowired
private ScadaTablePanelBroadcast scadaTablePanelBroadcast;
@Autowired
private ScadaTablePanelService scadaTablePanelService;
@Override
public void onApplicationEvent(ScadaTablePanelRefreshEvent event) {
// 这里采用线程池,异步处理
CompletableFuture.runAsync(() -> {
ScadaTabelPanelRefreshEventContent scadaTabelPanelRefreshEventContent = event.getScadaTabelPanelRefreshEventContent();
String scadaGraphId = scadaTabelPanelRefreshEventContent.getGraphId();
if (StringUtils.isEmpty(scadaGraphId)) {
// 查询
ScadaTablePanelBO scadaTablePanelBO = new ScadaTablePanelBO();
scadaTablePanelBO.setId(scadaTabelPanelRefreshEventContent.getId());
List<ScadaTablePanelBO> scadaTablePanelBOList = scadaTablePanelService.selectScadaTablePanelListByCondition(scadaTablePanelBO);
if (CollectionUtils.isNotEmpty(scadaTablePanelBOList)) {
scadaGraphId = scadaTablePanelBOList.get(0).getGraphId();
}
}
if (StringUtils.isEmpty(scadaGraphId)) {
LOGGER.info("ScadaTablePanelRefreshEvent中,未找到有效的组态id,eventContent:{}", JSON.toJSONString(scadaTabelPanelRefreshEventContent));
return;
}
scadaTablePanelBroadcast.cacheTablePanelByGraphId(scadaGraphId);
});
}
}
发布器:
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void finishRefresh(ScadaTabelPanelRefreshEventContent scadaTabelPanelRefreshEventContent) {
applicationEventPublisher.publishEvent(new ScadaTablePanelRefreshEvent(scadaTabelPanelRefreshEventContent));
}