Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ public void testSimpleConsumerToDLQ() throws Exception {
public void testConsumeOrderly() throws Exception {
super.testConsumeOrderly();
}

@Test
public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception {
super.testSimpleConsumerSendAndRecvPriorityMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,56 @@ public void testConsumeOrderly() throws Exception {
}
}

public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception {
String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.PRIORITY);
String group = MQRandomUtils.getRandomConsumerGroup();

// init consumer offset
this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get();
receiveMessage(blockingStub, topic, group, 1);

this.sendClientSettings(stub, buildProducerClientSettings(topic)).get();
for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
String messageId = createUniqID();
SendMessageResponse sendResponse = blockingStub.sendMessage(SendMessageRequest.newBuilder()
.addMessages(Message.newBuilder()
.setTopic(Resource.newBuilder()
.setName(topic)
.build())
.setSystemProperties(SystemProperties.newBuilder()
.setMessageId(messageId)
.setQueueId(0)
.setMessageType(MessageType.PRIORITY)
.setBodyEncoding(Encoding.GZIP)
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234"))
.setPriority(i)
.build())
.setBody(ByteString.copyFromUtf8("hello"))
.build())
.build());
assertSendMessage(sendResponse, messageId);
}

this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get();
List<Message> recvList = new ArrayList<>();
try {
await().atMost(java.time.Duration.ofSeconds(10)).until(() -> {
List<Message> messageList = getMessageFromReceiveMessageResponse(receiveMessage(blockingStub, topic, group));
if (messageList.isEmpty()) {
return false;
}
recvList.addAll(messageList);
return recvList.size() == BaseConf.QUEUE_NUMBERS;
});
} catch (Exception e) {
}
for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
// default priority order: 0 as lowest priority
assertThat(recvList.get(i).getSystemProperties().getPriority()).isEqualTo(BaseConf.QUEUE_NUMBERS - i - 1);
}
}

public List<ReceiveMessageResponse> receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub,
String topic, String group) {
return receiveMessage(stub, topic, group, 15);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ public void testSimpleConsumerToDLQ() throws Exception {
public void testConsumeOrderly() throws Exception {
super.testConsumeOrderly();
}

@Test
public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception {
super.testSimpleConsumerSendAndRecvPriorityMessage();
}
}
Loading