Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -286,10 +287,20 @@ public Iterator<Entry<Key,Value>> iterator() {
}

SortedKeyValueIterator<Key,Value> iterator;
boolean shuffled = tableConf.getBoolean(Property.TABLE_SHUFFLE_SOURCES);

if (opts.bounds != null) {
iterator = new MultiIterator(readers, opts.bounds);
if (shuffled) {
iterator = new MultiShuffledIterator(readers, opts.bounds);
} else {
iterator = new MultiIterator(readers, opts.bounds);
}
} else {
iterator = new MultiIterator(readers, false);
if (shuffled) {
iterator = new MultiShuffledIterator(readers);
} else {
iterator = new MultiIterator(readers);
}
}

Set<ByteSequence> families = Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.StoredTabletFile;
Expand Down Expand Up @@ -243,7 +244,12 @@ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent,
readers.add(reader);
}

MultiIterator multiIter = new MultiIterator(readers, extent);
MultiIterator multiIter;
if (tableCC.getBoolean(Property.TABLE_SHUFFLE_SOURCES)) {
multiIter = new MultiShuffledIterator(readers, extent.toDataRange());
} else {
multiIter = new MultiIterator(readers, extent.toDataRange());
}

ClientIteratorEnvironment.Builder iterEnvBuilder =
new ClientIteratorEnvironment.Builder().withAuthorizations(authorizations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,9 @@ public enum Property {
"The maximum amount of memory that will be used to cache results of a client query/scan. "
+ "Once this limit is reached, the buffered data is sent to the client.",
"1.3.5"),
TABLE_SHUFFLE_SOURCES("table.shuffle.sources", "false", PropertyType.BOOLEAN,
"Shuffle the opening order for Rfiles to reduce thread contention on file open operations.",
"2.1.5"),
TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.FILENAME_EXT,
"Change the type of file a table writes.", "1.3.5"),
TABLE_LOAD_BALANCER("table.balancer", "org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private TreeSet<String> getSplitsFromFullScan(SiteConfiguration accumuloConf,
readers.add(reader);
fileReaders.add(reader);
}
iterator = new MultiIterator(readers, false);
iterator = new MultiIterator(readers);
iterator.seek(new Range(), Collections.emptySet(), false);
splitArray = getQuantiles(iterator, numSplits);
} finally {
Expand Down Expand Up @@ -372,7 +372,7 @@ private TreeSet<String> getSplitsBySize(AccumuloConfiguration accumuloConf,
readers.add(reader);
fileReaders.add(reader);
}
iterator = new MultiIterator(readers, false);
iterator = new MultiIterator(readers);
iterator.seek(new Range(), Collections.emptySet(), false);
while (iterator.hasTop()) {
Key key = iterator.getTopKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

Expand All @@ -37,7 +36,7 @@
*/
public class MultiIterator extends HeapIterator {

private final List<SortedKeyValueIterator<Key,Value>> iters;
private List<SortedKeyValueIterator<Key,Value>> iters;
private final Range fence;

// deep copy with no seek/scan state
Expand All @@ -48,11 +47,16 @@ public MultiIterator deepCopy(IteratorEnvironment env) {

private MultiIterator(MultiIterator other, IteratorEnvironment env) {
super(other.iters.size());
this.iters = new ArrayList<>();
var tmpIters = new ArrayList<SortedKeyValueIterator<Key,Value>>();
this.fence = other.fence;
for (SortedKeyValueIterator<Key,Value> iter : other.iters) {
iters.add(iter.deepCopy(env));
tmpIters.add(iter.deepCopy(env));
}
setIters(tmpIters);
}

protected void setIters(List<SortedKeyValueIterator<Key,Value>> iters) {
this.iters = iters;
}

private void init() {
Expand All @@ -67,25 +71,30 @@ private MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters, Range seekF

if (seekFence != null && init) {
// throw this exception because multi-iterator does not seek on init, therefore the
// fence would not be enforced in anyway, so do not want to give the impression it
// fence would not be enforced in any way, so do not want to give the impression it
// will enforce this
throw new IllegalArgumentException("Initializing not supported when seek fence set");
}

this.fence = seekFence;
this.iters = iters;
setIters(iters);

if (init) {
init();
}
}

public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters, Range seekFence) {
this(iters, seekFence, false);
/**
* Creates a MultiIterator that doesn't have a fence range and therefore doesn't seek on creation.
*
* @param iters List of source iterators
*/
public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters) {
this(iters, null, false);
}

public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters2, KeyExtent extent) {
this(iters2, new Range(extent.prevEndRow(), false, extent.endRow(), true), false);
public MultiIterator(List<SortedKeyValueIterator<Key,Value>> iters, Range seekFence) {
this(iters, seekFence, false);
}

public MultiIterator(List<SortedKeyValueIterator<Key,Value>> readers, boolean init) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.iteratorsImpl.system;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

/**
* An iterator capable of iterating over other iterators in sorted order while shuffling the initial
* seek ordering to avoid thread contention.
*/
public class MultiShuffledIterator extends MultiIterator {

public MultiShuffledIterator(List<SortedKeyValueIterator<Key,Value>> readers) {
super(readers);
}

public MultiShuffledIterator(List<SortedKeyValueIterator<Key,Value>> iters, Range seekFence) {
super(iters, seekFence);
}

public MultiShuffledIterator(List<SortedKeyValueIterator<Key,Value>> readers, boolean init) {
super(readers, init);
}

@Override
protected void setIters(List<SortedKeyValueIterator<Key,Value>> iters) {
var copy = new ArrayList<>(iters);
Collections.shuffle(copy);
super.setIters(copy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public class MultiIteratorTest {

private static final Collection<ByteSequence> EMPTY_COL_FAMS = new ArrayList<>();

protected MultiIterator makeIterator(List<SortedKeyValueIterator<Key,Value>> list, Range range) {
return new MultiIterator(list, range);
}

protected MultiIterator makeIterator(List<SortedKeyValueIterator<Key,Value>> list, Boolean init) {
return new MultiIterator(list, init);
}

public static Key newKey(int row, long ts) {
return new Key(newRow(row), ts);
}
Expand Down Expand Up @@ -74,15 +82,15 @@ void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boole

MultiIterator mi;
if (endRow == null && prevEndRow == null) {
mi = new MultiIterator(iters, init);
mi = makeIterator(iters, init);
} else {
Range range = new Range(prevEndRow, false, endRow, true);
if (init) {
for (SortedKeyValueIterator<Key,Value> iter : iters) {
iter.seek(range, Set.of(), false);
}
}
mi = new MultiIterator(iters, range);
mi = makeIterator(iters, range);

if (init) {
mi.seek(range, Set.of(), false);
Expand Down Expand Up @@ -204,7 +212,7 @@ public void test4() throws IOException {

List<SortedKeyValueIterator<Key,Value>> skvil = new ArrayList<>(1);
skvil.add(new SortedMapIterator(tm1));
MultiIterator mi = new MultiIterator(skvil, true);
MultiIterator mi = makeIterator(skvil, true);

assertFalse(mi.hasTop());

Expand Down Expand Up @@ -285,7 +293,7 @@ public void test6() throws IOException {

List<SortedKeyValueIterator<Key,Value>> skvil = new ArrayList<>(1);
skvil.add(new SortedMapIterator(tm1));
MultiIterator mi = new MultiIterator(skvil, true);
MultiIterator mi = makeIterator(skvil, true);
mi.seek(new Range(null, true, newKey(5, 9), false), EMPTY_COL_FAMS, false);

assertTrue(mi.hasTop());
Expand Down Expand Up @@ -368,7 +376,7 @@ public void test7() throws IOException {

KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0));

MultiIterator mi = new MultiIterator(skvil, extent);
MultiIterator mi = makeIterator(skvil, extent.toDataRange());

Range r1 = new Range((Text) null, (Text) null);
mi.seek(r1, EMPTY_COL_FAMS, false);
Expand Down Expand Up @@ -422,4 +430,58 @@ public void test7() throws IOException {
mi.seek(r7, EMPTY_COL_FAMS, false);
assertFalse(mi.hasTop());
}

@Test
public void testDeepCopy() throws IOException {
// TEst setting an endKey
TreeMap<Key,Value> tm1 = new TreeMap<>();
newKeyValue(tm1, 0, 3, false, "1");
newKeyValue(tm1, 0, 2, false, "2");
newKeyValue(tm1, 0, 1, false, "3");
newKeyValue(tm1, 0, 0, false, "4");
newKeyValue(tm1, 1, 2, false, "5");
newKeyValue(tm1, 1, 1, false, "6");
newKeyValue(tm1, 1, 0, false, "7");
newKeyValue(tm1, 2, 1, false, "8");
newKeyValue(tm1, 2, 0, false, "9");

List<SortedKeyValueIterator<Key,Value>> skvil = new ArrayList<>(1);
skvil.add(new SortedMapIterator(tm1));

KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0));

MultiIterator mi = makeIterator(skvil, extent.toDataRange());
MultiIterator miCopy = mi.deepCopy(null);

Range r1 = new Range((Text) null, null);
mi.seek(r1, EMPTY_COL_FAMS, false);
miCopy.seek(r1, EMPTY_COL_FAMS, false);
assertTrue(mi.hasTop());
assertEquals("5", mi.getTopValue().toString());
mi.next();
assertTrue(mi.hasTop());
assertEquals("6", mi.getTopValue().toString());
assertTrue(miCopy.hasTop());
assertEquals("5", miCopy.getTopValue().toString());
mi.next();
assertTrue(mi.hasTop());
assertEquals("7", mi.getTopValue().toString());
mi.next();
assertFalse(mi.hasTop());
assertEquals("5", miCopy.getTopValue().toString());

miCopy.seek(r1, EMPTY_COL_FAMS, false);

assertTrue(miCopy.hasTop());
assertEquals("5", miCopy.getTopValue().toString());
miCopy.next();
assertTrue(miCopy.hasTop());
assertEquals("6", miCopy.getTopValue().toString());
assertFalse(mi.hasTop());
miCopy.next();
assertTrue(miCopy.hasTop());
assertEquals("7", miCopy.getTopValue().toString());
miCopy.next();
assertFalse(miCopy.hasTop());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.iterators.system;

import java.util.List;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator;

public class MultiShuffledIteratorTest extends MultiIteratorTest {

@Override
protected MultiIterator makeIterator(List<SortedKeyValueIterator<Key,Value>> list, Range range) {
return new MultiShuffledIterator(list, range);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ public class ScanFileManager {
private final KeyExtent tablet;
private boolean continueOnFailure;
private final CacheProvider cacheProvider;
private final boolean shuffleFiles;

ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) {
tabletReservedReaders = new ArrayList<>();
Expand All @@ -477,6 +478,9 @@ public class ScanFileManager {
continueOnFailure = context.getTableConfiguration(tablet.tableId())
.getBoolean(Property.TABLE_FAILURES_IGNORE);

shuffleFiles = context.getTableConfiguration(tablet.tableId())
.getBoolean(Property.TABLE_SHUFFLE_SOURCES);

if (tablet.isMeta()) {
continueOnFailure = false;
}
Expand All @@ -494,6 +498,10 @@ private Map<FileSKVIterator,String> openFiles(List<String> files)
+ maxOpen + " tablet = " + tablet);
}

if (shuffleFiles) {
Collections.shuffle(files);
}

Map<FileSKVIterator,String> newlyReservedReaders =
reserveReaders(tablet, files, continueOnFailure, cacheProvider);

Expand Down
Loading