RocketMQ發送普通消息的所有方法以及代碼示例-創新互聯
- 一、使用RocketMQTemplate發送消息(整合Springboot)
- (1)void send(Message\message) throws MessagingException;同步發送
- (2)void send(D destination, Message\message) throws MessagingException;同步發送
- (3)SendResult syncSend(String destination, Message\message);同步發送
- (4)SendResult syncSend(String destination, Message\message, long timeout);同步發送
- (5)\
SendResult syncSend(String destination, Collection\ messages);同步發送 - (6)\
SendResult syncSend(String destination, Collection\ messages, long timeout);同步發送 - (7)SendResult syncSend(String destination, Object payload);同步發送
- (8)SendResult syncSend(String destination, Object payload, long timeout);同步發送
- (9)void sendOneWay(String destination, Message\message);one-way模式,異步發送
- (10)void sendOneWay(String destination, Object payload);one-way模式,異步發送
- (11)void asyncSend(String destination, Message\message, SendCallback sendCallback);異步發送
- (12)void asyncSend(String destination, Message\message, SendCallback sendCallback, long timeout);異步發送
- (13)void asyncSend(String destination, Object payload, SendCallback sendCallback);異步發送
- (14)void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);異步發送
- (15)\
void asyncSend(String destination, Collection\ messages, SendCallback sendCallback);異步發送 - (16)\
void asyncSend(String destination, Collection\ messages, SendCallback sendCallback, long timeout);異步發送 - (17)void convertAndSend(Object payload) throws MessagingException;同步發送
- (18)void convertAndSend(D destination, Object payload) throws MessagingException;同步發送
- (19)void convertAndSend(D destination, Object payload, @Nullable Map
headers) throws MessagingException;同步發送 - (20)void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發送
- (21)void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發送
- (22)void convertAndSend(D destination, Object payload, @Nullable Map
headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發送
- 二、使用DefaultMQProducer發送消息
- 2.1 DefaultMQProducer的創建
- (1)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook);
- (2)DefaultMQProducer(String producerGroup, RPCHook rpcHook);
- (3)DefaultMQProducer(String namespace, String producerGroup);
- (4)DefaultMQProducer(String producerGroup);
- (5)DefaultMQProducer(RPCHook rpcHook);
- (6)DefaultMQProducer();
- (7)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, String customizedTraceTopic);
- (8)DefaultMQProducer(String producerGroup, boolean enableMsgTrace, String customizedTraceTopic);
- (9)DefaultMQProducer(String producerGroup, boolean enableMsgTrace);
- 2.2 發送普通消息
- (1)SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發送
- (2)SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發送
- (3)void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;異步發送
- (4)void send(Message msg, SendCallback sendCallback,long timeout) throws MQClientException, RemotingException, InterruptedException;異步發送
- (5)SendResult send(Collection
msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發送 - (6)SendResult send(Collection
msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發送 - (7)void send(Collection
msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發送 - (8)void send(Collection
msgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發送 - (9)void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;one-way模式,異步發送
- 總結與展望
RocketMQ發送消息主要分3大模式:
同步發送sysnc
、異步發送async
、直接發送one-way
。成都創新互聯專注于東寶企業網站建設,響應式網站開發,購物商城網站建設。東寶網站建設公司,為東寶等地區提供建站服務。全流程按需制作,專業設計,全程項目跟蹤,成都創新互聯專業和態度為您提供的服務- 同步發送模式只有在消息完全發送完成之后才返回結果,此方式存在需要同步等待發送結果的時間代價。這種方式具有內部重試機制,即在主動聲明本次消息發送失敗之前,內部實現將重試一定次數,默認為2次。 發送的結果存在同一個消息可能被多次發送給給broker,這里需要應用的開發者自己在消費端處理冪等性問題。
- 異步發送模式在消息發送后立刻返回,當消息完全完成發送后,會調用回調函數sendCallback來告知發送者本次發送是成功或者失敗。異步模式通常用于響應時間敏感業務場景,即承受不了同步發送消息時等待返回的耗時代價。同同步發送一樣,異步模式也在內部實現了重試機制,默認次數為2次。發送的結果同樣存在同一個消息可能被多次發送給給broker,需要應用的開發者自己在消費端處理冪等性問題。
- 采用one-way發送模式發送消息的時候,發送端發送完消息后會立即返回,不會等待來自broker的ack來告知本次消息發送是否完全完成發送。這種模式吞吐量很大,但是存在消息丟失的風險,所以其適用于不重要的消息發送,比如日志收集。one-way模式本質上是沒有sendCallback的異步發送方式。
每種發送模式都有很多發送消息的方法,接下來對每個發送方法進行講解。
使用RocketMQTemplate必須要在配置文件中配置RocketMQ的屬性,Springboot在加載時才會創建RocketMQTemplate的Bean。配置文件示例如下:
rocketmq:
name-server: nameServer的集群IP
compress-message-body-threshold: 4096
consumer:
access-key: username
secret-key: password
max-message-size: 536870912
producer:
access-key: username
secret-key: password
group: producerGroup
retry-next-server: true
retry-times-when-send-async-failed: 2
retry-times-when-send-failed: 2
send-message-timeout: 3000
(1)void send(Messagemessage) throws MessagingException;同步發送import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類型
MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發送帶key的消息,請求頭的鍵必須寫成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.send(sendMessage);
}
}
RocketMQTemplate.send(Messagemessage) 方法只有一個Message類型的參數,沒有設置topic,這個消息會發送到RocketMQ的默認topic
,這個默認topic是在安裝RocketMQ Client的時候配置的,如果沒有這個topic會拋出"No 'defaultDestination' configured"
異常。這個方法幾乎不會被使用,我們發送消息一般都是要發送到我們想去的一個topic。此方法會將消息同步發送至topic,此方法沒有返回值,我們無法獲取SendResult。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類型
MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發送帶key的消息,請求頭的鍵必須寫成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
rocketMQTemplate.send("topicA:tagA", sendMessage);
}
}
RocketMQTemplate.send(D destination, Messagemessage) 方法有兩個參數,第一個參數就是topic,第二個參數是要發送的消息。這個方法是一個委托方法,其實最終調用的是RocketMQTemplate.syncSend(String destination, Messagemessage)方法,也就是說destination雖然是個泛型,但是我們應該傳入一個字符串類型的topic,此方法會將消息同步發送至topic
。此方法沒有返回值,我們無法獲取SendResult。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類型
MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發送帶key的消息,請求頭的鍵必須寫成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Messagemessage) 方法有兩個參數,第一個參數就是topic,第二個參數是要發送的消息。此方法的返回值為SendResult,我們可以通過這個類來確定消息是否發送成功,獲取消息的MessageId等
。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類型
MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發送帶key的消息,請求頭的鍵必須寫成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", sendMessage, 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Messagemessage, long timeout) 方法有三個參數,第一個參數就是topic,第二個參數是要發送的消息,第三個參數是超時時間
。其實方法(3)的底層也是調用此方法,只不過由于我們沒有設置timeout,系統會使用默認的timeout,默認值為3000毫秒。注意:超時時間設置的過小會導致消息發送失敗。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Collection消息的集合
。此方法會將消息批量發送到topicA下的tagA下。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
SendResult sendResult =rocketMQTemplate.syncSend("topicA:tagA", messages, 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Collection消息的集合
,第三個參數是超時時間。此方法會將消息批量發送到topicA下的tagA下。其實方法(5)的底層也是調用此方法,只不過由于我們沒有設置timeout系統會使用默認的timeout,默認值為3000毫秒。注意:超時時間設置的過小會導致消息發送失敗。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "這里設置消息體");
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有兩個參數,第一個參數就是topic,第二個參數是要發送的消息的消息體。此方法不需要我們自己創建Message對象了,底層會幫我們創建。但是缺點就是不能設置消息的屬性和key
。
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
SendResult sendResult = rocketMQTemplate.syncSend("topicA:tagA", "這里設置消息體", 3000L);
System.out.println(sendResult);
}
}
RocketMQTemplate.syncSend(String destination, Object payload, long timeout) 方法有三個參數,第一個參數就是topic,第二個參數是要發送的消息的消息體
,第三個參數是超時時間。此方法就是不需要我們自己創建Message對象了,底層會幫我們創建。但是缺點就是不能設置消息的屬性和key
。其實方法(7)的底層也是調用此方法,只不過由于我們沒有設置timeout,系統會使用默認的timeout,默認值為3000毫秒。注意:超時時間設置的過小會導致消息發送失敗。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里的Message是org.springframework.messaging.Message類型
MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
// 想發送帶key的消息,請求頭的鍵必須寫成KEYS
.setHeader("KEYS", "消息的key的值")
.build();
// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
rocketMQTemplate.sendOneWay("topicA:tagA", "這里設置消息體");
}
}
RocketMQTemplate.sendOneWay(String destination, Messagemessage) 方法有兩個參數,第一個參數就是topic,第二個參數是要發送的消息。此方法可以異步發送消息,具有很高的發送效率,但是沒有返回值,我們無法獲取SendResult。
(10)void sendOneWay(String destination, Object payload);one-way模式,異步發送import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 這里指定將消息發送到topicA的tagA下,也可以不指定tagA只寫topicA
rocketMQTemplate.sendOneWay("topicA:tagA", "這里設置消息體");
}
}
RocketMQTemplate.sendOneWay(String destination, Object payload) 方法有兩個參數,第一個參數就是topic,第二個參數是要發送的消息的消息體
。此方法不需要我們自己創建Message對象了,底層會幫我們創建。但是缺點就是不能設置消息的屬性和key。此方法可以異步發送消息,具有很高的發送效率,但是沒有返回值,我們無法獲取SendResult。
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
RocketMQTemplate.asyncSend(String destination, Messagemessage, SendCallback sendCallback) 方法有三個參數,第一個參數就是topic,第二個參數是要發送的消息,第三個參數是異步消息的回調對象。此方法允許我們設置回調函數,知道異步消息是否發送成功以此來做相應的事情。方法(9)和(10)底層也是調用了此方法,只不過把SendCallback對象設為了null。
(12)void asyncSend(String destination, Messagemessage, SendCallback sendCallback, long timeout);異步發送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {MessagesendMessage = MessageBuilder.withPayload("這里設置消息體")
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
rocketMQTemplate.asyncSend("topicA:tagA", sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
RocketMQTemplate.asyncSend(String destination, Messagemessage, SendCallback sendCallback, long timeout) 方法有四個參數,第一個參數就是topic,第二個參數是要發送的消息,第三個參數是異步消息的回調對象,第四個參數是超時時間。此方法允許我們設置回調函數,知道異步消息是否發送成功以此來做相應的事情。方法(11)也是調用了此方法,只不過由于我們沒有設置timeout,系統會使用默認的timeout,默認值為3000毫秒。注意:超時時間設置的過小會導致消息發送失敗。
(13)void asyncSend(String destination, Object payload, SendCallback sendCallback);異步發送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "這里設置消息體", new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
同方法(11)一樣,只不過第二個參數由Message類型換成了Object類型,可以直接傳入要發送的消息體,不用我們自己創建Message對象,缺點就是不能設置消息的屬性和key。
(14)void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout);異步發送import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {rocketMQTemplate.asyncSend("topicA:tagA", "這里設置消息體", new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
方法(13)的底層也是調用此方法,只不過由于我們沒有設置timeout,系統會使用默認的timeout,默認值為3000毫秒。注意:超時時間設置的過小會導致消息發送失敗。
(15)import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
}
}
同方法(11)一樣,只不過第二個參數由Message類型換成了Collection
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.Collection;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {Collection>messages = new ArrayList<>();
for (int i=0; i<3; i++) {MessagesendMessage = MessageBuilder.withPayload("這里設置消息體" + i)
.setHeader("消息的屬性的key", "消息的屬性的值")
.setHeader("KEYS", "消息的key的值")
.build();
messages.add(sendMessage);
}
rocketMQTemplate.asyncSend("topicA:tagA", messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
}
}
方法(15)的底層也是調用此方法,只不過由于我們沒有設置timeout,系統會使用默認的timeout,默認值為3000毫秒。注意:超時時間設置的過小會導致消息發送失敗。
(17)void convertAndSend(Object payload) throws MessagingException;同步發送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實體類
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend(exampleEntity);
}
}
同方法(1)一樣,此方法也是將消息發送到默認的topic。payload可以是一個實體類、集合等也可以是字符串。如果payload是實體類、集合等,底層會將實體類轉化成json對象,例如上述代碼發送消息的結果就是{"name":"Tom"}
,如果傳入的是集合對象這會轉換從jsonArray。如果payload是字符串,則發送消息的結果就是原字符串。
import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實體類
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity);
}
}
在方法(17)的基礎上可以指定topic將消息發送到指定的topic。
(19)void convertAndSend(D destination, Object payload, @Nullable Mapimport com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 消息的屬性
Mapmap = new HashMap<>();
map.put("消息的屬性的鍵", "消息的屬性的值");
map.put("KEYS", "消息的key");
// 實體類
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map);
}
}
在方法(18)的基礎上增加了第三個Map類型的參數,我們可以使用這個參數來設置消息的屬性和key。
(20)void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實體類
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend(exampleEntity, new MessagePostProcessor() {@Override
public MessagepostProcessMessage(Messagemessage) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(17)的基礎上增加了MessagePostProcessor對象,MessagePostProcessor顧名思義就是消息的后處理。我們傳入的參數中,destination就是消息要去往的topic(這里沒有destination則發送默認的topic),payload就是消息的消息體,headers就是消息的屬性(這里沒有headers則無法設置屬性),RocketMQ底層會根據payload和headers生成Message對象,MessagePostProcessor就是對這個生成的Message對象做一些事情,最后再發往destination。
(21)void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException;同步發送import com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 實體類
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, new MessagePostProcessor() {@Override
public MessagepostProcessMessage(Messagemessage) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(20)的基礎上可以指定topic將消息發送到指定的topic。
(22)void convertAndSend(D destination, Object payload, @Nullable Mapimport com.sgm.esb.gateway.model.ExampleEntity;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.core.MessagePostProcessor;
import java.util.HashMap;
import java.util.Map;
public class CommonMessageProducer {@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendMessages() {// 消息的屬性
Mapmap = new HashMap<>();
map.put("消息的屬性的鍵", "消息的屬性的值");
map.put("KEYS", "消息的key");
// 實體類
ExampleEntity exampleEntity = new ExampleEntity();
exampleEntity.setName("Tom");
rocketMQTemplate.convertAndSend("topicA:tagA", exampleEntity, map, new MessagePostProcessor() {@Override
public MessagepostProcessMessage(Messagemessage) {MessageHeaders headers = message.getHeaders();
String keys = headers.get("KEYS", String.class);
System.out.println(keys);
return message;
}
});
}
}
在方法(21)的基礎上增加了Map類型的參數,我們可以使用這個參數來設置消息的屬性和key。
至此,使用RocketMQTemplate發送普通消息的方法就全部講解完了,其實還有RocketMQTemplate.sendAndReceive()方法也可以發送普通消息,但是這個要配合消費者一起使用,我會另寫一篇文章講解這個方法。
二、使用DefaultMQProducer發送消息有些場景下我們不想使用springboot自動創建的RocketMQTemplate的Bean來發送消息,而是想自己創建生產者以使用不同的nameServer來發送消息到不同的集群或者想在main函數中創建生產者,可以使用DefaultMQProducer來發送消息。其實RocketMQTemplate底層也是使用DefaultMQProducer來發送消息的,只不過進行了包裝讓用戶使用起來更方便。
2.1 DefaultMQProducer的創建DefaultMQProducer有多個構造函數,我們可以根據不同的場景使用不同的構造函數創建對象。
(1)DefaultMQProducer(String namespace, String producerGroup, RPCHook rpcHook);DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產者組", new AclClientRPCHook(new SessionCredentials("用戶名","密碼")));
此構造函數的第一個參數是命名空間,命名空間需要在服務端提前創建。第二個參數是生產者組,一個生產者組可以包含多個生產者,生產者組不需要提前創建,在創建DefaultMQProducer對象的時候賦值一個生產者組就可以。第三個參數是RPCHook對象用于權限認證,相當于你登陸一個網站需要輸入用戶名和密碼。
(2)DefaultMQProducer(String producerGroup, RPCHook rpcHook);命名空間是RocketMQ中的一個資源管理概念。用戶不同的業務場景一般都可以通過命名空間做隔離,并且針對不同的業務場景設置專門的配置,例如消息保留時間。不同命名空間之間的 Topic 相互隔離,訂閱相互隔離,角色權限相互隔離。
DefaultMQProducer producer = new DefaultMQProducer("生產者組", new AclClientRPCHook(new SessionCredentials("用戶名","密碼")));
此構造函數底層還是調用了構造方法(1)
,只不過將namespace設為了null,在沒有命名空間的時候可以使用此構造函數。
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產者組");
此構造函數底層還是調用了構造方法(1)
,只不過將RPCHook 設為了null,在不需要acl認證的時候可以使用此構造函數。
DefaultMQProducer producer = new DefaultMQProducer("生產者組");
此構造函數底層還是調用了構造方法(1)
,只不過將namespace和RPCHook設為了null,在沒有命名空間和不需要acl認證的時候可以使用此構造函數。
DefaultMQProducer producer = new DefaultMQProducer(new AclClientRPCHook(new SessionCredentials("用戶名","密碼")));
此構造函數底層還是調用了構造方法(1)
,只不過將namespace設為了null,由于prodcuerGroup不能為null,所以RocketMQ會使用默認的生產者組:DEFAULT_PRODUCER
。
DefaultMQProducer producer = new DefaultMQProducer();
此構造函數底層還是調用了構造方法(1)
,只不過將namespace和RPCHook設為了null,由于prodcuerGroup不能為null,所以RocketMQ會使用默認的生產者組:DEFAULT_PRODUCER
。
DefaultMQProducer producer = new DefaultMQProducer("命名空間", "生產者組", new AclClientRPCHook(new SessionCredentials("用戶名","密碼")), true, "traceTopic");
此構造函數的第一個參數是命名空間,命名空間需要在服務端提前創建。第二個參數是生產者組,一個生產者組可以包含多個生產者,生產者組不需要提前創建,在創建DefaultMQProducer對象的時候賦值一個生產者組就可以。第三個參數是RPCHook對象用于權限認證,相當于你登陸一個網站需要輸入用戶名和密碼。第四個參數是布爾類型,表示是否開啟消息追蹤。第五個參數是消息跟蹤的topic的名稱,這個topic專門用來做消息追蹤的,一般不會用這個topic生產和消費業務數據。開啟追蹤后,追蹤topic內會記錄生產者的一些信息,比如生產者IP、消息的MessageID等
。例如下面的代碼就是開啟追蹤并設置trace-topic
為追蹤topic,然后將消息發送到topicA中,于是topicA里面是業務數據,trace-topic里面是用于消息追蹤的追蹤數據。也就是發送一次消息會發送一份業務數據和一份追蹤數據到業務topic和追蹤topic
,
package com.sgm.esb.gateway.service;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "這里設置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage);
producer.shutdown();
}
}
如下是追蹤topic中的消息內容:
DefaultMQProducer producer = new DefaultMQProducer("生產者組", true, "traceTopic");
此構造函數底層還是調用了構造方法(7)
,只不過將namespace和RPCHook設為了null,使用于沒有命名空間和不需要acl認證的時候。
DefaultMQProducer producer = new DefaultMQProducer("生產者組", true);
此構造函數底層還是調用了構造方法(7)
,只不過將namespace、RPCHook和customizedTraceTopic設為了null。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
// 這里的Message類型是org.apache.rocketmq.common.message.Message
Message sendMessage = new Message("topicA", "tagA", "這里設置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
SendResult sendResult = producer.send(sendMessage);
System.out.println(sendResult);
producer.shutdown();
}
}
我們根據需要使用上面提到的某一種構造方法創建生產者后必須調用setNamesrvAddr()方法來設置NameServer(要不然消息發到哪呢),然后調用start()開始發送消息,調用send()方法將創建的Message對象發送到某個topic下,最后調用shutdown()來釋放資源。DefaultMQProducer.send(Message msg)方法只有一個Message類型的參數,這個Message的類型為org.apache.rocketmq.common.message.Message
,不同于之前RocketMQTemplate中的org.springframework.messaging.Message
。我們可以在此Message中設置topic和tag而不是在send()方法中設置topic和tag,通過putUserProperty()方法設置消息的屬性,通過setKeys()方法設置消息的key等。在RocketMQTemplate中我們使用org.springframework.messaging.Message來創建消息,其實RocketMQ底層最終會將org.springframework.messaging.Message轉化為org.apache.rocketmq.common.message.Message類型進行消息的發送
。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
// 這里的Message類型是org.apache.rocketmq.common.message.Message
Message sendMessage = new Message("topicA", "tagA", "這里設置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
SendResult sendResult = producer.send(sendMessage, 3000L);
System.out.println(sendResult);
producer.shutdown();
}
}
在方法(1)的基礎上增加timeout參數來設置超時時間。
(3)void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;異步發送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "這里設置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
第二個參數設置異步回調,同RocketMQTemplate異步發送消息一樣,不再贅述。
(4)void send(Message msg, SendCallback sendCallback,long timeout) throws MQClientException, RemotingException, InterruptedException;異步發送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", "這里設置消息體".getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.send(sendMessage, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
在方法(3)的基礎上增加timeout參數來設置超時時間。
(5)SendResult send(Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
}
批量發送消息到某一個topic。
(6)SendResult send(Collection msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;同步發送注意:這里List中Message的topic都必須是同一個,否則會報錯。
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
SendResult sendResult = producer.send(messages);
System.out.println(sendResult);
producer.shutdown();
}
}
批量發送消息到某一個topic,在方法(5)的基礎上增加timeout參數來設置超時時間。
(7)void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
producer.send(messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
});
producer.shutdown();
}
}
批量異步發送消息到某一個topic。
(8)void send(Collection msgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;異步發送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Listmessages = new ArrayList<>();
for (int i=0; i<3; i++) {Message sendMessage = new Message("topicA", "tagA", ("這里設置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
messages.add(sendMessage);
}
producer.send(messages, new SendCallback() {@Override
public void onSuccess(SendResult sendResult) {System.out.println("Send success");
}
@Override
public void onException(Throwable throwable) {System.out.println("Send fail");
}
}, 3000L);
producer.shutdown();
}
}
批量異步發送消息到某一個topic,在方法(7)的基礎上增加timeout參數來設置超時時間。
(9)void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;one-way模式,異步發送import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.nio.charset.StandardCharsets;
public class DefaultMQProducerTest {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("生產者組",
new AclClientRPCHook(new SessionCredentials("用戶名","密碼")),
true, "trace-topic");
producer.setNamesrvAddr("nameServer集群IP");
producer.start();
Message sendMessage = new Message("topicA", "tagA", ("這里設置消息體" + i).getBytes(StandardCharsets.UTF_8));
sendMessage.putUserProperty("消息的屬性的鍵", "消息的屬性的值");
sendMessage.setKeys("消息的key");
producer.sendOneway(sendMessage);
producer.shutdown();
}
}
使用one-way模式異步發送消息。
總結與展望至此,使用RocketMQTemplate和DefaultMQProducer發送普通消息的全部方法就講解完了,本文的主要目的是幫助讀者快速學習使用RocketMQ發送普通消息,本文總結了所有的發送普通消息的方法以滿足實際工作中不同的業務場景。RocketMQTemplate和DefaultMQProducer中還有一些發送消息的方法是用來發送順序、定時/延時消息的(DefaultMQProducer不能用來發送事務消息),之后我會繼續寫文章來講解這些方法以及所有的消費消息的方法。
這是我在寫的第一篇原創文章,之后我還準備寫一些使用RocketMQ時踩過的坑,最后可能會寫一些RocketMQ更底層的東西,我還準備寫一些webflux、springCloudGateway這些我比較感興趣的東西,希望看到這篇文章的人能和我一起成長。
你是否還在尋找穩定的海外服務器提供商?創新互聯www.cdcxhl.cn海外機房具備T級流量清洗系統配攻擊溯源,準確流量調度確保服務器高可用性,企業級服務器適合批量采購,新人活動首月15元起,快前往官網查看詳情吧
本文名稱:RocketMQ發送普通消息的所有方法以及代碼示例-創新互聯
瀏覽路徑:http://www.xueling.net.cn/article/dheosd.html