22
33import com .google .common .annotations .VisibleForTesting ;
44import io .split .client .SplitClientConfig ;
5+ import io .split .client .dtos .DecoratedImpression ;
56import io .split .client .dtos .KeyImpression ;
67import io .split .client .dtos .TestImpressions ;
8+ import io .split .client .impressions .strategy .ProcessImpressionNone ;
79import io .split .client .impressions .strategy .ProcessImpressionStrategy ;
810import io .split .client .utils .SplitExecutorFactory ;
911import io .split .telemetry .domain .enums .ImpressionsDataTypeEnum ;
1315
1416import java .io .Closeable ;
1517import java .net .URISyntaxException ;
18+ import java .util .ArrayList ;
1619import java .util .List ;
20+ import java .util .Objects ;
1721import java .util .concurrent .ScheduledExecutorService ;
1822import java .util .concurrent .TimeUnit ;
1923import java .util .stream .Collectors ;
24+ import java .util .stream .Stream ;
2025
2126import static com .google .common .base .Preconditions .checkNotNull ;
2227
@@ -40,37 +45,42 @@ public class ImpressionsManagerImpl implements ImpressionsManager, Closeable {
4045 private TelemetryRuntimeProducer _telemetryRuntimeProducer ;
4146 private ImpressionCounter _counter ;
4247 private ProcessImpressionStrategy _processImpressionStrategy ;
48+ private ProcessImpressionNone _processImpressionNone ;
49+
4350 private final int _impressionsRefreshRate ;
4451
4552 public static ImpressionsManagerImpl instance (SplitClientConfig config ,
4653 TelemetryRuntimeProducer telemetryRuntimeProducer ,
4754 ImpressionsStorageConsumer impressionsStorageConsumer ,
4855 ImpressionsStorageProducer impressionsStorageProducer ,
4956 ImpressionsSender impressionsSender ,
57+ ProcessImpressionNone processImpressionNone ,
5058 ProcessImpressionStrategy processImpressionStrategy ,
5159 ImpressionCounter counter ,
5260 ImpressionListener listener ) throws URISyntaxException {
5361 return new ImpressionsManagerImpl (config , impressionsSender , telemetryRuntimeProducer , impressionsStorageConsumer ,
54- impressionsStorageProducer , processImpressionStrategy , counter , listener );
62+ impressionsStorageProducer , processImpressionNone , processImpressionStrategy , counter , listener );
5563 }
5664
5765 public static ImpressionsManagerImpl instanceForTest (SplitClientConfig config ,
5866 ImpressionsSender impressionsSender ,
5967 TelemetryRuntimeProducer telemetryRuntimeProducer ,
6068 ImpressionsStorageConsumer impressionsStorageConsumer ,
6169 ImpressionsStorageProducer impressionsStorageProducer ,
70+ ProcessImpressionNone processImpressionNone ,
6271 ProcessImpressionStrategy processImpressionStrategy ,
6372 ImpressionCounter counter ,
6473 ImpressionListener listener ) {
6574 return new ImpressionsManagerImpl (config , impressionsSender , telemetryRuntimeProducer , impressionsStorageConsumer ,
66- impressionsStorageProducer , processImpressionStrategy , counter , listener );
75+ impressionsStorageProducer , processImpressionNone , processImpressionStrategy , counter , listener );
6776 }
6877
6978 private ImpressionsManagerImpl (SplitClientConfig config ,
7079 ImpressionsSender impressionsSender ,
7180 TelemetryRuntimeProducer telemetryRuntimeProducer ,
7281 ImpressionsStorageConsumer impressionsStorageConsumer ,
7382 ImpressionsStorageProducer impressionsStorageProducer ,
83+ ProcessImpressionNone processImpressionNone ,
7484 ProcessImpressionStrategy processImpressionStrategy ,
7585 ImpressionCounter impressionCounter ,
7686 ImpressionListener impressionListener ) {
@@ -81,6 +91,7 @@ private ImpressionsManagerImpl(SplitClientConfig config,
8191 _impressionsStorageConsumer = checkNotNull (impressionsStorageConsumer );
8292 _impressionsStorageProducer = checkNotNull (impressionsStorageProducer );
8393 _telemetryRuntimeProducer = checkNotNull (telemetryRuntimeProducer );
94+ _processImpressionNone = checkNotNull (processImpressionNone );
8495 _processImpressionStrategy = checkNotNull (processImpressionStrategy );
8596 _impressionsSender = impressionsSender ;
8697 _counter = impressionCounter ;
@@ -110,15 +121,29 @@ public void start(){
110121 }
111122
112123 @ Override
113- public void track (List <Impression > impressions ) {
114- if (null == impressions ) {
124+ public void track (List <DecoratedImpression > decoratedImpressions ) {
125+ if (null == decoratedImpressions ) {
115126 return ;
116127 }
117-
118- ImpressionsResult impressionsResult = _processImpressionStrategy .process (impressions );
119- List <Impression > impressionsForLogs = impressionsResult .getImpressionsToQueue ();
120- List <Impression > impressionsToListener = impressionsResult .getImpressionsToListener ();
121-
128+ List <Impression > impressionsForLogs = new ArrayList <>();
129+ List <Impression > impressionsToListener = new ArrayList <>();
130+
131+ for (int i = 0 ; i < decoratedImpressions .size (); i ++) {
132+ ImpressionsResult impressionsResult ;
133+ if (decoratedImpressions .get (i ).track ) {
134+ impressionsResult = _processImpressionStrategy .process (Stream .of (
135+ decoratedImpressions .get (i ).impression ).collect (Collectors .toList ()));
136+ } else {
137+ impressionsResult = _processImpressionNone .process (Stream .of (
138+ decoratedImpressions .get (i ).impression ).collect (Collectors .toList ()));
139+ }
140+ if (!Objects .isNull (impressionsResult .getImpressionsToQueue ())) {
141+ _log .info ("Adding impression to queue" );
142+ impressionsForLogs .addAll (impressionsResult .getImpressionsToQueue ());
143+ }
144+ if (!Objects .isNull (impressionsResult .getImpressionsToListener ()))
145+ impressionsToListener .addAll (impressionsResult .getImpressionsToListener ());
146+ }
122147 int totalImpressions = impressionsForLogs .size ();
123148 long queued = _impressionsStorageProducer .put (impressionsForLogs .stream ().map (KeyImpression ::fromImpression ).collect (Collectors .toList ()));
124149 if (queued < totalImpressions ) {
0 commit comments