From 55405ee87a316fc70c454dc0470911a17fd0887f Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 16 Oct 2024 18:21:36 +0200 Subject: [PATCH 1/4] MINIFICPP-2470 Add SSL and mTLS authentication support to CouchbaseClusterService --- .../cluster/DockerTestDirectoryBindings.py | 10 +- .../containers/CouchbaseServerContainer.py | 93 +++++++++++++++++-- .../MiNiFi_integration_test_driver.py | 5 +- .../integration/features/couchbase.feature | 57 ++++++++++++ .../test/integration/features/steps/steps.py | 30 +++++- .../controllers/CouchbaseClusterService.py | 9 +- .../minifi/core/ControllerService.py | 1 + .../Minifi_flow_json_serializer.py | 13 +-- .../Minifi_flow_yaml_serializer.py | 38 ++++---- examples/couchbase_mtls_authentication.json | 74 +++++++++++++++ examples/couchbase_mtls_authentication.yml | 45 +++++++++ .../CouchbaseClusterService.cpp | 66 ++++++++++++- .../CouchbaseClusterService.h | 12 +-- .../StandardControllerServiceNode.cpp | 15 ++- 14 files changed, 415 insertions(+), 53 deletions(-) create mode 100644 examples/couchbase_mtls_authentication.json create mode 100644 examples/couchbase_mtls_authentication.yml diff --git a/docker/test/integration/cluster/DockerTestDirectoryBindings.py b/docker/test/integration/cluster/DockerTestDirectoryBindings.py index 87cf2b4ec6..25add90ae7 100644 --- a/docker/test/integration/cluster/DockerTestDirectoryBindings.py +++ b/docker/test/integration/cluster/DockerTestDirectoryBindings.py @@ -18,7 +18,7 @@ import hashlib import subprocess import OpenSSL.crypto -from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert +from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert, make_client_cert class DockerTestDirectoryBindings: @@ -214,3 +214,11 @@ def create_cert_files(self): os.path.join(base, "root_ca.crt"), ] subprocess.run(cmd, check=True) + + clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=self.root_ca_cert, ca_key=self.root_ca_key) + self.put_test_resource('clientuser.crt', + OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, + cert=clientuser_cert)) + self.put_test_resource('clientuser.key', + OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, + pkey=clientuser_key)) diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py index 9baf01f088..71a135f12c 100644 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py @@ -12,19 +12,72 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os +import OpenSSL.crypto +import tempfile +import docker +import requests +import logging +from requests.auth import HTTPBasicAuth from .Container import Container from utils import retry_check +from ssl_utils.SSL_cert_utils import make_server_cert class CouchbaseServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command) + def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False): + self.ssl = ssl + engine = "couchbase-server" if not ssl else "couchbase-server-ssl" + super().__init__(feature_context, name, engine, vols, network, image_store, command) + couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key) + + self.root_ca_file = tempfile.NamedTemporaryFile(delete=False) + self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert)) + self.root_ca_file.close() + os.chmod(self.root_ca_file.name, 0o666) + + self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False) + self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert)) + self.couchbase_cert_file.close() + os.chmod(self.couchbase_cert_file.name, 0o666) + + self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False) + self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key)) + self.couchbase_key_file.close() + os.chmod(self.couchbase_key_file.name, 0o666) def get_startup_finished_log_entry(self): # after startup the logs are only available in the container, only this message is shown return "logs available in" + @retry_check(12, 5) + def _run_couchbase_cli_command(self, command): + (code, _) = self.client.containers.get(self.name).exec_run(command) + if code != 0: + logging.error(f"Failed to run command '{command}', returned error code: {code}") + return False + return True + + def _run_couchbase_cli_commands(self, commands): + for command in commands: + if not self._run_couchbase_cli_command(command): + return False + return True + @retry_check(15, 2) + def _load_couchbase_certs(self): + response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) + if response.status_code != 200: + logging.error(f"Failed to load CA certificates, with status code: {response.status_code}") + return False + + response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123")) + if response.status_code != 200: + logging.error(f"Failed to reload certificates, with status code: {response.status_code}") + return False + + return True + def run_post_startup_commands(self): if self.post_startup_commands_finished: return True @@ -33,12 +86,18 @@ def run_post_startup_commands(self): ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", - "--bucket-ramsize", "1024", "--max-ttl", "36000"] + "--bucket-ramsize", "1024", "--max-ttl", "36000"], + ["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123", + "--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"], + ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''], + ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json'] ] - for command in commands: - (code, _) = self.client.containers.get(self.name).exec_run(command) - if code != 0: - return False + if not self._run_couchbase_cli_commands(commands): + return False + + if not self._load_couchbase_certs(): + return False + self.post_startup_commands_finished = True return True @@ -46,10 +105,26 @@ def deploy(self): if not self.set_deployed(): return + mounts = [ + docker.types.Mount( + type='bind', + source=self.couchbase_key_file.name, + target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'), + docker.types.Mount( + type='bind', + source=self.couchbase_cert_file.name, + target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'), + docker.types.Mount( + type='bind', + source=self.root_ca_file.name, + target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt') + ] + self.docker_container = self.client.containers.run( "couchbase:enterprise-7.2.5", detach=True, name=self.name, network=self.network.name, - ports={'11210/tcp': 11210}, - entrypoint=self.command) + ports={'8091/tcp': 8091, '11210/tcp': 11210}, + entrypoint=self.command, + mounts=mounts) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 57526342db..874a6b6acb 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -86,8 +86,9 @@ def start_minifi_c2_server(self, context): self.cluster.deploy_container('minifi-c2-server') assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output() - def start_couchbase_server(self, context): - self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server') + def start_couchbase_server(self, context, ssl=False): + engine = 'couchbase-server-ssl' if ssl else 'couchbase-server' + self.cluster.acquire_container(context=context, name='couchbase-server', engine=engine) self.cluster.deploy_container('couchbase-server') assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output() diff --git a/docker/test/integration/features/couchbase.feature b/docker/test/integration/features/couchbase.feature index 646eaacd97..a9f2b25f01 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/docker/test/integration/features/couchbase.feature @@ -173,3 +173,60 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And all instances start up Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using SSL connection + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And the "Keep Source File" property of the GetFile processor is set to "true" + And the scheduling period of the GetFile processor is set to "20 seconds" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up with SSL connection with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds + + Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication + Given a MiNiFi CPP server with yaml config + And a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" + And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" + And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService" + + And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey + And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile + And the "success" relationship of the PutFile processor is connected to the LogAttribute + + When a Couchbase server is started + And all instances start up + + Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds + And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds + And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds + And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 6d4e464e90..de484caa67 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -573,7 +573,7 @@ def step_impl(context): minifi_crt_file = '/tmp/resources/minifi_client.crt' minifi_key_file = '/tmp/resources/minifi_client.key' root_ca_crt_file = '/tmp/resources/root_ca.crt' - ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) + ssl_context_service = SSLContextService(name='SSLContextService', cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.root_ca_cert, context.root_ca_key) put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP") @@ -1384,6 +1384,34 @@ def step_impl(context, service_name): container.add_controller(couchbase_cluster_controller_service) +@given("a CouchbaseClusterService is setup up with SSL connection with the name \"{service_name}\"") +def step_impl(context, service_name): + ssl_context_service = SSLContextService(name="SSLContextService", + ca_cert='/tmp/resources/root_ca.crt') + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(ssl_context_service) + couchbase_cluster_controller_service = CouchbaseClusterService( + name=service_name, + connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), + ssl_context_service=ssl_context_service) + container.add_controller(couchbase_cluster_controller_service) + + @then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase") def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str): context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type) + + +@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"") +def step_impl(context, service_name): + ssl_context_service = SSLContextService(name="SSLContextService", + cert='/tmp/resources/clientuser.crt', + key='/tmp/resources/clientuser.key', + ca_cert='/tmp/resources/root_ca.crt') + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(ssl_context_service) + couchbase_cluster_controller_service = CouchbaseClusterService( + name=service_name, + connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), + ssl_context_service=ssl_context_service) + container.add_controller(couchbase_cluster_controller_service) diff --git a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py index e06c1e6ffb..94494fe175 100644 --- a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py +++ b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py @@ -18,10 +18,13 @@ class CouchbaseClusterService(ControllerService): - def __init__(self, name, connection_string): + def __init__(self, name, connection_string, ssl_context_service=None): super(CouchbaseClusterService, self).__init__(name=name) self.service_class = 'CouchbaseClusterService' self.properties['Connection String'] = connection_string - self.properties['User Name'] = "Administrator" - self.properties['User Password'] = "password123" + if ssl_context_service: + self.linked_services.append(ssl_context_service) + if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties: + self.properties['User Name'] = "Administrator" + self.properties['User Password'] = "password123" diff --git a/docker/test/integration/minifi/core/ControllerService.py b/docker/test/integration/minifi/core/ControllerService.py index f295131393..02d32e4a77 100644 --- a/docker/test/integration/minifi/core/ControllerService.py +++ b/docker/test/integration/minifi/core/ControllerService.py @@ -34,3 +34,4 @@ def __init__(self, name=None, properties=None): properties = {} self.properties = properties + self.linked_services = [] diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py index c1043d5067..a100ae0382 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -118,12 +118,7 @@ def serialize_node(self, connectable, root, visited): continue visited.append(svc) - root['controllerServices'].append({ - 'name': svc.name, - 'identifier': svc.id, - 'type': svc.service_class, - 'properties': svc.properties - }) + self.serialize_controller(svc, root) if isinstance(connectable, Funnel): root['funnels'].append({ @@ -159,3 +154,9 @@ def serialize_controller(self, controller, root): 'type': controller.service_class, 'properties': controller.properties }) + + if controller.linked_services: + if len(controller.linked_services) == 1: + root['controllerServices'][-1]['properties']['Linked Services'] = controller.linked_services[0].name + else: + root['controllerServices'][-1]['properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services] diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py index dc912b1224..14cbabc980 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -119,12 +119,7 @@ def serialize_node(self, connectable, res=None, visited=None): continue visited.append(svc) - res['Controller Services'].append({ - 'name': svc.name, - 'id': svc.id, - 'class': svc.service_class, - 'Properties': svc.properties - }) + self._add_controller_service_node(svc, res) if isinstance(connectable, Funnel): res['Funnels'].append({ @@ -160,6 +155,25 @@ def serialize_node(self, connectable, res=None, visited=None): return (res, visited) + def _add_controller_service_node(self, controller, parent): + if hasattr(controller, 'name'): + connectable_name = controller.name + else: + connectable_name = str(controller.uuid) + + parent['Controller Services'].append({ + 'name': connectable_name, + 'id': controller.id, + 'class': controller.service_class, + 'Properties': controller.properties + }) + + if controller.linked_services: + if len(controller.linked_services) == 1: + parent['Controller Services'][-1]['Properties']['Linked Services'] = controller.linked_services[0].name + else: + parent['Controller Services'][-1]['Properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services] + def serialize_controller(self, controller, root=None): if root is None: res = { @@ -175,16 +189,6 @@ def serialize_controller(self, controller, root=None): else: res = root - if hasattr(controller, 'name'): - connectable_name = controller.name - else: - connectable_name = str(controller.uuid) - - res['Controller Services'].append({ - 'name': connectable_name, - 'id': controller.id, - 'class': controller.service_class, - 'Properties': controller.properties - }) + self._add_controller_service_node(controller, res) return res diff --git a/examples/couchbase_mtls_authentication.json b/examples/couchbase_mtls_authentication.json new file mode 100644 index 0000000000..7ac14df254 --- /dev/null +++ b/examples/couchbase_mtls_authentication.json @@ -0,0 +1,74 @@ +{ + "parameterContexts": [], + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [ + { + "name": "Get Couchbase document file from local directory", + "identifier": "21b1e56e-e8d5-4543-9f6b-be148f91fb02", + "type": "org.apache.nifi.processors.standard.GetFile", + "schedulingStrategy": "TIMER_DRIVEN", + "schedulingPeriod": "2 sec", + "penaltyDuration": "30 sec", + "properties": { + "Input Directory": "/home/user/couchbase/input" + }, + "autoTerminatedRelationships": [], + "concurrentlySchedulableTaskCount": 1 + }, + { + "name": "Insert Couchbase document", + "identifier": "df762d53-0f94-4611-be01-e689b8992573", + "type": "org.apache.nifi.processors.standard.PutCouchbaseKey", + "schedulingStrategy": "EVENT_DRIVEN", + "penaltyDuration": "30 sec", + "properties": { + "Bucket Name": "test_bucket", + "Document Id": "test_doc_id", + "Couchbase Cluster Controller Service": "CouchbaseClusterService for mTLS authentication" + }, + "autoTerminatedRelationships": [ + "success", + "failure", + "retry" + ] + } + ], + "controllerServices": [ + { + "name": "SSLContextService for Couchbase", + "identifier": "33e03d54-9917-494e-8ba0-8caeb3fdf4de", + "type": "SSLContextService", + "properties": { + "Client Certificate": "/home/user/couchbase/certs/clientuser.crt", + "Private Key": "/home/user/couchbase/certs/clientuser.key", + "CA Certificate": "/home/user/couchbase/certsroot_ca.crt" + } + }, + { + "name": "CouchbaseClusterService for mTLS authentication", + "identifier": "747bae3c-e68e-40af-8933-02179bd6cf85", + "type": "CouchbaseClusterService", + "properties": { + "Connection String": "couchbases://couchbase-server-hLkYUYq55djwrW5A26XNJD", + "Linked Services": "SSLContextService for Couchbase" + } + } + ], + "connections": [ + { + "identifier": "94fdd7b1-7857-44c3-8cf2-d373a5578420", + "name": "GetFile/success/PutCouchbaseKey", + "source": { + "id": "21b1e56e-e8d5-4543-9f6b-be148f91fb02" + }, + "destination": { + "id": "df762d53-0f94-4611-be01-e689b8992573" + }, + "selectedRelationships": [ + "success" + ] + } + ], + } +} diff --git a/examples/couchbase_mtls_authentication.yml b/examples/couchbase_mtls_authentication.yml new file mode 100644 index 0000000000..7ef1033e70 --- /dev/null +++ b/examples/couchbase_mtls_authentication.yml @@ -0,0 +1,45 @@ +Flow Controller: + name: MiNiFi Flow +Processors: +- id: 21b1e56e-e8d5-4543-9f6b-be148f91fb02 + name: Get Couchbase document file from local directory + class: org.apache.nifi.processors.standard.GetFile + scheduling strategy: TIMER_DRIVEN + scheduling period: 2 sec + penalization period: 30 sec + Properties: + Input Directory: /tmp/input + auto-terminated relationships list: [] +- id: df762d53-0f94-4611-be01-e689b8992573 + name: Insert Couchbase document + class: org.apache.nifi.processors.standard.PutCouchbaseKey + scheduling strategy: EVENT_DRIVEN + penalization period: 30 sec + Properties: + Bucket Name: test_bucket + Couchbase Cluster Controller Service: CouchbaseClusterService for mTLS authentication + Document Id: test_doc_id + auto-terminated relationships list: + - success + - failure + - retry +Controller Services: +- id: 33e03d54-9917-494e-8ba0-8caeb3fdf4de + name: SSLContextService for Couchbase + class: SSLContextService + Properties: + CA Certificate: /tmp/resources/root_ca.crt + Client Certificate: /tmp/resources/clientuser.crt + Private Key: /tmp/resources/clientuser.key +- id: 747bae3c-e68e-40af-8933-02179bd6cf85 + name: CouchbaseClusterService for mTLS authentication + class: CouchbaseClusterService + Properties: + Connection String: couchbases://couchbase-server-VPQDsPD2pj35q5WzHNt9ER + Linked Services: SSLContextService for Couchbase +Connections: +- id: 94fdd7b1-7857-44c3-8cf2-d373a5578420 + destination id: df762d53-0f94-4611-be01-e689b8992573 + name: GetFile/success/PutCouchbaseKey + source id: 21b1e56e-e8d5-4543-9f6b-be148f91fb02 + source relationship name: success diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index c40697cf71..12b8b977e6 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -50,6 +50,50 @@ CouchbaseErrorType getErrorType(const std::error_code& error_code) { } // namespace +CouchbaseClient::CouchbaseClient(std::string connection_string, std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service, + const std::shared_ptr& logger) + : connection_string_(std::move(connection_string)), logger_(logger), cluster_options_(buildClusterOptions(std::move(username), std::move(password), ssl_context_service)) { +} + +::couchbase::cluster_options CouchbaseClient::buildClusterOptions(std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service) { + if (username.empty() && (!ssl_context_service || (ssl_context_service && ssl_context_service->getCertificateFile().empty()))) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Neither username and password nor SSLContextService is provided for Couchbase authentication"); + } + + if (!username.empty() && ssl_context_service && !ssl_context_service->getCertificateFile().empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Username and password authentication or mTLS authentication using certificate defined in SSLConextService " + "linked service should be provided exclusively for Couchbase"); + } + + if (!username.empty()) { + logger_->log_debug("Using username and password authentication for Couchbase server"); + if (password.empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Password missing for Couchbase server authentication"); + } + ::couchbase::cluster_options cluster_options(std::move(username), std::move(password)); + if (ssl_context_service && !ssl_context_service->getCACertificate().empty()) { + logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", ssl_context_service->getCACertificate().string()); + cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string()); + } + return cluster_options; + } + + logger_->log_debug("Using mTLS authentication for Couchbase server"); + logger_->log_debug("Setting Couchbase client SSL key file path to '{}'", ssl_context_service->getPrivateKeyFile().string()); + logger_->log_debug("Setting Couchbase client certificate file path to '{}'", ssl_context_service->getCertificateFile().string()); + if (ssl_context_service->getPrivateKeyFile().empty() || ssl_context_service->getCertificateFile().empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Couchbase client private key path or client certificate path is empty"); + } + + ::couchbase::cluster_options cluster_options(::couchbase::certificate_authenticator(ssl_context_service->getCertificateFile().string(), ssl_context_service->getPrivateKeyFile().string())); + if (!ssl_context_service->getCACertificate().empty()) { + logger_->log_debug("Setting Couchbase client CA certificate path to '{}'", ssl_context_service->getCACertificate().string()); + cluster_options.security().trust_certificate(ssl_context_service->getCACertificate().string()); + } + cluster_options.security().tls_verify(::couchbase::tls_verify_mode::peer); + return cluster_options; +} + nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::getCollection(const CouchbaseCollection& collection) { auto connection_result = establishConnection(); if (!connection_result) { @@ -155,8 +199,7 @@ nonstd::expected CouchbaseClient::establishConnection( return {}; } - auto options = ::couchbase::cluster_options(username_, password_); - auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, options).get(); + auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, cluster_options_).get(); if (connect_err.ec()) { logger_->log_error("Failed to connect to Couchbase cluster with error code: '{}' and message: '{}'", connect_err.ec(), connect_err.message()); return nonstd::make_unexpected(getErrorType(connect_err.ec())); @@ -179,11 +222,24 @@ void CouchbaseClusterService::onEnable() { getProperty(UserName, username); std::string password; getProperty(UserPassword, password); - if (connection_string.empty() || username.empty() || password.empty()) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing connection string, username or password"); + if (connection_string.empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing connection string"); + } + + if ((username.empty() || password.empty()) && linked_services_.empty()) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing username and password or SSLConextService as a linked service"); + } + + minifi::controllers::SSLContextService* ssl_context_service_ptr = nullptr; + if (!linked_services_.empty()) { + auto ssl_context_service = std::dynamic_pointer_cast(linked_services_[0]); + if (!ssl_context_service) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Linked service is not an SSLContextService"); + } + ssl_context_service_ptr = ssl_context_service.get(); } + client_ = std::make_unique(connection_string, username, password, ssl_context_service_ptr, logger_); - client_ = std::make_unique(connection_string, username, password, logger_); auto result = client_->establishConnection(); if (!result) { if (result.error() == CouchbaseErrorType::FATAL) { diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h index 28c6390dd5..2174497233 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.h +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h @@ -31,6 +31,7 @@ #include "couchbase/cluster.hxx" #include "core/ProcessContext.h" #include "core/logging/LoggerConfiguration.h" +#include "controllers/SSLContextService.h" namespace org::apache::nifi::minifi::couchbase { @@ -69,9 +70,8 @@ enum class CouchbaseErrorType { class CouchbaseClient { public: - CouchbaseClient(std::string connection_string, std::string username, std::string password, const std::shared_ptr& logger) - : connection_string_(std::move(connection_string)), username_(std::move(username)), password_(std::move(password)), logger_(logger) { - } + CouchbaseClient(std::string connection_string, std::string username, std::string password, controllers::SSLContextService* ssl_context_service, + const std::shared_ptr& logger); ~CouchbaseClient() { close(); @@ -89,14 +89,14 @@ class CouchbaseClient { void close(); private: + ::couchbase::cluster_options buildClusterOptions(std::string username, std::string password, minifi::controllers::SSLContextService* ssl_context_service); nonstd::expected<::couchbase::collection, CouchbaseErrorType> getCollection(const CouchbaseCollection& collection); std::string connection_string_; - std::string username_; - std::string password_; + std::shared_ptr logger_; + ::couchbase::cluster_options cluster_options_; std::mutex cluster_mutex_; std::optional<::couchbase::cluster> cluster_; - std::shared_ptr logger_; }; namespace controllers { diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 6bc423b15c..0d63420d0c 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -23,11 +23,13 @@ namespace org::apache::nifi::minifi::core::controller { bool StandardControllerServiceNode::enable() { - Property property("Linked Services", "Referenced Controller Services"); - controller_service_->setState(ENABLED); logger_->log_trace("Enabling CSN {}", getName()); + if (active) { + logger_->log_debug("CSN {} is already enabled", getName()); + return true; + } + Property property("Linked Services", "Referenced Controller Services"); if (getProperty(property.getName(), property)) { - active = true; for (const auto& linked_service : property.getValues()) { ControllerServiceNode* csNode = provider->getControllerServiceNode(linked_service, controller_service_->getUUID()); if (nullptr != csNode) { @@ -40,13 +42,20 @@ bool StandardControllerServiceNode::enable() { if (nullptr != impl) { std::lock_guard lock(mutex_); std::vector> services; + std::vector service_nodes; services.reserve(linked_controller_services_.size()); for (const auto& service : linked_controller_services_) { services.push_back(service->getControllerServiceImplementation()); + if (!service->enable()) { + logger_->log_debug("Linked Service '{}' could not be enabled", service->getName()); + return false; + } } impl->setLinkedControllerServices(services); impl->onEnable(); } + active = true; + controller_service_->setState(ENABLED); return true; } From 0abf6fbc2bc0d13c0c28fa40a01da816911a12b8 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Fri, 22 Nov 2024 14:59:51 +0100 Subject: [PATCH 2/4] Review update --- .../couchbase/controllerservices/CouchbaseClusterService.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index 12b8b977e6..cdf1259f56 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -61,7 +61,7 @@ ::couchbase::cluster_options CouchbaseClient::buildClusterOptions(std::string us } if (!username.empty() && ssl_context_service && !ssl_context_service->getCertificateFile().empty()) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Username and password authentication or mTLS authentication using certificate defined in SSLConextService " + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Username and password authentication or mTLS authentication using certificate defined in SSLContextService " "linked service should be provided exclusively for Couchbase"); } @@ -227,7 +227,7 @@ void CouchbaseClusterService::onEnable() { } if ((username.empty() || password.empty()) && linked_services_.empty()) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing username and password or SSLConextService as a linked service"); + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing username and password or SSLContextService as a linked service"); } minifi::controllers::SSLContextService* ssl_context_service_ptr = nullptr; From c3519585ea81b9d4c89383e5138be9d37980a99e Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 25 Nov 2024 09:45:23 +0100 Subject: [PATCH 3/4] Review update --- .../cluster/containers/CouchbaseServerContainer.py | 11 +++-------- .../features/MiNiFi_integration_test_driver.py | 5 ++--- docker/test/integration/features/couchbase.feature | 2 +- docker/test/integration/features/steps/steps.py | 2 +- .../controllerservices/CouchbaseClusterService.cpp | 4 ++-- 5 files changed, 9 insertions(+), 15 deletions(-) diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py index 71a135f12c..f5d9674f90 100644 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py @@ -26,9 +26,7 @@ class CouchbaseServerContainer(Container): def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False): - self.ssl = ssl - engine = "couchbase-server" if not ssl else "couchbase-server-ssl" - super().__init__(feature_context, name, engine, vols, network, image_store, command) + super().__init__(feature_context, name, "couchbase-server", vols, network, image_store, command) couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key) self.root_ca_file = tempfile.NamedTemporaryFile(delete=False) @@ -50,7 +48,7 @@ def get_startup_finished_log_entry(self): # after startup the logs are only available in the container, only this message is shown return "logs available in" - @retry_check(12, 5) + @retry_check(max_tries=12, retry_interval=5) def _run_couchbase_cli_command(self, command): (code, _) = self.client.containers.get(self.name).exec_run(command) if code != 0: @@ -59,10 +57,7 @@ def _run_couchbase_cli_command(self, command): return True def _run_couchbase_cli_commands(self, commands): - for command in commands: - if not self._run_couchbase_cli_command(command): - return False - return True + return all(self._run_couchbase_cli_command(command) for command in commands) @retry_check(15, 2) def _load_couchbase_certs(self): diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 874a6b6acb..57526342db 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -86,9 +86,8 @@ def start_minifi_c2_server(self, context): self.cluster.deploy_container('minifi-c2-server') assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output() - def start_couchbase_server(self, context, ssl=False): - engine = 'couchbase-server-ssl' if ssl else 'couchbase-server' - self.cluster.acquire_container(context=context, name='couchbase-server', engine=engine) + def start_couchbase_server(self, context): + self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server') self.cluster.deploy_container('couchbase-server') assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output() diff --git a/docker/test/integration/features/couchbase.feature b/docker/test/integration/features/couchbase.feature index a9f2b25f01..7b74a6c3dd 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/docker/test/integration/features/couchbase.feature @@ -187,7 +187,7 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with SSL connection with the name "CouchbaseClusterService" + And a CouchbaseClusterService is set up up with SSL connection with the name "CouchbaseClusterService" And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index de484caa67..70a1938886 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1384,7 +1384,7 @@ def step_impl(context, service_name): container.add_controller(couchbase_cluster_controller_service) -@given("a CouchbaseClusterService is setup up with SSL connection with the name \"{service_name}\"") +@given("a CouchbaseClusterService is set up up with SSL connection with the name \"{service_name}\"") def step_impl(context, service_name): ssl_context_service = SSLContextService(name="SSLContextService", ca_cert='/tmp/resources/root_ca.crt') diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp index cdf1259f56..ac340f8640 100644 --- a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp +++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp @@ -61,8 +61,8 @@ ::couchbase::cluster_options CouchbaseClient::buildClusterOptions(std::string us } if (!username.empty() && ssl_context_service && !ssl_context_service->getCertificateFile().empty()) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Username and password authentication or mTLS authentication using certificate defined in SSLContextService " - "linked service should be provided exclusively for Couchbase"); + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Either username and password or mTLS certificate authentication should be used in the SSLContextService for Couchbase, " + "but not both"); } if (!username.empty()) { From 579e378d6343930fbd22ecc2e8852df74314e815 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 25 Nov 2024 10:31:25 +0100 Subject: [PATCH 4/4] Review update --- .../cluster/containers/CouchbaseServerContainer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py index f5d9674f90..b2f13b7329 100644 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py @@ -25,7 +25,7 @@ class CouchbaseServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False): + def __init__(self, feature_context, name, vols, network, image_store, command=None): super().__init__(feature_context, name, "couchbase-server", vols, network, image_store, command) couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key) @@ -59,7 +59,7 @@ def _run_couchbase_cli_command(self, command): def _run_couchbase_cli_commands(self, commands): return all(self._run_couchbase_cli_command(command) for command in commands) - @retry_check(15, 2) + @retry_check(max_tries=15, retry_interval=2) def _load_couchbase_certs(self): response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) if response.status_code != 200: