diff --git a/client/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java b/client/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java index 80b3703d..c0034b6b 100644 --- a/client/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java +++ b/client/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java @@ -1,5 +1,6 @@ package io.split.client.impressions; +import com.google.common.collect.Lists; import io.split.client.dtos.UniqueKeys; import io.split.client.impressions.filters.BloomFilterImp; import io.split.client.impressions.filters.Filter; @@ -21,12 +22,14 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class UniqueKeysTrackerImp implements UniqueKeysTracker{ private static final Logger _log = LoggerFactory.getLogger(UniqueKeysTrackerImp.class); private static final double MARGIN_ERROR = 0.01; - private static final int MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS = 30000; + private static final int MAX_UNIQUE_KEYS_POST_SIZE = 5000; private static final int MAX_AMOUNT_OF_KEYS = 10000000; + private final AtomicInteger trackerKeysSize = new AtomicInteger(0); private FilterAdapter filterAdapter; private final TelemetrySynchronizer _telemetrySynchronizer; private final ScheduledExecutorService _uniqueKeysSyncScheduledExecutorService; @@ -59,10 +62,11 @@ public boolean track(String featureFlagName, String key) { (feature, current) -> { HashSet keysByFeature = Optional.ofNullable(current).orElse(new HashSet<>()); keysByFeature.add(key); + trackerKeysSize.incrementAndGet(); return keysByFeature; }); _logger.debug("The feature flag " + featureFlagName + " and key " + key + " was added"); - if (uniqueKeysTracker.size() >= MAX_AMOUNT_OF_TRACKED_UNIQUE_KEYS){ + if (trackerKeysSize.intValue() >= MAX_UNIQUE_KEYS_POST_SIZE){ _logger.warn("The UniqueKeysTracker size reached the maximum limit"); try { sendUniqueKeys(); @@ -107,6 +111,7 @@ public HashMap> popAll(){ HashSet value = uniqueKeysTracker.remove(key); toReturn.put(key, value); } + trackerKeysSize.set(0); return toReturn; } @@ -115,26 +120,71 @@ private void sendUniqueKeys(){ _log.debug("SendUniqueKeys already running"); return; } + try { - if (uniqueKeysTracker.size() == 0) { + if (uniqueKeysTracker.isEmpty()) { _log.debug("The Unique Keys Tracker is empty"); return; } + HashMap> uniqueKeysHashMap = popAll(); List uniqueKeysFromPopAll = new ArrayList<>(); for (Map.Entry> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) { UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue())); uniqueKeysFromPopAll.add(uniqueKey); } - _telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(uniqueKeysFromPopAll)); + uniqueKeysFromPopAll = capChunksToMaxSize(uniqueKeysFromPopAll); + + for (List chunk : getChunks(uniqueKeysFromPopAll)) { + _telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(chunk)); + } } finally { sendGuard.set(false); } } + private List capChunksToMaxSize(List uniqueKeys) { + List finalChunk = new ArrayList<>(); + for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) { + if (uniqueKey.keysDto.size() > MAX_UNIQUE_KEYS_POST_SIZE) { + for(List subChunk : Lists.partition(uniqueKey.keysDto, MAX_UNIQUE_KEYS_POST_SIZE)) { + finalChunk.add(new UniqueKeys.UniqueKey(uniqueKey.featureName, subChunk)); + } + continue; + } + finalChunk.add(uniqueKey); + } + return finalChunk; + } + + private List> getChunks(List uniqueKeys) { + List> chunks = new ArrayList<>(); + List intermediateChunk = new ArrayList<>(); + for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) { + if ((getChunkSize(intermediateChunk) + uniqueKey.keysDto.size()) > MAX_UNIQUE_KEYS_POST_SIZE) { + chunks.add(intermediateChunk); + intermediateChunk = new ArrayList<>(); + } + intermediateChunk.add(uniqueKey); + } + if (!intermediateChunk.isEmpty()) { + chunks.add(intermediateChunk); + } + return chunks; + } + + private int getChunkSize(List uniqueKeysChunk) { + int totalSize = 0; + for (UniqueKeys.UniqueKey uniqueKey : uniqueKeysChunk) { + totalSize += uniqueKey.keysDto.size(); + } + return totalSize; + } + private interface ExecuteUniqueKeysAction{ void execute(); } + private class ExecuteCleanFilter implements ExecuteUniqueKeysAction { @Override diff --git a/client/src/test/java/io/split/client/impressions/UniqueKeysTrackerImpTest.java b/client/src/test/java/io/split/client/impressions/UniqueKeysTrackerImpTest.java index a1594011..e758369e 100644 --- a/client/src/test/java/io/split/client/impressions/UniqueKeysTrackerImpTest.java +++ b/client/src/test/java/io/split/client/impressions/UniqueKeysTrackerImpTest.java @@ -1,13 +1,21 @@ package io.split.client.impressions; +import io.split.client.dtos.UniqueKeys; import io.split.telemetry.synchronizer.TelemetryInMemorySubmitter; import io.split.telemetry.synchronizer.TelemetrySynchronizer; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; public class UniqueKeysTrackerImpTest { private static TelemetrySynchronizer _telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class); @@ -100,4 +108,71 @@ public void testStopSynchronization() throws Exception { uniqueKeysTrackerImp.stop(); Mockito.verify(telemetrySynchronizer, Mockito.times(1)).synchronizeUniqueKeys(Mockito.anyObject()); } + + @Test + public void testUniqueKeysChunks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_telemetrySynchronizer, 10000, 10000, null); + HashMap> uniqueKeysHashMap = new HashMap<>(); + HashSet feature1 = new HashSet<>(); + HashSet feature2 = new HashSet<>(); + HashSet feature3 = new HashSet<>(); + HashSet feature4 = new HashSet<>(); + HashSet feature5 = new HashSet<>(); + for (Integer i=1; i<6000; i++) { + if (i <= 1000) { + feature1.add("key" + i); + } + if (i <= 2000) { + feature2.add("key" + i); + } + if (i <= 3000) { + feature3.add("key" + i); + } + if (i <= 4000) { + feature4.add("key" + i); + } + feature5.add("key" + i); + } + uniqueKeysHashMap.put("feature1", feature1); + uniqueKeysHashMap.put("feature2", feature2); + uniqueKeysHashMap.put("feature3", feature3); + uniqueKeysHashMap.put("feature4", feature4); + uniqueKeysHashMap.put("feature5", feature5); + + List uniqueKeysFromPopAll = new ArrayList<>(); + for (Map.Entry> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) { + UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue())); + uniqueKeysFromPopAll.add(uniqueKey); + } + Method methodCapChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("capChunksToMaxSize", List.class); + methodCapChunks.setAccessible(true); + uniqueKeysFromPopAll = (List)methodCapChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll); + + Method methodGetChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("getChunks", List.class); + methodGetChunks.setAccessible(true); + List> keysChunks = (List>) methodGetChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll); + for (List chunk : keysChunks) { + int chunkSize = 0; + for (UniqueKeys.UniqueKey keys : chunk) { + chunkSize += keys.keysDto.size(); + } + Assert.assertTrue(chunkSize <= 5000); + } + } + + @Test + public void testTrackReachMaxKeys() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException { + TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(TelemetryInMemorySubmitter.class); + UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(telemetrySynchronizer, 10000, 10000, null); + for (int i=1; i<6000; i++) { + Assert.assertTrue(uniqueKeysTrackerImp.track("feature1", "key" + i)); + Assert.assertTrue(uniqueKeysTrackerImp.track("feature2", "key" + i)); + } + Mockito.verify(telemetrySynchronizer, Mockito.times(2)).synchronizeUniqueKeys(Mockito.anyObject()); + + Field getTrackerSize = uniqueKeysTrackerImp.getClass().getDeclaredField("trackerKeysSize"); + getTrackerSize.setAccessible(true); + AtomicInteger trackerSize = (AtomicInteger) getTrackerSize.get(uniqueKeysTrackerImp); + Assert.assertTrue(trackerSize.intValue() == 1998); + } } \ No newline at end of file diff --git a/testing/pom.xml b/testing/pom.xml index 5f0aa0c5..f9f0d181 100644 --- a/testing/pom.xml +++ b/testing/pom.xml @@ -9,7 +9,7 @@ java-client-testing jar - 4.16.0 + 4.16.1 Java Client For Testing Testing suite for Java SDK for Split @@ -39,7 +39,7 @@ central false published - true + false