SpringBoot与RocketMQ客户端集成原理解读与示例

  • 时间:
  • 浏览:1
  • 来源:大发5分快乐8APP下载_大发5分快乐8APP官方

2 Spring的消息框架介绍

3.3. 消息消费端实现

发送端的Java代码

Spring中的消息框架

Spring Messaging是Spring Framework 4中加在的模块,是Spring与消息系统集成的有一一有十个 扩展性的支持。它实现了从基于JmsTemplate的简单的使用JMS接口到异步接收消息的一整套完整版的基础架构,Spring AMQP提供了该协议所要求的类式于的功能集。 在与Spring Boot的集成后,它拥有了自动配置能力,都都可不可不可以 在测试和运行时与相应的消息传递系统进行集成。

在发送端(producer)和消费端(consumer)客户端分别进行封装,在当前的实现版本提供了对Spring Messaging接口的兼容法子 。

使用示例

Binder抽象使得Spring Cloud Stream应用还需要灵活的连接到后边件,加之Spring Cloud Stream使用利用了Spring Boot的灵活配置配置能力,那我的配置还需要通过内部人员配置的属性和Spring Boo支持的任何形式来提供(包括应用启动参数、环境变量和application.yml可能性application.properties文件),部署人员还需要在运行时动态选取通道连接destination(类式于,Kafkatopic可能性RabbitMQexchange)。

rocketmq-spring-boot的实现

用户可能性使用它,需要在消息的发布和消费客户端的maven配置文件pom.xml中加在如下的依赖:



在执行启动命令的目录下执行下面的命令行操作

org.springframework.boot.autoconfigure.EnableAutoConfiguration=

org
.apache.rocketmq.spring.starter.RocketMQAutoConfiguration

Spring Cloud Stream结合了Spring Integration的注解和功能,它的应用模型如下:

目前的spring-boot-starter依赖还太难提交的Maven的中心库,用户使用前需要自行下载git源码,过后执行mvn clean install 安装下 本地仓库。

在过后过后刚开始的过后朋友可能性知道,spring boot starter构造的启动器对于使用者是非常方便的,使用者假若在pom.xml引入starter的依赖定义,相应的编译,运行和部署功能就完整版自动引入。过后常用的开源组件后要为Spring的用户提供有一一有十个 spring-boot-starter封装给开发者,让开发者非常方便集成和使用,这里朋友完整版的介绍一下RocketMQ(客户端)的starter实现过程。



3.1. spring-boot-starter的实现步骤

对于事务消息的补救,在消息发送端进行了要素的扩展,参考下图的调用关系类图:

定义 src/resources/META-INF/spring.factories文件中的自动加载类, 其目的是让spring boot更具文中中所指定的自动化配置类来自动初始化相关的Bean,Component或Service,它的内容如下:

4. 最后具体的RocketMQ相关的封装

 ●  定义依赖包,

1. 消息发送端的代码

文章主要内容包括以下有几块方面:

上世纪90年代末,随着Java EE(Enterprise Edition)的经常出现,有点硬是Enterprise Java Beans的使用需要简化的描述符配置和死板简化的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的XML配置和普通Java对象(Plain Old Java Objects)的Spring技术应运而生,依赖注入(Dependency Injection), 控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地补救了传统Java企业及版本的缺乏。随着Spring的持续演进,基于注解(Annotation)的配置逐渐取代了XML文件配置, 2014年4月1日,Spring Boot 1.0.0正式发布,它基于“约定大于配置”(Convention over configuration)这俩 理念来快速地开发,测试,运行和部署Spring应用,都可不可不可以 通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的法子 运行,不需再部署到独立容器中。这俩 简便直接快速构建和开发应用的过程,还需要使用约定的配置过后简化部署,受到太难来太满的开发者的欢迎。

git clone https://github.com/apache/rocketmq-externals.git

cd rocketmq-spring-boot-starter

mvn clean install

Binder SPI的法子 来让消息后边件产品使用可扩展的API来编写相应的Binder,并集成到Spring Cloud Steam环境,目前RocketMQ还太难提供相关的Binder,朋友计划在下一步将完善这俩 功能,也希望社区里有这方面经验的同学积极尝试,贡献PR或建议。

2 创建实例中所需要的Topics

1 前言

 ●  RocketMQTemplate 发送端用户发送消息的发送模板类

 ●  ListenerContainerConfiguration 容器Bean负责发现和注册消费端消费实现接口类,这俩 类要求:1. 由@RocketMQMessageListener注解标注;2. 实现RocketMQListener泛化接口。

定义应用属性配置文件类RocketMQProperties,这俩 Bean定义一组默认的属性值。用户在使用最终的starter时,还需要根据这俩 类定义的属性来修改取值,当然也有直接修改这俩 类的配置,就让spring-boot应用中对应的配置文件:src/main/resources/application.properties.

后边的一章介绍了RocketMQ在spring-boot-starter法子 的实现,这里通过有一一有十个 最简单的消息发送和消费的例子来介绍要怎样使这俩 rocketmq-spring-boot-starter。

3. 定义自动加载类

发送端的配置文件application.properties

1 启动NameServer和Broker

2 事务消息发送端

单纯对于客户端而言,Spring Messaging提供了一套抽象的API可能性说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息后边件提供商还需要在这俩 模式下提供本人的Spring实现:在消息发送端需要实现的是有一一有十个 XXXTemplate形式的Java Bean,结合Spring Boot的自动化配置选项提供多个不同的发送消息法子 ;在消息的消费端是有一一有十个 XXXMessageListener接口(实现法子 通常会使用有一一有十个 注解来声明有一一有十个 消息驱动的POJO),提供回调法子 来监听和消费消息,这俩 接口同样还需要使用Spring Boot的自动化选项和某些定制化的属性。可能性有兴趣深入的了解Spring Messaging及针对不同的消息产品的使用,推荐阅读这俩 文件。参考Spring Messaging的既有实现,RocketMQ的spring-boot-starter中遵循了相关的设计模式并结合RocketMQ自身的功能特点提供了相应的API(如,顺序,异步和事务半消息等)。

1. 普通发送端

# 定义name-server地址

spring.rocketmq.name-server=localhost:9876

# 定义发布者组名

spring.rocketmq.producer.group=my-group1

# 定义要发送的topic

spring.rocketmq.topic=string-topic

属性spring-boot-starter-rocketmq-version的取值为:1.0.0-SNAPSHOT, 这与上一步骤中执行安装下 本地仓库的版本一致。

 ●  编译rocketmq-spring-boot-starter ●  代码示例

前言

1. 在pom.xml的定义

对于有一一有十个 spring-boot-starter实现需要含高如下有几块要素:

<dependencies>

<!-- spring-boot-start internal depdencies -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency><!-- rocketmq dependencies -->

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>${rocketmq-version}</version>

</dependency>

</dependencies>

<dependencyManagement>

<dependencies>

<!-- spring-boot-start parent depdency definition -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>${spring.boot.version}</version>

<type>pom</type>

<scope>import</scope>

</dependency>

</dependencies>

</dependencyManagement>

 ●  Sping Cloud Stream

3 rocketmq-spring-boot具体实现

发送端的代码封装下 RocketMQTemplate POJO中,下图是发送端的相关代码的调用关系图:

RocketMQTemplate里加入了有一一有十个 发送事务消息的法子 sendMessageInTransaction(), 过后最终这俩 法子 会代理到RocketMQ的TransactionProducer进行调用,在这俩 Producer上会注册其关联的TransactionListener实现类,以便在发送消息还需要够对TransactionListener里的法子 实现进行调用。

bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

3.2. 消息发送端实现

4.1 RocketMQ服务端的准备

Spring Cloud Stream框架中提供有一一有十个 独立的应用内核,它通过输入(@Input)和输出(@Output)通道与内部人员世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消 费的消息。哪些地方地方通道通过专用的Binder实现与内部人员代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解法子 进行编程,而需要关心运行时具体的Binder绑定的消息后边件。在运行时,Spring Cloud Stream都都可不可不可以 自动探测并使用在classpath下找到的Binder。那我开发人员还需要轻松地在相同的代码中使用不类式于型的后边件:仅仅需要在构建时含高进不同的Binder。在更加简化的使用场景中,也还需要在应用中打包多个Binder并让它本人选取Binder,甚至在运行时为不同的通道使用不同的Binder

2.1 Spring Messaging

顺便在这里讨论一下在Spring中关于消息的有一一有十个 主要的框架,即Spring Messaging和Spring Cloud Stream。它们都都都可不可不可以 与Spring Boot整合并提供了某些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,还需要有效地简化开发人员对消息后边件的使用简化度,让系统开发人员还需要有更多的精力关注于核心业务逻辑的补救。

本文将对当前的设计实现做有一一有十个 简单的介绍,读者还需要通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,过后通过有一一有十个 简单的示例来一步一步的讲解要怎样使用这俩 spring-boot-starter工具包来配置,发送和消费RocketMQ消息。

import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;

...
@SpringBootApplication

public class ProducerApplication implements CommandLineRunner {

// 声明并引用RocketMQTemplate

@Resource

private RocketMQTemplate rocketMQTemplate;// 使用application.properties里定义的topic属性

@Value("${spring.rocketmq.springTopic}")

private String springTopic;public static

在消费端Spring-Boot应用启动后,会扫描所有含高@RocketMQMessageListener注解的类(哪些地方地方类需要集成RocketMQListener接口,并实现onMessage()法子 ),这俩 Listener会一对一的被放置到DefaultRocketMQListenerContainer容器对象中,容器对象会根据消费的法子 (并发或顺序),将RocketMQListener封装下 具体的RocketMQ内部人员的并发可能性顺序接口实现。在容器中创建RocketMQ Consumer对象,启动并监听定制的Topic消息,可能性有消费消息,则回调到Listener的onMessage()法子 。

<groupId>org.apache.rocketmq</groupId>

<artifactId>spring-boot-starter-rocketmq</artifactId><version>1.0.0-SNAPSHOT</version>

2.2 Spring Cloud Stream

本文将对rocktmq-spring-boot的设计实现做有一一有十个 简单的介绍,读者还需要通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,过后通过有一一有十个 简单的示例来一步一步的讲解要怎样使用这俩 spring-boot-starter工具包来配置,发送和消费RocketMQ消息。该项目git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter

为了与Spring Messaging的发送模板兼容,在RocketMQTemplate集成了AbstractMessageSendingTemplate抽象类,来支持相关的消息转换和发送法子 ,哪些地方地法子子 最终会代理给doSend()法子 ;doSend()以及RocoketMQ所特有的某些法子 如异步,单向和顺序等法子 直接加在到RoketMQTempalte中,哪些地方地法子子 直接代理调用到RocketMQ的Producer API来进行消息的发送。

2. 配置文件类

在RocketMQAutoConfiguration类的具体实现中,定义开放给用户直接使用的Bean对象. 包括:

 ●  发送端实现 ●  消费端实现

4 使用示例

要验证RocketMQ的Spring-Boot客户端,首太难确保RocketMQ服务正确的下载并启动。还需要参考RocketMQ主站的快速过后过后刚开始来进行操作。确保启动NameServer和Broker可能性正确启动。

它分为有一一有十个 要素: A Spring自身的依赖包; B RocketMQ的依赖包

Apache RocketMQ是业界知名的分布式消息和流补救后边件,简单地理解,它由Broker服务器和客户端两要素组成,其中客户端有一一有十个 是消息发布者客户端(Producer),它负责向Broker服务器发送消息;另外有一一有十个 是消息的消费者客户端(Consumer),多个消费者还需要组成有一一有十个 消费组,来订阅和拉取消 费Broker服务器上存储的消息。为了利用Spring Boot的快速开发和让用户都都可不可不可以 更灵活地使用RocketMQ消息客户端,Apache RocketMQ社区推出了spring-boot-starter实现。随着分布式事务消息功能在RocketMQ 4.3.0版本的发布,近期升级了相关的spring-boot代码,通过注解法子 支持分布式事务的回查和事务消息的发送。

<properties> <spring-boot-starter-rocketmq-version>1.0.0-SNAPSHOT</spring-boot-starter-rocketmq-version>

</properties><dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>spring-boot-starter-rocketmq</artifactId>

<version>${spring-boot-starter-rocketmq-version}</version>

该图片引自spring cloud stream