diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
index aefc8023cb3f..5528a8447ec6 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
@@ -32,6 +32,11 @@
nifi-couchbase-processors2.8.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-couchbase-services
+ 2.8.0-SNAPSHOT
+ org.apache.nifinifi-couchbase-services-api-nar
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
index 6d84e4e953e3..fc2f788df0dd 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -50,8 +50,6 @@
public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
- protected CouchbaseConnectionService connectionService;
-
public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder()
.name("Document ID")
.description("Couchbase document identifier, or an expression to construct the Couchbase document identifier.")
@@ -128,6 +126,8 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_RETRY);
+ protected volatile CouchbaseConnectionService connectionService;
+
@Override
protected List getSupportedPropertyDescriptors() {
return PROPERTIES;
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
index a2bc22816dd9..1ffe1045f1cb 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbase.java
@@ -28,7 +28,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.services.couchbase.CouchbaseClient;
-import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
@@ -69,7 +68,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
}
final long startNanos = System.nanoTime();
- final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
final CouchbaseContext couchbaseContext = getCouchbaseContext(context, flowFile);
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
new file mode 100644
index 000000000000..5cbf02439214
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessorTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.services.couchbase.CouchbaseClient;
+import org.apache.nifi.services.couchbase.CouchbaseConnectionService;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCouchbaseProcessorTest {
+
+ protected static final String SERVICE_ID = "couchbaseConnectionService";
+ protected static final String TEST_DOCUMENT_ID = "test-document-id";
+ protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}";
+ protected static final String TEST_SERVICE_LOCATION = "couchbase://test-location";
+ protected static final long TEST_CAS = 1L;
+
+ protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) {
+ final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class);
+ when(connectionService.getIdentifier()).thenReturn(SERVICE_ID);
+ when(connectionService.getClient(any())).thenReturn(client);
+ when(connectionService.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ return connectionService;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
index 97595b7d1d4b..4f535e34f9f0 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/GetCouchbaseTest.java
@@ -31,8 +31,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,12 +60,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class GetCouchbaseTest {
-
- private static final String SERVICE_ID = "couchbaseConnectionService";
- private static final String TEST_DOCUMENT_ID = "test-document-id";
- private static final String TEST_SERVICE_LOCATION = "couchbase://test-location";
- private static final long TEST_CAS = 1L;
+public class GetCouchbaseTest extends AbstractCouchbaseProcessorTest {
private TestRunner runner;
@@ -76,20 +71,15 @@ public void init() {
@Test
public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, InitializationException {
- final String content = "{\"key\":\"value\"}";
-
- final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
- when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -101,7 +91,7 @@ public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, Ini
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, DEFAULT_COLLECTION);
@@ -116,24 +106,18 @@ public void testOnTriggerWithProvidedDocumentId() throws CouchbaseException, Ini
@Test
public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, InitializationException {
- final String content = "{\"key\":\"value\"}";
-
- final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
final MockFlowFile flowFile = new MockFlowFile(0);
- final Map attributes = new HashMap<>();
- attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
- flowFile.putAttributes(attributes);
+ flowFile.putAttributes(Collections.singletonMap("flowfile_document_id", TEST_DOCUMENT_ID));
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(flowFile);
@@ -145,7 +129,7 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, DEFAULT_BUCKET);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, DEFAULT_SCOPE);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, DEFAULT_COLLECTION);
@@ -154,24 +138,20 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I
@Test
public void testWithFlowFileAttributes() throws CouchbaseException, InitializationException {
- final String documentId = "test-document-id";
- final String content = "{\"key\":\"value\"}";
final String testBucket = "test-bucket";
final String testScope = "test-scope";
final String testCollection = "test-collection";
- final CouchbaseGetResult result = new CouchbaseGetResult(content.getBytes(), TEST_CAS);
+ final CouchbaseGetResult result = new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS);
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.getDocument(anyString())).thenReturn(result);
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
- runner.setProperty(DOCUMENT_ID, documentId);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
+ runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(BUCKET_NAME, "${bucket.attribute}");
runner.setProperty(SCOPE_NAME, "${scope.attribute}");
runner.setProperty(COLLECTION_NAME, "${collection.attribute}");
@@ -181,17 +161,16 @@ public void testWithFlowFileAttributes() throws CouchbaseException, Initializati
attributes.put("bucket.attribute", testBucket);
attributes.put("scope.attribute", testScope);
attributes.put("collection.attribute", testCollection);
- final byte[] input = documentId.getBytes(StandardCharsets.UTF_8);
- runner.enqueue(input, attributes);
+ runner.enqueue(new byte[0], attributes);
runner.run();
- verify(client, times(1)).getDocument(eq(documentId));
+ verify(client, times(1)).getDocument(eq(TEST_DOCUMENT_ID));
runner.assertTransferCount(REL_SUCCESS, 1);
runner.assertTransferCount(REL_FAILURE, 0);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
- outFile.assertContentEquals(content);
+ outFile.assertContentEquals(TEST_DOCUMENT_CONTENT);
outFile.assertAttributeEquals(BUCKET_ATTRIBUTE, testBucket);
outFile.assertAttributeEquals(SCOPE_ATTRIBUTE, testScope);
outFile.assertAttributeEquals(COLLECTION_ATTRIBUTE, testCollection);
@@ -204,13 +183,11 @@ public void testWithFailure() throws CouchbaseException, InitializationException
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.getDocument(anyString())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
runner.run();
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
index 60cd3a114a26..320beb848ebc 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/PutCouchbaseTest.java
@@ -57,12 +57,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-public class PutCouchbaseTest {
-
- private static final String SERVICE_ID = "couchbaseConnectionService";
- private static final String TEST_DOCUMENT_ID = "test-document-id";
- private static final String TEST_SERVICE_LOCATION = "couchbase://test-location";
- private static final long TEST_CAS = 1L;
+public class PutCouchbaseTest extends AbstractCouchbaseProcessorTest {
private TestRunner runner;
@@ -76,18 +71,15 @@ public void testWithDocumentIdAsFlowFileAttribute() throws CouchbaseException, I
final CouchbaseClient client = mock(CouchbaseClient.class);
when(client.upsertDocument(anyString(), any())).thenReturn(new CouchbaseUpsertResult(TEST_CAS));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
- when(service.getServiceLocation()).thenReturn(TEST_SERVICE_LOCATION);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
final MockFlowFile flowFile = new MockFlowFile(0);
final Map attributes = new HashMap<>();
attributes.put("flowfile_document_id", TEST_DOCUMENT_ID);
flowFile.putAttributes(attributes);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, "${flowfile_document_id}");
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.setValidateExpressionUsage(false);
@@ -120,12 +112,10 @@ public void testWithFailure() throws CouchbaseException, InitializationException
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -143,12 +133,10 @@ public void testWithRetry() throws CouchbaseException, InitializationException {
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.RETRY);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -166,12 +154,10 @@ public void testWithRollback() throws CouchbaseException, InitializationExceptio
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.ROLLBACK);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
@@ -189,12 +175,10 @@ public void testWithUnknownException() throws CouchbaseException, Initialization
when(client.getExceptionCategory(any())).thenReturn(ExceptionCategory.FAILURE);
when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("", new TestCouchbaseException("Test exception")));
- final CouchbaseConnectionService service = mock(CouchbaseConnectionService.class);
- when(service.getIdentifier()).thenReturn(SERVICE_ID);
- when(service.getClient(any())).thenReturn(client);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
- runner.addControllerService(SERVICE_ID, service);
- runner.enableControllerService(service);
+ runner.addControllerService(SERVICE_ID, connectionService);
+ runner.enableControllerService(connectionService);
runner.setProperty(DOCUMENT_ID, TEST_DOCUMENT_ID);
runner.setProperty(COUCHBASE_CONNECTION_SERVICE, SERVICE_ID);
runner.enqueue(new byte[0]);
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
index 9ad04fbe01df..96797c1a5d21 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/CouchbaseClient.java
@@ -19,6 +19,7 @@
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
public interface CouchbaseClient {
@@ -27,5 +28,15 @@ public interface CouchbaseClient {
CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) throws CouchbaseException;
+ boolean documentExists(String documentId) throws CouchbaseException;
+
+ void insertDocument(String documentId, byte[] content) throws CouchbaseException;
+
+ void removeDocument(String documentId) throws CouchbaseException;
+
+ void replaceDocument(String documentId, byte[] content, long cas) throws CouchbaseException;
+
+ CouchbaseLookupInResult lookupIn(String documentId, String subDocPath) throws CouchbaseException;
+
ExceptionCategory getExceptionCategory(Throwable throwable);
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
new file mode 100644
index 000000000000..87187884439b
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseCasMismatchException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase.exception;
+
+public class CouchbaseCasMismatchException extends CouchbaseException {
+
+ public CouchbaseCasMismatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
new file mode 100644
index 000000000000..dda65be21e9f
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocExistsException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase.exception;
+
+public class CouchbaseDocExistsException extends CouchbaseException {
+
+ public CouchbaseDocExistsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
new file mode 100644
index 000000000000..4a7910052317
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseDocNotFoundException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase.exception;
+
+public class CouchbaseDocNotFoundException extends CouchbaseException {
+
+ public CouchbaseDocNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
index ea2436e621fa..600e38dc1c9d 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/exception/CouchbaseException.java
@@ -23,6 +23,6 @@ public CouchbaseException(final String message) {
}
public CouchbaseException(final String message, final Throwable cause) {
- super(cause);
+ super(message, cause);
}
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
new file mode 100644
index 000000000000..c916e29f3435
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services-api/src/main/java/org/apache/nifi/services/couchbase/utils/CouchbaseLookupInResult.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase.utils;
+
+public record CouchbaseLookupInResult(Object resultContent, long cas) {
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
new file mode 100644
index 000000000000..a03361f05297
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-couchbase-bundle
+ 2.8.0-SNAPSHOT
+
+
+ nifi-couchbase-services
+ jar
+
+
+
+
+ org.apache.nifi
+ nifi-couchbase-services-api
+ 2.8.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ nifi-lookup-service-api
+
+
+ org.apache.nifi
+ nifi-record
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+ org.apache.nifi
+ nifi-json-record-utils
+ 2.8.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-avro-record-utils
+ 2.8.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-distributed-cache-client-service-api
+ provided
+
+
+
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ 2.8.0-SNAPSHOT
+ test
+
+
+
+
\ No newline at end of file
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
new file mode 100644
index 000000000000..d1946bc040b8
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/AbstractCouchbaseService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseContext;
+import org.apache.nifi.services.couchbase.utils.DocumentType;
+
+import java.util.Set;
+
+public abstract class AbstractCouchbaseService extends AbstractControllerService {
+
+ protected static final String KEY = "key";
+ protected static final Set REQUIRED_KEYS = Set.of(KEY);
+
+ private static final String DEFAULT_BUCKET = "default";
+ private static final String DEFAULT_SCOPE = "_default";
+ private static final String DEFAULT_COLLECTION = "_default";
+
+ public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+ .name("Couchbase Connection Service")
+ .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.")
+ .required(true)
+ .identifiesControllerService(CouchbaseConnectionService.class)
+ .build();
+
+ public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder()
+ .name("Bucket Name")
+ .description("The name of the bucket where documents will be stored. Each bucket contains a hierarchy of scopes and collections to group keys and values logically.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_BUCKET)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor SCOPE_NAME = new PropertyDescriptor.Builder()
+ .name("Scope Name")
+ .description("The name of the scope which is a logical namespace within a bucket, serving to categorize and organize related collections.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_SCOPE)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
+ .name("Collection Name")
+ .description("The name of collection which is a logical container within a scope, used to hold documents.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(DEFAULT_COLLECTION)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ protected volatile CouchbaseClient couchbaseClient;
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final CouchbaseConnectionService connectionService = context.getProperty(COUCHBASE_CONNECTION_SERVICE).asControllerService(CouchbaseConnectionService.class);
+ final CouchbaseContext couchbaseContext = getCouchbaseContext(context);
+ couchbaseClient = connectionService.getClient(couchbaseContext);
+ }
+
+ public Set getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+
+ private CouchbaseContext getCouchbaseContext(ConfigurationContext context) {
+ final String bucketName = context.getProperty(BUCKET_NAME).evaluateAttributeExpressions().getValue();
+ final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue();
+ final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue();
+
+ return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.BINARY);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
new file mode 100644
index 000000000000..97e58978ed99
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupService.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Tags({"lookup", "enrich", "key", "value", "couchbase"})
+@CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.")
+public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService {
+
+ public static final PropertyDescriptor LOOKUP_SUB_DOC_PATH = new PropertyDescriptor.Builder()
+ .name("Lookup Sub-Document Path")
+ .description("The Sub-Document lookup path within the target JSON document.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ private static final List PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME,
+ LOOKUP_SUB_DOC_PATH
+ );
+
+ private volatile String subDocPath;
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ super.onEnabled(context);
+ subDocPath = context.getProperty(LOOKUP_SUB_DOC_PATH).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ final Object documentId = coordinates.get(KEY);
+
+ if (documentId == null) {
+ return Optional.empty();
+ }
+ try {
+ final CouchbaseLookupInResult result = couchbaseClient.lookupIn(documentId.toString(), subDocPath);
+ return Optional.ofNullable(result.resultContent()).map(Object::toString);
+ } catch (Exception e) {
+ throw new LookupFailureException("Key-value lookup from Couchbase failed", e);
+ }
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
new file mode 100644
index 000000000000..355e3bef3c31
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+
+@Tags({"distributed", "cache", "map", "cluster", "couchbase"})
+@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." +
+ " This can be used in order to share a Map between nodes in a NiFi cluster." +
+ " Couchbase Server cluster can provide a high available and persistent cache storage.")
+public class CouchbaseMapCacheClient extends AbstractCouchbaseService implements AtomicDistributedMapCacheClient {
+
+ public static final PropertyDescriptor COUCHBASE_CONNECTION_SERVICE = new PropertyDescriptor.Builder()
+ .name("Couchbase Connection Service")
+ .description("A Couchbase Connection Service which manages connections to a Couchbase cluster.")
+ .required(true)
+ .identifiesControllerService(CouchbaseConnectionService.class)
+ .build();
+
+ private static final List PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME
+ );
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ try {
+ final CouchbaseGetResult result = couchbaseClient.getDocument(documentId);
+ return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas());
+ } catch (CouchbaseDocNotFoundException e) {
+ return null;
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to fetch cache entry [%s] from Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(entry.getKey(), keySerializer);
+ final byte[] document = serializeDocument(entry.getValue(), valueSerializer);
+ final Optional revision = entry.getRevision();
+
+ if (revision.isEmpty()) {
+ try {
+ couchbaseClient.insertDocument(documentId, document);
+ return true;
+ } catch (CouchbaseDocExistsException e) {
+ return false;
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to insert cache entry [%s] into Couchbase".formatted(documentId), e);
+ }
+ }
+
+ try {
+ final long casValue = revision.get();
+ couchbaseClient.replaceDocument(documentId, document, casValue);
+ return true;
+ } catch (CouchbaseDocNotFoundException | CouchbaseCasMismatchException e) {
+ return false;
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to replace cache entry [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ final byte[] document = serializeDocument(value, valueSerializer);
+
+ try {
+ couchbaseClient.insertDocument(documentId, document);
+ return true;
+ } catch (CouchbaseDocExistsException e) {
+ return false;
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to insert cache entry [%s] into Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException {
+ final V document = get(key, keySerializer, valueDeserializer);
+ if (document != null) {
+ return document;
+ }
+
+ putIfAbsent(key, value, keySerializer, valueSerializer);
+ return value;
+ }
+
+ @Override
+ public boolean containsKey(K key, Serializer keySerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+
+ try {
+ return couchbaseClient.documentExists(documentId);
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to check existence of cache entry [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+ final byte[] document = serializeDocument(value, valueSerializer);
+
+ try {
+ couchbaseClient.upsertDocument(documentId, document);
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to insert cache entry [%s] into Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, keySerializer);
+
+ try {
+ final CouchbaseGetResult result = couchbaseClient.getDocument(documentId);
+ return deserializeDocument(valueDeserializer, result.resultContent());
+ } catch (CouchbaseDocNotFoundException e) {
+ return null;
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to fetch cache entry [%s] from Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean remove(K key, Serializer serializer) throws IOException {
+ final String documentId = serializeDocumentKey(key, serializer);
+
+ try {
+ couchbaseClient.removeDocument(documentId);
+ return true;
+ } catch (CouchbaseDocNotFoundException e) {
+ return false;
+ } catch (CouchbaseException e) {
+ throw new IOException("Failed to remove cache entry [%s] from Couchbase".formatted(documentId), e);
+ }
+ }
+
+ private String serializeDocumentKey(final S key, final Serializer serializer) throws IOException {
+ final String result;
+
+ if (key instanceof String) {
+ result = (String) key;
+ } else {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ serializer.serialize(key, stream);
+ result = stream.toString(StandardCharsets.UTF_8);
+ }
+
+ if (result.isEmpty()) {
+ throw new IOException("Cache entry key cannot be empty!");
+ }
+
+ return result;
+ }
+
+ private byte[] serializeDocument(final S value, final Serializer serializer) throws IOException {
+ final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ serializer.serialize(value, stream);
+ return stream.toByteArray();
+ }
+
+ private static V deserializeDocument(final Deserializer deserializer, final byte[] value) throws IOException {
+ return deserializer.deserialize(value);
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
new file mode 100644
index 000000000000..554f37cbb102
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@Tags({"lookup", "enrich", "couchbase"})
+@CapabilityDescription("Lookup a record from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.")
+public class CouchbaseRecordLookupService extends AbstractCouchbaseService implements RecordLookupService {
+
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("The Record Reader to use for parsing fetched document from Couchbase Server.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ private static final List PROPERTIES = List.of(
+ COUCHBASE_CONNECTION_SERVICE,
+ BUCKET_NAME,
+ SCOPE_NAME,
+ COLLECTION_NAME,
+ RECORD_READER
+ );
+
+ private volatile RecordReaderFactory readerFactory;
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ super.onEnabled(context);
+ readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) throws LookupFailureException {
+ final Object documentId = coordinates.get(KEY);
+
+ if (documentId == null) {
+ return Optional.empty();
+ }
+
+ CouchbaseGetResult result;
+ try {
+ result = couchbaseClient.getDocument(documentId.toString());
+ } catch (CouchbaseDocNotFoundException e) {
+ return Optional.empty();
+ } catch (Exception e) {
+ throw new LookupFailureException("Record lookup from Couchbase failed", e);
+ }
+
+ try (final InputStream input = new ByteArrayInputStream(result.resultContent())) {
+ final long inputLength = result.resultContent().length;
+ final Map stringMap = coordinates.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> String.valueOf(e.getValue())
+ ));
+
+ final RecordReader recordReader = readerFactory.createRecordReader(stringMap, input, inputLength, getLogger());
+ return Optional.ofNullable(recordReader.nextRecord());
+ } catch (Exception e) {
+ throw new LookupFailureException("Failed to parse the looked-up record", e);
+ }
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 000000000000..772920f46fea
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.couchbase.CouchbaseKeyValueLookupService
+org.apache.nifi.services.couchbase.CouchbaseRecordLookupService
+org.apache.nifi.services.couchbase.CouchbaseMapCacheClient
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
new file mode 100644
index 000000000000..132ebd005677
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/AbstractCouchbaseServiceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public abstract class AbstractCouchbaseServiceTest {
+
+ protected static final String CONNECTION_SERVICE_ID = "couchbaseConnectionService";
+ protected static final String TEST_DOCUMENT_ID = "test-document-id";
+ protected static final String TEST_DOCUMENT_CONTENT = "{\"key\":\"value\"}";
+ protected static final long TEST_CAS = 1L;
+
+ protected static CouchbaseConnectionService mockConnectionService(CouchbaseClient client) {
+ final CouchbaseConnectionService connectionService = mock(CouchbaseConnectionService.class);
+ when(connectionService.getIdentifier()).thenReturn(CONNECTION_SERVICE_ID);
+ when(connectionService.getClient(any())).thenReturn(client);
+ return connectionService;
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
new file mode 100644
index 000000000000..18397a75645c
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseKeyValueLookupServiceTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CouchbaseKeyValueLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+ private CouchbaseKeyValueLookupService lookupService;
+
+ @BeforeEach
+ public void init() {
+ lookupService = new CouchbaseKeyValueLookupService();
+ }
+
+ @Test
+ public void testSuccessfulLookup() throws CouchbaseException, LookupFailureException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.lookupIn(anyString(), any())).thenReturn(new CouchbaseLookupInResult("test result", TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+ final Optional result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isPresent());
+ assertEquals("test result", result.get());
+ }
+
+ @Test
+ public void testLookupFailure() throws CouchbaseException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.lookupIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+
+ assertThrows(LookupFailureException.class, () -> lookupService.lookup(coordinates));
+ }
+
+ @Test
+ public void testMissingKey() throws LookupFailureException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+ lookupService.onEnabled(context);
+
+ final Optional result = lookupService.lookup(Collections.emptyMap());
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
new file mode 100644
index 000000000000..25c89b7e353b
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseMapCacheClientTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CouchbaseMapCacheClientTest extends AbstractCouchbaseServiceTest {
+
+ private final Serializer stringSerializer = (value, output) -> output.write(value.getBytes(StandardCharsets.UTF_8));
+ private final Deserializer stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
+ private CouchbaseMapCacheClient mapCacheClient;
+
+ @BeforeEach
+ public void init() {
+ mapCacheClient = new CouchbaseMapCacheClient();
+ }
+
+ @Test
+ public void testCacheGet() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ final String result = mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer);
+
+ assertEquals(TEST_DOCUMENT_CONTENT, result);
+ }
+
+ @Test
+ public void testCacheGetFailure() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.getDocument(anyString())).thenThrow(new CouchbaseDocNotFoundException("Test exception", null));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ final String result = mapCacheClient.get(TEST_DOCUMENT_ID, stringSerializer, stringDeserializer);
+
+ assertNull(result);
+ }
+
+ @Test
+ public void testCachePut() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.upsertDocument(anyString(), any())).thenReturn(new CouchbaseUpsertResult(TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, stringSerializer);
+ verify(client, times(1)).upsertDocument(eq(TEST_DOCUMENT_ID), eq(TEST_DOCUMENT_CONTENT.getBytes()));
+ }
+
+ @Test
+ public void testCachePutFailure() throws CouchbaseException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.upsertDocument(anyString(), any())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ assertThrows(IOException.class, () -> mapCacheClient.put(TEST_DOCUMENT_ID, TEST_DOCUMENT_CONTENT, stringSerializer, stringSerializer));
+ }
+
+ @Test
+ public void testCacheRemove() throws CouchbaseException, IOException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ mapCacheClient.onEnabled(context);
+
+ mapCacheClient.remove(TEST_DOCUMENT_ID, stringSerializer);
+ verify(client, times(1)).removeDocument(eq(TEST_DOCUMENT_ID));
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
new file mode 100644
index 000000000000..f28e1b67336b
--- /dev/null
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseRecordLookupServiceTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.couchbase;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.services.couchbase.exception.CouchbaseException;
+import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.COUCHBASE_CONNECTION_SERVICE;
+import static org.apache.nifi.services.couchbase.AbstractCouchbaseService.KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CouchbaseRecordLookupServiceTest extends AbstractCouchbaseServiceTest {
+
+ private static final String LOOKUP_SERVICE_ID = "lookupService";
+ private static final String RECORD_READER_ID = "recordReaderService";
+
+ private CouchbaseRecordLookupService lookupService;
+
+ public void initLookupService() throws InitializationException, CouchbaseException {
+ final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ lookupService = new CouchbaseRecordLookupService();
+
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.getDocument(anyString())).thenReturn(new CouchbaseGetResult(TEST_DOCUMENT_CONTENT.getBytes(), TEST_CAS));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+ final MockRecordParser readerFactory = new MockRecordParser();
+ readerFactory.addSchemaField("key", RecordFieldType.STRING);
+ readerFactory.addRecord("value");
+
+ runner.addControllerService(LOOKUP_SERVICE_ID, lookupService);
+
+ runner.addControllerService(CONNECTION_SERVICE_ID, connectionService);
+ runner.addControllerService(RECORD_READER_ID, readerFactory);
+
+ runner.enableControllerService(connectionService);
+ runner.enableControllerService(readerFactory);
+
+ runner.setProperty(lookupService, CouchbaseRecordLookupService.COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ runner.setProperty(lookupService, CouchbaseRecordLookupService.RECORD_READER, RECORD_READER_ID);
+
+ runner.enableControllerService(lookupService);
+ }
+
+ @Test
+ public void testSuccessfulLookUp() throws LookupFailureException, CouchbaseException, InitializationException {
+ initLookupService();
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+ final Optional result = lookupService.lookup(coordinates);
+
+ assertTrue(result.isPresent());
+
+ final List fields = Collections.singletonList(new RecordField("key", RecordFieldType.STRING.getDataType()));
+ final Record expectedRecord = new MapRecord(new SimpleRecordSchema(fields), Collections.singletonMap("key", "value"));
+
+ assertEquals(expectedRecord, result.get());
+ }
+
+ @Test
+ public void testLookUpFailure() throws CouchbaseException {
+ final CouchbaseClient client = mock(CouchbaseClient.class);
+ when(client.lookupIn(anyString(), any())).thenThrow(new CouchbaseException("Test exception"));
+
+ final CouchbaseConnectionService connectionService = mockConnectionService(client);
+
+ final MockControllerServiceInitializationContext serviceInitializationContext = new MockControllerServiceInitializationContext(connectionService, CONNECTION_SERVICE_ID);
+ final Map properties = Collections.singletonMap(COUCHBASE_CONNECTION_SERVICE, CONNECTION_SERVICE_ID);
+ final MockConfigurationContext context = new MockConfigurationContext(properties, serviceInitializationContext, new HashMap<>());
+
+ lookupService = new CouchbaseRecordLookupService();
+ lookupService.onEnabled(context);
+
+ final Map coordinates = Collections.singletonMap(KEY, TEST_DOCUMENT_ID);
+
+ assertThrows(LookupFailureException.class, () -> lookupService.lookup(coordinates));
+ }
+
+ @Test
+ public void testMissingKey() throws LookupFailureException, CouchbaseException, InitializationException {
+ initLookupService();
+ final Optional result = lookupService.lookup(Collections.emptyMap());
+
+ assertTrue(result.isEmpty());
+ }
+}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml
index a6662135e77b..043e36076bdc 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/pom.xml
@@ -27,7 +27,7 @@
jar
- 3.10.1
+ 3.11.0
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
index dce98be91963..f1c3aaac1acf 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/main/java/org/apache/nifi/services/couchbase/StandardCouchbaseClient.java
@@ -45,20 +45,34 @@
import com.couchbase.client.java.codec.RawBinaryTranscoder;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.codec.Transcoder;
+import com.couchbase.client.java.json.JsonArray;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.kv.ExistsResult;
import com.couchbase.client.java.kv.GetOptions;
import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.InsertOptions;
+import com.couchbase.client.java.kv.LookupInResult;
+import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.PersistTo;
+import com.couchbase.client.java.kv.ReplaceOptions;
import com.couchbase.client.java.kv.ReplicateTo;
import com.couchbase.client.java.kv.UpsertOptions;
+import org.apache.nifi.services.couchbase.exception.CouchbaseCasMismatchException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseDocExistsException;
+import org.apache.nifi.services.couchbase.exception.CouchbaseDocNotFoundException;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.exception.ExceptionCategory;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
import org.apache.nifi.services.couchbase.utils.DocumentType;
import org.apache.nifi.services.couchbase.utils.JsonValidator;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
@@ -117,6 +131,8 @@ public CouchbaseGetResult getDocument(String documentId) throws CouchbaseExcepti
final GetResult result = collection.get(documentId, GetOptions.getOptions().transcoder(getTranscoder(documentType)));
return new CouchbaseGetResult(result.contentAsBytes(), result.cas());
+ } catch (DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with key [%s] not found".formatted(documentId), e);
} catch (Exception e) {
throw new CouchbaseException("Failed to get document [%s] from Couchbase".formatted(documentId), e);
}
@@ -124,11 +140,11 @@ public CouchbaseGetResult getDocument(String documentId) throws CouchbaseExcepti
@Override
public CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) throws CouchbaseException {
- try {
- if (!getInputValidator(documentType).test(content)) {
- throw new CouchbaseException("The provided input is invalid");
- }
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId));
+ }
+ try {
final MutationResult result = collection.upsert(documentId, content,
UpsertOptions.upsertOptions()
.durability(persistTo, replicateTo)
@@ -141,6 +157,91 @@ public CouchbaseUpsertResult upsertDocument(String documentId, byte[] content) t
}
}
+ @Override
+ public boolean documentExists(String documentId) throws CouchbaseException {
+ try {
+ final ExistsResult result = collection.exists(documentId);
+ return result.exists();
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to check document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void insertDocument(String documentId, byte[] content) throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId));
+ }
+
+ try {
+ collection.insert(documentId, content,
+ InsertOptions.insertOptions()
+ .durability(persistTo, replicateTo)
+ .transcoder(getTranscoder(documentType))
+ .clientContext(new HashMap<>()));
+ } catch (DocumentExistsException e) {
+ throw new CouchbaseDocExistsException("Document with key [%s] already exists".formatted(documentId), e);
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to insert document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void removeDocument(String documentId) throws CouchbaseException {
+ try {
+ collection.remove(documentId);
+ } catch (DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with key [%s] not found".formatted(documentId), e);
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to remove document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public void replaceDocument(String documentId, byte[] content, long cas) throws CouchbaseException {
+ if (!getInputValidator(documentType).test(content)) {
+ throw new CouchbaseException("The provided input is invalid for document [%s]".formatted(documentId));
+ }
+
+ try {
+ collection.replace(documentId, content,
+ ReplaceOptions.replaceOptions()
+ .cas(cas)
+ .durability(persistTo, replicateTo)
+ .transcoder(getTranscoder(documentType))
+ .clientContext(new HashMap<>()));
+ } catch (CasMismatchException e) {
+ throw new CouchbaseCasMismatchException("Couchbase document with key [%s] has been concurrently modified".formatted(documentId), e);
+ } catch (DocumentNotFoundException e) {
+ throw new CouchbaseDocNotFoundException("Couchbase document with key [%s] not found".formatted(documentId), e);
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to replace document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
+ @Override
+ public CouchbaseLookupInResult lookupIn(String documentId, String subDocPath) throws CouchbaseException {
+ try {
+ final String documentPath = subDocPath == null ? "" : subDocPath;
+ final LookupInResult result = collection.lookupIn(documentId, Collections.singletonList(LookupInSpec.get(documentPath)));
+
+ if (!result.exists(0)) {
+ throw new CouchbaseException("No value found on the requested path [%s] in Couchbase".formatted(subDocPath));
+ }
+
+ Object lookupInResult;
+ try {
+ lookupInResult = result.contentAs(0, Object.class);
+ } catch (DecodingFailureException e) {
+ lookupInResult = result.contentAs(0, byte[].class);
+ }
+
+ return new CouchbaseLookupInResult(deserializeLookupInResult(lookupInResult), result.cas());
+ } catch (Exception e) {
+ throw new CouchbaseException("Failed to look up in document [%s] in Couchbase".formatted(documentId), e);
+ }
+ }
+
@Override
public ExceptionCategory getExceptionCategory(Throwable throwable) {
return exceptionMapping.getOrDefault(throwable.getClass(), FAILURE);
@@ -159,4 +260,16 @@ private Predicate getInputValidator(DocumentType documentType) {
case BINARY -> v -> true;
};
}
+
+ private String deserializeLookupInResult(Object result) {
+ return switch (result) {
+ case null -> null;
+ case String s -> s;
+ case Map map -> JsonObject.from(map).toString();
+ case List list -> JsonArray.from(list).toString();
+ case byte[] bytes -> new String(bytes, StandardCharsets.UTF_8);
+ default -> result.toString();
+ };
+
+ }
}
diff --git a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
similarity index 50%
rename from nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
rename to nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
index ad3d7e7d8785..62a9bc064b4a 100644
--- a/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/TestCouchbaseClient.java
+++ b/nifi-extension-bundles/nifi-couchbase-bundle/nifi-couchbase-standard-services/src/test/java/org/apache/nifi/services/couchbase/CouchbaseClientTest.java
@@ -18,16 +18,23 @@
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.kv.GetResult;
+import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.PersistTo;
import com.couchbase.client.java.kv.ReplicateTo;
import org.apache.nifi.services.couchbase.exception.CouchbaseException;
import org.apache.nifi.services.couchbase.utils.CouchbaseGetResult;
+import org.apache.nifi.services.couchbase.utils.CouchbaseLookupInResult;
import org.apache.nifi.services.couchbase.utils.CouchbaseUpsertResult;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.services.couchbase.utils.DocumentType.JSON;
@@ -36,11 +43,12 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestCouchbaseClient {
+public class CouchbaseClientTest {
private static final String TEST_DOCUMENT_ID = "test-document-id";
private static final long TEST_CAS = 1L;
@@ -72,7 +80,24 @@ void testPutJsonDocumentValidationFailure() {
final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
final Exception exception = assertThrows(CouchbaseException.class, () -> client.upsertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+ assertTrue(exception.getMessage().contains("The provided input is invalid"));
+ }
+
+ @Test
+ void testInsertJsonDocumentValidationFailure() {
+ final String content = "{invalid-json}";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ final Exception exception = assertThrows(CouchbaseException.class, () -> client.insertDocument(TEST_DOCUMENT_ID, content.getBytes()));
+ assertTrue(exception.getMessage().contains("The provided input is invalid"));
+ }
+
+ @Test
+ void testReplaceJsonDocumentValidationFailure() {
+ final String content = "{invalid-json}";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+ final Exception exception = assertThrows(CouchbaseException.class, () -> client.replaceDocument(TEST_DOCUMENT_ID, content.getBytes(), TEST_CAS));
assertTrue(exception.getMessage().contains("The provided input is invalid"));
}
@@ -94,4 +119,61 @@ void testGetDocument() throws CouchbaseException {
assertEquals(TEST_CAS, getResult.cas());
assertArrayEquals(content.getBytes(), getResult.resultContent());
}
+
+ @Test
+ void testLookupInWithMapResult() throws CouchbaseException {
+ final String expectedResult = "{\"name\":\"John\",\"age\":\"20\"}";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ Map lookupInContent = new HashMap<>();
+ lookupInContent.put("name", "John");
+ lookupInContent.put("age", "20");
+
+ final LookupInResult result = mock(LookupInResult.class);
+ when(result.contentAs(anyInt(), any(Class.class))).thenReturn(lookupInContent);
+ when(result.exists(anyInt())).thenReturn(true);
+ when(result.cas()).thenReturn(TEST_CAS);
+
+ when(collection.lookupIn(anyString(), any())).thenReturn(result);
+
+ final CouchbaseLookupInResult lookupInResult = client.lookupIn(TEST_DOCUMENT_ID, "");
+
+ assertEquals(expectedResult, lookupInResult.resultContent());
+ assertEquals(TEST_CAS, lookupInResult.cas());
+ }
+
+ @Test
+ void testLookupInWithArrayResult() throws CouchbaseException {
+ final String expectedResult = "[{\"name\":\"John\"},{\"name\":\"Jack\"}]";
+ final StandardCouchbaseClient client = new StandardCouchbaseClient(collection, JSON, PersistTo.ONE, ReplicateTo.ONE);
+
+ List