Skip to content

Commit a741f19

Browse files
author
hovoodd
committed
Add node-mongo-kafka-emitter
1 parent c860b40 commit a741f19

File tree

13 files changed

+123
-50
lines changed

13 files changed

+123
-50
lines changed

src/kafka-processor.js

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,22 +81,40 @@ class KafkaProcessor {
8181
}
8282
}
8383

84-
on(eventName, listener) {
85-
let listeners = this.listeners.get(eventName);
84+
addListener(event, listener) {
85+
let listeners = this.listeners.get(event);
8686

8787
if (!listeners) {
8888
listeners = new Set();
89-
this.listeners.set(eventName, listeners);
89+
this.listeners.set(event, listeners);
9090
}
9191

9292
listeners.add(listener);
93+
return this;
9394
}
9495

95-
off(eventName, listener) {
96-
const listeners = this.listeners.get(eventName);
97-
if (!listeners) return;
96+
on(event, listener) {
97+
return this.addListener(event, listener);
98+
}
99+
100+
removeListener(event, listener) {
101+
const listeners = this.listeners.get(event);
102+
if (listeners) listeners.delete(listener);
103+
return this;
104+
}
105+
106+
off(event, listener) {
107+
return this.removeListener(event, listener);
108+
}
109+
110+
once(eventName, listener) {
111+
const wrapper = async (...args) => {
112+
this.removeListener(eventName, wrapper);
113+
await listener(...args);
114+
};
98115

99-
listeners.delete(listener);
116+
this.addListener(eventName, wrapper);
117+
return this;
100118
}
101119
}
102120

src/kafka.js

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,41 @@
1-
const { Kafka } = require('kafkajs');
1+
const KafkaJS = require('kafkajs');
22

33
const { KafkaProcessor } = require('./kafka-processor');
44

5-
const kafka = new Kafka({
6-
clientId: 'ship',
7-
brokers: ['kafka:9092'],
8-
});
5+
class Kafka {
6+
constructor(config) {
7+
this.kafka = new KafkaJS.Kafka(config);
98

10-
const processors = {
11-
user: new KafkaProcessor(kafka, { groupId: 'ship' }, { topic: 'user' }),
12-
};
9+
this.producer = this.kafka.producer();
10+
this.processors = {};
11+
}
1312

14-
async function run() {
15-
await Promise.all(Object.values(processors).map((p) => p.run()));
16-
}
13+
addProcessor(topic, consumerConfig, subscribeConfig = { topic }) {
14+
if (this.processors[topic]) {
15+
throw new Error(`Processor for "${topic}" topic already exists`);
16+
}
17+
18+
this.processors[topic] = new KafkaProcessor(this.kafka, consumerConfig, subscribeConfig);
19+
return this;
20+
}
21+
22+
async run() {
23+
await this.producer.connect();
24+
await Promise.all(Object.values(this.processors).map((p) => p.run()));
25+
}
1726

18-
async function send({ event, data, ...record }) {
19-
const producer = kafka.producer();
20-
await producer.connect();
21-
await producer.send({
22-
...record,
23-
messages: [
24-
{ value: JSON.stringify({ event, data }) },
25-
],
26-
});
27-
await producer.disconnect();
27+
async send({ event, data, ...record }) {
28+
await this.producer.send({
29+
...record,
30+
messages: [
31+
{ value: JSON.stringify({ event, data }) },
32+
],
33+
});
34+
}
2835
}
2936

30-
module.exports = {
31-
kafka,
32-
processors,
33-
run,
34-
send,
35-
};
37+
const kafka = new Kafka({ clientId: 'ship', brokers: ['kafka:9092'] });
38+
39+
kafka.addProcessor('user', { groupId: 'ship' });
40+
41+
module.exports = kafka;

src/migrations/migration.schema.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ const schema = Joi.object({
88
.required(),
99
});
1010

11-
module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
11+
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });

src/migrations/migration.service.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
const db = require('db');
22
const fs = require('fs');
33
const path = require('path');
4-
const validateSchema = require('./migration.schema');
4+
const { validate } = require('./migration.schema');
55

6-
const service = db.createService('__migrationVersion', { validateSchema });
6+
const service = db.createService('__migrationVersion', { validate });
77
const migrationsPath = path.join(__dirname, 'migrations');
88
const _id = 'migration_version';
99

src/migrations/migrations-log/migration-log.schema.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ const schema = Joi.object({
1616
.required(),
1717
});
1818

19-
module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
19+
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });

src/migrations/migrations-log/migration-log.service.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
const db = require('db');
22

3-
const validateSchema = require('./migration-log.schema.js');
3+
const { validate } = require('./migration-log.schema.js');
44

5-
const service = db.createService('__migrationLog', { validateSchema });
5+
const service = db.createService('__migrationLog', { validate });
66

77
service.startMigrationLog = (_id, startTime, migrationVersion) => {
88
return service.atomic.findOneAndUpdate({ _id }, {

src/node-mongo-kafka-emitter.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
class NodeMongoKafkaEmitter {
2+
constructor(_topic, _kafka, _logger = console) {
3+
this.topic = _topic;
4+
this.kafka = _kafka;
5+
this.logger = _logger;
6+
7+
this.processor = this.kafka.processors[this.topic];
8+
if (!this.processor) {
9+
throw new Error(`Processor for "${this.topic}" topic doesn't exist`);
10+
}
11+
}
12+
13+
castEvent(event) {
14+
return `${this.topic}:${event}`;
15+
}
16+
17+
async emit(event, data) {
18+
try {
19+
await this.kafka.send({
20+
topic: this.topic,
21+
event: this.castEvent(event),
22+
data,
23+
});
24+
} catch (error) {
25+
this.logger.error(error);
26+
}
27+
}
28+
29+
on(event, listener) {
30+
this.processor.on(this.castEvent(event), async (message) => {
31+
await listener(message.data);
32+
});
33+
}
34+
35+
once(event, listener) {
36+
this.processor.once(this.castEvent(event), async (message) => {
37+
await listener(message.data);
38+
});
39+
}
40+
}
41+
42+
module.exports = NodeMongoKafkaEmitter;

src/resources/account/account.spec.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ const db = require('tests/db');
66
const { USER, ERRORS } = require('tests/constants');
77
const testsHelper = require('tests/tests.helper');
88
const UserBuilder = require('resources/user/user.builder');
9-
const validateSchema = require('resources/user/user.schema');
9+
const { validate } = require('resources/user/user.schema');
1010

11-
const userService = db.createService(USER.COLLECTION, { validateSchema });
11+
const userService = db.createService(USER.COLLECTION, { validate });
1212
const app = server.listen();
1313

1414
const request = supertest.agent(app);

src/resources/token/token.schema.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ const schema = Joi.object({
1616
isShadow: Joi.boolean(),
1717
});
1818

19-
module.exports = (obj) => schema.validate(obj, { allowUnknown: false });
19+
module.exports.validate = (obj) => schema.validate(obj, { allowUnknown: false });

src/resources/token/token.service.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ const db = require('db');
22
const securityUtil = require('security.util');
33
const { DATABASE_DOCUMENTS, TOKEN_SECURITY_LENGTH, TOKEN_TYPES } = require('app.constants');
44

5-
const validateSchema = require('./token.schema');
5+
const { validate } = require('./token.schema');
66

7-
const service = db.createService(DATABASE_DOCUMENTS.TOKENS, { validateSchema });
7+
const service = db.createService(DATABASE_DOCUMENTS.TOKENS, { validate });
88

99
const createToken = async (userId, type) => {
1010
const value = await securityUtil.generateSecureToken(TOKEN_SECURITY_LENGTH);

0 commit comments

Comments
 (0)