Skip to content

Conversation

@mark-bathori
Copy link
Contributor

Summary

NIFI-15145

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@deniswsrosa
Copy link

any updates on when this PR will be reviewed?

@turcsanyip turcsanyip self-requested a review January 15, 2026 18:16
@deniswsrosa
Copy link

@mark-bathori Do you need any help to resolve the conflicts? I can push a pr to your branch.

@turcsanyip
Copy link
Contributor

@deniswsrosa The PR has been abandoned due to lack of reviewers. I'm reviewing it now and it should be merged soon.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mark-bathori and @turcsanyip, raising a general question, is it necessary to have all of the added components? Map Cache Client, Key Value Lookup, and Record Lookup are all for different use cases. If you have confirmed usage of all three, that's good, but just raising the question as opposed to simply carrying over what previously existed.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@turcsanyip I will hold off on further comments until you have completed your review. On a very cursory scan, I notice a handful of mostly minor concerns related to exception handling, such as lack of detailed messages and exceptions that are never logged in some cases.

Answering the main question about component usage would help guide where to focus review cycles.

Thanks for the work on this!

@deniswsrosa
Copy link

deniswsrosa commented Jan 17, 2026 via email

@deniswsrosa
Copy link

@exceptionfactory @turcsanyip We could push another PR with the changes requested in this PR if needed.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mark-bathori Please find my review comments below.

Please also rebase the branch to main and update the old 2.7.0 version references.

Comment on lines +60 to +62
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>compile</scope>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The service API dependency should be provided (no scope definition is needed but it can be inherited from dependency management).

private static final String DEFAULT_SCOPE = "_default";
private static final String DEFAULT_COLLECTION = "_default";

protected CouchbaseClient couchbaseClient;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field should be volatile and please move it after the static property definitions below.
Please also fix the same in AbstractCouchbaseProcessor.

@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.")
public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService {

private volatile String subDocPath;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move it after the static members.

final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue();
final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();

return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.JSON);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the fixed document type, CouchbaseKeyValueLookupService and CouchbaseMapCacheClient can only handle Json documents. They should also be able to process binary (raw) type too.

final byte[] document = serializeDocument(entry.getValue(), valueSerializer);

try {
couchbaseClient.replaceDocument(documentId, document);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AtomicDistributedMapCacheClient interface expects to apply optimistic locking so AtomicCacheEntry.version should be passed as the CAS value to Couchbase if it is available.

final byte[] document = serializeDocument(entry.getValue(), valueSerializer);

try {
couchbaseClient.replaceDocument(documentId, document);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the API specification of the replace() method says "Replace an existing key with new value", the only client of the method expects insert logic as well. So we may consider adding that logic too (as it was in the old implementation). I suggest checking the other replace() implementations to get an idea how it should work.

try {
final CouchbaseGetResult result = couchbaseClient.getDocument(documentId);
return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas());
} catch (CouchbaseException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only DocumentNotFoundException should be handled gracefully. Other exceptions should be wrapped into IOException and thrown.

Similarly: replace(), remove(), get()

DocumentExistsException: putIfAbsent()

Comment on lines +98 to +100
if (containsKey(key, keySerializer)) {
return get(key, keySerializer, valueDeserializer);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache entry can be deleted or expired after containsKey() and get() may return null. So it would be more error-proof to just call get() and return if something is found.

return get(key, keySerializer, valueDeserializer);
}

put(key, value, keySerializer, valueSerializer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the method name shouldn't it be putIfAbsent()?

}

@Override
public CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the Couchbase Java client type and method names, the correct spelling would be: lookupIn()

@mark-bathori
Copy link
Contributor Author

Thanks @turcsanyip for the review. I'll go through your comments and make the requested changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants