Skip to content
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.split.client.impressions;

import com.google.common.collect.Lists;
import io.split.client.dtos.UniqueKeys;
import io.split.client.impressions.filters.BloomFilterImp;
import io.split.client.impressions.filters.Filter;
Expand All @@ -21,12 +22,14 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class UniqueKeysTrackerImp implements UniqueKeysTracker{
private static final Logger _log = LoggerFactory.getLogger(UniqueKeysTrackerImp.class);
private static final double MARGIN_ERROR = 0.01;
private static final int MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS = 30000;
private static final int MAX_UNIQUE_KEYS_POST_SIZE = 5000;
private static final int MAX_AMOUNT_OF_KEYS = 10000000;
private final AtomicInteger trackerKeysSize = new AtomicInteger(0);
private FilterAdapter filterAdapter;
private final TelemetrySynchronizer _telemetrySynchronizer;
private final ScheduledExecutorService _uniqueKeysSyncScheduledExecutorService;
Expand Down Expand Up @@ -59,10 +62,11 @@ public boolean track(String featureFlagName, String key) {
(feature, current) -> {
HashSet<String> keysByFeature = Optional.ofNullable(current).orElse(new HashSet<>());
keysByFeature.add(key);
trackerKeysSize.incrementAndGet();
return keysByFeature;
});
_logger.debug("The feature flag " + featureFlagName + " and key " + key + " was added");
if (uniqueKeysTracker.size() >= MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS){
if (trackerKeysSize.intValue() >= MAX_UNIQUE_KEYS_POST_SIZE){
_logger.warn("The UniqueKeysTracker size reached the maximum limit");
try {
sendUniqueKeys();
Expand Down Expand Up @@ -107,6 +111,7 @@ public HashMap<String,HashSet<String>> popAll(){
HashSet<String> value = uniqueKeysTracker.remove(key);
toReturn.put(key, value);
}
trackerKeysSize.set(0);
return toReturn;
}

Expand All @@ -115,26 +120,71 @@ private void sendUniqueKeys(){
_log.debug("SendUniqueKeys already running");
return;
}

try {
if (uniqueKeysTracker.size() == 0) {
if (uniqueKeysTracker.isEmpty()) {
_log.debug("The Unique Keys Tracker is empty");
return;
}

HashMap<String, HashSet<String>> uniqueKeysHashMap = popAll();
List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
uniqueKeysFromPopAll.add(uniqueKey);
}
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(uniqueKeysFromPopAll));
uniqueKeysFromPopAll = capChunksToMaxSize(uniqueKeysFromPopAll);

for (List<UniqueKeys.UniqueKey> chunk : getChunks(uniqueKeysFromPopAll)) {
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(chunk));
}
} finally {
sendGuard.set(false);
}
}

private List<UniqueKeys.UniqueKey> capChunksToMaxSize(List<UniqueKeys.UniqueKey> uniqueKeys) {
List<UniqueKeys.UniqueKey> finalChunk = new ArrayList<>();
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) {
if (uniqueKey.keysDto.size() > MAX_UNIQUE_KEYS_POST_SIZE) {
for(List<String> subChunk : Lists.partition(uniqueKey.keysDto, MAX_UNIQUE_KEYS_POST_SIZE)) {
finalChunk.add(new UniqueKeys.UniqueKey(uniqueKey.featureName, subChunk));
}
continue;
}
finalChunk.add(uniqueKey);
}
return finalChunk;
}

private List<List<UniqueKeys.UniqueKey>> getChunks(List<UniqueKeys.UniqueKey> uniqueKeys) {
List<List<UniqueKeys.UniqueKey>> chunks = new ArrayList<>();
List<UniqueKeys.UniqueKey> intermediateChunk = new ArrayList<>();
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) {
if ((getChunkSize(intermediateChunk) + uniqueKey.keysDto.size()) > MAX_UNIQUE_KEYS_POST_SIZE) {
chunks.add(intermediateChunk);
intermediateChunk = new ArrayList<>();
}
intermediateChunk.add(uniqueKey);
}
if (!intermediateChunk.isEmpty()) {
chunks.add(intermediateChunk);
}
return chunks;
}

private int getChunkSize(List<UniqueKeys.UniqueKey> uniqueKeysChunk) {
int totalSize = 0;
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeysChunk) {
totalSize += uniqueKey.keysDto.size();
}
return totalSize;
}

private interface ExecuteUniqueKeysAction{
void execute();
}

private class ExecuteCleanFilter implements ExecuteUniqueKeysAction {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package io.split.client.impressions;

import io.split.client.dtos.UniqueKeys;
import io.split.telemetry.synchronizer.TelemetryInMemorySubmitter;
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

public class UniqueKeysTrackerImpTest {
private static TelemetrySynchronizer _telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class);
Expand Down Expand Up @@ -100,4 +108,71 @@ public void testStopSynchronization() throws Exception {
uniqueKeysTrackerImp.stop();
Mockito.verify(telemetrySynchronizer, Mockito.times(1)).synchronizeUniqueKeys(Mockito.anyObject());
}

@Test
public void testUniqueKeysChunks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_telemetrySynchronizer, 10000, 10000, null);
HashMap<String, HashSet<String>> uniqueKeysHashMap = new HashMap<>();
HashSet<String> feature1 = new HashSet<>();
HashSet<String> feature2 = new HashSet<>();
HashSet<String> feature3 = new HashSet<>();
HashSet<String> feature4 = new HashSet<>();
HashSet<String> feature5 = new HashSet<>();
for (Integer i=1; i<6000; i++) {
if (i <= 1000) {
feature1.add("key" + i);
}
if (i <= 2000) {
feature2.add("key" + i);
}
if (i <= 3000) {
feature3.add("key" + i);
}
if (i <= 4000) {
feature4.add("key" + i);
}
feature5.add("key" + i);
}
uniqueKeysHashMap.put("feature1", feature1);
uniqueKeysHashMap.put("feature2", feature2);
uniqueKeysHashMap.put("feature3", feature3);
uniqueKeysHashMap.put("feature4", feature4);
uniqueKeysHashMap.put("feature5", feature5);

List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
uniqueKeysFromPopAll.add(uniqueKey);
}
Method methodCapChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("capChunksToMaxSize", List.class);
methodCapChunks.setAccessible(true);
uniqueKeysFromPopAll = (List<UniqueKeys.UniqueKey>)methodCapChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);

Method methodGetChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("getChunks", List.class);
methodGetChunks.setAccessible(true);
List<List<UniqueKeys.UniqueKey>> keysChunks = (List<List<UniqueKeys.UniqueKey>>) methodGetChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);
for (List<UniqueKeys.UniqueKey> chunk : keysChunks) {
int chunkSize = 0;
for (UniqueKeys.UniqueKey keys : chunk) {
chunkSize += keys.keysDto.size();
}
Assert.assertTrue(chunkSize <= 5000);
}
}

@Test
public void testTrackReachMaxKeys() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class);
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(telemetrySynchronizer, 10000, 10000, null);
for (int i=1; i<6000; i++) {
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1", "key" + i));
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2", "key" + i));
}
Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeUniqueKeys(Mockito.anyObject());

Field getTrackerSize = uniqueKeysTrackerImp.getClass().getDeclaredField("trackerKeysSize");
getTrackerSize.setAccessible(true);
AtomicInteger trackerSize = (AtomicInteger) getTrackerSize.get(uniqueKeysTrackerImp);
Assert.assertTrue(trackerSize.intValue() == 1998);
}
}
4 changes: 2 additions & 2 deletions testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</parent>
<artifactId>java-client-testing</artifactId>
<packaging>jar</packaging>
<version>4.16.0</version>
<version>4.16.1</version>
<name>Java Client For Testing</name>
<description>Testing suite for Java SDK for Split</description>
<dependencies>
Expand Down Expand Up @@ -39,7 +39,7 @@
<publishingServerId>central</publishingServerId>
<autoPublish>false</autoPublish>
<waitUntil>published</waitUntil>
<ignorePublishedComponents>true</ignorePublishedComponents>
<ignorePublishedComponents>false</ignorePublishedComponents>
</configuration>
</plugin>
</plugins>
Expand Down