-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15145: Add RecordLookup, KeyValueLookup, and MapCacheClient Services for Couchbase bundle #10467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ices for Couchbase bundle
|
any updates on when this PR will be reviewed? |
|
@mark-bathori Do you need any help to resolve the conflicts? I can push a pr to your branch. |
|
@deniswsrosa The PR has been abandoned due to lack of reviewers. I'm reviewing it now and it should be merged soon. |
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mark-bathori and @turcsanyip, raising a general question, is it necessary to have all of the added components? Map Cache Client, Key Value Lookup, and Record Lookup are all for different use cases. If you have confirmed usage of all three, that's good, but just raising the question as opposed to simply carrying over what previously existed.
exceptionfactory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@turcsanyip I will hold off on further comments until you have completed your review. On a very cursory scan, I notice a handful of mostly minor concerns related to exception handling, such as lack of detailed messages and exceptions that are never logged in some cases.
Answering the main question about component usage would help guide where to focus review cycles.
Thanks for the work on this!
|
If you all need any help, just let me know, I can also push some fixes.
…On Fri, Jan 16, 2026 at 3:50 PM David Handermann ***@***.***> wrote:
***@***.**** commented on this pull request.
@turcsanyip <https://github.com/turcsanyip> I will hold off on further
comments until you have completed your review. On a very cursory scan, I
notice a handful of mostly minor concerns related to exception handling,
such as lack of detailed messages and exceptions that are never logged in
some cases.
Answering the main question about component usage would help guide where
to focus review cycles.
Thanks for the work on this!
—
Reply to this email directly, view it on GitHub
<#10467 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABUXRQQTSJWABVQAJCBU3T4HD3D7AVCNFSM6AAAAACKKSEVCSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZTMNZRGIZTKNRSGI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
|
@exceptionfactory @turcsanyip We could push another PR with the changes requested in this PR if needed. |
turcsanyip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mark-bathori Please find my review comments below.
Please also rebase the branch to main and update the old 2.7.0 version references.
| <groupId>org.apache.nifi</groupId> | ||
| <artifactId>nifi-distributed-cache-client-service-api</artifactId> | ||
| <scope>compile</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The service API dependency should be provided (no scope definition is needed but it can be inherited from dependency management).
| private static final String DEFAULT_SCOPE = "_default"; | ||
| private static final String DEFAULT_COLLECTION = "_default"; | ||
|
|
||
| protected CouchbaseClient couchbaseClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field should be volatile and please move it after the static property definitions below.
Please also fix the same in AbstractCouchbaseProcessor.
| @CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.") | ||
| public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService { | ||
|
|
||
| private volatile String subDocPath; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move it after the static members.
| final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue(); | ||
| final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); | ||
|
|
||
| return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.JSON); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the fixed document type, CouchbaseKeyValueLookupService and CouchbaseMapCacheClient can only handle Json documents. They should also be able to process binary (raw) type too.
| final byte[] document = serializeDocument(entry.getValue(), valueSerializer); | ||
|
|
||
| try { | ||
| couchbaseClient.replaceDocument(documentId, document); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtomicDistributedMapCacheClient interface expects to apply optimistic locking so AtomicCacheEntry.version should be passed as the CAS value to Couchbase if it is available.
| final byte[] document = serializeDocument(entry.getValue(), valueSerializer); | ||
|
|
||
| try { | ||
| couchbaseClient.replaceDocument(documentId, document); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though the API specification of the replace() method says "Replace an existing key with new value", the only client of the method expects insert logic as well. So we may consider adding that logic too (as it was in the old implementation). I suggest checking the other replace() implementations to get an idea how it should work.
| try { | ||
| final CouchbaseGetResult result = couchbaseClient.getDocument(documentId); | ||
| return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas()); | ||
| } catch (CouchbaseException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only DocumentNotFoundException should be handled gracefully. Other exceptions should be wrapped into IOException and thrown.
Similarly: replace(), remove(), get()
DocumentExistsException: putIfAbsent()
| if (containsKey(key, keySerializer)) { | ||
| return get(key, keySerializer, valueDeserializer); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cache entry can be deleted or expired after containsKey() and get() may return null. So it would be more error-proof to just call get() and return if something is found.
| return get(key, keySerializer, valueDeserializer); | ||
| } | ||
|
|
||
| put(key, value, keySerializer, valueSerializer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the method name shouldn't it be putIfAbsent()?
| } | ||
|
|
||
| @Override | ||
| public CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the Couchbase Java client type and method names, the correct spelling would be: lookupIn()
|
Thanks @turcsanyip for the review. I'll go through your comments and make the requested changes. |
Summary
NIFI-15145
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation