Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
"src/ConsumerConfig.cc",
"src/Reader.cc",
"src/ReaderConfig.cc",
"src/ThreadSafeDeferred.cc"
"src/ThreadSafeDeferred.cc",
"src/CryptoKeyReader.cc"
],
'conditions': [
['OS=="mac"', {
Expand Down
30 changes: 30 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export interface ProducerConfig {
properties?: { [key: string]: string };
publicKeyPath?: string;
encryptionKey?: string;
encryptionKeys?: string[];
cryptoKeyReader?: CryptoKeyReader;
cryptoFailureAction?: ProducerCryptoFailureAction;
chunkingEnabled?: boolean;
schema?: SchemaInfo;
Expand Down Expand Up @@ -99,6 +101,7 @@ export interface ConsumerConfig {
listener?: (message: Message, consumer: Consumer) => void;
readCompacted?: boolean;
privateKeyPath?: string;
cryptoKeyReader?: CryptoKeyReader;
cryptoFailureAction?: ConsumerCryptoFailureAction;
maxPendingChunkedMessage?: number;
autoAckOldestChunkedMessageOnQueueFull?: number;
Expand Down Expand Up @@ -171,6 +174,7 @@ export class Message {
getPartitionKey(): string;
getOrderingKey(): string;
getProducerName(): string;
getEncryptionContext(): EncryptionContext | null;
}

export class MessageId {
Expand Down Expand Up @@ -198,6 +202,22 @@ export interface TopicMetadata {
*/
export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number;

export interface EncryptionKey {
key: string;
value: string;
metadata: { [key: string]: string };
}

export interface EncryptionContext {
keys: EncryptionKey[];
param: string;
algorithm: string;
compressionType: CompressionType;
uncompressedMessageSize: number;
batchSize: number;
isDecryptionFailed: boolean;
}

export interface SchemaInfo {
schemaType: SchemaType;
name?: string;
Expand Down Expand Up @@ -285,6 +305,16 @@ export class AuthenticationBasic {
});
}

export interface EncryptionKeyInfo {
key: Buffer;
metadata: { [key: string]: string };
}

export class CryptoKeyReader {
getPublicKey(keyName: string, metadata: { [key: string]: string }): EncryptionKeyInfo;
getPrivateKey(keyName: string, metadata: { [key: string]: string }): EncryptionKeyInfo;
}

export enum LogLevel {
DEBUG = 0,
INFO = 1,
Expand Down
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const Pulsar = {
Client,
Message: PulsarBinding.Message,
MessageId: PulsarBinding.MessageId,
CryptoKeyReader: PulsarBinding.CryptoKeyReader,
AuthenticationTls,
AuthenticationAthenz,
AuthenticationToken,
Expand Down
24 changes: 17 additions & 7 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "ConsumerConfig.h"
#include "Consumer.h"
#include "SchemaInfo.h"
#include "CryptoKeyReader.h"
#include "Message.h"
#include "pulsar/ConsumerConfiguration.h"
#include <pulsar/c/consumer_configuration.h>
Expand Down Expand Up @@ -60,6 +61,7 @@ static const std::string CFG_KEY_SHARED_POLICY = "keySharedPolicy";
static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode";
static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = "allowOutOfOrderDelivery";
static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";
static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader";

static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
Expand Down Expand Up @@ -249,13 +251,21 @@ void ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
std::string privateKeyPath = consumerConfig.Get(CFG_PRIVATE_KEY_PATH).ToString().Utf8Value();
pulsar_consumer_configuration_set_default_crypto_key_reader(
this->cConsumerConfig.get(), publicKeyPath.c_str(), privateKeyPath.c_str());
if (consumerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
std::string cryptoFailureAction = consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
if (CONSUMER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction)) {
pulsar_consumer_configuration_set_crypto_failure_action(
this->cConsumerConfig.get(), CONSUMER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
}
}

if (consumerConfig.Has(CFG_CRYPTO_KEY_READER) && consumerConfig.Get(CFG_CRYPTO_KEY_READER).IsObject()) {
Napi::Object cryptoKeyReaderObj = consumerConfig.Get(CFG_CRYPTO_KEY_READER).As<Napi::Object>();
CryptoKeyReader *cryptoKeyReader = Napi::ObjectWrap<CryptoKeyReader>::Unwrap(cryptoKeyReaderObj);
this->cConsumerConfig.get()->consumerConfiguration.setCryptoKeyReader(
cryptoKeyReader->GetCCryptoKeyReader());
}

if (consumerConfig.Has(CFG_CRYPTO_FAILURE_ACTION) &&
consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).IsString()) {
std::string cryptoFailureAction = consumerConfig.Get(CFG_CRYPTO_FAILURE_ACTION).ToString().Utf8Value();
if (CONSUMER_CRYPTO_FAILURE_ACTION.count(cryptoFailureAction)) {
pulsar_consumer_configuration_set_crypto_failure_action(
this->cConsumerConfig.get(), CONSUMER_CRYPTO_FAILURE_ACTION.at(cryptoFailureAction));
}
}

Expand Down
182 changes: 182 additions & 0 deletions src/CryptoKeyReader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "CryptoKeyReader.h"
#include <pulsar/Result.h>
#include <pulsar/EncryptionKeyInfo.h>
#include <thread>
#include <future>
#include <iostream>

class CryptoKeyReaderWrapper : public pulsar::CryptoKeyReader {
public:
CryptoKeyReaderWrapper(const Napi::Object& jsObject) : mainThreadId_(std::this_thread::get_id()) {
jsObject_.Reset(jsObject, 1);
tsfn_ = Napi::ThreadSafeFunction::New(
jsObject.Env(), Napi::Function::New(jsObject.Env(), [](const Napi::CallbackInfo& info) {}), jsObject,
"CryptoKeyReader", 0, 1);
}

~CryptoKeyReaderWrapper() { tsfn_.Release(); }

pulsar::Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
pulsar::EncryptionKeyInfo& encKeyInfo) const override {
return executeCallback("getPublicKey", keyName, metadata, encKeyInfo);
}

pulsar::Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
pulsar::EncryptionKeyInfo& encKeyInfo) const override {
return executeCallback("getPrivateKey", keyName, metadata, encKeyInfo);
}

private:
mutable Napi::ObjectReference jsObject_;
Napi::ThreadSafeFunction tsfn_;
std::thread::id mainThreadId_;

struct CallbackData {
std::string method;
std::string keyName;
std::map<std::string, std::string> metadata;
std::shared_ptr<std::promise<pulsar::Result>> promise;
pulsar::EncryptionKeyInfo* encKeyInfo;
Napi::ObjectReference* jsObjectRef;
};

static void parseEncryptionKeyInfo(const Napi::Object& obj, pulsar::EncryptionKeyInfo& info) {
if (obj.Has("key") && obj.Get("key").IsBuffer()) {
Napi::Buffer<char> keyBuf = obj.Get("key").As<Napi::Buffer<char>>();
info.setKey(std::string(keyBuf.Data(), keyBuf.Length()));
}
if (obj.Has("metadata") && obj.Get("metadata").IsObject()) {
std::map<std::string, std::string> metadata;
Napi::Object metaObj = obj.Get("metadata").As<Napi::Object>();
Napi::Array keys = metaObj.GetPropertyNames();
for (uint32_t i = 0; i < keys.Length(); i++) {
std::string k = keys.Get(i).ToString().Utf8Value();
std::string v = metaObj.Get(k).ToString().Utf8Value();
metadata[k] = v;
}
info.setMetadata(metadata);
}
}

pulsar::Result executeCallback(const std::string& method, const std::string& keyName,
std::map<std::string, std::string>& metadata,
pulsar::EncryptionKeyInfo& encKeyInfo) const {
if (std::this_thread::get_id() == mainThreadId_) {
Napi::Env env = jsObject_.Env();
Napi::HandleScope scope(env);

if (jsObject_.IsEmpty()) {
return pulsar::Result::ResultCryptoError;
}
Napi::Object obj = jsObject_.Value();

if (!obj.Has(method)) {
return pulsar::Result::ResultCryptoError;
}
Napi::Value funcVal = obj.Get(method);
if (!funcVal.IsFunction()) {
return pulsar::Result::ResultCryptoError;
}
Napi::Function func = funcVal.As<Napi::Function>();

Napi::Object metadataObj = Napi::Object::New(env);
for (const auto& kv : metadata) {
metadataObj.Set(kv.first, kv.second);
}

try {
Napi::Value result = func.Call(obj, {Napi::String::New(env, keyName), metadataObj});
if (result.IsObject()) {
parseEncryptionKeyInfo(result.As<Napi::Object>(), encKeyInfo);
return pulsar::Result::ResultOk;
}
} catch (const Napi::Error& e) {
return pulsar::Result::ResultCryptoError;
}
return pulsar::Result::ResultCryptoError;
} else {
auto promise = std::make_shared<std::promise<pulsar::Result>>();
auto future = promise->get_future();

auto* data = new CallbackData{method, keyName, metadata, promise, &encKeyInfo, &jsObject_};

napi_status status =
tsfn_.BlockingCall(data, [](Napi::Env env, Napi::Function jsCallback, void* context) {
CallbackData* data = static_cast<CallbackData*>(context);

Napi::HandleScope scope(env);
Napi::Object obj = data->jsObjectRef->Value();

pulsar::Result res = pulsar::Result::ResultCryptoError;
if (obj.Has(data->method) && obj.Get(data->method).IsFunction()) {
Napi::Function func = obj.Get(data->method).As<Napi::Function>();
Napi::Object metadataObj = Napi::Object::New(env);
for (const auto& kv : data->metadata) {
metadataObj.Set(kv.first, kv.second);
}

try {
Napi::Value result = func.Call(obj, {Napi::String::New(env, data->keyName), metadataObj});
if (result.IsObject()) {
parseEncryptionKeyInfo(result.As<Napi::Object>(), *data->encKeyInfo);
res = pulsar::Result::ResultOk;
}
} catch (...) {
res = pulsar::Result::ResultCryptoError;
}
}

data->promise->set_value(res);
delete data;
});

if (status != napi_ok) {
delete data;
return pulsar::Result::ResultCryptoError;
}

future.wait();
return future.get();
}
}
};

Napi::FunctionReference CryptoKeyReader::constructor;

void CryptoKeyReader::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);

Napi::Function func = DefineClass(env, "CryptoKeyReader", {});

constructor = Napi::Persistent(func);
constructor.SuppressDestruct();

exports.Set("CryptoKeyReader", func);
}

CryptoKeyReader::CryptoKeyReader(const Napi::CallbackInfo& info) : Napi::ObjectWrap<CryptoKeyReader>(info) {}

CryptoKeyReader::~CryptoKeyReader() {}

std::shared_ptr<pulsar::CryptoKeyReader> CryptoKeyReader::GetCCryptoKeyReader() {
return std::make_shared<CryptoKeyReaderWrapper>(Value());
}
39 changes: 39 additions & 0 deletions src/CryptoKeyReader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef CRYPTO_KEY_READER_H
#define CRYPTO_KEY_READER_H

#include <napi.h>
#include <pulsar/CryptoKeyReader.h>
#include <thread>

class CryptoKeyReader : public Napi::ObjectWrap<CryptoKeyReader> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Object NewInstance(const Napi::CallbackInfo &info);
CryptoKeyReader(const Napi::CallbackInfo &info);
~CryptoKeyReader();
std::shared_ptr<pulsar::CryptoKeyReader> GetCCryptoKeyReader();

private:
static Napi::FunctionReference constructor;
};

#endif
Loading
Loading