共计 8704 个字符,预计需要花费 22 分钟才能阅读完成。
如何在优雅地 Spring 中实现消息的发送和消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
丸趣 TV 小编将对 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以了解将 RocketMQ
Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
作者简介:辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对 Microservice、Messaging 和 Storage 等领域有深刻理解,目前专注 RocketMQ 内核优化以及 Messaging 生态建设。
通过本文,您将了解到:
Spring 的消息框架介绍
rocketmq-spring-boot 具体实现
使用示例
插播一条广告:本周六下午,Apache RocketMQ 开发者沙龙将来到杭州,欢迎大家到现场,活动详情请点击“阅读原文”。
前言
上世纪 90 年代末,随着 Java EE(Enterprise Edition)的出现,特别是 Enterprise Java
Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain
Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection),
控制反转 (Inversion of Control) 和面向切面编程 (AOP) 的技术更加敏捷地解决了传统 Java 企业及版本的不足。
随着 Spring 的持续演进,基于注解 (Annotation) 的配置逐渐取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot
1.0.0 正式发布,它基于“约定大于配置”(Convention over
configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如
spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。这种简便直接快速构建和开发应用的过程,可以使用约定的配置并且简化部署,受到越来越多的开发者的欢迎。
Apache RocketMQ 是业界知名的分布式消息和流处理中间件,简单地理解,它由 Broker 服务器和客户端两部分组成:
其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;
另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。
为了利用 Spring Boot 的快速开发和让用户能够更灵活地使用 RocketMQ 消息客户端,Apache
RocketMQ 社区推出了 spring-boot-starter 实现。随着分布式事务消息功能在 RocketMQ
4.3.0 版本的发布,近期升级了相关的 spring-boot 代码,通过注解方式支持分布式事务的回查和事务消息的发送。
本文将对当前的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ
Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
Spring 中的消息框架
顺便在这里讨论一下在 Spring 中关于消息的两个主要的框架,即 Spring Messaging 和 Spring Cloud
Stream。它们都能够与 Spring
Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
2.1 Spring Messaging
Spring Messaging 是 Spring Framework
4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring
AMQP 提供了该协议所要求的类似的功能集。在与 Spring
Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。
单纯对于客户端而言,Spring
Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java
Bean,结合 Spring
Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring
Boot 的自动化选项和一些定制化的属性。
如果有兴趣深入的了解 Spring Messaging 及针对不同的消息产品的使用,推荐阅读这个文件。参考 Spring
Messaging 的既有实现,RocketMQ 的 spring-boot-starter 中遵循了相关的设计模式并结合 RocketMQ 自身的功能特点提供了相应的 API(如,顺序,异步和事务半消息等)。
2.2 Spring Cloud Stream
Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下:
该图片引自 spring cloud stream
Stream 框架中提供一个独立的应用内核,它通过输入 (@Input) 和输出 (@Output) 通道与外部世界进行通信,消息源端 (Source) 通过输入通道发送消息,消费目标端 (Sink) 通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。在运行时,Spring
Cloud Stream 能够自动探测并使用在 classpath 下找到的 Binder。
这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。
Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud
Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring
Boo 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。
Binder SPI 的方式来让消息中间件产品使用可扩展的 API 来编写相应的 Binder,并集成到 Spring Cloud
Steam 环境,目前 RocketMQ 还没有提供相关的 Binder,我们计划在下一步将完善这一功能,也希望社区里有这方面经验的同学积极尝试,贡献 PR 或建议。
spring-boot-starter 的实现
在开始的时候我们已经知道,spring boot
starter 构造的启动器对于使用者是非常方便的,使用者只要在 pom.xml 引入 starter 的依赖定义,相应的编译,运行和部署功能就全部自动引入。因此常用的开源组件都会为 Spring 的用户提供一个 spring-boot-starter 封装给开发者,让开发者非常方便集成和使用,这里我们详细的介绍一下 RocketMQ(客户端)的 starter 实现过程。
3.1. spring-boot-starter 的实现步骤
对于一个 spring-boot-starter 实现需要包含如下几个部分:
在 pom.xml 的定义
定义最终要生成的 starter 组件信息
groupId org.apache.rocketmq /groupId artifactId spring-boot-starter-rocketmq /artifactId version 1.0.0-SNAPSHOT /version
定义依赖包,
它分为两个部分: A、Spring 自身的依赖包;B、RocketMQ 的依赖包
dependencies
!-- spring-boot-start internal depdencies --
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter /artifactId
/dependency
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-test /artifactId
scope test /scope
/dependency
!-- rocketmq dependencies --
dependency
groupId org.apache.rocketmq /groupId
artifactId rocketmq-client /artifactId
version ${rocketmq-version} /version
/dependency /dependencies
dependencyManagement
dependencies
!-- spring-boot-start parent depdency definition --
dependency
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-parent /artifactId
version ${spring.boot.version} /version
type pom /type
scope import /scope
/dependency
/dependencies /dependencyManagement
配置文件类
定义应用属性配置文件类 RocketMQProperties, 这个 Bean 定义一组默认的属性值。用户在使用最终的 starter 时,可以根据这个类定义的属性来修改取值,当然不是直接修改这个类的配置,而是 spring-boot 应用中对应的配置文件:src/main/resources/application.properties.
定义自动加载类
定义 src/resources/META-INF/spring.factories 文件中的自动加载类,其目的是让 spring boot 更具文中中所指定的自动化配置类来自动初始化相关的 Bean,Component 或 Service,它的内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration
在 RocketMQAutoConfiguration 类的具体实现中,定义开放给用户直接使用的 Bean 对象. 包括:
RocketMQProperties 加载应用属性配置文件的处理类;
RocketMQTemplate 发送端用户发送消息的发送模板类;
ListenerContainerConfiguration 容器 Bean 负责发现和注册消费端消费实现接口类,这个类要求:由 @RocketMQMessageListener 注解标注;实现 RocketMQListener 泛化接口。
最后具体的 RocketMQ 相关的封装
在发送端(producer)和消费端 (consumer) 客户端分别进行封装,在当前的实现版本提供了对 Spring Messaging 接口的兼容方式。
3.2. 消息发送端实现
普通发送端
发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相关代码的调用关系图:
为了与 SpringMessaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给 doSend()方法;doSend()以及 RocoketMQ 所特有的一些方法如异步,单向和顺序等方法直接添加到 RoketMQTempalte 中,这些方法直接代理调用到 RocketMQ 的 Producer
API 来进行消息的发送。
事务消息发送端
对于事务消息的处理,在消息发送端进行了部分的扩展,参考下图的调用关系类图:
RocketMQTemplate 里加入了一个发送事务消息的方法 sendMessageInTransaction(),
并且最终这个方法会代理到 RocketMQ 的 TransactionProducer 进行调用,在这个 Producer 上会注册其关联的 TransactionListener 实现类,以便在发送消息后能够对 TransactionListener 里的方法实现进行调用。
3.3. 消息消费端实现
在消费端 Spring-Boot 应用启动后,会扫描所有包含 @RocketMQMessageListener 注解的类 (这些类需要集成 RocketMQListener 接口,并实现 onMessage() 方法),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQConsumer 对象,启动并监听定制的 Topic 消息,如果有消费消息,则回调到 Listener 的 onMessage()方法。
使用示例
上面的一章介绍了 RocketMQ 在 spring-boot-starter 方式的实现,这里通过一个最简单的消息发送和消费的例子来介绍如何使这个 rocketmq-spring-boot-starter。
4.1 RocketMQ 服务端的准备
启动 NameServer 和 Broker
要验证 RocketMQ 的 Spring-Boot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。
创建实例中所需要的 Topics
在执行启动命令的目录下执行下面的命令行操作
bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic
4.2. 编译 rocketmq-spring-boot-starter
目前的 spring-boot-starter 依赖还没有提交的 Maven 的中心库,用户使用前需要自行下载 git 源码,然后执行 mvn clean install 安装到本地仓库。
git clone https://github.com/apache/rocketmq-externals.gitcd rocketmq-spring-boot-starter
mvn clean install
4.3. 编写客户端代码
用户如果使用它,需要在消息的发布和消费客户端的 maven 配置文件 pom.xml 中添加如下的依赖:
properties spring-boot-starter-rocketmq-version 1.0.0-SNAPSHOT /spring-boot-starter-rocketmq-version /properties dependency
groupId org.apache.rocketmq /groupId
artifactId spring-boot-starter-rocketmq /artifactId
version ${spring-boot-starter-rocketmq-version} /version /dependency
属性 spring-boot-starter-rocketmq-version 的取值为:1.0.0-SNAPSHOT,这与上一步骤中执行安装到本地仓库的版本一致。
消息发送端的代码
发送端的配置文件 application.properties
# 定义 name-server 地址 spring.rocketmq.name-server=localhost:9876# 定义发布者组名 spring.rocketmq.producer.group=my-group1# 定义要发送的 topicspring.rocketmq.topic=string-topic
发送端的 Java 代码
import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
...@SpringBootApplicationpublic class ProducerApplication implements CommandLineRunner { // 声明并引用 RocketMQTemplate
@Resource
private RocketMQTemplate rocketMQTemplate; // 使用 application.properties 里定义的 topic 属性
@Value(${spring.rocketmq.springTopic} ) private String springTopic;
public static void main(String[] args){ SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception { // 以同步的方式发送字符串消息给指定的 topic
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, Hello, World! // 打印发送结果信息
System.out.printf(string-topic syncSend1 sendResult=%s %n , sendResult);
}
}
消息消费端代码
消费端的配置文件 application.properties
# 定义 name-server 地址 spring.rocketmq.name-server=localhost:9876# 定义发布者组名 spring.rocketmq.consumer.group=my-customer-group1# 定义要发送的 topicspring.rocketmq.topic=string-topic
消费端的 Java 代码
@SpringBootApplicationpublic class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args);
}
}// 声明消费消息的类,并在注解中指定,相关的消费信息 @Service@RocketMQMessageListener(topic = ${spring.rocketmq.topic} , consumerGroup = ${spring.rocketmq.consumer.group} )class StringConsumer implements RocketMQListener String { @Override
public void onMessage(String message) { System.out.printf( ------- StringConsumer received: %s %f , message);
}
}
这里只是简单的介绍了使用 spring-boot 来编写最基本的消息发送和接收的代码,如果需要了解更多的调用方式,如: 异步发送,对象消息体,指定 tag 标签以及指定事务消息,请参看 github 的说明文档和详细的代码。我们后续还会对这些高级功能进行陆续的介绍。
看完上述内容,你们掌握如何在优雅地 Spring 中实现消息的发送和消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!