Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.List;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;

public interface IndexedResourceCache<T extends HasMetadata> extends ResourceCache<T> {

List<T> byIndex(String indexName, String indexKey);

default Stream<T> byIndexStream(String indexName, String indexKey) {
return byIndex(indexName, indexKey).stream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,22 @@
@SuppressWarnings("unchecked")
public interface ResourceCache<T extends HasMetadata> extends Cache<T> {

/**
* Lists all resources in the given namespace.
*
* @param namespace the namespace to list resources from
* @return a stream of all cached resources in the namespace
*/
default Stream<T> list(String namespace) {
return list(namespace, TRUE);
}

/**
* Lists resources in the given namespace that match the provided predicate.
*
* @param namespace the namespace to list resources from
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(String namespace, Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
/**
* Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an
* idiomatic way, in particular to make sure that the latest version of the resource is present in
* the caches for the next reconciliation.
* the caches for the next reconciliation. In other words, it provides read-cache-after-write
* consistency.
*
* @param <P> the resource type on which this object operates
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,45 @@
public interface Cache<T> {
Predicate TRUE = (a) -> true;

/**
* Retrieves a resource from the cache by its {@link ResourceID}.
*
* @param resourceID the identifier of the resource
* @return an Optional containing the resource if present in the cache
*/
Optional<T> get(ResourceID resourceID);

/**
* Checks whether a resource with the given {@link ResourceID} exists in the cache.
*
* @param resourceID the identifier of the resource
* @return {@code true} if the resource is present in the cache
*/
default boolean contains(ResourceID resourceID) {
return get(resourceID).isPresent();
}

/**
* Returns a stream of all {@link ResourceID}s currently in the cache.
*
* @return a stream of resource identifiers
*/
Stream<ResourceID> keys();

/**
* Lists all resources in the cache.
*
* @return a stream of all cached resources
*/
default Stream<T> list() {
return list(TRUE);
}

/**
* Lists resources in the cache that match the provided predicate.
*
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ private void initSources() {
}
}

C configuration() {
return configuration;
}

public void changeNamespaces(Set<String> namespaces) {
var sourcesToRemove =
sources.keySet().stream().filter(k -> !namespaces.contains(k)).collect(Collectors.toSet());
Expand Down Expand Up @@ -256,12 +252,14 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
}

Comment thread
csviri marked this conversation as resolved.
@Override
public Stream<R> byIndexStream(String indexName, String indexKey) {
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
return sources.values().stream().flatMap(s -> s.byIndexStream(indexName, indexKey));
}

@Override
public List<R> byIndex(String indexName, String indexKey) {
return sources.values().stream()
.map(s -> s.byIndex(indexName, indexKey))
.flatMap(List::stream)
.collect(Collectors.toList());
return byIndexStream(indexName, indexKey).toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand Down Expand Up @@ -111,7 +113,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();

// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
Expand Down Expand Up @@ -219,11 +220,6 @@ public Optional<R> getCachedValue(ResourceID resourceID) {
return get(resourceID);
}

@Override
public Stream<R> list(String namespace, Predicate<R> predicate) {
return manager().list(namespace, predicate);
}

void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache) {
this.temporaryResourceCache = temporaryResourceCache;
}
Expand All @@ -236,19 +232,163 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public List<R> byIndex(String indexName, String indexKey) {
return manager().byIndex(indexName, indexKey);
public Stream<R> list(String namespace, Predicate<R> predicate) {
return mergeWithTempCacheForList(manager().list(namespace), namespace, predicate);
Comment thread
csviri marked this conversation as resolved.
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public Stream<ResourceID> keys() {
return cache.keys();
public Stream<R> list(Predicate<R> predicate) {
return mergeWithTempCacheForList(manager().list(), null, predicate);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public Stream<R> list(Predicate<R> predicate) {
return cache.list(predicate);
public Stream<R> byIndexStream(String indexName, String indexKey) {
return mergeWithTempCacheForIndex(
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public List<R> byIndex(String indexName, String indexKey) {
return mergeWithTempCacheForIndex(
manager().byIndexStream(indexName, indexKey), indexName, indexKey)
.collect(Collectors.toList());
}

// namespace is filtered on informer manager level
private Stream<R> mergeWithTempCacheForList(
Stream<R> stream, String namespace, Predicate<R> predicate) {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {

return stream.filter(filterResourceByPredicate(predicate));
}
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
if (tempResources.isEmpty()) {
Comment thread
csviri marked this conversation as resolved.
return stream.filter(filterResourceByPredicate(predicate));
}

var upToDateList =
stream
.map(
r -> {
var resourceID = ResourceID.fromResource(r);
var tempResource = tempResources.remove(resourceID);
if (tempResource != null
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
return tempResource;
}
return r;
})
// we filter on predicate only since namespace changes would not be detected anyway.
.filter(filterResourceByPredicate(predicate))
.toList();

return Stream.concat(
tempResources.values().stream()
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)),
upToDateList.stream());
}

private Stream<R> mergeWithTempCacheForIndex(
Stream<R> stream, String indexName, String indexKey) {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return stream;
}
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
if (tempResources.isEmpty()) {
return stream;
}

var indexer = indexers.get(indexName);
if (indexer == null) {
throw new IllegalArgumentException("Indexer not found for: " + indexName);
}

var upToDateList =
stream
.map(
r -> {
var resourceID = ResourceID.fromResource(r);
var tempResource = tempResources.remove(resourceID);
if (tempResource != null
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
if (!indexer.apply(tempResource).contains(indexKey)) {
return null;
}
return tempResource;
}
return r;
})
.filter(Objects::nonNull)
.toList();

// remaining temp resources are ghost resources — include only those matching the index
return Stream.concat(
tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)),
upToDateList.stream());
}

private static <R extends HasMetadata> Predicate<R> filterResourceByPredicate(
Predicate<R> predicate) {
return filterResourceByNamespaceAndPredicate(null, predicate);
}

private static <R extends HasMetadata> Predicate<R> filterResourceByNamespaceAndPredicate(
String namespace, Predicate<R> predicate) {
return r -> {
if (namespace != null) {
if (!Optional.of(r)
.map(rr -> Objects.equals(namespace, rr.getMetadata().getNamespace()))
.orElse(false)) {
return false;
}
}
if (predicate != null) {
return predicate.test(r);
}
return true;
};
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Keys from the temporary resource
* cache (ghost resources) are included in the result.
*/
@Override
public Stream<ResourceID> keys() {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return manager().keys();
}
var managerKeys = manager().keys().collect(Collectors.toSet());
var tempKeys = temporaryResourceCache.getResources().keySet();
return Stream.concat(
managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -265,4 +266,12 @@ public void checkGhostResources() {
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}

synchronized boolean isEmpty() {
return cache.isEmpty();
}

synchronized Map<ResourceID, T> getResources() {
Comment thread
csviri marked this conversation as resolved.
return Collections.unmodifiableMap(cache);
}
}
Loading
Loading