Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env)
throws SubscriptionException {
LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");

alterConsumerGroupProcedure = null;
createPipeProcedures = new ArrayList<>();

subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);

// Construct AlterConsumerGroupProcedure
Expand Down Expand Up @@ -160,8 +163,7 @@ protected void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv env)
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& response.getSubStatusSize() > 0) {
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new SubscriptionException(
String.format(
"Failed to create subscription with request %s on config nodes, because %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env)
throws SubscriptionException {
LOGGER.info("DropSubscriptionProcedure: executeFromValidate");

alterConsumerGroupProcedure = null;
dropPipeProcedures = new ArrayList<>();

subscriptionInfo.get().validateBeforeUnsubscribe(unsubscribeReq);

// Construct AlterConsumerGroupProcedure
Expand Down Expand Up @@ -141,8 +144,7 @@ protected void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv env)
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& response.getSubStatusSize() > 0) {
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new SubscriptionException(
String.format(
"Failed to drop subscription with request %s on config nodes, because %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,36 @@

package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.PermissionManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;

import org.apache.tsfile.utils.PublicBAOS;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.DataOutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -39,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -102,4 +121,119 @@
fail();
}
}

@Test
public void executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws Exception {
final CreateSubscriptionProcedure proc =
new CreateSubscriptionProcedure(
new TSubscribeReq(
"old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class));

final CreatePipeProcedureV2 createPipeProcedure = Mockito.mock(CreatePipeProcedureV2.class);
Mockito.when(createPipeProcedure.constructPlan())
.thenReturn(Mockito.mock(CreatePipePlanV2.class));
proc.setCreatePipeProcedures(Collections.singletonList(createPipeProcedure));

try {

Check warning on line 138 in iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor the body of this try/catch to have only one invocation possibly throwing a runtime exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ37jNqru9_C1pCLerT6&open=AZ37jNqru9_C1pCLerT6&pullRequest=17599
proc.executeFromOperateOnConfigNodes(
mockConsensusFailureEnv(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage("consensus write failed")));
fail();
} catch (SubscriptionException e) {
Assert.assertTrue(e.getMessage().contains("Failed to create subscription"));
}
}

@Test
public void executeFromValidateShouldResetCreatePipeProceduresOnRetry() throws Exception {
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put("username", "user");
consumerAttributes.put("password", "password");

final ConsumerGroupMeta consumerGroupMeta =
new ConsumerGroupMeta(
"test_consumer_group", 1, new ConsumerMeta("old_consumer", 1, consumerAttributes));
final TopicMeta topicMeta = new TopicMeta("test_topic", 1, Collections.emptyMap());

final SubscriptionInfo subscriptionInfo = Mockito.mock(SubscriptionInfo.class);
Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group"))
.thenReturn(consumerGroupMeta);
Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group"))
.thenAnswer(invocation -> consumerGroupMeta.deepCopy());
Mockito.when(
subscriptionInfo.isTopicSubscribedByConsumerGroup("test_topic", "test_consumer_group"))
.thenReturn(false);
Mockito.when(subscriptionInfo.deepCopyTopicMeta("test_topic")).thenReturn(topicMeta);

final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class);
Mockito.when(pipeTaskInfo.checkBeforeCreatePipe(Mockito.any(TCreatePipeReq.class)))
.thenReturn(true);

final CreateSubscriptionProcedure proc =
new CreateSubscriptionProcedure(
new TSubscribeReq(
"old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
setField(proc, "subscriptionInfo", new AtomicReference<>(subscriptionInfo));
setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo));

final ConfigNodeProcedureEnv env = mockCreateSubscriptionValidationEnv();
proc.executeFromValidate(env);
Assert.assertEquals(1, proc.getCreatePipeProcedures().size());

proc.executeFromValidate(env);
Assert.assertEquals(1, proc.getCreatePipeProcedures().size());
}

private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus response)
throws Exception {
final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
final ConfigManager configManager = Mockito.mock(ConfigManager.class);
final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);

Mockito.when(env.getConfigManager()).thenReturn(configManager);
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response);

return env;
}

private static ConfigNodeProcedureEnv mockCreateSubscriptionValidationEnv() {
final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
final ConfigManager configManager = Mockito.mock(ConfigManager.class);
final PermissionManager permissionManager = Mockito.mock(PermissionManager.class);
final PipeManager pipeManager = Mockito.mock(PipeManager.class);
final PipePluginCoordinator pipePluginCoordinator = Mockito.mock(PipePluginCoordinator.class);
final PipePluginInfo pipePluginInfo = Mockito.mock(PipePluginInfo.class);
final LoadManager loadManager = Mockito.mock(LoadManager.class);

Mockito.when(env.getConfigManager()).thenReturn(configManager);
Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
Mockito.when(configManager.getPipeManager()).thenReturn(pipeManager);
Mockito.when(pipeManager.getPipePluginCoordinator()).thenReturn(pipePluginCoordinator);
Mockito.when(pipePluginCoordinator.getPipePluginInfo()).thenReturn(pipePluginInfo);
Mockito.when(configManager.getLoadManager()).thenReturn(loadManager);
Mockito.when(loadManager.getRegionLeaderMap()).thenReturn(Collections.emptyMap());
Mockito.when(permissionManager.login4Pipe(Mockito.anyString(), Mockito.any()))
.thenReturn("hashedPassword");

return env;
}

private static void setField(final Object target, final String fieldName, final Object value)
throws Exception {
Class<?> clazz = target.getClass();
while (clazz != null) {
try {
final Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
return;
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
}
throw new NoSuchFieldException(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,37 @@

package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;

import org.apache.tsfile.utils.PublicBAOS;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.DataOutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -91,4 +104,90 @@
fail();
}
}

@Test
public void executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws Exception {
final DropSubscriptionProcedure proc =
new DropSubscriptionProcedure(
new TUnsubscribeReq(
"old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class));

final DropPipeProcedureV2 dropPipeProcedure = Mockito.mock(DropPipeProcedureV2.class);
Mockito.when(dropPipeProcedure.getPipeName()).thenReturn("pipe_topic");
proc.setDropPipeProcedures(Collections.singletonList(dropPipeProcedure));

try {

Check warning on line 120 in iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor the body of this try/catch to have only one invocation possibly throwing a runtime exception.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ37jNsfu9_C1pCLerT7&open=AZ37jNsfu9_C1pCLerT7&pullRequest=17599
proc.executeFromOperateOnConfigNodes(
mockConsensusFailureEnv(
new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage("consensus write failed")));
fail();
} catch (SubscriptionException e) {
Assert.assertTrue(e.getMessage().contains("Failed to drop subscription"));
}
}

@Test
public void executeFromValidateShouldResetDropPipeProceduresOnRetry() throws Exception {
final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put("username", "user");
consumerAttributes.put("password", "password");

final ConsumerGroupMeta consumerGroupMeta =
new ConsumerGroupMeta(
"test_consumer_group", 1, new ConsumerMeta("old_consumer", 1, consumerAttributes));
consumerGroupMeta.addSubscription("old_consumer", Collections.singleton("test_topic"));

final SubscriptionInfo subscriptionInfo = Mockito.mock(SubscriptionInfo.class);
Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group"))
.thenReturn(consumerGroupMeta);
Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group"))
.thenAnswer(invocation -> consumerGroupMeta.deepCopy());

final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class);

final DropSubscriptionProcedure proc =
new DropSubscriptionProcedure(
new TUnsubscribeReq(
"old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
setField(proc, "subscriptionInfo", new AtomicReference<>(subscriptionInfo));
setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo));

final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
proc.executeFromValidate(env);
Assert.assertEquals(1, proc.getDropPipeProcedures().size());

proc.executeFromValidate(env);
Assert.assertEquals(1, proc.getDropPipeProcedures().size());
}

private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus response)
throws Exception {
final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
final ConfigManager configManager = Mockito.mock(ConfigManager.class);
final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);

Mockito.when(env.getConfigManager()).thenReturn(configManager);
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response);

return env;
}

private static void setField(final Object target, final String fieldName, final Object value)
throws Exception {
Class<?> clazz = target.getClass();
while (clazz != null) {
try {
final Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
return;
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
}
throw new NoSuchFieldException(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ public ConsumerGroupMeta deepCopy() {
final ConsumerGroupMeta copied = new ConsumerGroupMeta();
copied.consumerGroupId = consumerGroupId;
copied.creationTime = creationTime;
copied.topicNameToSubscribedConsumerIdSet =
new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
copied.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
topicNameToSubscribedConsumerIdSet.forEach(
(topicName, subscribedConsumerIds) ->
copied.topicNameToSubscribedConsumerIdSet.put(
topicName, new HashSet<>(subscribedConsumerIds)));
copied.consumerIdToConsumerMeta = new ConcurrentHashMap<>(consumerIdToConsumerMeta);
copied.topicNameToSubscriptionCreationTime =
new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,24 @@ public void test() throws IOException {
consumerGroupMeta.getConsumerGroupId(), consumerGroupMeta2.getConsumerGroupId());
Assert.assertEquals(consumerGroupMeta.getCreationTime(), consumerGroupMeta2.getCreationTime());
}

@Test
public void testDeepCopyShouldNotShareSubscribedConsumerSets() {
Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put("username", "user");
consumerAttributes.put("password", "password");

ConsumerGroupMeta consumerGroupMeta =
new ConsumerGroupMeta(
"test_consumer_group", 1, new ConsumerMeta("test_consumer1", 1, consumerAttributes));
consumerGroupMeta.addSubscription("test_consumer1", Collections.singleton("test_topic"));

ConsumerGroupMeta copiedConsumerGroupMeta = consumerGroupMeta.deepCopy();
copiedConsumerGroupMeta.removeSubscription(
"test_consumer1", Collections.singleton("test_topic"));

Assert.assertTrue(
consumerGroupMeta.getConsumersSubscribingTopic("test_topic").contains("test_consumer1"));
Assert.assertTrue(copiedConsumerGroupMeta.getConsumersSubscribingTopic("test_topic").isEmpty());
}
}
Loading