diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 9266a53e510..49e424b4563 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; @@ -286,10 +287,20 @@ public Iterator> iterator() { } SortedKeyValueIterator iterator; + boolean shuffled = tableConf.getBoolean(Property.TABLE_SHUFFLE_SOURCES); + if (opts.bounds != null) { - iterator = new MultiIterator(readers, opts.bounds); + if (shuffled) { + iterator = new MultiShuffledIterator(readers, opts.bounds); + } else { + iterator = new MultiIterator(readers, opts.bounds); + } } else { - iterator = new MultiIterator(readers, false); + if (shuffled) { + iterator = new MultiShuffledIterator(readers); + } else { + iterator = new MultiIterator(readers); + } } Set families = Collections.emptySet(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index 7ec99bd9c49..251523b445c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -56,6 +56,7 @@ import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -243,7 +244,12 @@ private SortedKeyValueIterator createIterator(KeyExtent extent, readers.add(reader); } - MultiIterator multiIter = new MultiIterator(readers, extent); + MultiIterator multiIter; + if (tableCC.getBoolean(Property.TABLE_SHUFFLE_SOURCES)) { + multiIter = new MultiShuffledIterator(readers, extent.toDataRange()); + } else { + multiIter = new MultiIterator(readers, extent.toDataRange()); + } ClientIteratorEnvironment.Builder iterEnvBuilder = new ClientIteratorEnvironment.Builder().withAuthorizations(authorizations) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d6acb504fad..8fd7896bf5a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1194,6 +1194,9 @@ public enum Property { "The maximum amount of memory that will be used to cache results of a client query/scan. " + "Once this limit is reached, the buffered data is sent to the client.", "1.3.5"), + TABLE_SHUFFLE_SOURCES("table.shuffle.sources", "false", PropertyType.BOOLEAN, + "Shuffle the opening order for Rfiles to reduce thread contention on file open operations.", + "2.1.5"), TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.FILENAME_EXT, "Change the type of file a table writes.", "1.3.5"), TABLE_LOAD_BALANCER("table.balancer", "org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer", diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java index 865210a9708..3937ca383b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java @@ -338,7 +338,7 @@ private TreeSet getSplitsFromFullScan(SiteConfiguration accumuloConf, readers.add(reader); fileReaders.add(reader); } - iterator = new MultiIterator(readers, false); + iterator = new MultiIterator(readers); iterator.seek(new Range(), Collections.emptySet(), false); splitArray = getQuantiles(iterator, numSplits); } finally { @@ -372,7 +372,7 @@ private TreeSet getSplitsBySize(AccumuloConfiguration accumuloConf, readers.add(reader); fileReaders.add(reader); } - iterator = new MultiIterator(readers, false); + iterator = new MultiIterator(readers); iterator.seek(new Range(), Collections.emptySet(), false); while (iterator.hasTop()) { Key key = iterator.getTopKey(); diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java index 3b3defc5462..a7b7a76dae9 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java @@ -28,7 +28,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -37,7 +36,7 @@ */ public class MultiIterator extends HeapIterator { - private final List> iters; + private List> iters; private final Range fence; // deep copy with no seek/scan state @@ -48,11 +47,16 @@ public MultiIterator deepCopy(IteratorEnvironment env) { private MultiIterator(MultiIterator other, IteratorEnvironment env) { super(other.iters.size()); - this.iters = new ArrayList<>(); + var tmpIters = new ArrayList>(); this.fence = other.fence; for (SortedKeyValueIterator iter : other.iters) { - iters.add(iter.deepCopy(env)); + tmpIters.add(iter.deepCopy(env)); } + setIters(tmpIters); + } + + protected void setIters(List> iters) { + this.iters = iters; } private void init() { @@ -67,25 +71,30 @@ private MultiIterator(List> iters, Range seekF if (seekFence != null && init) { // throw this exception because multi-iterator does not seek on init, therefore the - // fence would not be enforced in anyway, so do not want to give the impression it + // fence would not be enforced in any way, so do not want to give the impression it // will enforce this throw new IllegalArgumentException("Initializing not supported when seek fence set"); } this.fence = seekFence; - this.iters = iters; + setIters(iters); if (init) { init(); } } - public MultiIterator(List> iters, Range seekFence) { - this(iters, seekFence, false); + /** + * Creates a MultiIterator that doesn't have a fence range and therefore doesn't seek on creation. + * + * @param iters List of source iterators + */ + public MultiIterator(List> iters) { + this(iters, null, false); } - public MultiIterator(List> iters2, KeyExtent extent) { - this(iters2, new Range(extent.prevEndRow(), false, extent.endRow(), true), false); + public MultiIterator(List> iters, Range seekFence) { + this(iters, seekFence, false); } public MultiIterator(List> readers, boolean init) { diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java new file mode 100644 index 00000000000..e563c5467e0 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iteratorsImpl.system; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * An iterator capable of iterating over other iterators in sorted order while shuffling the initial + * seek ordering to avoid thread contention. + */ +public class MultiShuffledIterator extends MultiIterator { + + public MultiShuffledIterator(List> readers) { + super(readers); + } + + public MultiShuffledIterator(List> iters, Range seekFence) { + super(iters, seekFence); + } + + public MultiShuffledIterator(List> readers, boolean init) { + super(readers, init); + } + + @Override + protected void setIters(List> iters) { + var copy = new ArrayList<>(iters); + Collections.shuffle(copy); + super.setIters(copy); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index c9a52242dc2..d669565c011 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -45,6 +45,14 @@ public class MultiIteratorTest { private static final Collection EMPTY_COL_FAMS = new ArrayList<>(); + protected MultiIterator makeIterator(List> list, Range range) { + return new MultiIterator(list, range); + } + + protected MultiIterator makeIterator(List> list, Boolean init) { + return new MultiIterator(list, init); + } + public static Key newKey(int row, long ts) { return new Key(newRow(row), ts); } @@ -74,7 +82,7 @@ void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boole MultiIterator mi; if (endRow == null && prevEndRow == null) { - mi = new MultiIterator(iters, init); + mi = makeIterator(iters, init); } else { Range range = new Range(prevEndRow, false, endRow, true); if (init) { @@ -82,7 +90,7 @@ void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boole iter.seek(range, Set.of(), false); } } - mi = new MultiIterator(iters, range); + mi = makeIterator(iters, range); if (init) { mi.seek(range, Set.of(), false); @@ -204,7 +212,7 @@ public void test4() throws IOException { List> skvil = new ArrayList<>(1); skvil.add(new SortedMapIterator(tm1)); - MultiIterator mi = new MultiIterator(skvil, true); + MultiIterator mi = makeIterator(skvil, true); assertFalse(mi.hasTop()); @@ -285,7 +293,7 @@ public void test6() throws IOException { List> skvil = new ArrayList<>(1); skvil.add(new SortedMapIterator(tm1)); - MultiIterator mi = new MultiIterator(skvil, true); + MultiIterator mi = makeIterator(skvil, true); mi.seek(new Range(null, true, newKey(5, 9), false), EMPTY_COL_FAMS, false); assertTrue(mi.hasTop()); @@ -368,7 +376,7 @@ public void test7() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiIterator mi = new MultiIterator(skvil, extent); + MultiIterator mi = makeIterator(skvil, extent.toDataRange()); Range r1 = new Range((Text) null, (Text) null); mi.seek(r1, EMPTY_COL_FAMS, false); @@ -422,4 +430,58 @@ public void test7() throws IOException { mi.seek(r7, EMPTY_COL_FAMS, false); assertFalse(mi.hasTop()); } + + @Test + public void testDeepCopy() throws IOException { + // TEst setting an endKey + TreeMap tm1 = new TreeMap<>(); + newKeyValue(tm1, 0, 3, false, "1"); + newKeyValue(tm1, 0, 2, false, "2"); + newKeyValue(tm1, 0, 1, false, "3"); + newKeyValue(tm1, 0, 0, false, "4"); + newKeyValue(tm1, 1, 2, false, "5"); + newKeyValue(tm1, 1, 1, false, "6"); + newKeyValue(tm1, 1, 0, false, "7"); + newKeyValue(tm1, 2, 1, false, "8"); + newKeyValue(tm1, 2, 0, false, "9"); + + List> skvil = new ArrayList<>(1); + skvil.add(new SortedMapIterator(tm1)); + + KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); + + MultiIterator mi = makeIterator(skvil, extent.toDataRange()); + MultiIterator miCopy = mi.deepCopy(null); + + Range r1 = new Range((Text) null, null); + mi.seek(r1, EMPTY_COL_FAMS, false); + miCopy.seek(r1, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("6", mi.getTopValue().toString()); + assertTrue(miCopy.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("7", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + + miCopy.seek(r1, EMPTY_COL_FAMS, false); + + assertTrue(miCopy.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + miCopy.next(); + assertTrue(miCopy.hasTop()); + assertEquals("6", miCopy.getTopValue().toString()); + assertFalse(mi.hasTop()); + miCopy.next(); + assertTrue(miCopy.hasTop()); + assertEquals("7", miCopy.getTopValue().toString()); + miCopy.next(); + assertFalse(miCopy.hasTop()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java new file mode 100644 index 00000000000..50ab6fae5d6 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iterators.system; + +import java.util.List; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; + +public class MultiShuffledIteratorTest extends MultiIteratorTest { + + @Override + protected MultiIterator makeIterator(List> list, Range range) { + return new MultiShuffledIterator(list, range); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index b2f9daeccd3..151a343c482 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -467,6 +467,7 @@ public class ScanFileManager { private final KeyExtent tablet; private boolean continueOnFailure; private final CacheProvider cacheProvider; + private final boolean shuffleFiles; ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) { tabletReservedReaders = new ArrayList<>(); @@ -477,6 +478,9 @@ public class ScanFileManager { continueOnFailure = context.getTableConfiguration(tablet.tableId()) .getBoolean(Property.TABLE_FAILURES_IGNORE); + shuffleFiles = context.getTableConfiguration(tablet.tableId()) + .getBoolean(Property.TABLE_SHUFFLE_SOURCES); + if (tablet.isMeta()) { continueOnFailure = false; } @@ -494,6 +498,10 @@ private Map openFiles(List files) + maxOpen + " tablet = " + tablet); } + if (shuffleFiles) { + Collections.shuffle(files); + } + Map newlyReservedReaders = reserveReaders(tablet, files, continueOnFailure, cacheProvider); diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java index 2705703ad6f..3ef3a5fc9b1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java @@ -125,7 +125,7 @@ public ServerContext getServerContext() { } ArrayList> allIters = new ArrayList<>(topLevelIterators); allIters.add(iter); - return new MultiIterator(allIters, false); + return new MultiIterator(allIters); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 6bcfbc4b9e0..2455d70a472 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -20,13 +20,13 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.thrift.IterInfo; @@ -35,9 +35,11 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; +import org.apache.accumulo.core.iteratorsImpl.system.HeapIterator; import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.DataSource; import org.apache.accumulo.core.iteratorsImpl.system.StatsIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -177,7 +179,7 @@ private SortedKeyValueIterator createIterator() files = reservation.getSecond(); } - Collection mapfiles = + List mapfiles = fileManager.openFiles(files, scanParams.isIsolated(), samplerConfig); List.of(mapfiles, memIters).forEach(c -> c.forEach(ii -> ii.setInterruptFlag(interruptFlag))); @@ -188,7 +190,13 @@ private SortedKeyValueIterator createIterator() iters.addAll(mapfiles); iters.addAll(memIters); - MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); + HeapIterator multiIter; + if (tablet.getContext().getTableConfiguration(tablet.getExtent().tableId()) + .getBoolean(Property.TABLE_SHUFFLE_SOURCES)) { + multiIter = new MultiShuffledIterator(iters, tablet.getExtent().toDataRange()); + } else { + multiIter = new MultiIterator(iters, tablet.getExtent().toDataRange()); + } var builder = new SystemIteratorEnvironmentImpl.Builder(tablet.getContext()) .withTopLevelIterators(new ArrayList<>()).withScope(IteratorScope.scan) diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index 62e034bbef5..ae789e0d88b 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -438,7 +438,7 @@ private static SortedKeyValueIterator createScanIterator(KeyExtent ke iters.addAll(mapfiles); iters.add(smi); - MultiIterator multiIter = new MultiIterator(iters, ke); + MultiIterator multiIter = new MultiIterator(iters, ke.toDataRange()); SortedKeyValueIterator delIter = DeletingIterator.wrap(multiIter, false, Behavior.PROCESS); ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter);