From 21e98af19e3bd82a8c25db0f6d6d4a3ff2d9437b Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Mon, 27 Oct 2025 15:22:59 +0100 Subject: [PATCH 1/3] NIFI-15145: Add RecordLookup, KeyValueLookup, and MapCacheClient Services for Couchbase bundle --- .../nifi-couchbase-nar/pom.xml | 5 + .../processors/couchbase/PutCouchbase.java | 2 - .../AbstractCouchbaseProcessorTest.java | 41 ++++ .../couchbase/GetCouchbaseTest.java | 71 +++---- .../couchbase/PutCouchbaseTest.java | 48 ++--- .../services/couchbase/CouchbaseClient.java | 11 ++ .../exception/CouchbaseException.java | 2 +- .../utils/CouchbaseLookupInResult.java | 20 ++ .../nifi-couchbase-services/pom.xml | 68 +++++++ .../couchbase/AbstractCouchbaseService.java | 93 +++++++++ .../CouchbaseKeyValueLookupService.java | 81 ++++++++ .../couchbase/CouchbaseMapCacheClient.java | 184 ++++++++++++++++++ .../CouchbaseRecordLookupService.java | 97 +++++++++ ...g.apache.nifi.controller.ControllerService | 17 ++ .../AbstractCouchbaseServiceTest.java | 36 ++++ .../CouchbaseKeyValueLookupServiceTest.java | 104 ++++++++++ .../CouchbaseMapCacheClientTest.java | 142 ++++++++++++++ .../CouchbaseRecordLookupServiceTest.java | 114 +++++++++++ .../couchbase/StandardCouchbaseClient.java | 105 +++++++++- ...seClient.java => CouchbaseClientTest.java} | 84 +++++++- .../nifi-couchbase-bundle/pom.xml | 1 + 21 files changed, 1239 insertions(+), 87 deletions(-) create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java rename nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/{TestCouchbaseClient.java => CouchbaseClientTest.java} (50%) diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml index aefc8023cb3f..4011cfa0c379 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml @@ -32,6 +32,11 @@ nifi-couchbase-processors 2.8.0-SNAPSHOT + + org.apache.nifi + nifi-couchbase-services + 2.7.0-SNAPSHOT + org.apache.nifi nifi-couchbase-services-api-nar diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java index a2bc22816dd9..1ffe1045f1cb 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java @@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.services.couchbase.CouchbaseClient; -import org.apache.nifi.services.couchbase.CouchbaseConnectionService; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.utils.CouchbaseContext; import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult; @@ -69,7 +68,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro } final long startNanos = System.nanoTime(); - final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class); final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue(); final CouchbaseContext couchbaseContext = getCouchbaseContext(context, flowFile); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java new file mode 100644 index 000000000000..5cbf02439214 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.services.couchbase.CouchbaseClient; +import org.apache.nifi.services.couchbase.CouchbaseConnectionService; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public abstract class AbstractCouchbaseProcessorTest { + + protected static final String SERVICE_ID = "couchbaseConnectionService"; + protected static final String TEST_DOCUMENT_ID = "test-document-id"; + protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}"; + protected static final String TEST_SERVICE_LOCATION = "couchbase://test-location"; + protected static final long TEST_CAS = 1L; + + protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) { + final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class); + when(connectionService.getIdentifier()).thenReturn(SERVICE_ID); + when(connectionService.getClient(any())).thenReturn(client); + when(connectionService.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION); + return connectionService; + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java index 97595b7d1d4b..4f535e34f9f0 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java @@ -31,8 +31,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,12 +60,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class GetCouchbaseTest { - - private static final String SERVICE_ID = "couchbaseConnectionService"; - private static final String TEST_DOCUMENT_ID = "test-document-id"; - private static final String TEST_SERVICE_LOCATION = "couchbase://test-location"; - private static final long TEST_CAS = 1L; +public class GetCouchbaseTest extends AbstractCouchbaseProcessorTest { private TestRunner runner; @@ -76,20 +71,15 @@ public void init() { @Test public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, InitializationException { - final String content = "{\"key\":\"value\"}"; - - final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS); + final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS); final CouchbaseClient client = mock(CouchbaseClient.class); when(client.getDocument(anyString())).thenReturn(result); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); - when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION); + final CouchbaseConnectionService connectionService = mockConnectionService(client); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(new byte[0]); @@ -101,7 +91,7 @@ public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, Ini runner.assertTransferCount(REL_FAILURE, 0); final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst(); - outFile.assertContentEquals(content); + outFile.assertContentEquals(TEST_DOCUMENT_CONTENT); outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET); outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE); outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, DEFAULT_COLLECTION); @@ -116,24 +106,18 @@ public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, Ini @Test public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, InitializationException { - final String content = "{\"key\":\"value\"}"; - - final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS); + final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS); final CouchbaseClient client = mock(CouchbaseClient.class); when(client.getDocument(anyString())).thenReturn(result); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); final MockFlowFile flowFile = new MockFlowFile(0); - final Map attributes = new HashMap<>(); - attributes.put("flowfile_document_id", TEST_DOCUMENT_ID); - flowFile.putAttributes(attributes); + flowFile.putAttributes(Collections.singletonMap("flowfile_document_id", TEST_DOCUMENT_ID)); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}"); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(flowFile); @@ -145,7 +129,7 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I runner.assertTransferCount(REL_FAILURE, 0); final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst(); - outFile.assertContentEquals(content); + outFile.assertContentEquals(TEST_DOCUMENT_CONTENT); outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET); outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE); outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, DEFAULT_COLLECTION); @@ -154,24 +138,20 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I @Test public void testWithFlowFileAttributes() throws CouchbaseException, InitializationException { - final String documentId = "test-document-id"; - final String content = "{\"key\":\"value\"}"; final String testBucket = "test-bucket"; final String testScope = "test-scope"; final String testCollection = "test-collection"; - final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS); + final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS); final CouchbaseClient client = mock(CouchbaseClient.class); when(client.getDocument(anyString())).thenReturn(result); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); - runner.setProperty(DOCUMENT_ID, documentId); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); + runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); runner.setProperty(BUCKET_NAME, "${bucket.attribute}"); runner.setProperty(SCOPE_NAME, "${scope.attribute}"); runner.setProperty(COLLECTION_NAME, "${collection.attribute}"); @@ -181,17 +161,16 @@ public void testWithFlowFileAttributes() throws CouchbaseException, Initializati attributes.put("bucket.attribute", testBucket); attributes.put("scope.attribute", testScope); attributes.put("collection.attribute", testCollection); - final byte[] input = documentId.getBytes(StandardCharsets.UTF_8); - runner.enqueue(input, attributes); + runner.enqueue(new byte[0], attributes); runner.run(); - verify(client, times(1)).getDocument(eq(documentId)); + verify(client, times(1)).getDocument(eq(TEST_DOCUMENT_ID)); runner.assertTransferCount(REL_SUCCESS, 1); runner.assertTransferCount(REL_FAILURE, 0); final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst(); - outFile.assertContentEquals(content); + outFile.assertContentEquals(TEST_DOCUMENT_CONTENT); outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, testBucket); outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, testScope); outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, testCollection); @@ -204,13 +183,11 @@ public void testWithFailure() throws CouchbaseException, InitializationException when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE); when(client.getDocument(anyString())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception"))); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(new byte[0]); runner.run(); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java index 60cd3a114a26..320beb848ebc 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java @@ -57,12 +57,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class PutCouchbaseTest { - - private static final String SERVICE_ID = "couchbaseConnectionService"; - private static final String TEST_DOCUMENT_ID = "test-document-id"; - private static final String TEST_SERVICE_LOCATION = "couchbase://test-location"; - private static final long TEST_CAS = 1L; +public class PutCouchbaseTest extends AbstractCouchbaseProcessorTest { private TestRunner runner; @@ -76,18 +71,15 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I final CouchbaseClient client = mock(CouchbaseClient.class); when(client.upsertDocument(anyString(), any())).thenReturn(new CouchbaseUpsertResult(TEST_CAS)); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); - when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION); + final CouchbaseConnectionService connectionService = mockConnectionService(client); final MockFlowFile flowFile = new MockFlowFile(0); final Map attributes = new HashMap<>(); attributes.put("flowfile_document_id", TEST_DOCUMENT_ID); flowFile.putAttributes(attributes); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}"); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.setValidateExpressionUsage(false); @@ -120,12 +112,10 @@ public void testWithFailure() throws CouchbaseException, InitializationException when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE); when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception"))); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(new byte[0]); @@ -143,12 +133,10 @@ public void testWithRetry() throws CouchbaseException, InitializationException { when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.RETRY); when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception"))); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(new byte[0]); @@ -166,12 +154,10 @@ public void testWithRollback() throws CouchbaseException, InitializationExceptio when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.ROLLBACK); when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception"))); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(new byte[0]); @@ -189,12 +175,10 @@ public void testWithUnknownException() throws CouchbaseException, Initialization when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE); when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception"))); - final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class); - when(service.getIdentifier()).thenReturn(SERVICE_ID); - when(service.getClient(any())).thenReturn(client); + final CouchbaseConnectionService connectionService = mockConnectionService(client); - runner.addControllerService(SERVICE_ID, service); - runner.enableControllerService(service); + runner.addControllerService(SERVICE_ID, connectionService); + runner.enableControllerService(connectionService); runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID); runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); runner.enqueue(new byte[0]); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java index 9ad04fbe01df..4cc354f26b7d 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java @@ -19,6 +19,7 @@ import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.exception.ExceptionCategory; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult; public interface CouchbaseClient { @@ -27,5 +28,15 @@ public interface CouchbaseClient { CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) throws CouchbaseException; + boolean documentExists(String documentId) throws CouchbaseException; + + void insertDocument(String documentId, byte[] content) throws CouchbaseException; + + void removeDocument(String documentId) throws CouchbaseException; + + void replaceDocument(String documentId, byte[] content) throws CouchbaseException; + + CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException; + ExceptionCategory getExceptionCategory(Throwable throwable); } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java index ea2436e621fa..600e38dc1c9d 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java @@ -23,6 +23,6 @@ public CouchbaseException(final String message) { } public CouchbaseException(final String message, final Throwable cause) { - super(cause); + super(message, cause); } } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java new file mode 100644 index 000000000000..c916e29f3435 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase.utils; + +public record CouchbaseLookupInResult(Object resultContent, long cas) { +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml new file mode 100644 index 000000000000..e918d055e62f --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-couchbase-bundle + 2.7.0-SNAPSHOT + + + nifi-couchbase-services + jar + + + + + org.apache.nifi + nifi-couchbase-services-api + 2.7.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-lookup-service-api + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-json-record-utils + 2.7.0-SNAPSHOT + + + org.apache.nifi + nifi-avro-record-utils + 2.7.0-SNAPSHOT + + + org.apache.nifi + nifi-distributed-cache-client-service-api + compile + + + + + + \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java new file mode 100644 index 000000000000..47fdc4ac7657 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.couchbase.utils.CouchbaseContext; +import org.apache.nifi.services.couchbase.utils.DocumentType; + +import java.util.Set; + +public class AbstractCouchbaseService extends AbstractControllerService { + + protected static final String KEY = "key"; + protected static final Set REQUIRED_KEYS = Set.of(KEY); + + private static final String DEFAULT_BUCKET = "default"; + private static final String DEFAULT_SCOPE = "_default"; + private static final String DEFAULT_COLLECTION = "_default"; + + protected CouchbaseClient couchbaseClient; + + public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Couchbase Connection Service") + .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseConnectionService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder() + .name("Bucket Name") + .description("The name of the bucket where documents will be stored. Each bucket contains a hierarchy of scopes and collections to group keys and values logically.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue(DEFAULT_BUCKET) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor SCOPE_NAME = new PropertyDescriptor.Builder() + .name("Scope Name") + .description("The name of the scope which is a logical namespace within a bucket, serving to categorize and organize related collections.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue(DEFAULT_SCOPE) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() + .name("Collection Name") + .description("The name of collection which is a logical container within a scope, used to hold documents.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue(DEFAULT_COLLECTION) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class); + final CouchbaseContext couchbaseContext = getCouchbaseContext(context); + couchbaseClient = connectionService.getClient(couchbaseContext); + } + + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } + + private CouchbaseContext getCouchbaseContext(ConfigurationContext context) { + final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue(); + final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue(); + final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); + + return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.JSON); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java new file mode 100644 index 000000000000..fdc5fd024939 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.StringLookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Tags({"lookup", "enrich", "key", "value", "couchbase"}) +@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.") +public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService { + + private volatile String subDocPath; + + public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new PropertyDescriptor.Builder() + .name("Lookup Sub-Document Path") + .description("The Sub-Document lookup path within the target JSON document.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .build(); + + private static final List PROPERTIES = List.of( + COUCHBASE_CONNECTION_SERVICE, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME, + LOOKUP_SUB_DOC_PATH + ); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + super.onEnabled(context); + subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue(); + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + final Object documentId = coordinates.get(KEY); + + if (documentId == null) { + return Optional.empty(); + } + try { + final CouchbaseLookupInResult result = couchbaseClient.lookUpIn(documentId.toString(), subDocPath); + return Optional.ofNullable(result.resultContent().toString()); + } catch (Exception e) { + throw new LookupFailureException("Key-value lookup from Couchbase failed", e); + } + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java new file mode 100644 index 000000000000..28d2215c3685 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; +import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Tags({"distributed", "cache", "map", "cluster", "couchbase"}) +@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." + + " This can be used in order to share a Map between nodes in a NiFi cluster." + + " Couchbase Server cluster can provide a high available and persistent cache storage.") +public class CouchbaseMapCacheClient extends AbstractCouchbaseService implements AtomicDistributedMapCacheClient { + + public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Couchbase Connection Service") + .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseConnectionService.class) + .build(); + + private static final List PROPERTIES = List.of( + COUCHBASE_CONNECTION_SERVICE, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME + ); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + final String documentId = serializeDocumentKey(key, keySerializer); + try { + final CouchbaseGetResult result = couchbaseClient.getDocument(documentId); + return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas()); + } catch (CouchbaseException e) { + return null; + } + } + + @Override + public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { + final String documentId = serializeDocumentKey(entry.getKey(), keySerializer); + final byte[] document = serializeDocument(entry.getValue(), valueSerializer); + + try { + couchbaseClient.replaceDocument(documentId, document); + return true; + } catch (CouchbaseException e) { + return false; + } + } + + @Override + public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + final String documentId = serializeDocumentKey(key, keySerializer); + final byte[] document = serializeDocument(value, valueSerializer); + + try { + couchbaseClient.insertDocument(documentId, document); + return true; + } catch (CouchbaseException e) { + return false; + } + } + + @Override + public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { + if (containsKey(key, keySerializer)) { + return get(key, keySerializer, valueDeserializer); + } + + put(key, value, keySerializer, valueSerializer); + return value; + } + + @Override + public boolean containsKey(K key, Serializer keySerializer) throws IOException { + final String documentId = serializeDocumentKey(key, keySerializer); + + try { + return couchbaseClient.documentExists(documentId); + } catch (CouchbaseException e) { + throw new IOException(e); + } + } + + @Override + public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + final String documentId = serializeDocumentKey(key, keySerializer); + final byte[] document = serializeDocument(value, valueSerializer); + + try { + couchbaseClient.upsertDocument(documentId, document); + } catch (CouchbaseException e) { + throw new IOException(e); + } + } + + @Override + public V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + final String documentId = serializeDocumentKey(key, keySerializer); + + try { + final CouchbaseGetResult result = couchbaseClient.getDocument(documentId); + return deserializeDocument(valueDeserializer, result.resultContent()); + } catch (CouchbaseException e) { + return null; + } + } + + @Override + public void close() { + } + + @Override + public boolean remove(K key, Serializer serializer) throws IOException { + final String documentId = serializeDocumentKey(key, serializer); + + try { + couchbaseClient.removeDocument(documentId); + return true; + } catch (CouchbaseException e) { + return false; + } + } + + private String serializeDocumentKey(final S key, final Serializer serializer) throws IOException { + final String result; + + if (key instanceof String) { + result = (String) key; + } else { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + serializer.serialize(key, stream); + result = stream.toString(StandardCharsets.UTF_8); + } + + if (result.isEmpty()) { + throw new IOException("Cache record key cannot be empty!"); + } + + return result; + } + + private byte[] serializeDocument(final S value, final Serializer serializer) throws IOException { + final ByteArrayOutputStream stream = new ByteArrayOutputStream(); + serializer.serialize(value, stream); + return stream.toByteArray(); + } + + private static V deserializeDocument(final Deserializer deserializer, final byte[] value) throws IOException { + return deserializer.deserialize(value); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java new file mode 100644 index 000000000000..3ffae09b2c2b --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.json.JsonParserFactory; +import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.json.SchemaApplicationStrategy; +import org.apache.nifi.json.StartingFieldStrategy; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.RecordLookupService; +import org.apache.nifi.schema.access.InferenceSchemaStrategy; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@Tags({"lookup", "enrich", "couchbase"}) +@CapabilityDescription("Lookup a record from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.") +public class CouchbaseRecordLookupService extends AbstractCouchbaseService implements RecordLookupService { + + private static final String DATE_FORMAT = "yyyy-MM-dd"; + private static final String TIME_FORMAT = "HH:mm:ss.SSSZ"; + private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); + + private static final List PROPERTIES = List.of( + COUCHBASE_CONNECTION_SERVICE, + BUCKET_NAME, + SCOPE_NAME, + COLLECTION_NAME + ); + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Optional lookup(Map coordinates) throws LookupFailureException { + final Object documentId = coordinates.get(KEY); + + if (documentId == null) { + return Optional.empty(); + } + + try { + final CouchbaseGetResult result = couchbaseClient.getDocument(documentId.toString()); + final RecordSchema schema = new InferenceSchemaStrategy().getSchema(null, new ByteArrayInputStream(result.resultContent()), null); + + final JsonTreeRowRecordReader recordReader = createJsonReader(new ByteArrayInputStream(result.resultContent()), schema); + + return Optional.ofNullable(recordReader.nextRecord()); + } catch (Exception e) { + throw new LookupFailureException("Record lookup from Couchbase failed", e); + } + } + + private JsonTreeRowRecordReader createJsonReader(InputStream inputStream, RecordSchema recordSchema) throws IOException, MalformedRecordException { + return new JsonTreeRowRecordReader( + inputStream, + getLogger(), + recordSchema, + DATE_FORMAT, + TIME_FORMAT, + DATE_TIME_FORMAT, + StartingFieldStrategy.ROOT_NODE, + null, + SchemaApplicationStrategy.SELECTED_PART, + null, + jsonParserFactory + ); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 000000000000..772920f46fea --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.services.couchbase.CouchbaseKeyValueLookupService +org.apache.nifi.services.couchbase.CouchbaseRecordLookupService +org.apache.nifi.services.couchbase.CouchbaseMapCacheClient diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java new file mode 100644 index 000000000000..daab9c887845 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public abstract class AbstractCouchbaseServiceTest { + + protected static final String SERVICE_ID = "couchbaseConnectionService"; + protected static final String TEST_DOCUMENT_ID = "test-document-id"; + protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}"; + protected static final long TEST_CAS = 1L; + + protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) { + final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class); + when(connectionService.getIdentifier()).thenReturn(SERVICE_ID); + when(connectionService.getClient(any())).thenReturn(client); + return connectionService; + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java new file mode 100644 index 000000000000..0531c1110630 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE; +import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CouchbaseKeyValueLookupServiceTest extends AbstractCouchbaseServiceTest { + + private CouchbaseKeyValueLookupService lookupService; + + @BeforeEach + public void init() { + lookupService = new CouchbaseKeyValueLookupService(); + } + + @Test + public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.lookUpIn(anyString(), any())).thenReturn(new CouchbaseLookupInResult("test result", TEST_CAS)); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + lookupService.onEnabled(context); + + final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID); + final Optional result = lookupService.lookup(coordinates); + + assertTrue(result.isPresent()); + assertEquals("test result", result.get()); + } + + @Test + public void testLookUpFailure() throws CouchbaseException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.lookUpIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + lookupService.onEnabled(context); + + final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID); + + assertThrows(LookupFailureException.class, () -> lookupService.lookup(coordinates)); + } + + @Test + public void testMissingKey() throws LookupFailureException { + final CouchbaseClient client = mock(CouchbaseClient.class); + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + lookupService.onEnabled(context); + + final Optional result = lookupService.lookup(Collections.emptyMap()); + + assertTrue(result.isEmpty()); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java new file mode 100644 index 000000000000..36181411791c --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CouchbaseMapCacheClientTest extends AbstractCouchbaseServiceTest { + + private final Serializer stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8)); + private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8); + private CouchbaseMapCacheClient mapCacheClient; + + @BeforeEach + public void init() { + mapCacheClient = new CouchbaseMapCacheClient(); + } + + @Test + public void testCacheGet() throws CouchbaseException, IOException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS)); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + mapCacheClient.onEnabled(context); + + final String result = mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer); + + assertEquals(TEST_DOCUMENT_CONTENT, result); + } + + @Test + public void testCacheGetFailure() throws CouchbaseException, IOException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.getDocument(anyString())).thenThrow(new CouchbaseException("Test exception")); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + mapCacheClient.onEnabled(context); + + final String result = mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer); + + assertNull(result); + } + + @Test + public void testCachePut() throws CouchbaseException, IOException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.upsertDocument(anyString(), any())).thenReturn(new CouchbaseUpsertResult(TEST_CAS)); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + mapCacheClient.onEnabled(context); + + mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, stringSerializer); + verify(client, times(1)).upsertDocument(eq(TEST_DOCUMENT_ID), eq(TEST_DOCUMENT_CONTENT.getBytes())); + } + + @Test + public void testCachePutFailure() throws CouchbaseException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + mapCacheClient.onEnabled(context); + + assertThrows(IOException.class, () -> mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, stringSerializer)); + } + + @Test + public void testCacheRemove() throws CouchbaseException, IOException { + final CouchbaseClient client = mock(CouchbaseClient.class); + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + mapCacheClient.onEnabled(context); + + mapCacheClient.remove(TEST_DOCUMENT_ID, stringSerializer); + verify(client, times(1)).removeDocument(eq(TEST_DOCUMENT_ID)); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java new file mode 100644 index 000000000000..4082cf7bcbd4 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.services.couchbase.exception.CouchbaseException; +import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockControllerServiceInitializationContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE; +import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CouchbaseRecordLookupServiceTest extends AbstractCouchbaseServiceTest { + + private CouchbaseRecordLookupService lookupService; + + @BeforeEach + public void init() { + lookupService = new CouchbaseRecordLookupService(); + } + + @Test + public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS)); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + lookupService.onEnabled(context); + + final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID); + final Optional result = lookupService.lookup(coordinates); + + assertTrue(result.isPresent()); + + final List fields = Collections.singletonList(new RecordField("key", RecordFieldType.STRING.getDataType())); + final Record expectedRecord = new MapRecord(new SimpleRecordSchema(fields), Collections.singletonMap("key", "value")); + + assertEquals(expectedRecord, result.get()); + } + + @Test + public void testLookUpFailure() throws CouchbaseException { + final CouchbaseClient client = mock(CouchbaseClient.class); + when(client.lookUpIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); + + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + + lookupService.onEnabled(context); + + final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID); + + assertThrows(LookupFailureException.class, () -> lookupService.lookup(coordinates)); + } + + @Test + public void testMissingKey() throws LookupFailureException { + final CouchbaseClient client = mock(CouchbaseClient.class); + final CouchbaseConnectionService connectionService = mockConnectionService(client); + + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + lookupService.onEnabled(context); + + final Optional result = lookupService.lookup(Collections.emptyMap()); + + assertTrue(result.isEmpty()); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java index dce98be91963..9a5fcbdbd197 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java @@ -45,22 +45,33 @@ import com.couchbase.client.java.codec.RawBinaryTranscoder; import com.couchbase.client.java.codec.RawJsonTranscoder; import com.couchbase.client.java.codec.Transcoder; +import com.couchbase.client.java.json.JsonObject; +import com.couchbase.client.java.kv.ExistsResult; import com.couchbase.client.java.kv.GetOptions; import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.InsertOptions; +import com.couchbase.client.java.kv.LookupInResult; +import com.couchbase.client.java.kv.LookupInSpec; import com.couchbase.client.java.kv.MutationResult; import com.couchbase.client.java.kv.PersistTo; +import com.couchbase.client.java.kv.ReplaceOptions; import com.couchbase.client.java.kv.ReplicateTo; import com.couchbase.client.java.kv.UpsertOptions; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.exception.ExceptionCategory; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult; import org.apache.nifi.services.couchbase.utils.DocumentType; import org.apache.nifi.services.couchbase.utils.JsonValidator; +import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Map.entry; import static org.apache.nifi.services.couchbase.exception.ExceptionCategory.FAILURE; @@ -124,11 +135,11 @@ public CouchbaseGetResult getDocument(String documentId) throws CouchbaseExcepti @Override public CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) throws CouchbaseException { - try { - if (!getInputValidator(documentType).test(content)) { - throw new CouchbaseException("The provided input is invalid"); - } + if (!getInputValidator(documentType).test(content)) { + throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId)); + } + try { final MutationResult result = collection.upsert(documentId, content, UpsertOptions.upsertOptions() .durability(persistTo, replicateTo) @@ -141,6 +152,76 @@ public CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) t } } + @Override + public boolean documentExists(String documentId) throws CouchbaseException { + try { + final ExistsResult result = collection.exists(documentId); + return result.exists(); + } catch (Exception e) { + throw new CouchbaseException("Failed to check document [%s] in Couchbase".formatted(documentId), e); + } + } + + @Override + public void insertDocument(String documentId, byte[] content) throws CouchbaseException { + if (!getInputValidator(documentType).test(content)) { + throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId)); + } + + try { + collection.insert(documentId, content, + InsertOptions.insertOptions() + .durability(persistTo, replicateTo) + .transcoder(getTranscoder(documentType)) + .clientContext(new HashMap<>())); + } catch (Exception e) { + throw new CouchbaseException("Failed to insert document [%s] in Couchbase".formatted(documentId), e); + } + } + + @Override + public void removeDocument(String documentId) throws CouchbaseException { + try { + collection.remove(documentId); + } catch (Exception e) { + throw new CouchbaseException("Failed to remove document [%s] in Couchbase".formatted(documentId), e); + } + } + + @Override + public void replaceDocument(String documentId, byte[] content) throws CouchbaseException { + if (!getInputValidator(documentType).test(content)) { + throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId)); + } + + try { + collection.replace(documentId, content, + ReplaceOptions.replaceOptions() + .durability(persistTo, replicateTo) + .transcoder(getTranscoder(documentType)) + .clientContext(new HashMap<>())); + } catch (Exception e) { + throw new CouchbaseException("Failed to replace document [%s] in Couchbase".formatted(documentId), e); + } + } + + @Override + public CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException { + try { + final String documentPath = subDocPath == null ? "" : subDocPath; + final LookupInResult result = collection.lookupIn(documentId, Collections.singletonList(LookupInSpec.get(documentPath))); + + if (!result.exists(0)) { + throw new CouchbaseException("No value found on the requested path [%s] in Couchbase".formatted(subDocPath)); + } + + final Object lookUpInResult = result.contentAs(0, Object.class); + return new CouchbaseLookupInResult(deserializeLookupInResult(lookUpInResult), result.cas()); + } catch (Exception e) { + throw new CouchbaseException("Failed to look up in document [%s] in Couchbase".formatted(documentId), e); + } + } + @Override public ExceptionCategory getExceptionCategory(Throwable throwable) { return exceptionMapping.getOrDefault(throwable.getClass(), FAILURE); @@ -159,4 +240,20 @@ private Predicate getInputValidator(DocumentType documentType) { case BINARY -> v -> true; }; } + + private String deserializeLookupInResult(Object result) { + if (result instanceof String) { + return (String) result; + } else if (result instanceof Map) { + return JsonObject.from((Map) result).toString(); + } else if (result instanceof List) { + return ((List) result).stream() + .map(this::deserializeLookupInResult) + .collect(Collectors.joining(",", "[", "]")); + } else if (result instanceof byte[]) { + return new String((byte[]) result, StandardCharsets.UTF_8); + } + + return result.toString(); + } } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java similarity index 50% rename from nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java rename to nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java index ad3d7e7d8785..104400fa9d10 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java @@ -18,16 +18,23 @@ import com.couchbase.client.java.Collection; import com.couchbase.client.java.kv.GetResult; +import com.couchbase.client.java.kv.LookupInResult; import com.couchbase.client.java.kv.MutationResult; import com.couchbase.client.java.kv.PersistTo; import com.couchbase.client.java.kv.ReplicateTo; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.nifi.services.couchbase.utils.DocumentType.JSON; @@ -36,11 +43,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestCouchbaseClient { +public class CouchbaseClientTest { private static final String TEST_DOCUMENT_ID = "test-document-id"; private static final long TEST_CAS = 1L; @@ -72,7 +80,24 @@ void testPutJsonDocumentValidationFailure() { final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); final Exception exception = assertThrows(CouchbaseException.class, () -> client.upsertDocument(TEST_DOCUMENT_ID, content.getBytes())); + assertTrue(exception.getMessage().contains("The provided input is invalid")); + } + + @Test + void testInsertJsonDocumentValidationFailure() { + final String content = "{invalid-json}"; + final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); + + final Exception exception = assertThrows(CouchbaseException.class, () -> client.insertDocument(TEST_DOCUMENT_ID, content.getBytes())); + assertTrue(exception.getMessage().contains("The provided input is invalid")); + } + + @Test + void testReplaceJsonDocumentValidationFailure() { + final String content = "{invalid-json}"; + final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); + final Exception exception = assertThrows(CouchbaseException.class, () -> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes())); assertTrue(exception.getMessage().contains("The provided input is invalid")); } @@ -94,4 +119,61 @@ void testGetDocument() throws CouchbaseException { assertEquals(TEST_CAS, getResult.cas()); assertArrayEquals(content.getBytes(), getResult.resultContent()); } + + @Test + void testLookupInWithMapResult() throws CouchbaseException { + final String expectedResult = "{\"name\":\"John\",\"age\":\"20\"}"; + final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); + + Map lookupInContent = new HashMap<>(); + lookupInContent.put("name", "John"); + lookupInContent.put("age", "20"); + + final LookupInResult result = mock(LookupInResult.class); + when(result.contentAs(anyInt(), any(Class.class))).thenReturn(lookupInContent); + when(result.exists(anyInt())).thenReturn(true); + when(result.cas()).thenReturn(TEST_CAS); + + when(collection.lookupIn(anyString(), any())).thenReturn(result); + + final CouchbaseLookupInResult lookupInResult = client.lookUpIn(TEST_DOCUMENT_ID, ""); + + assertEquals(expectedResult, lookupInResult.resultContent()); + assertEquals(TEST_CAS, lookupInResult.cas()); + } + + @Test + void testLookupInWithArrayResult() throws CouchbaseException { + final String expectedResult = "[{\"name\":\"John\"},{\"name\":\"Jack\"}]"; + final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); + + List lookupInContent = new ArrayList<>(); + lookupInContent.add(Collections.singletonMap("name", "John")); + lookupInContent.add(Collections.singletonMap("name", "Jack")); + + final LookupInResult result = mock(LookupInResult.class); + when(result.contentAs(anyInt(), any(Class.class))).thenReturn(lookupInContent); + when(result.exists(anyInt())).thenReturn(true); + when(result.cas()).thenReturn(TEST_CAS); + + when(collection.lookupIn(anyString(), any())).thenReturn(result); + + final CouchbaseLookupInResult lookupInResult = client.lookUpIn(TEST_DOCUMENT_ID, ""); + + assertEquals(expectedResult, lookupInResult.resultContent()); + assertEquals(TEST_CAS, lookupInResult.cas()); + } + + @Test + void testLookupInWithNoResult() { + final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); + + final LookupInResult result = mock(LookupInResult.class); + when(result.exists(anyInt())).thenReturn(false); + + when(collection.lookupIn(anyString(), any())).thenReturn(result); + + final Exception exception = assertThrows(CouchbaseException.class, () -> client.lookUpIn(TEST_DOCUMENT_ID, "test-path")); + assertTrue(exception.getCause().getMessage().contains("No value found on the requested path [test-path] in Couchbase")); + } } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml index 73c18a9c54eb..b827445e4bfe 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml +++ b/nifi-extension-bundles/nifi-couchbase-bundle/pom.xml @@ -34,6 +34,7 @@ nifi-couchbase-standard-services-nar nifi-couchbase-services-api nifi-couchbase-services-api-nar + nifi-couchbase-services \ No newline at end of file From 4b055d4341c96c6827eea6e6a0cc8676b7a6876d Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Thu, 5 Feb 2026 15:34:47 +0100 Subject: [PATCH 2/3] Fix review comments --- .../nifi-couchbase-nar/pom.xml | 2 +- .../couchbase/AbstractCouchbaseProcessor.java | 4 +- .../services/couchbase/CouchbaseClient.java | 4 +- .../CouchbaseCasMismatchException.java | 24 ++++++++ .../CouchbaseDocExistsException.java | 24 ++++++++ .../CouchbaseDocNotFoundException.java | 24 ++++++++ .../nifi-couchbase-services/pom.xml | 12 ++-- .../couchbase/AbstractCouchbaseService.java | 19 ++++++- .../CouchbaseKeyValueLookupService.java | 12 +++- .../couchbase/CouchbaseMapCacheClient.java | 55 ++++++++++++++----- .../CouchbaseRecordLookupService.java | 9 ++- .../CouchbaseKeyValueLookupServiceTest.java | 8 +-- .../CouchbaseMapCacheClientTest.java | 3 +- .../CouchbaseRecordLookupServiceTest.java | 4 +- .../nifi-couchbase-standard-services/pom.xml | 2 +- .../couchbase/StandardCouchbaseClient.java | 18 +++++- .../couchbase/CouchbaseClientTest.java | 8 +-- 17 files changed, 186 insertions(+), 46 deletions(-) create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java create mode 100644 nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml index 4011cfa0c379..5528a8447ec6 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml @@ -35,7 +35,7 @@ org.apache.nifi nifi-couchbase-services - 2.7.0-SNAPSHOT + 2.8.0-SNAPSHOT org.apache.nifi diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java index 6d84e4e953e3..fc2f788df0dd 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -50,8 +50,6 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { - protected CouchbaseConnectionService connectionService; - public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder() .name("Document ID") .description("Couchbase document identifier, or an expression to construct the Couchbase document identifier.") @@ -128,6 +126,8 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY); + protected volatile CouchbaseConnectionService connectionService; + @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java index 4cc354f26b7d..96797c1a5d21 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java @@ -34,9 +34,9 @@ public interface CouchbaseClient { void removeDocument(String documentId) throws CouchbaseException; - void replaceDocument(String documentId, byte[] content) throws CouchbaseException; + void replaceDocument(String documentId, byte[] content, long cas) throws CouchbaseException; - CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException; + CouchbaseLookupInResult lookupIn(String documentId, String subDocPath) throws CouchbaseException; ExceptionCategory getExceptionCategory(Throwable throwable); } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java new file mode 100644 index 000000000000..87187884439b --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase.exception; + +public class CouchbaseCasMismatchException extends CouchbaseException { + + public CouchbaseCasMismatchException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java new file mode 100644 index 000000000000..dda65be21e9f --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase.exception; + +public class CouchbaseDocExistsException extends CouchbaseException { + + public CouchbaseDocExistsException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java new file mode 100644 index 000000000000..4a7910052317 --- /dev/null +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.couchbase.exception; + +public class CouchbaseDocNotFoundException extends CouchbaseException { + + public CouchbaseDocNotFoundException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml index e918d055e62f..1203c01fc36d 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml @@ -20,7 +20,7 @@ org.apache.nifi nifi-couchbase-bundle - 2.7.0-SNAPSHOT + 2.8.0-SNAPSHOT nifi-couchbase-services @@ -31,7 +31,7 @@ org.apache.nifi nifi-couchbase-services-api - 2.7.0-SNAPSHOT + 2.8.0-SNAPSHOT provided @@ -49,20 +49,18 @@ org.apache.nifi nifi-json-record-utils - 2.7.0-SNAPSHOT + 2.8.0-SNAPSHOT org.apache.nifi nifi-avro-record-utils - 2.7.0-SNAPSHOT + 2.8.0-SNAPSHOT org.apache.nifi nifi-distributed-cache-client-service-api - compile + provided - - \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java index 47fdc4ac7657..afc345b1f575 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java @@ -36,8 +36,6 @@ public class AbstractCouchbaseService extends AbstractControllerService { private static final String DEFAULT_SCOPE = "_default"; private static final String DEFAULT_COLLECTION = "_default"; - protected CouchbaseClient couchbaseClient; - public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder() .name("Couchbase Connection Service") .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.") @@ -72,6 +70,16 @@ public class AbstractCouchbaseService extends AbstractControllerService { .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder() + .name("Document Type") + .description("The content type for storing the document.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.JSON.toString()) + .build(); + + protected volatile CouchbaseClient couchbaseClient; + @OnEnabled public void onEnabled(final ConfigurationContext context) { final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class); @@ -83,11 +91,16 @@ public Set getRequiredKeys() { return REQUIRED_KEYS; } + protected DocumentType resolveDocumentType(ConfigurationContext context) { + return DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + } + private CouchbaseContext getCouchbaseContext(ConfigurationContext context) { final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue(); final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue(); final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); + final DocumentType documentType = resolveDocumentType(context); - return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.JSON); + return new CouchbaseContext(bucketName, scopeName, collectionName, documentType); } } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java index fdc5fd024939..fb9b5fde5971 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java @@ -26,6 +26,7 @@ import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; +import org.apache.nifi.services.couchbase.utils.DocumentType; import java.util.List; import java.util.Map; @@ -35,8 +36,6 @@ @CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.") public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService { - private volatile String subDocPath; - public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new PropertyDescriptor.Builder() .name("Lookup Sub-Document Path") .description("The Sub-Document lookup path within the target JSON document.") @@ -52,6 +51,8 @@ public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService imp LOOKUP_SUB_DOC_PATH ); + private volatile String subDocPath; + @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; @@ -64,6 +65,11 @@ public void onEnabled(final ConfigurationContext context) { subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue(); } + @Override + protected DocumentType resolveDocumentType(ConfigurationContext context) { + return DocumentType.JSON; + } + @Override public Optional lookup(Map coordinates) throws LookupFailureException { final Object documentId = coordinates.get(KEY); @@ -72,7 +78,7 @@ public Optional lookup(Map coordinates) throws LookupFai return Optional.empty(); } try { - final CouchbaseLookupInResult result = couchbaseClient.lookUpIn(documentId.toString(), subDocPath); + final CouchbaseLookupInResult result = couchbaseClient.lookupIn(documentId.toString(), subDocPath); return Optional.ofNullable(result.resultContent().toString()); } catch (Exception e) { throw new LookupFailureException("Key-value lookup from Couchbase failed", e); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java index 28d2215c3685..1b3dbc2f03f7 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java @@ -23,6 +23,9 @@ import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException; +import org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException; +import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; @@ -30,6 +33,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; @Tags({"distributed", "cache", "map", "cluster", "couchbase"}) @CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." + @@ -48,7 +52,8 @@ public class CouchbaseMapCacheClient extends AbstractCouchbaseService implements COUCHBASE_CONNECTION_SERVICE, BUCKET_NAME, SCOPE_NAME, - COLLECTION_NAME + COLLECTION_NAME, + DOCUMENT_TYPE ); @Override @@ -62,8 +67,10 @@ public AtomicCacheEntry fetch(K key, Serializer keySeriali try { final CouchbaseGetResult result = couchbaseClient.getDocument(documentId); return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas()); - } catch (CouchbaseException e) { + } catch (CouchbaseDocNotFoundException e) { return null; + } catch (CouchbaseException e) { + throw new IOException("Failed to fetch cache entry [%s] from Couchbase".formatted(documentId), e); } } @@ -71,12 +78,27 @@ public AtomicCacheEntry fetch(K key, Serializer keySeriali public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { final String documentId = serializeDocumentKey(entry.getKey(), keySerializer); final byte[] document = serializeDocument(entry.getValue(), valueSerializer); + final Optional revision = entry.getRevision(); + + if (revision.isEmpty()) { + try { + couchbaseClient.insertDocument(documentId, document); + return true; + } catch (CouchbaseDocExistsException e) { + return false; + } catch (CouchbaseException e) { + throw new IOException("Failed to insert cache entry [%s] into Couchbase".formatted(documentId), e); + } + } try { - couchbaseClient.replaceDocument(documentId, document); + final long casValue = revision.get(); + couchbaseClient.replaceDocument(documentId, document, casValue); return true; - } catch (CouchbaseException e) { + } catch (CouchbaseDocNotFoundException | CouchbaseCasMismatchException e) { return false; + } catch (CouchbaseException e) { + throw new IOException("Failed to replace cache entry [%s] in Couchbase".formatted(documentId), e); } } @@ -88,18 +110,21 @@ public boolean putIfAbsent(K key, V value, Serializer keySerializer, S try { couchbaseClient.insertDocument(documentId, document); return true; - } catch (CouchbaseException e) { + } catch (CouchbaseDocExistsException e) { return false; + } catch (CouchbaseException e) { + throw new IOException("Failed to insert cache entry [%s] into Couchbase".formatted(documentId), e); } } @Override public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { - if (containsKey(key, keySerializer)) { - return get(key, keySerializer, valueDeserializer); + final V document = get(key, keySerializer, valueDeserializer); + if (document != null) { + return document; } - put(key, value, keySerializer, valueSerializer); + putIfAbsent(key, value, keySerializer, valueSerializer); return value; } @@ -110,7 +135,7 @@ public boolean containsKey(K key, Serializer keySerializer) throws IOExce try { return couchbaseClient.documentExists(documentId); } catch (CouchbaseException e) { - throw new IOException(e); + throw new IOException("Failed to check existence of cache entry [%s] in Couchbase".formatted(documentId), e); } } @@ -122,7 +147,7 @@ public void put(K key, V value, Serializer keySerializer, Serializer V get(K key, Serializer keySerializer, Deserializer valueDes try { final CouchbaseGetResult result = couchbaseClient.getDocument(documentId); return deserializeDocument(valueDeserializer, result.resultContent()); - } catch (CouchbaseException e) { + } catch (CouchbaseDocNotFoundException e) { return null; + } catch (CouchbaseException e) { + throw new IOException("Failed to fetch cache entry [%s] from Couchbase".formatted(documentId), e); } } @@ -149,8 +176,10 @@ public boolean remove(K key, Serializer serializer) throws IOException { try { couchbaseClient.removeDocument(documentId); return true; - } catch (CouchbaseException e) { + } catch (CouchbaseDocNotFoundException e) { return false; + } catch (CouchbaseException e) { + throw new IOException("Failed to remove cache entry [%s] from Couchbase".formatted(documentId), e); } } @@ -166,7 +195,7 @@ private String serializeDocumentKey(final S key, final Serializer seriali } if (result.isEmpty()) { - throw new IOException("Cache record key cannot be empty!"); + throw new IOException("Cache entry key cannot be empty!"); } return result; diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java index 3ffae09b2c2b..f1b49c8c9b5f 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java @@ -19,6 +19,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.json.JsonParserFactory; import org.apache.nifi.json.JsonTreeRowRecordReader; import org.apache.nifi.json.SchemaApplicationStrategy; @@ -27,9 +28,10 @@ import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.schema.access.InferenceSchemaStrategy; import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; +import org.apache.nifi.services.couchbase.utils.DocumentType; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -79,6 +81,11 @@ public Optional lookup(Map coordinates) throws LookupFai } } + @Override + protected DocumentType resolveDocumentType(ConfigurationContext context) { + return DocumentType.JSON; + } + private JsonTreeRowRecordReader createJsonReader(InputStream inputStream, RecordSchema recordSchema) throws IOException, MalformedRecordException { return new JsonTreeRowRecordReader( inputStream, diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java index 0531c1110630..0cfc571ffa6e 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java @@ -50,9 +50,9 @@ public void init() { } @Test - public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureException { + public void testSuccessfulLookup() throws CouchbaseException, LookupFailureException { final CouchbaseClient client = mock(CouchbaseClient.class); - when(client.lookUpIn(anyString(), any())).thenReturn(new CouchbaseLookupInResult("test result", TEST_CAS)); + when(client.lookupIn(anyString(), any())).thenReturn(new CouchbaseLookupInResult("test result", TEST_CAS)); final CouchbaseConnectionService connectionService = mockConnectionService(client); @@ -70,9 +70,9 @@ public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureExcep } @Test - public void testLookUpFailure() throws CouchbaseException { + public void testLookupFailure() throws CouchbaseException { final CouchbaseClient client = mock(CouchbaseClient.class); - when(client.lookUpIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); + when(client.lookupIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); final CouchbaseConnectionService connectionService = mockConnectionService(client); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java index 36181411791c..a9ebadc30e6f 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java @@ -19,6 +19,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult; @@ -77,7 +78,7 @@ public void testCacheGet() throws CouchbaseException, IOException { @Test public void testCacheGetFailure() throws CouchbaseException, IOException { final CouchbaseClient client = mock(CouchbaseClient.class); - when(client.getDocument(anyString())).thenThrow(new CouchbaseException("Test exception")); + when(client.getDocument(anyString())).thenThrow(new CouchbaseDocNotFoundException("Test exception", null)); final CouchbaseConnectionService connectionService = mockConnectionService(client); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java index 4082cf7bcbd4..2299f7e68876 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java @@ -20,11 +20,11 @@ import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; -import org.apache.nifi.serialization.record.Record; import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext; import org.junit.jupiter.api.BeforeEach; @@ -82,7 +82,7 @@ public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureExcep @Test public void testLookUpFailure() throws CouchbaseException { final CouchbaseClient client = mock(CouchbaseClient.class); - when(client.lookUpIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); + when(client.lookupIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception")); final CouchbaseConnectionService connectionService = mockConnectionService(client); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml index a6662135e77b..043e36076bdc 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml @@ -27,7 +27,7 @@ jar - 3.10.1 + 3.11.0 diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java index 9a5fcbdbd197..08e5965dfe2a 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java @@ -57,6 +57,9 @@ import com.couchbase.client.java.kv.ReplaceOptions; import com.couchbase.client.java.kv.ReplicateTo; import com.couchbase.client.java.kv.UpsertOptions; +import org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException; +import org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException; +import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException; import org.apache.nifi.services.couchbase.exception.CouchbaseException; import org.apache.nifi.services.couchbase.exception.ExceptionCategory; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; @@ -128,6 +131,8 @@ public CouchbaseGetResult getDocument(String documentId) throws CouchbaseExcepti final GetResult result = collection.get(documentId, GetOptions.getOptions().transcoder(getTranscoder(documentType))); return new CouchbaseGetResult(result.contentAsBytes(), result.cas()); + } catch (DocumentNotFoundException e) { + throw new CouchbaseDocNotFoundException("Couchbase document with key [%s] not found".formatted(documentId), e); } catch (Exception e) { throw new CouchbaseException("Failed to get document [%s] from Couchbase".formatted(documentId), e); } @@ -174,6 +179,8 @@ public void insertDocument(String documentId, byte[] content) throws CouchbaseEx .durability(persistTo, replicateTo) .transcoder(getTranscoder(documentType)) .clientContext(new HashMap<>())); + } catch (DocumentExistsException e) { + throw new CouchbaseDocExistsException("Document with key [%s] already exists".formatted(documentId), e); } catch (Exception e) { throw new CouchbaseException("Failed to insert document [%s] in Couchbase".formatted(documentId), e); } @@ -183,13 +190,15 @@ public void insertDocument(String documentId, byte[] content) throws CouchbaseEx public void removeDocument(String documentId) throws CouchbaseException { try { collection.remove(documentId); + } catch (DocumentNotFoundException e) { + throw new CouchbaseDocNotFoundException("Couchbase document with key [%s] not found".formatted(documentId), e); } catch (Exception e) { throw new CouchbaseException("Failed to remove document [%s] in Couchbase".formatted(documentId), e); } } @Override - public void replaceDocument(String documentId, byte[] content) throws CouchbaseException { + public void replaceDocument(String documentId, byte[] content, long cas) throws CouchbaseException { if (!getInputValidator(documentType).test(content)) { throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId)); } @@ -197,16 +206,21 @@ public void replaceDocument(String documentId, byte[] content) throws CouchbaseE try { collection.replace(documentId, content, ReplaceOptions.replaceOptions() + .cas(cas) .durability(persistTo, replicateTo) .transcoder(getTranscoder(documentType)) .clientContext(new HashMap<>())); + } catch (CasMismatchException e) { + throw new CouchbaseCasMismatchException("Couchbase document with key [%s] has been concurrently modified".formatted(documentId), e); + } catch (DocumentNotFoundException e) { + throw new CouchbaseDocNotFoundException("Couchbase document with key [%s] not found".formatted(documentId), e); } catch (Exception e) { throw new CouchbaseException("Failed to replace document [%s] in Couchbase".formatted(documentId), e); } } @Override - public CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException { + public CouchbaseLookupInResult lookupIn(String documentId, String subDocPath) throws CouchbaseException { try { final String documentPath = subDocPath == null ? "" : subDocPath; final LookupInResult result = collection.lookupIn(documentId, Collections.singletonList(LookupInSpec.get(documentPath))); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java index 104400fa9d10..62a9bc064b4a 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java @@ -97,7 +97,7 @@ void testReplaceJsonDocumentValidationFailure() { final String content = "{invalid-json}"; final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE); - final Exception exception = assertThrows(CouchbaseException.class, () -> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes())); + final Exception exception = assertThrows(CouchbaseException.class, () -> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes(), TEST_CAS)); assertTrue(exception.getMessage().contains("The provided input is invalid")); } @@ -136,7 +136,7 @@ void testLookupInWithMapResult() throws CouchbaseException { when(collection.lookupIn(anyString(), any())).thenReturn(result); - final CouchbaseLookupInResult lookupInResult = client.lookUpIn(TEST_DOCUMENT_ID, ""); + final CouchbaseLookupInResult lookupInResult = client.lookupIn(TEST_DOCUMENT_ID, ""); assertEquals(expectedResult, lookupInResult.resultContent()); assertEquals(TEST_CAS, lookupInResult.cas()); @@ -158,7 +158,7 @@ void testLookupInWithArrayResult() throws CouchbaseException { when(collection.lookupIn(anyString(), any())).thenReturn(result); - final CouchbaseLookupInResult lookupInResult = client.lookUpIn(TEST_DOCUMENT_ID, ""); + final CouchbaseLookupInResult lookupInResult = client.lookupIn(TEST_DOCUMENT_ID, ""); assertEquals(expectedResult, lookupInResult.resultContent()); assertEquals(TEST_CAS, lookupInResult.cas()); @@ -173,7 +173,7 @@ void testLookupInWithNoResult() { when(collection.lookupIn(anyString(), any())).thenReturn(result); - final Exception exception = assertThrows(CouchbaseException.class, () -> client.lookUpIn(TEST_DOCUMENT_ID, "test-path")); + final Exception exception = assertThrows(CouchbaseException.class, () -> client.lookupIn(TEST_DOCUMENT_ID, "test-path")); assertTrue(exception.getCause().getMessage().contains("No value found on the requested path [test-path] in Couchbase")); } } From 3606263f761222ec6b55d1765809c654c39e7cf6 Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Mon, 9 Feb 2026 16:24:38 +0100 Subject: [PATCH 3/3] Review items Signed-off-by: Mark Bathori --- .../nifi-couchbase-services/pom.xml | 8 ++ .../couchbase/AbstractCouchbaseService.java | 17 +--- .../CouchbaseKeyValueLookupService.java | 8 +- .../couchbase/CouchbaseMapCacheClient.java | 3 +- .../CouchbaseRecordLookupService.java | 78 +++++++++---------- .../AbstractCouchbaseServiceTest.java | 4 +- .../CouchbaseKeyValueLookupServiceTest.java | 12 +-- .../CouchbaseMapCacheClientTest.java | 20 ++--- .../CouchbaseRecordLookupServiceTest.java | 54 ++++++++----- .../couchbase/StandardCouchbaseClient.java | 32 ++++---- 10 files changed, 119 insertions(+), 117 deletions(-) diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml index 1203c01fc36d..a03361f05297 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml @@ -61,6 +61,14 @@ nifi-distributed-cache-client-service-api provided + + + + org.apache.nifi + nifi-mock-record-utils + 2.8.0-SNAPSHOT + test + \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java index afc345b1f575..d1946bc040b8 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java @@ -27,7 +27,7 @@ import java.util.Set; -public class AbstractCouchbaseService extends AbstractControllerService { +public abstract class AbstractCouchbaseService extends AbstractControllerService { protected static final String KEY = "key"; protected static final Set REQUIRED_KEYS = Set.of(KEY); @@ -70,14 +70,6 @@ public class AbstractCouchbaseService extends AbstractControllerService { .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .build(); - public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor.Builder() - .name("Document Type") - .description("The content type for storing the document.") - .required(true) - .allowableValues(DocumentType.values()) - .defaultValue(DocumentType.JSON.toString()) - .build(); - protected volatile CouchbaseClient couchbaseClient; @OnEnabled @@ -91,16 +83,11 @@ public Set getRequiredKeys() { return REQUIRED_KEYS; } - protected DocumentType resolveDocumentType(ConfigurationContext context) { - return DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); - } - private CouchbaseContext getCouchbaseContext(ConfigurationContext context) { final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue(); final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue(); final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); - final DocumentType documentType = resolveDocumentType(context); - return new CouchbaseContext(bucketName, scopeName, collectionName, documentType); + return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.BINARY); } } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java index fb9b5fde5971..97e58978ed99 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java @@ -26,7 +26,6 @@ import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult; -import org.apache.nifi.services.couchbase.utils.DocumentType; import java.util.List; import java.util.Map; @@ -65,11 +64,6 @@ public void onEnabled(final ConfigurationContext context) { subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue(); } - @Override - protected DocumentType resolveDocumentType(ConfigurationContext context) { - return DocumentType.JSON; - } - @Override public Optional lookup(Map coordinates) throws LookupFailureException { final Object documentId = coordinates.get(KEY); @@ -79,7 +73,7 @@ public Optional lookup(Map coordinates) throws LookupFai } try { final CouchbaseLookupInResult result = couchbaseClient.lookupIn(documentId.toString(), subDocPath); - return Optional.ofNullable(result.resultContent().toString()); + return Optional.ofNullable(result.resultContent()).map(Object::toString); } catch (Exception e) { throw new LookupFailureException("Key-value lookup from Couchbase failed", e); } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java index 1b3dbc2f03f7..355e3bef3c31 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java @@ -52,8 +52,7 @@ public class CouchbaseMapCacheClient extends AbstractCouchbaseService implements COUCHBASE_CONNECTION_SERVICE, BUCKET_NAME, SCOPE_NAME, - COLLECTION_NAME, - DOCUMENT_TYPE + COLLECTION_NAME ); @Override diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java index f1b49c8c9b5f..554f37cbb102 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java @@ -18,49 +18,58 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.json.JsonParserFactory; -import org.apache.nifi.json.JsonTreeRowRecordReader; -import org.apache.nifi.json.SchemaApplicationStrategy; -import org.apache.nifi.json.StartingFieldStrategy; import org.apache.nifi.lookup.LookupFailureException; import org.apache.nifi.lookup.RecordLookupService; -import org.apache.nifi.schema.access.InferenceSchemaStrategy; -import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException; import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; -import org.apache.nifi.services.couchbase.utils.DocumentType; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; @Tags({"lookup", "enrich", "couchbase"}) @CapabilityDescription("Lookup a record from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.") public class CouchbaseRecordLookupService extends AbstractCouchbaseService implements RecordLookupService { - private static final String DATE_FORMAT = "yyyy-MM-dd"; - private static final String TIME_FORMAT = "HH:mm:ss.SSSZ"; - private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; - private static final JsonParserFactory jsonParserFactory = new JsonParserFactory(); + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for parsing fetched document from Couchbase Server.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); private static final List PROPERTIES = List.of( COUCHBASE_CONNECTION_SERVICE, BUCKET_NAME, SCOPE_NAME, - COLLECTION_NAME + COLLECTION_NAME, + RECORD_READER ); + private volatile RecordReaderFactory readerFactory; + @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; } + @Override + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + super.onEnabled(context); + readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + } + @Override public Optional lookup(Map coordinates) throws LookupFailureException { final Object documentId = coordinates.get(KEY); @@ -69,36 +78,27 @@ public Optional lookup(Map coordinates) throws LookupFai return Optional.empty(); } + CouchbaseGetResult result; try { - final CouchbaseGetResult result = couchbaseClient.getDocument(documentId.toString()); - final RecordSchema schema = new InferenceSchemaStrategy().getSchema(null, new ByteArrayInputStream(result.resultContent()), null); - - final JsonTreeRowRecordReader recordReader = createJsonReader(new ByteArrayInputStream(result.resultContent()), schema); - - return Optional.ofNullable(recordReader.nextRecord()); + result = couchbaseClient.getDocument(documentId.toString()); + } catch (CouchbaseDocNotFoundException e) { + return Optional.empty(); } catch (Exception e) { throw new LookupFailureException("Record lookup from Couchbase failed", e); } - } - @Override - protected DocumentType resolveDocumentType(ConfigurationContext context) { - return DocumentType.JSON; - } + try (final InputStream input = new ByteArrayInputStream(result.resultContent())) { + final long inputLength = result.resultContent().length; + final Map stringMap = coordinates.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> String.valueOf(e.getValue()) + )); - private JsonTreeRowRecordReader createJsonReader(InputStream inputStream, RecordSchema recordSchema) throws IOException, MalformedRecordException { - return new JsonTreeRowRecordReader( - inputStream, - getLogger(), - recordSchema, - DATE_FORMAT, - TIME_FORMAT, - DATE_TIME_FORMAT, - StartingFieldStrategy.ROOT_NODE, - null, - SchemaApplicationStrategy.SELECTED_PART, - null, - jsonParserFactory - ); + final RecordReader recordReader = readerFactory.createRecordReader(stringMap, input, inputLength, getLogger()); + return Optional.ofNullable(recordReader.nextRecord()); + } catch (Exception e) { + throw new LookupFailureException("Failed to parse the looked-up record", e); + } } } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java index daab9c887845..132ebd005677 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java @@ -22,14 +22,14 @@ public abstract class AbstractCouchbaseServiceTest { - protected static final String SERVICE_ID = "couchbaseConnectionService"; + protected static final String CONNECTION_SERVICE_ID = "couchbaseConnectionService"; protected static final String TEST_DOCUMENT_ID = "test-document-id"; protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}"; protected static final long TEST_CAS = 1L; protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) { final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class); - when(connectionService.getIdentifier()).thenReturn(SERVICE_ID); + when(connectionService.getIdentifier()).thenReturn(CONNECTION_SERVICE_ID); when(connectionService.getClient(any())).thenReturn(client); return connectionService; } diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java index 0cfc571ffa6e..18397a75645c 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java @@ -56,8 +56,8 @@ public void testSuccessfulLookup() throws CouchbaseException, LookupFailureExcep final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); lookupService.onEnabled(context); @@ -76,8 +76,8 @@ public void testLookupFailure() throws CouchbaseException { final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); lookupService.onEnabled(context); @@ -92,8 +92,8 @@ public void testMissingKey() throws LookupFailureException { final CouchbaseClient client = mock(CouchbaseClient.class); final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); lookupService.onEnabled(context); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java index a9ebadc30e6f..25c89b7e353b 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java @@ -64,8 +64,8 @@ public void testCacheGet() throws CouchbaseException, IOException { final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); mapCacheClient.onEnabled(context); @@ -82,8 +82,8 @@ public void testCacheGetFailure() throws CouchbaseException, IOException { final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); mapCacheClient.onEnabled(context); @@ -100,8 +100,8 @@ public void testCachePut() throws CouchbaseException, IOException { final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); mapCacheClient.onEnabled(context); @@ -117,8 +117,8 @@ public void testCachePutFailure() throws CouchbaseException { final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); mapCacheClient.onEnabled(context); @@ -131,8 +131,8 @@ public void testCacheRemove() throws CouchbaseException, IOException { final CouchbaseClient client = mock(CouchbaseClient.class); final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); mapCacheClient.onEnabled(context); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java index 2299f7e68876..f28e1b67336b 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java @@ -18,8 +18,10 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; @@ -27,7 +29,9 @@ import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult; import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext; -import org.junit.jupiter.api.BeforeEach; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -48,26 +52,40 @@ public class CouchbaseRecordLookupServiceTest extends AbstractCouchbaseServiceTest { + private static final String LOOKUP_SERVICE_ID = "lookupService"; + private static final String RECORD_READER_ID = "recordReaderService"; + private CouchbaseRecordLookupService lookupService; - @BeforeEach - public void init() { + public void initLookupService() throws InitializationException, CouchbaseException { + final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class); lookupService = new CouchbaseRecordLookupService(); - } - @Test - public void testSuccessfulLookUp() throws CouchbaseException, LookupFailureException { final CouchbaseClient client = mock(CouchbaseClient.class); when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS)); final CouchbaseConnectionService connectionService = mockConnectionService(client); + final MockRecordParser readerFactory = new MockRecordParser(); + readerFactory.addSchemaField("key", RecordFieldType.STRING); + readerFactory.addRecord("value"); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); - final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + runner.addControllerService(LOOKUP_SERVICE_ID, lookupService); - lookupService.onEnabled(context); + runner.addControllerService(CONNECTION_SERVICE_ID, connectionService); + runner.addControllerService(RECORD_READER_ID, readerFactory); + + runner.enableControllerService(connectionService); + runner.enableControllerService(readerFactory); + + runner.setProperty(lookupService, CouchbaseRecordLookupService.COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); + runner.setProperty(lookupService, CouchbaseRecordLookupService.RECORD_READER, RECORD_READER_ID); + + runner.enableControllerService(lookupService); + } + @Test + public void testSuccessfulLookUp() throws LookupFailureException, CouchbaseException, InitializationException { + initLookupService(); final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID); final Optional result = lookupService.lookup(coordinates); @@ -86,10 +104,11 @@ public void testLookUpFailure() throws CouchbaseException { final CouchbaseConnectionService connectionService = mockConnectionService(client); - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); + final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID); + final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID); final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); + lookupService = new CouchbaseRecordLookupService(); lookupService.onEnabled(context); final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID); @@ -98,15 +117,8 @@ public void testLookUpFailure() throws CouchbaseException { } @Test - public void testMissingKey() throws LookupFailureException { - final CouchbaseClient client = mock(CouchbaseClient.class); - final CouchbaseConnectionService connectionService = mockConnectionService(client); - - final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, SERVICE_ID); - final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID); - final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>()); - lookupService.onEnabled(context); - + public void testMissingKey() throws LookupFailureException, CouchbaseException, InitializationException { + initLookupService(); final Optional result = lookupService.lookup(Collections.emptyMap()); assertTrue(result.isEmpty()); diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java index 08e5965dfe2a..f1c3aaac1acf 100644 --- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java +++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java @@ -45,6 +45,7 @@ import com.couchbase.client.java.codec.RawBinaryTranscoder; import com.couchbase.client.java.codec.RawJsonTranscoder; import com.couchbase.client.java.codec.Transcoder; +import com.couchbase.client.java.json.JsonArray; import com.couchbase.client.java.json.JsonObject; import com.couchbase.client.java.kv.ExistsResult; import com.couchbase.client.java.kv.GetOptions; @@ -74,7 +75,6 @@ import java.util.List; import java.util.Map; import java.util.function.Predicate; -import java.util.stream.Collectors; import static java.util.Map.entry; import static org.apache.nifi.services.couchbase.exception.ExceptionCategory.FAILURE; @@ -229,8 +229,14 @@ public CouchbaseLookupInResult lookupIn(String documentId, String subDocPath) th throw new CouchbaseException("No value found on the requested path [%s] in Couchbase".formatted(subDocPath)); } - final Object lookUpInResult = result.contentAs(0, Object.class); - return new CouchbaseLookupInResult(deserializeLookupInResult(lookUpInResult), result.cas()); + Object lookupInResult; + try { + lookupInResult = result.contentAs(0, Object.class); + } catch (DecodingFailureException e) { + lookupInResult = result.contentAs(0, byte[].class); + } + + return new CouchbaseLookupInResult(deserializeLookupInResult(lookupInResult), result.cas()); } catch (Exception e) { throw new CouchbaseException("Failed to look up in document [%s] in Couchbase".formatted(documentId), e); } @@ -256,18 +262,14 @@ private Predicate getInputValidator(DocumentType documentType) { } private String deserializeLookupInResult(Object result) { - if (result instanceof String) { - return (String) result; - } else if (result instanceof Map) { - return JsonObject.from((Map) result).toString(); - } else if (result instanceof List) { - return ((List) result).stream() - .map(this::deserializeLookupInResult) - .collect(Collectors.joining(",", "[", "]")); - } else if (result instanceof byte[]) { - return new String((byte[]) result, StandardCharsets.UTF_8); - } + return switch (result) { + case null -> null; + case String s -> s; + case Map map -> JsonObject.from(map).toString(); + case List list -> JsonArray.from(list).toString(); + case byte[] bytes -> new String(bytes, StandardCharsets.UTF_8); + default -> result.toString(); + }; - return result.toString(); } }