diff --git a/app/_event_gateway_policies/decrypt-fields/index.md b/app/_event_gateway_policies/decrypt-fields/index.md index 95b354d3f6..64e0dcc193 100644 --- a/app/_event_gateway_policies/decrypt-fields/index.md +++ b/app/_event_gateway_policies/decrypt-fields/index.md @@ -28,6 +28,8 @@ related_resources: url: /event-gateway/entities/static-key/ - text: Encrypt and decrypt Kafka message fields with {{site.event_gateway}} url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + - text: Encrypt and decrypt Kafka message fields dynamically with {{site.event_gateway}} + url: /event-gateway/encrypt-kafka-message-fields-dynamic/ phases: - consume diff --git a/app/_event_gateway_policies/encrypt-fields/index.md b/app/_event_gateway_policies/encrypt-fields/index.md index b6fbf212b7..1bd90be194 100644 --- a/app/_event_gateway_policies/encrypt-fields/index.md +++ b/app/_event_gateway_policies/encrypt-fields/index.md @@ -38,6 +38,8 @@ related_resources: url: /event-gateway/entities/static-key/ - text: Encrypt and decrypt Kafka message fields with {{site.event_gateway}} url: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + - text: Encrypt and decrypt Kafka message fields dynamically with {{site.event_gateway}} + url: /event-gateway/encrypt-kafka-message-fields-dynamic/ min_version: event-gateway: '1.2' diff --git a/app/_how-tos/event-gateway/encrypt-kafka-message-fields-dynamic.md b/app/_how-tos/event-gateway/encrypt-kafka-message-fields-dynamic.md new file mode 100644 index 0000000000..3e322df8de --- /dev/null +++ b/app/_how-tos/event-gateway/encrypt-kafka-message-fields-dynamic.md @@ -0,0 +1,522 @@ +--- +title: Encrypt and decrypt Kafka fields dynamically in message values with {{site.event_gateway}} +permalink: /event-gateway/encrypt-kafka-message-fields-dynamic/ +content_type: how_to +breadcrumbs: + - /event-gateway/ + +products: + - event-gateway + - identity + +works_on: + - konnect + +tags: + - event-gateway + - kafka + +description: Use this tutorial to dynamically generate a list of fields to encrypt and decrypt with {{site.event_gateway}}. + +tldr: + q: How can I encrypt and decrypt Kafka specific fields depending on the connection context with {{site.event_gateway}}? + a: | + Generate a key and create a [static key](/event-gateway/entities/static-key/) entity, then create [field encryption](/event-gateway/policies/encrypt-fields/) and [field decryption](/event-gateway/policies/decrypt-fields/) policies with path expressions to enable message encryption and decryption. + +tools: + - konnect-api + +prereqs: + inline: + - title: Install kafkactl + position: before + content: | + Install [kafkactl](https://github.com/deviceinsight/kafkactl?tab=readme-ov-file#installation). You'll need it to interact with Kafka clusters. + + - title: Start a local Kafka cluster + position: before + include_content: knep/docker-compose-start + +cleanup: + inline: + - title: Clean up {{site.event_gateway}} resources + include_content: cleanup/products/event-gateway + icon_url: /assets/icons/gateway.svg + +related_resources: + - text: "{{site.event_gateway_short}} Control Plane API" + url: /api/konnect/event-gateway/ + - text: Event Gateway + url: /event-gateway/ + - text: Static keys + url: /event-gateway/entities/static-key/ + - text: Encrypt Fields policy + url: /event-gateway/policies/encrypt-fields/ + - text: Decrypt Fields policy + url: /event-gateway/policies/decrypt-fields/ + - text: Encrypt and decrypt Kafka messages + url: /event-gateway/encrypt-kafka-messages-with-event-gateway/ +--- + +{:.info} +> If you need to encrypt and decrypt whole Kafka messages instead of specific fields, use the Decrypt and Encrypt policies. +See [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/) for a complete how-to guide. + +## Create an auth server in {{site.identity}} + +Before you can configure a dynamic field decryption based on authentication, you must first create an auth server in {{site.identity}}. We recommend creating different auth servers for different environments or subsidiaries. The auth server name is unique per each organization and each {{site.konnect_short_name}} region. + +Create an auth server using the [`/v1/auth-servers` endpoint](/api/konnect/kong-identity/v1/#/operations/createAuthServer): + + +{% konnect_api_request %} +url: /v1/auth-servers +status_code: 201 +method: POST +headers: + - 'Content-Type: application/json' +body: + name: "Appointments Dev" + audience: "http://myhttpbin.dev" + description: "Auth server for the Appointment dev environment" +extract_body: + - name: 'id' + variable: AUTH_SERVER_ID + - name: 'issuer' + variable: ISSUER_URL +capture: + - variable: AUTH_SERVER_ID + jq: '.id' + - variable: ISSUER_URL + jq: '.issuer' +{% endkonnect_api_request %} + + +## Create a client in the auth server + +The client is the machine-to-machine credential. In this tutorial, {{site.konnect_short_name}} will autogenerate the client ID and secret, but you can alternatively specify one yourself. + +Configure the client using the [`/v1/auth-servers/$AUTH_SERVER_ID/clients` endpoint](/api/konnect/kong-identity/v1/#/operations/createAuthServerClient): + + +{% konnect_api_request %} +url: /v1/auth-servers/$AUTH_SERVER_ID/clients +status_code: 201 +method: POST +headers: + - 'Content-Type: application/json' +body: + name: Client + grant_types: + - client_credentials + allow_all_scopes: true + access_token_duration: 3600 + id_token_duration: 3600 + response_types: + - id_token + - token +extract_body: + - name: 'client_secret' + variable: CLIENT_SECRET + - name: 'id' + variable: CLIENT_ID +capture: + - variable: CLIENT_SECRET + jq: '.client_secret' + - variable: CLIENT_ID + jq: '.id' +{% endkonnect_api_request %} + + +## Add a backend cluster + +Run the following command to create a new backend cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters +status_code: 201 +method: POST +body: + name: default_backend_cluster + bootstrap_servers: + - kafka1:9092 + - kafka2:9092 + - kafka3:9092 + authentication: + type: anonymous + insecure_allow_anonymous_virtual_cluster_auth: true + tls: + enabled: false +extract_body: + - name: id + variable: BACKEND_CLUSTER_ID +capture: + - variable: BACKEND_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Add a virtual cluster + +Run the following command to create a new virtual cluster associated with our backend cluster: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters +status_code: 201 +method: POST +body: + name: example_virtual_cluster + destination: + id: $BACKEND_CLUSTER_ID + dns_label: vcluster-1 + authentication: + - type: anonymous + - type: oauth_bearer + mediation: terminate + jwks: + endpoint: $ISSUER_URL/.well-known/jwks + acl_mode: passthrough +extract_body: + - name: id + variable: VIRTUAL_CLUSTER_ID +capture: + - variable: VIRTUAL_CLUSTER_ID + jq: ".id" +{% endkonnect_api_request %} + + + +## Add a listener + +A [listener](/event-gateway/entities/listener/) represents hostname-port or IP-port combinations that connect to TCP sockets. +In this example, we're going to use port mapping, so we need to expose a range of ports. + +Run the following command to create a new listener: + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners +status_code: 201 +method: POST +body: + name: example_listener + addresses: + - 0.0.0.0 + ports: + - 19092-19095 +extract_body: + - name: id + variable: LISTENER_ID +capture: + - variable: LISTENER_ID + jq: ".id" +{% endkonnect_api_request %} + + +## Add a listener policy + +The listener needs a policy to tell it how to process requests and what to do with them. +In this example, we're going to use the [Forward to Virtual Cluster](/event-gateway/policies/forward-to-virtual-cluster/) policy, +which will forward requests based on a defined mapping to our virtual cluster. + +Run the following command to add the listener policy: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies +status_code: 201 +method: POST +body: + type: forward_to_virtual_cluster + name: forward + config: + type: port_mapping + advertised_host: localhost + destination: + id: $VIRTUAL_CLUSTER_ID +{% endkonnect_api_request %} + + +For demo purposes, we're using port mapping, which assigns each Kafka broker to a dedicated port on the {{site.event_gateway_short}}. +In production, we recommend using [SNI routing](/event-gateway/architecture/#hostname-mapping) instead. + +## Generate a key + +Use OpenSSL to generate the key that will be used to encrypt and decrypt messages: + + +{% env_variables %} +MY_KEY: $(openssl rand -base64 32) +{% endenv_variables %} + + +## Add a static key + +Run the following command to create a new [static key](/event-gateway/entities/static-key/) named `my-key` with the key we generated: + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/static-keys +status_code: 201 +method: POST +body: + name: my-key + value: $MY_KEY +{% endkonnect_api_request %} + + +## Create a Schema Validation produce policy + +Create a [Schema Validation policy](/event-gateway/policies/schema-validation-produce/) that validates that all produced values are JSON encoded. + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies +status_code: 201 +method: POST +body: + type: schema_validation + name: produce_validate_json + config: + type: json + value_validation_action: reject +extract_body: + - name: id + variable: PRODUCE_SCHEMA_VALIDATION_ID +capture: + - variable: PRODUCE_SCHEMA_VALIDATION_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `value_validation_action: reject` setting ensures that the entire batch containing an invalid message is rejected, and the producer receives an error. +Alternatively, you can use `mark`, which passes the message to the broker but adds a `kong/sverr-value` header to flag it as invalid. + +## Add a field encryption policy + +Use the following command to create a [field encryption policy](/event-gateway/policies/encrypt-fields/) to enable encryption of one field in the JSON value: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies +status_code: 201 +method: POST +body: + name: encrypt-fields-static-key + parent_policy_id: $PRODUCE_SCHEMA_VALIDATION_ID + type: encrypt_fields + config: + failure_mode: reject + encrypt_fields: + - paths: + - match: "personal.ssn" + encryption_key: + type: static + key: + name: my-key +{% endkonnect_api_request %} + + +## Create a Schema Validation consume policy + +Create a [Schema Validation policy](/event-gateway/policies/schema-validation-consume/) that validates that all consumed values are JSON encoded. + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + type: schema_validation + name: consume_validate_json + config: + type: json + value_validation_action: mark +extract_body: + - name: id + variable: CONSUME_SCHEMA_VALIDATION_ID +capture: + - variable: CONSUME_SCHEMA_VALIDATION_ID + jq: ".id" +{% endkonnect_api_request %} + + +The `value_validation_action: mark` passes the message to the broker but adds a `kong/sverr-value` header to flag it as invalid. + +## Add a field decryption policy + +This [field decryption policy](/event-gateway/policies/decrypt-fields/) is where {{ site.event_gateway_short }} will make a dynamic decision +on which fields to decrypt based on the authenticated user: + + +{% konnect_api_request %} +url: /v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies +status_code: 201 +method: POST +body: + name: decrypt-fields-static-key + parent_policy_id: $CONSUME_SCHEMA_VALIDATION_ID + type: decrypt_fields + config: + failure_mode: error + key_sources: + - type: static + decrypt_fields: + paths: "context.auth.principal.name == \"$CLIENT_ID\" ? [\"personal.ssn\"] : []" +{% endkonnect_api_request %} + + +## Set up `kafkactl` to use OAuth + +{:.warning} +> This step requires a `kafkactl` version >= 5.17.0. To check your version, run `kafkactl version`. +>

+> Note that this script is for demo purposes only and hard-codes client ID and client secret. +> For production, we recommend securing sensitive data. + +`kafkactl` will generate tokens using a script. Let's create the script: + + +{% validation custom-command %} +command: | + cat < get-oauth-token.sh + #!/bin/bash + curl -s --fail -X POST "$ISSUER_URL/oauth/token" \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "grant_type=client_credentials" \ + -d "client_id=$CLIENT_ID" \ + -d "client_secret=$CLIENT_SECRET" | jq -r '{"token": .access_token}' + EOF + chmod u+x get-oauth-token.sh +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +Next, create a `kafkactl` configuration with both non-authenticated and authenticated access: + + +{% validation custom-command %} +command: | + cat < kafkactl.yaml + contexts: + direct: + brokers: + - localhost:9095 + - localhost:9096 + - localhost:9094 + vc: + brokers: + - localhost:19092 + vc-oauth: + sasl: + enabled: true + mechanism: oauth + tokenprovider: + plugin: generic + options: + script: ./get-oauth-token.sh + args: [] + brokers: + - localhost:19092 + EOF +expected: + return_code: 0 +render_output: false +{% endvalidation %} + + +## Validate + +Let's check that the encryption/decryption works. +First, create a topic using the `direct` context, which is a direct connection to our Kafka cluster: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct create topic my-test-topic +expected: + message: "topic created: my-test-topic" + return_code: 0 +render_output: false +{% endvalidation %} + +Produce a message using the `vc` context which should encrypt the field: +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc produce my-test-topic --value='{"personal": {"ssn": "100-00-00001"}}' +expected: + message: "message produced (partition=0 offset=0)" + return_code: 0 +render_output: false +{% endvalidation %} + + +You should see the following response: +```shell +message produced (partition=0 offset=0) +``` +{:.no-copy-code} + +Now let's verify that the message was encrypted by consuming the message directly: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context direct consume my-test-topic --exit --output json --from-beginning --print-headers +expected: + message: '"kong/enc": "\u0000\u0004\u0000-static://' + return_code: 0 +render_output: false +{% endvalidation %} + +You should see the following response: +```json +{ + "Partition": 0, + "Offset": 0, + "Headers": { + "kong/enc": "\u0000\u0004\u0000-static://" + }, + "Value": "{\"personal\":{\"ssn\":\"AHry69Jl4oJzafOlu/xOjVa37hpfYTAVXoAolj94NoBQSKz7dkEF/gg=\"}}" +} +``` +{:.no-copy-code} + +The field encryption policy appends a `kong/enc` header to each message. This header identifies the encryption key by its ID. + +Now let's verify that the field decryption policy works by consuming the message through the virtual cluster with OAuth: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc-oauth consume my-test-topic --from-beginning --exit +expected: + message: '{"personal": {"ssn": "100-00-00001"}}' + return_code: 0 +render_output: false +{% endvalidation %} + +The output should look like this, with the value decrypted: + +```json +{"personal": {"ssn": "100-00-00001"}} +``` +{:.no-copy-code} + +We can then see that our field remains encrypted if we aren't authenticated: + +{% validation custom-command %} +command: | + kafkactl -C kafkactl.yaml --context vc consume my-test-topic --from-beginning --exit +expected: + message: '{"personal": {"ssn": "..."}}' + return_code: 0 +render_output: false +{% endvalidation %} + +The output should look like this, with the value obfuscated by encryption: + +```json +{"personal": {"ssn": "..."}} +``` +{:.no-copy-code} + diff --git a/app/_how-tos/event-gateway/kong-identity-oauth.md b/app/_how-tos/event-gateway/kong-identity-oauth.md index 5efbd532a9..b0abbd6257 100644 --- a/app/_how-tos/event-gateway/kong-identity-oauth.md +++ b/app/_how-tos/event-gateway/kong-identity-oauth.md @@ -321,7 +321,7 @@ body: This ACL policy grants full access to all topics with the prefix in the `topic_prefix` claim. -## Setup `kafkactl` to use OAuth +## Set up `kafkactl` to use OAuth {:.warning} > This step requires a `kafkactl` version >= 5.17.0. To check your version, run `kafkactl version`. diff --git a/app/_indices/event-gateway.yaml b/app/_indices/event-gateway.yaml index fd64b649ad..ea39fd1e45 100644 --- a/app/_indices/event-gateway.yaml +++ b/app/_indices/event-gateway.yaml @@ -46,6 +46,7 @@ sections: - path: /event-gateway/validate-avro-messages-with-schema-registry/ - path: /event-gateway/configure-topic-aliases/ - path: /event-gateway/encrypt-kafka-message-fields-with-event-gateway/ + - path: /event-gateway/encrypt-kafka-message-fields-dynamic/ - title: "References" items: - title: Event Gateway OpenAPI specification diff --git a/app/_landing_pages/event-gateway.yaml b/app/_landing_pages/event-gateway.yaml index 7798e1d9c4..e404dfc570 100644 --- a/app/_landing_pages/event-gateway.yaml +++ b/app/_landing_pages/event-gateway.yaml @@ -271,8 +271,9 @@ rows: [Encryption policy](/event-gateway/policies/encrypt/), [Decryption policy](/event-gateway/policies/decrypt/), [Encrypt Fields policy](/event-gateway/policies/encrypt-fields/), [Decrypt Fields policy](/event-gateway/policies/decrypt-fields/) guide: | - [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/) - or [encrypt and decrypt Kafka message fields](/event-gateway/encrypt-kafka-message-fields-with-event-gateway/) + [Encrypt and decrypt Kafka messages](/event-gateway/encrypt-kafka-messages-with-event-gateway/), + [encrypt and decrypt Kafka message fields](/event-gateway/encrypt-kafka-message-fields-with-event-gateway/) + or [encrypt and decrypt Kafka message fields dynamically](/event-gateway/encrypt-kafka-message-fields-dynamic/) - usecase: | **Simplify your Kafka deployment** because it's too complicated and expensive outcomes: