From 47635c67c81b534d75711db0f5e70a2ef5d29c24 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Wed, 3 Mar 2021 16:45:30 -0500 Subject: [PATCH 1/6] SOLR-15045: Execute local leader commit in parallel with distributed commits in DistributedZkUpdateProcessor --- .../processor/DistributedZkUpdateProcessor.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index ece4f53381c..0fc38962fe2 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -194,6 +194,7 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { // zk ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + boolean issuedDistribCommit = false; List useNodes = null; if (req.getParams().get(COMMIT_END_POINT) == null) { useNodes = nodes; @@ -203,11 +204,16 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); cmdDistrib.distribCommit(cmd, useNodes, params); - cmdDistrib.blockAndDoRetries(); + issuedDistribCommit = true; } } if (isLeader) { + if (issuedDistribCommit) { + // defensive copy of params, which was passed into distribCommit(...) above; will unconditionally replace + // DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the new `params` val will actually be used + params = new ModifiableSolrParams(params); + } params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(COMMIT_END_POINT, "replicas"); @@ -218,14 +224,15 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); + // NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER distrib commits cmdDistrib.distribCommit(cmd, useNodes, params); + issuedDistribCommit = true; } doLocalCommit(cmd); - - if (useNodes != null) { - cmdDistrib.blockAndDoRetries(); - } + } + if (issuedDistribCommit) { + cmdDistrib.blockAndDoRetries(); } } } From 2ee16aa5c70753d366d95c1171e8d9e009761b13 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Thu, 4 Mar 2021 03:52:31 -0500 Subject: [PATCH 2/6] add test NOTE: this test jumps through hoops to support "failure" expressed as success, for compatibility with the way distrib shard commit errors are (not) currently propagated back to the client. This behavior is likely a bug in its own right that should be addressed separately, probably before this fix is committed --- .../conf/solrconfig-parallel-commit.xml | 52 ++++++ .../cloud/ParallelCommitExecutionTest.java | 172 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml create mode 100644 solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml new file mode 100644 index 00000000000..3e619948e18 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml @@ -0,0 +1,52 @@ + + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + + + + + ${solr.ulog.dir:} + + + + ${solr.commitwithin.softcommit:true} + + + + + + + ensure-parallel-commit + + + + + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java new file mode 100644 index 00000000000..205baf4a16b --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.util.TestUtil; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ParallelCommitExecutionTest extends SolrCloudTestCase { + + private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName(); + private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection"; + + /** A basic client for operations at the cloud level, default collection will be set */ + private static CloudSolrClient CLOUD_CLIENT; + private static int expectCount; + private static volatile int tooHigh; + + private static volatile CountDownLatch countdown; + private static final AtomicInteger countup = new AtomicInteger(); + + @BeforeClass + public static void beforeClass() throws Exception { + // multi replicas matters; for the initial parallel commit execution tests, only consider repFactor=1 + final int repFactor = 1;//random().nextBoolean() ? 1 : 2; + final int numShards = TestUtil.nextInt(random(), 1, 4); + final int numNodes = (numShards * repFactor); + expectCount = numNodes; + + final String configName = DEBUG_LABEL + "_config-set"; + final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf"); + + configureCluster(numNodes).addConfig(configName, configDir).configure(); + + Map collectionProperties = new LinkedHashMap<>(); + collectionProperties.put("config", "solrconfig-parallel-commit.xml"); + collectionProperties.put("schema", "schema_latest.xml"); + CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor) + .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE) + .setProperties(collectionProperties) + .process(cluster.getSolrClient()); + + CLOUD_CLIENT = cluster.getSolrClient(); + CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME); + waitForRecoveriesToFinish(CLOUD_CLIENT); + } + + @AfterClass + private static void afterClass() throws Exception { + if (null != CLOUD_CLIENT) { + CLOUD_CLIENT.close(); + CLOUD_CLIENT = null; + } + } + + private static void initSyncVars(boolean tooHigh) { + final int ct; + if (tooHigh) { + ParallelCommitExecutionTest.tooHigh = TOO_HIGH_INCREMENT; + ct = expectCount + TOO_HIGH_INCREMENT; + } else { + ParallelCommitExecutionTest.tooHigh = 0; + ct = expectCount; + } + countdown = new CountDownLatch(ct); + countup.set(0); + } + + @Test + public void testParallelOk() throws Exception { + initSyncVars(false); + UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); + // nocommit: why are we getting status:0 in response that clearly failed? It looks like no matter + // how many distrib commits fail, as long as the local commit succeeds, it reports success. (if the + // local commit fails, it apparently does report failure). + // that's why we're doing this "countup" thing -- because there's apparently no other way to know + // whether the commit succeeded or not? + // we expect `countup` to be incremented by exactly `expectCount` number of parallel commits + assertEquals(expectCount, countup.get()); + } + + private static final int TOO_HIGH_INCREMENT = 1; + @Test + public void testParallelFail() throws Exception { + // artificially set countdown too high; we're really testing the test code, since there's no way + // to mock the "incorrect" behavior (i.e., "serial" requests) + initSyncVars(true); + UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); + + // If we could mock a "real" scenario, we'd expect the distrib requests to timeout waiting + // on the countdown latch (and thus not increment `countup`), but the "local" shard would + // then succeed, resulting in `countup` being incremented exactly once. Here we've set `countdown` + // artificially high, but to approximately mock this, we get the "local" shard to recognize + // the "tooHigh" scenario, and succeed anyway, incrementing `countup`. + assertEquals(1, countup.get()); + // sanity check: because we actually expect things to be working properly, we expect `countdown` + // to be exactly "TOO_HIGH_INCREMENT" too high. + assertEquals(TOO_HIGH_INCREMENT, countdown.getCount()); + } + + public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception { + assert null != client.getDefaultCollection(); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(), + client.getZkStateReader(), + true, true, 330); + } + + public static class CheckFactory extends UpdateRequestProcessorFactory { + @Override + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + return new Check(next); + } + } + + public static class Check extends UpdateRequestProcessor { + + public Check(UpdateRequestProcessor next) { + super(next); + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + super.processCommit(cmd); + countdown.countDown(); + // In the "tooHigh" scenario, for the one-and-only local (non-distrib) shard, force success (after waiting) to + // mock the "serial blocking" behavior. + final boolean mustSucceed = tooHigh == TOO_HIGH_INCREMENT && cmd.getReq().getParams().get("update.distrib") == null; + try { + if (!countdown.await(5, TimeUnit.SECONDS) && !mustSucceed) { + throw new RuntimeException("done waiting"); + } + countup.incrementAndGet(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } +} From 99cb91bd852e60c0ce4811365de1fbb02c95a876 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Wed, 19 Jan 2022 15:23:36 -0500 Subject: [PATCH 3/6] simplify by removing the `testParallelFail()` test it was jumping through hoops to try to recreate superficial aspects of the initial problem, but for no real purpose, and at the expense of much additional complexity. --- .../cloud/ParallelCommitExecutionTest.java | 49 +++---------------- 1 file changed, 8 insertions(+), 41 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java index 205baf4a16b..b92a5c61104 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java @@ -35,7 +35,6 @@ import org.apache.lucene.util.TestUtil; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.response.UpdateResponse; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -48,7 +47,6 @@ public class ParallelCommitExecutionTest extends SolrCloudTestCase { /** A basic client for operations at the cloud level, default collection will be set */ private static CloudSolrClient CLOUD_CLIENT; private static int expectCount; - private static volatile int tooHigh; private static volatile CountDownLatch countdown; private static final AtomicInteger countup = new AtomicInteger(); @@ -87,51 +85,21 @@ private static void afterClass() throws Exception { } } - private static void initSyncVars(boolean tooHigh) { + private static void initSyncVars() { final int ct; - if (tooHigh) { - ParallelCommitExecutionTest.tooHigh = TOO_HIGH_INCREMENT; - ct = expectCount + TOO_HIGH_INCREMENT; - } else { - ParallelCommitExecutionTest.tooHigh = 0; - ct = expectCount; - } + ct = expectCount; countdown = new CountDownLatch(ct); countup.set(0); } @Test public void testParallelOk() throws Exception { - initSyncVars(false); - UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); - // nocommit: why are we getting status:0 in response that clearly failed? It looks like no matter - // how many distrib commits fail, as long as the local commit succeeds, it reports success. (if the - // local commit fails, it apparently does report failure). - // that's why we're doing this "countup" thing -- because there's apparently no other way to know - // whether the commit succeeded or not? - // we expect `countup` to be incremented by exactly `expectCount` number of parallel commits + initSyncVars(); + CLOUD_CLIENT.commit(true, true); + assertEquals(0, countdown.getCount()); assertEquals(expectCount, countup.get()); } - private static final int TOO_HIGH_INCREMENT = 1; - @Test - public void testParallelFail() throws Exception { - // artificially set countdown too high; we're really testing the test code, since there's no way - // to mock the "incorrect" behavior (i.e., "serial" requests) - initSyncVars(true); - UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); - - // If we could mock a "real" scenario, we'd expect the distrib requests to timeout waiting - // on the countdown latch (and thus not increment `countup`), but the "local" shard would - // then succeed, resulting in `countup` being incremented exactly once. Here we've set `countdown` - // artificially high, but to approximately mock this, we get the "local" shard to recognize - // the "tooHigh" scenario, and succeed anyway, incrementing `countup`. - assertEquals(1, countup.get()); - // sanity check: because we actually expect things to be working properly, we expect `countdown` - // to be exactly "TOO_HIGH_INCREMENT" too high. - assertEquals(TOO_HIGH_INCREMENT, countdown.getCount()); - } - public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception { assert null != client.getDefaultCollection(); AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(), @@ -156,11 +124,10 @@ public Check(UpdateRequestProcessor next) { public void processCommit(CommitUpdateCommand cmd) throws IOException { super.processCommit(cmd); countdown.countDown(); - // In the "tooHigh" scenario, for the one-and-only local (non-distrib) shard, force success (after waiting) to - // mock the "serial blocking" behavior. - final boolean mustSucceed = tooHigh == TOO_HIGH_INCREMENT && cmd.getReq().getParams().get("update.distrib") == null; try { - if (!countdown.await(5, TimeUnit.SECONDS) && !mustSucceed) { + // NOTE: this ensures that all commits are executed in parallel; no commit can complete successfully + // until all commits have entered the `processCommit(...)` method. + if (!countdown.await(5, TimeUnit.SECONDS)) { throw new RuntimeException("done waiting"); } countup.incrementAndGet(); From f87dac45333274b56c3236590b12c83e6336705f Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Wed, 19 Jan 2022 15:37:35 -0500 Subject: [PATCH 4/6] comment wrt intentionally leaving probably-superfluous code in place --- .../update/processor/DistributedZkUpdateProcessor.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index 07f780e285c..ab864006179 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -195,6 +195,7 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { // zk ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + // TODO: revisit the need for tracking `issuedDistribCommit` -- see below, and SOLR-15045 boolean issuedDistribCommit = false; List useNodes = null; if (req.getParams().get(COMMIT_END_POINT) == null) { @@ -233,6 +234,12 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { doLocalCommit(cmd); } if (issuedDistribCommit) { + // TODO: according to discussion on SOLR-15045, this call (and all tracking of `issuedDistribCommit`) may + // well be superfluous, and can probably simply be removed. It is left in place for now, intentionally + // punting on the question of whether this internal `blockAndDoRetries()` is necessary. At worst, its + // presence is misleading; but it should be harmless, and allows the change fixing SOLR-15045 to be as + // tightly scoped as possible, leaving the behavior of the code otherwise functionally equivalent (for + // better or worse!) cmdDistrib.blockAndDoRetries(); } } From 18beb1dd0a7b8a479071bb708a6fde3d87a8acf5 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Wed, 19 Jan 2022 15:49:51 -0500 Subject: [PATCH 5/6] add provisional/draft CHANGES.txt entry (in a temporary file) --- solr/CHANGES-SOLR-15045.txt | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 solr/CHANGES-SOLR-15045.txt diff --git a/solr/CHANGES-SOLR-15045.txt b/solr/CHANGES-SOLR-15045.txt new file mode 100644 index 00000000000..21a3b1d87cf --- /dev/null +++ b/solr/CHANGES-SOLR-15045.txt @@ -0,0 +1,7 @@ +DRAFT TO BE ADDED TO CHANGES.txt FOR WHICHEVER VERSION IN WHICH THIS LANDS + +Bug Fixes +--------------------- +* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel, + halving the latency of synchronous commits (Michael Gibney) + From 9b79064cce40141154c5428d7ea4149c1c5513f5 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Mon, 24 Jan 2022 12:18:48 -0500 Subject: [PATCH 6/6] initial CHANGES.txt entry under 10.0.0 let this change bake on main, per @dsmiley's suggestion --- solr/CHANGES-SOLR-15045.txt | 7 ------- solr/CHANGES.txt | 3 ++- 2 files changed, 2 insertions(+), 8 deletions(-) delete mode 100644 solr/CHANGES-SOLR-15045.txt diff --git a/solr/CHANGES-SOLR-15045.txt b/solr/CHANGES-SOLR-15045.txt deleted file mode 100644 index 21a3b1d87cf..00000000000 --- a/solr/CHANGES-SOLR-15045.txt +++ /dev/null @@ -1,7 +0,0 @@ -DRAFT TO BE ADDED TO CHANGES.txt FOR WHICHEVER VERSION IN WHICH THIS LANDS - -Bug Fixes ---------------------- -* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel, - halving the latency of synchronous commits (Michael Gibney) - diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 030baeff36e..8c7761bc5e6 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -22,7 +22,8 @@ Optimizations Bug Fixes --------------------- -(No changes) +* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local shards and remote shards in parallel, + halving the latency of synchronous commits (Michael Gibney) Other Changes ---------------------