diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java index 80e4e5115541a..6d468908ed71a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java @@ -87,6 +87,9 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("CreateSubscriptionProcedure: executeFromValidate"); + alterConsumerGroupProcedure = null; + createPipeProcedures = new ArrayList<>(); + subscriptionInfo.get().validateBeforeSubscribe(subscribeReq); // Construct AlterConsumerGroupProcedure @@ -160,8 +163,7 @@ protected void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv env) response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); response.setMessage(e.getMessage()); } - if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && response.getSubStatusSize() > 0) { + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new SubscriptionException( String.format( "Failed to create subscription with request %s on config nodes, because %s", diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java index 6741a6c1e2a84..6f668f29c5dac 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java @@ -85,6 +85,9 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) throws SubscriptionException { LOGGER.info("DropSubscriptionProcedure: executeFromValidate"); + alterConsumerGroupProcedure = null; + dropPipeProcedures = new ArrayList<>(); + subscriptionInfo.get().validateBeforeUnsubscribe(unsubscribeReq); // Construct AlterConsumerGroupProcedure @@ -141,8 +144,7 @@ protected void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv env) response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); response.setMessage(e.getMessage()); } - if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && response.getSubStatusSize() > 0) { + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new SubscriptionException( String.format( "Failed to drop subscription with request %s on config nodes, because %s", diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java index 93d9941fbf333..e2a4d0615d886 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java @@ -19,18 +19,36 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.subscription; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta; +import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.PermissionManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; +import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager; +import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator; +import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo; +import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; +import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; import org.apache.tsfile.utils.PublicBAOS; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.DataOutputStream; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -102,4 +121,119 @@ public void serializeDeserializeTest() { fail(); } } + + @Test + public void executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws Exception { + final CreateSubscriptionProcedure proc = + new CreateSubscriptionProcedure( + new TSubscribeReq( + "old_consumer", "test_consumer_group", Collections.singleton("test_topic"))); + proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class)); + + final CreatePipeProcedureV2 createPipeProcedure = Mockito.mock(CreatePipeProcedureV2.class); + Mockito.when(createPipeProcedure.constructPlan()) + .thenReturn(Mockito.mock(CreatePipePlanV2.class)); + proc.setCreatePipeProcedures(Collections.singletonList(createPipeProcedure)); + + try { + proc.executeFromOperateOnConfigNodes( + mockConsensusFailureEnv( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("consensus write failed"))); + fail(); + } catch (SubscriptionException e) { + Assert.assertTrue(e.getMessage().contains("Failed to create subscription")); + } + } + + @Test + public void executeFromValidateShouldResetCreatePipeProceduresOnRetry() throws Exception { + final Map consumerAttributes = new HashMap<>(); + consumerAttributes.put("username", "user"); + consumerAttributes.put("password", "password"); + + final ConsumerGroupMeta consumerGroupMeta = + new ConsumerGroupMeta( + "test_consumer_group", 1, new ConsumerMeta("old_consumer", 1, consumerAttributes)); + final TopicMeta topicMeta = new TopicMeta("test_topic", 1, Collections.emptyMap()); + + final SubscriptionInfo subscriptionInfo = Mockito.mock(SubscriptionInfo.class); + Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group")) + .thenReturn(consumerGroupMeta); + Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group")) + .thenAnswer(invocation -> consumerGroupMeta.deepCopy()); + Mockito.when( + subscriptionInfo.isTopicSubscribedByConsumerGroup("test_topic", "test_consumer_group")) + .thenReturn(false); + Mockito.when(subscriptionInfo.deepCopyTopicMeta("test_topic")).thenReturn(topicMeta); + + final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class); + Mockito.when(pipeTaskInfo.checkBeforeCreatePipe(Mockito.any(TCreatePipeReq.class))) + .thenReturn(true); + + final CreateSubscriptionProcedure proc = + new CreateSubscriptionProcedure( + new TSubscribeReq( + "old_consumer", "test_consumer_group", Collections.singleton("test_topic"))); + setField(proc, "subscriptionInfo", new AtomicReference<>(subscriptionInfo)); + setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo)); + + final ConfigNodeProcedureEnv env = mockCreateSubscriptionValidationEnv(); + proc.executeFromValidate(env); + Assert.assertEquals(1, proc.getCreatePipeProcedures().size()); + + proc.executeFromValidate(env); + Assert.assertEquals(1, proc.getCreatePipeProcedures().size()); + } + + private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus response) + throws Exception { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response); + + return env; + } + + private static ConfigNodeProcedureEnv mockCreateSubscriptionValidationEnv() { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final PermissionManager permissionManager = Mockito.mock(PermissionManager.class); + final PipeManager pipeManager = Mockito.mock(PipeManager.class); + final PipePluginCoordinator pipePluginCoordinator = Mockito.mock(PipePluginCoordinator.class); + final PipePluginInfo pipePluginInfo = Mockito.mock(PipePluginInfo.class); + final LoadManager loadManager = Mockito.mock(LoadManager.class); + + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager); + Mockito.when(configManager.getPipeManager()).thenReturn(pipeManager); + Mockito.when(pipeManager.getPipePluginCoordinator()).thenReturn(pipePluginCoordinator); + Mockito.when(pipePluginCoordinator.getPipePluginInfo()).thenReturn(pipePluginInfo); + Mockito.when(configManager.getLoadManager()).thenReturn(loadManager); + Mockito.when(loadManager.getRegionLeaderMap()).thenReturn(Collections.emptyMap()); + Mockito.when(permissionManager.login4Pipe(Mockito.anyString(), Mockito.any())) + .thenReturn("hashedPassword"); + + return env; + } + + private static void setField(final Object target, final String fieldName, final Object value) + throws Exception { + Class clazz = target.getClass(); + while (clazz != null) { + try { + final Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + return; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java index 9ecce2a522ca4..910648bbe5132 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java @@ -19,24 +19,37 @@ package org.apache.iotdb.confignode.procedure.impl.subscription.subscription; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; +import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; import org.apache.tsfile.utils.PublicBAOS; +import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.io.DataOutputStream; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -91,4 +104,90 @@ public void serializeDeserializeTest() { fail(); } } + + @Test + public void executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws Exception { + final DropSubscriptionProcedure proc = + new DropSubscriptionProcedure( + new TUnsubscribeReq( + "old_consumer", "test_consumer_group", Collections.singleton("test_topic"))); + proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class)); + + final DropPipeProcedureV2 dropPipeProcedure = Mockito.mock(DropPipeProcedureV2.class); + Mockito.when(dropPipeProcedure.getPipeName()).thenReturn("pipe_topic"); + proc.setDropPipeProcedures(Collections.singletonList(dropPipeProcedure)); + + try { + proc.executeFromOperateOnConfigNodes( + mockConsensusFailureEnv( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("consensus write failed"))); + fail(); + } catch (SubscriptionException e) { + Assert.assertTrue(e.getMessage().contains("Failed to drop subscription")); + } + } + + @Test + public void executeFromValidateShouldResetDropPipeProceduresOnRetry() throws Exception { + final Map consumerAttributes = new HashMap<>(); + consumerAttributes.put("username", "user"); + consumerAttributes.put("password", "password"); + + final ConsumerGroupMeta consumerGroupMeta = + new ConsumerGroupMeta( + "test_consumer_group", 1, new ConsumerMeta("old_consumer", 1, consumerAttributes)); + consumerGroupMeta.addSubscription("old_consumer", Collections.singleton("test_topic")); + + final SubscriptionInfo subscriptionInfo = Mockito.mock(SubscriptionInfo.class); + Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group")) + .thenReturn(consumerGroupMeta); + Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group")) + .thenAnswer(invocation -> consumerGroupMeta.deepCopy()); + + final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class); + + final DropSubscriptionProcedure proc = + new DropSubscriptionProcedure( + new TUnsubscribeReq( + "old_consumer", "test_consumer_group", Collections.singleton("test_topic"))); + setField(proc, "subscriptionInfo", new AtomicReference<>(subscriptionInfo)); + setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo)); + + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + proc.executeFromValidate(env); + Assert.assertEquals(1, proc.getDropPipeProcedures().size()); + + proc.executeFromValidate(env); + Assert.assertEquals(1, proc.getDropPipeProcedures().size()); + } + + private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus response) + throws Exception { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response); + + return env; + } + + private static void setField(final Object target, final String fieldName, final Object value) + throws Exception { + Class clazz = target.getClass(); + while (clazz != null) { + try { + final Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + return; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + throw new NoSuchFieldException(fieldName); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java index 498f3427690f5..640d154937b4b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java @@ -69,8 +69,11 @@ public ConsumerGroupMeta deepCopy() { final ConsumerGroupMeta copied = new ConsumerGroupMeta(); copied.consumerGroupId = consumerGroupId; copied.creationTime = creationTime; - copied.topicNameToSubscribedConsumerIdSet = - new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet); + copied.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>(); + topicNameToSubscribedConsumerIdSet.forEach( + (topicName, subscribedConsumerIds) -> + copied.topicNameToSubscribedConsumerIdSet.put( + topicName, new HashSet<>(subscribedConsumerIds))); copied.consumerIdToConsumerMeta = new ConcurrentHashMap<>(consumerIdToConsumerMeta); copied.topicNameToSubscriptionCreationTime = new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java index d41b3706cd1f2..9ef7191ba6683 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java @@ -69,4 +69,24 @@ public void test() throws IOException { consumerGroupMeta.getConsumerGroupId(), consumerGroupMeta2.getConsumerGroupId()); Assert.assertEquals(consumerGroupMeta.getCreationTime(), consumerGroupMeta2.getCreationTime()); } + + @Test + public void testDeepCopyShouldNotShareSubscribedConsumerSets() { + Map consumerAttributes = new HashMap<>(); + consumerAttributes.put("username", "user"); + consumerAttributes.put("password", "password"); + + ConsumerGroupMeta consumerGroupMeta = + new ConsumerGroupMeta( + "test_consumer_group", 1, new ConsumerMeta("test_consumer1", 1, consumerAttributes)); + consumerGroupMeta.addSubscription("test_consumer1", Collections.singleton("test_topic")); + + ConsumerGroupMeta copiedConsumerGroupMeta = consumerGroupMeta.deepCopy(); + copiedConsumerGroupMeta.removeSubscription( + "test_consumer1", Collections.singleton("test_topic")); + + Assert.assertTrue( + consumerGroupMeta.getConsumersSubscribingTopic("test_topic").contains("test_consumer1")); + Assert.assertTrue(copiedConsumerGroupMeta.getConsumersSubscribingTopic("test_topic").isEmpty()); + } }