From 5bd6d6b17c358cbe41697f67111dca598f369255 Mon Sep 17 00:00:00 2001 From: imzs Date: Thu, 18 Dec 2025 15:34:33 +0800 Subject: [PATCH] #9928 add Priority IT for GRPC protocol. --- .../rocketmq/test/grpc/v2/ClusterGrpcIT.java | 5 ++ .../rocketmq/test/grpc/v2/GrpcBaseIT.java | 50 +++++++++++++++++++ .../rocketmq/test/grpc/v2/LocalGrpcIT.java | 5 ++ 3 files changed, 60 insertions(+) diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java index 7c9625ecd55..6eb7a773762 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java @@ -118,4 +118,9 @@ public void testSimpleConsumerToDLQ() throws Exception { public void testConsumeOrderly() throws Exception { super.testConsumeOrderly(); } + + @Test + public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception { + super.testSimpleConsumerSendAndRecvPriorityMessage(); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java index 2d186373764..724aa114f1d 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java @@ -629,6 +629,56 @@ public void testConsumeOrderly() throws Exception { } } + public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception { + String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.PRIORITY); + String group = MQRandomUtils.getRandomConsumerGroup(); + + // init consumer offset + this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get(); + receiveMessage(blockingStub, topic, group, 1); + + this.sendClientSettings(stub, buildProducerClientSettings(topic)).get(); + for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) { + String messageId = createUniqID(); + SendMessageResponse sendResponse = blockingStub.sendMessage(SendMessageRequest.newBuilder() + .addMessages(Message.newBuilder() + .setTopic(Resource.newBuilder() + .setName(topic) + .build()) + .setSystemProperties(SystemProperties.newBuilder() + .setMessageId(messageId) + .setQueueId(0) + .setMessageType(MessageType.PRIORITY) + .setBodyEncoding(Encoding.GZIP) + .setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis())) + .setBornHost(StringUtils.defaultString(NetworkUtil.getLocalAddress(), "127.0.0.1:1234")) + .setPriority(i) + .build()) + .setBody(ByteString.copyFromUtf8("hello")) + .build()) + .build()); + assertSendMessage(sendResponse, messageId); + } + + this.sendClientSettings(stub, buildSimpleConsumerClientSettings(group)).get(); + List recvList = new ArrayList<>(); + try { + await().atMost(java.time.Duration.ofSeconds(10)).until(() -> { + List messageList = getMessageFromReceiveMessageResponse(receiveMessage(blockingStub, topic, group)); + if (messageList.isEmpty()) { + return false; + } + recvList.addAll(messageList); + return recvList.size() == BaseConf.QUEUE_NUMBERS; + }); + } catch (Exception e) { + } + for (int i = 0; i < BaseConf.QUEUE_NUMBERS; i++) { + // default priority order: 0 as lowest priority + assertThat(recvList.get(i).getSystemProperties().getPriority()).isEqualTo(BaseConf.QUEUE_NUMBERS - i - 1); + } + } + public List receiveMessage(MessagingServiceGrpc.MessagingServiceBlockingStub stub, String topic, String group) { return receiveMessage(stub, topic, group, 15); diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java index 43471c7b2a8..a53f3afc1ab 100644 --- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java @@ -106,4 +106,9 @@ public void testSimpleConsumerToDLQ() throws Exception { public void testConsumeOrderly() throws Exception { super.testConsumeOrderly(); } + + @Test + public void testSimpleConsumerSendAndRecvPriorityMessage() throws Exception { + super.testSimpleConsumerSendAndRecvPriorityMessage(); + } }