Skip to content

Commit 0792474

Browse files
authored
[ISSUE #9928] Add Priority IT for GRPC protocol
1 parent c1ccc3c commit 0792474

File tree

3 files changed

+60
-0
lines changed

3 files changed

+60
-0
lines changed

test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,9 @@ public void testSimpleConsumerToDLQ() throws Exception {
118118
public void testConsumeOrderly() throws Exception {
119119
super.testConsumeOrderly();
120120
}
121+
122+
@Test
123+
public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception {
124+
super.testSimpleConsumerSendAndRecvPriorityMessage();
125+
}
121126
}

test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,56 @@ public void testConsumeOrderly() throws Exception {
629629
}
630630
}
631631

632+
public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception {
633+
String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.PRIORITY);
634+
String group = MQRandomUtils.getRandomConsumerGroup();
635+
636+
// init consumer offset
637+
this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get();
638+
receiveMessage(blockingStub, topic, group, 1);
639+
640+
this.sendClientSettings(stub, buildProducerClientSettings(topic)).get();
641+
for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
642+
String messageId = createUniqID();
643+
SendMessageResponse sendResponse = blockingStub.sendMessage(SendMessageRequest.newBuilder()
644+
.addMessages(Message.newBuilder()
645+
.setTopic(Resource.newBuilder()
646+
.setName(topic)
647+
.build())
648+
.setSystemProperties(SystemProperties.newBuilder()
649+
.setMessageId(messageId)
650+
.setQueueId(0)
651+
.setMessageType(MessageType.PRIORITY)
652+
.setBodyEncoding(Encoding.GZIP)
653+
.setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
654+
.setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234"))
655+
.setPriority(i)
656+
.build())
657+
.setBody(ByteString.copyFromUtf8("hello"))
658+
.build())
659+
.build());
660+
assertSendMessage(sendResponse, messageId);
661+
}
662+
663+
this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get();
664+
List<Message> recvList = new ArrayList<>();
665+
try {
666+
await().atMost(java.time.Duration.ofSeconds(10)).until(() -> {
667+
List<Message> messageList = getMessageFromReceiveMessageResponse(receiveMessage(blockingStub, topic, group));
668+
if (messageList.isEmpty()) {
669+
return false;
670+
}
671+
recvList.addAll(messageList);
672+
return recvList.size() == BaseConf.QUEUE_NUMBERS;
673+
});
674+
} catch (Exception e) {
675+
}
676+
for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) {
677+
// default priority order: 0 as lowest priority
678+
assertThat(recvList.get(i).getSystemProperties().getPriority()).isEqualTo(BaseConf.QUEUE_NUMBERS - i - 1);
679+
}
680+
}
681+
632682
public List<ReceiveMessageResponse> receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub,
633683
String topic, String group) {
634684
return receiveMessage(stub, topic, group, 15);

test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,9 @@ public void testSimpleConsumerToDLQ() throws Exception {
106106
public void testConsumeOrderly() throws Exception {
107107
super.testConsumeOrderly();
108108
}
109+
110+
@Test
111+
public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception {
112+
super.testSimpleConsumerSendAndRecvPriorityMessage();
113+
}
109114
}

0 commit comments

Comments
 (0)