Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ public void clear() {
size = 0;
}

// Walks each bucket back-to-front so the action can remove the entry it's currently on (as metric
// collection does to drop stale series) without the next entry being skipped.
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
for (int j = 0; j < table.length; j++) {
ArrayList<Entry<K, V>> bucket = table[j];
if (bucket != null) {
for (int i = 0; i < bucket.size(); i++) {
for (int i = bucket.size() - 1; i >= 0; i--) {
Entry<K, V> entry = bucket.get(i);
action.accept(entry.key, entry.value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -413,6 +416,41 @@ void collect_CumulativeSeriesDisappearsAndReappears(MemoryMode memoryMode) {
.hasValue(7)));
}

@ParameterizedTest
@EnumSource(MemoryMode.class)
void collect_CumulativeRetainsLiveSeriesWhenStaleSeriesAreRemoved(MemoryMode memoryMode) {
setup(memoryMode);

// Enough series to share buckets, but under the cardinality limit so there's no overflow series.
int seriesCount = 20;

// First collection reports everything.
testClock.advance(Duration.ofSeconds(10));
for (int i = 0; i < seriesCount; i++) {
longCounterStorage.record(Attributes.builder().put("key", "v" + i).build(), 1);
}
longCounterStorage.collect(resource, scope, testClock.now());
registeredReader.setLastCollectEpochNanos(testClock.now());

// Next time only the even series report; the odd ones go stale and get removed while iterating.
// None of the live series should be skipped.
testClock.advance(Duration.ofSeconds(10));
Set<Attributes> expected = new LinkedHashSet<>();
for (int i = 0; i < seriesCount; i += 2) {
Attributes attributes = Attributes.builder().put("key", "v" + i).build();
longCounterStorage.record(attributes, 2);
expected.add(attributes);
}

MetricData metricData = longCounterStorage.collect(resource, scope, testClock.now());
Set<Attributes> collected =
metricData.getLongSumData().getPoints().stream()
.map(PointData::getAttributes)
.collect(Collectors.toCollection(LinkedHashSet::new));

assertThat(collected).isEqualTo(expected);
}

@ParameterizedTest
@EnumSource(MemoryMode.class)
void collect_DeltaComputesDiff(MemoryMode memoryMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,24 @@ void forEachTest() {

assertThat(actualMap).containsOnlyKeys("One", "Two").containsValues(1, 2);
}

@Test
void forEachAllowsRemovalOfCurrentEntry() {
// 1 and 17 land in the same bucket, like collection removing a stale series mid-iteration.
PooledHashMap<Integer, Integer> collidingMap = new PooledHashMap<>();
collidingMap.put(1, 1);
collidingMap.put(17, 17);

Map<Integer, Integer> visited = new HashMap<>();
collidingMap.forEach(
(key, value) -> {
visited.put(key, value);
if (visited.size() == 1) {
collidingMap.remove(key); // drop the first one we see
}
});

assertThat(visited).containsOnlyKeys(1, 17).containsValues(1, 17);
assertThat(collidingMap.size()).isEqualTo(1);
}
}
Loading