3636public class CachingFilteringUpdateReconciler
3737 implements Reconciler <CachingFilteringUpdateCustomResource > {
3838
39+ public static final String RESOURCE_VERSION_INDEX = "resourceVersionIndex" ;
3940 private final AtomicBoolean issueFound = new AtomicBoolean (false );
4041
42+ InformerEventSource <ConfigMap , CachingFilteringUpdateCustomResource > configMapEventSource ;
43+
4144 @ Override
4245 public UpdateControl <CachingFilteringUpdateCustomResource > reconcile (
4346 CachingFilteringUpdateCustomResource resource ,
4447 Context <CachingFilteringUpdateCustomResource > context ) {
4548
46- context .resourceOperations ().serverSideApply (prepareCM (resource , 1 ));
49+ var updated = context .resourceOperations ().serverSideApply (prepareCM (resource , 1 ));
4750 var cachedCM = context .getSecondaryResource (ConfigMap .class );
4851 if (cachedCM .isEmpty ()) {
4952 issueFound .set (true );
5053 throw new IllegalStateException ("Error for resource: " + ResourceID .fromResource (resource ));
5154 }
52-
53- var updated = context .resourceOperations ().serverSideApply (prepareCM (resource , 2 ));
55+ checkListContainsCM (updated );
56+ checkIfResourceVersionIndexContainsUpdated (updated );
57+ updated = context .resourceOperations ().serverSideApply (prepareCM (resource , 2 ));
5458 cachedCM = context .getSecondaryResource (ConfigMap .class );
5559 if (!cachedCM
5660 .orElseThrow ()
@@ -61,12 +65,43 @@ public UpdateControl<CachingFilteringUpdateCustomResource> reconcile(
6165 throw new IllegalStateException (
6266 "Update error for resource: " + ResourceID .fromResource (resource ));
6367 }
68+ checkListContainsCM (updated );
69+ checkIfResourceVersionIndexContainsUpdated (updated );
6470
6571 ensureStatusExists (resource );
6672 resource .getStatus ().setUpdated (true );
6773 return UpdateControl .patchStatus (resource );
6874 }
6975
76+ private void checkIfResourceVersionIndexContainsUpdated (ConfigMap updated ) {
77+ if (configMapEventSource
78+ .byIndex (RESOURCE_VERSION_INDEX , updated .getMetadata ().getResourceVersion ())
79+ .stream ()
80+ .noneMatch (
81+ r ->
82+ ResourceID .fromResource (r ).equals (ResourceID .fromResource (updated ))
83+ && r .getMetadata ()
84+ .getResourceVersion ()
85+ .equals (updated .getMetadata ().getResourceVersion ()))) {
86+ throw new IllegalStateException (
87+ "Index does not contain resource: " + ResourceID .fromResource (updated ));
88+ }
89+ }
90+
91+ private void checkListContainsCM (ConfigMap updated ) {
92+ if (configMapEventSource
93+ .list ()
94+ .noneMatch (
95+ r ->
96+ ResourceID .fromResource (r ).equals (ResourceID .fromResource (updated ))
97+ && r .getMetadata ()
98+ .getResourceVersion ()
99+ .equals (updated .getMetadata ().getResourceVersion ()))) {
100+ throw new IllegalStateException (
101+ "List does not contain resource: " + ResourceID .fromResource (updated ));
102+ }
103+ }
104+
70105 private static ConfigMap prepareCM (CachingFilteringUpdateCustomResource p , int num ) {
71106 var cm =
72107 new ConfigMapBuilder ()
@@ -84,13 +119,15 @@ private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int n
84119 @ Override
85120 public List <EventSource <?, CachingFilteringUpdateCustomResource >> prepareEventSources (
86121 EventSourceContext <CachingFilteringUpdateCustomResource > context ) {
87- InformerEventSource < ConfigMap , CachingFilteringUpdateCustomResource > cmES =
122+ configMapEventSource =
88123 new InformerEventSource <>(
89124 InformerEventSourceConfiguration .from (
90125 ConfigMap .class , CachingFilteringUpdateCustomResource .class )
91126 .build (),
92127 context );
93- return List .of (cmES );
128+ configMapEventSource .addIndexers (
129+ Map .of (RESOURCE_VERSION_INDEX , cm -> List .of (cm .getMetadata ().getResourceVersion ())));
130+ return List .of (configMapEventSource );
94131 }
95132
96133 private void ensureStatusExists (CachingFilteringUpdateCustomResource resource ) {
0 commit comments