This project aims to help developers quickly integrate RocketMQ with Spring Boot.
We are always very happy to have contributions, whether for trivial cleanups or big new features. Please see the RocketMQ main website to read details
- JDK 1.8 and above
- Maven 3.0 and above
mvn clean install
- synchronous transmission
- synchronous ordered transmission
- asynchronous transmission
- asynchronous ordered transmission
- orderly consume
- concurrently consume(broadcasting/clustering)
- one-way transmission
- transaction transmission
- pull consume
Please see the complete sample rocketmq-spring-boot-samples
Note: Current RELEASE.VERSION=2.0.0
<!--add dependency in pom.xml-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>## application.properties
spring.rocketmq.nameServer=127.0.0.1:9876
spring.rocketmq.producer.group=my-groupNote:
Maybe you need change
127.0.0.1:9876with your real NameServer address for RocketMQ
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
// rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
}
@Data
@AllArgsConstructor
public class OrderPaidEvent implements Serializable{
private String orderId;
private BigDecimal paidMoney;
}
}More relevant configurations for producing:
spring.rocketmq.producer.retryTimesWhenSendAsyncFailed=0 spring.rocketmq.producer.sendMessageTimeout=300000 spring.rocketmq.producer.compressMessageBodyOverHowmuch=4096 spring.rocketmq.producer.maxMessageSize=4194304 spring.rocketmq.producer.retryAnotherBrokerWhenNotStoreOk=false spring.rocketmq.producer.retryTimesWhenSendFailed=2
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
try {
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test", "test-topic" msg, null);
} catch (MQClientException e) {
e.printStackTrace(System.out);
}
}
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener(transName="test")
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
}## application.properties
spring.rocketmq.nameServer=127.0.0.1:9876Note:
Maybe you need change
127.0.0.1:9876with your real NameServer address for RocketMQ
@SpringBootApplication
public class ConsumerApplication{
public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args);
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer1 implements RocketMQListener<String>{
public void onMessage(String message) {
log.info("received message: {}", message);
}
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
public void onMessage(OrderPaidEvent orderPaidEvent) {
log.info("received orderPaidEvent: {}", orderPaidEvent);
}
}
}More relevant configurations for consuming:
-
How to connected many
nameserveron production environment?spring.rocketmq.nameServersupport the configuration of multiplenameserver, separated by;. For example:172.19.0.1: 9876; 172.19.0.2: 9876 -
When was
rocketMQTemplatedestroyed?Developers do not need to manually execute the
rocketMQTemplate.destroy ()method when usingrocketMQTemplateto send a message in the project, androcketMQTemplatewill be destroyed automatically when the spring container is destroyed. -
start exception:
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name pleaseRocketMQ in the design do not want a consumer to deal with multiple types of messages at the same time, so the same
consumerGroupconsumer responsibility should be the same, do not do different things (that is, consumption of multiple topics). SuggestedconsumerGroupandtopicone correspondence. -
How is the message content body being serialized and deserialized?
RocketMQ's message body is stored as
byte []. When the business system message content body if it isjava.lang.Stringtype, unified in accordance withutf-8code intobyte []; If the business system message content is notjava.lang.StringType, then use jackson-databind serialized into theJSONformat string, and then unified in accordance withutf-8code intobyte []. -
How do I specify the
tagsfor topic?RocketMQ best practice recommended: an application as much as possible with one Topic, the message sub-type with
tagsto identify,tagscan be set by the application free.When you use
rocketMQTemplateto send a message, set the destination of the message by setting thedestinationparameter of the send method. Thedestinationformat istopicName:tagName,:Precedes the name of the topic, followed by thetagsname.Note:
tagslooks a complex, but when sending a message , the destination can only specify one topic under atag, can not specify multiple. -
How do I set the message's
keywhen sending a message?You can send a message by overloading method like
xxxSend(String destination, Message<?> msg, ...), settingheadersofmsg. for example:Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);
Similarly, you can also set the message
FLAG,WAIT_STORE_MSG_OKand some other user-defined other header information according to the above method.Note:
In the case of converting Spring's Message to RocketMQ's Message, to prevent the
headerinformation from conflicting with RocketMQ's system properties, the prefixUSERS_was added in front of allheadernames. So if you want to get a custom message header when consuming, please pass through the key at the beginning ofUSERS_in the header. -
When consume message, in addition to get the message
payload, but also want to get RocketMQ message of other system attributes, how to do?Consumers in the realization of
RocketMQListenerinterface, only need to be generic for theMessageExtcan, so in theonMessagemethod will receive RocketMQ native 'MessageExt` message.@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener<MessageExt>{ public void onMessage(MessageExt messageExt) { log.info("received messageExt: {}", messageExt); } }
-
How do I specify where consumers start consuming messages?
The default consume offset please refer: RocketMQ FAQ. To customize the consumer's starting location, simply add a
RocketMQPushConsumerLifecycleListenerinterface implementation to the consumer class. Examples are as follows:@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
Similarly, any other configuration on
DefaultMQPushConsumercan be done in the same way as above. -
How do I send transactional messages? It needs two steps on client side: a) Define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction() methods are implemented; b) Invoke the sendMessageInTransaction() method with the RocketMQTemplate API. Note: The first parameter of this method is correlated with the txProducerGroup attribute of @RocketMQTransactionListener. It can be null if using the default transaction producer group.