From 09c2f2d9b4e2b3153b805ad2bfe2c027999fa323 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=CC=81=20F=2E=20Romaniello?= Date: Thu, 30 Mar 2017 14:29:44 -0300 Subject: [PATCH 1/2] wip --- .gitignore | 2 + bench/different_takes.js | 82 +++++++ bench/same_takes.js | 82 +++++++ lib/DbBuffer.js | 137 +++++++++++ lib/db.js | 73 +++++- package.json | 5 +- test/withFlushInterval.tests.js | 407 ++++++++++++++++++++++++++++++++ 7 files changed, 778 insertions(+), 10 deletions(-) create mode 100644 bench/different_takes.js create mode 100644 bench/same_takes.js create mode 100644 lib/DbBuffer.js create mode 100644 test/withFlushInterval.tests.js diff --git a/.gitignore b/.gitignore index 5702678..380ba86 100644 --- a/.gitignore +++ b/.gitignore @@ -63,3 +63,5 @@ typings/ # End of https://www.gitignore.io/api/node + +bench/db diff --git a/bench/different_takes.js b/bench/different_takes.js new file mode 100644 index 0000000..65c4e48 --- /dev/null +++ b/bench/different_takes.js @@ -0,0 +1,82 @@ +const bench = require('bench'); +const async = require('async'); +const _ = require('lodash'); +const rimraf = require('rimraf'); +const path = require('path'); +const fs = require('fs'); +const randomstring = require('randomstring'); + +const LimitDB = require('../lib/db'); + +const types = { + ip: { + size: 10, + per_second: 5 + } +}; + +const dbBase = path.join(__dirname, 'db'); + +if (fs.existsSync(dbBase)) { + rimraf.sync(dbBase); +} + +fs.mkdirSync(dbBase); + +const db1 = new LimitDB({ + path: path.join(dbBase, 'db1'), + types +}); + + +const db2 = new LimitDB({ + path: path.join(dbBase, 'db2'), + types, + flushInterval: '5m' +}); + + +exports.compareCount = 10; +exports.countPerLap = 10000; + +const fixture = _.range(exports.compareCount).map(() => { + return _.range(exports.countPerLap).map(() => { + return randomstring.generate(4); + }); +}); + +var lapOnTest1 = 0, lapOnTest2 = 0; + +exports.compare = { + 'random take on disk db' : function (done) { + const keys = fixture[lapOnTest1++]; + async.forEach(keys, (key, done) => { + db1.take({ + type: 'ip', + key + }, (err) => { + if (err) { console.error(err.message); process.exit(1); } + done(); + }); + }, done); + }, + 'random take with flushing' : function (done) { + const keys = fixture[lapOnTest2++]; + async.forEach(keys, (key, done) => { + db2.take({ + type: 'ip', + key + }, (err) => { + if (err) { console.error(err.message); process.exit(1); } + done(); + }); + }, done); + } +}; + +exports.done = (results) => { + bench.show(results); + process.exit(0); +}; + +require("bench").runMain(); diff --git a/bench/same_takes.js b/bench/same_takes.js new file mode 100644 index 0000000..4766e4c --- /dev/null +++ b/bench/same_takes.js @@ -0,0 +1,82 @@ +const bench = require('bench'); +const async = require('async'); +const _ = require('lodash'); +const rimraf = require('rimraf'); +const path = require('path'); +const fs = require('fs'); +const randomstring = require('randomstring'); + +const LimitDB = require('../lib/db'); + +const types = { + ip: { + size: 10, + per_second: 5 + } +}; + +const dbBase = path.join(__dirname, 'db'); + +if (fs.existsSync(dbBase)) { + rimraf.sync(dbBase); +} + +fs.mkdirSync(dbBase); + +const db1 = new LimitDB({ + path: path.join(dbBase, 'db1'), + types +}); + + +const db2 = new LimitDB({ + path: path.join(dbBase, 'db2'), + types, + flushInterval: '5m' +}); + + +exports.compareCount = 20; +exports.countPerLap = 1000; + +const fixture = _.range(exports.compareCount).map(() => { + return _.range(exports.countPerLap).map(() => { + return 'x'; + }); +}); + +var lapOnTest1 = 0, lapOnTest2 = 0; + +exports.compare = { + 'same take on disk db' : function (done) { + const keys = fixture[lapOnTest1++]; + async.forEach(keys, (key, done) => { + db1.take({ + type: 'ip', + key + }, (err) => { + if (err) { console.error(err.message); process.exit(1); } + done(); + }); + }, done); + }, + 'same take on memory' : function (done) { + const keys = fixture[lapOnTest2++]; + async.forEach(keys, (key, done) => { + db2.take({ + type: 'ip', + key + }, (err) => { + if (err) { console.error(err.message); process.exit(1); } + done(); + }); + }, done); + } +}; + +exports.done = (results) => { + bench.show(results); + process.exit(0); +}; + +require("bench").runMain(); diff --git a/lib/DbBuffer.js b/lib/DbBuffer.js new file mode 100644 index 0000000..a3f1de9 --- /dev/null +++ b/lib/DbBuffer.js @@ -0,0 +1,137 @@ +'use strict'; + +const ms = require('ms'); +const async = require('async'); +const _ = require('lodash'); + +//inspired by https://github.com/dominictarr/hashlru + +class DbBuffer { + constructor(params) { + this.max = 5000; + this.size = 0; + this.current = Object.create(null); + this.previous = Object.create(null); + + this._db = params.db; + + const flushInterval = typeof params.flushInterval === 'string' ? + ms(params.flushInterval): + params.flushInterval; + + this._setFlushInterval(flushInterval || ms('5m')); + + this.flushed = false; + this.locks = {}; + } + + _setFlushInterval(flushInterval) { + setTimeout(() => { + this.flush(() => { + this._setFlushInterval(flushInterval); + }); + }, flushInterval); + } + + _flushStage(stage, done) { + async.forEach(Object.keys(stage), (key, saved) => { + this._db.put(key, stage[key], saved); + }, done); + } + + flush(callback) { + this.flushed = true; + + async.parallel([ + done => this._flushStage(this.current, done), + done => this._flushStage(this.previous, done) + ], callback || _.noop); + } + + _update(key, value) { + this.current[key] = value; + this.size++; + + if(this.size >= this.max) { + this.flush(); + this.size = 0; + this.previous = this.current; + this.current = Object.create(null); + } + } + + _set(key, value) { + if(this.current[key] !== undefined) { + this.current[key] = value; + } else { + this._update(key, value); + } + return value; + } + + _get(key) { + const fromCurrent = this.current[key]; + + if(fromCurrent !== undefined) { + return fromCurrent; + } + + const fromPrevious = this.previous[key]; + + if(fromPrevious) { + this._update(key, fromPrevious); + return fromPrevious; + } + } + + lock(key, run) { + if (!this.locks[key]) { + this.locks[key] = []; + run(() => { + const waiting = this.locks[key]; + var cb; + while((cb = waiting.shift())) { + cb(_.noop); + } + this.locks[key] = false; + }); + } else { + this.locks[key].push(run); + } + } + + get(key, typeParams, callback) { + this.flushed = false; + + this.lock(key, release => { + const fromBuffer = this._get(key); + if (fromBuffer) { + callback(null, fromBuffer); + return release(); + } + + this._db.get(key, (err, bucket) => { + if (err && err.name !== 'NotFoundError') { + callback(err); + return release(); + } + + if (typeof bucket === 'string') { + bucket = JSON.parse(bucket); + } else if (typeof bucket === 'undefined') { + bucket = { + lastDrip: Date.now(), + content: typeParams.size, + size: typeParams.size + }; + } + + this._set(key, bucket); + callback(null, bucket); + release(); + }); + }); + } +} + +module.exports = DbBuffer; diff --git a/lib/db.js b/lib/db.js index 4be7243..8924c4a 100644 --- a/lib/db.js +++ b/lib/db.js @@ -7,6 +7,7 @@ const ms = require('ms'); const _ = require('lodash'); const gms = require('./gms'); const LRU = require('lru-cache'); +const DbBuffer = require('./DbBuffer'); const INTERVAL_TO_MS = { 'per_second': ms('1s'), @@ -16,6 +17,7 @@ const INTERVAL_TO_MS = { }; const INTERVAL_SHORTCUTS = Object.keys(INTERVAL_TO_MS); + const GC_GRACE_PERIOD = ms('2m'); @@ -85,11 +87,22 @@ class LimitDB { this._types = _.reduce(params.types, (result, typeParams, name) => { const type = result[name] = normalizeType(typeParams); type.db = gms(spaces(this._db, name, { valueEncoding: 'json' })); + + if (typeof params.flushInterval !== 'undefined') { + type.buffer = new DbBuffer({ + db: type.db, + getTypeParams: (key) => this._getTypeParams(type, key), + flushInterval: params.flushInterval + }); + } + return result; }, {}); + + this._flushInterval = params.flushInterval; } - _drip(bucket, type) { + _drip(bucket, type, inPlace) { if (!type.per_interval) { return bucket; } @@ -99,6 +112,15 @@ class LimitDB { const dripAmount = deltaMS * (type.per_interval / type.interval); const content = Math.min(bucket.content + dripAmount, type.size); + if (inPlace) { + bucket.content = content; + bucket.lastDrip = now; + bucket.size = type.size; + bucket.beforeDrip = bucket.content; + + return bucket; + } + return { content: content, lastDrip: now, @@ -171,6 +193,26 @@ class LimitDB { }); } + if (this._flushInterval) { + return type.buffer.get(params.key, typeParams, (err, bucket) => { + if (err) { return callback(err); } + this._drip(bucket, typeParams, true); + + const conformant = bucket.content >= count; + + if (conformant) { + bucket.content -= count; + } + + return callback(null, { + conformant, + remaining: Math.floor(bucket.content), + reset: this._getResetTimestamp(bucket, typeParams), + limit: typeParams.size, + }); + }); + } + type.db.gms(params.key, (bucket) => { if (bucket && typeof bucket === 'string') { bucket = JSON.parse(bucket); @@ -178,16 +220,10 @@ class LimitDB { bucket = bucket ? this._drip(bucket, typeParams) : { lastDrip: Date.now(), - content: typeParams.size + content: typeParams.size, + size: typeParams.size }; - //this happen when we scale down a bucket - //imagine the size was 10 and the current content is 9. - //then we scale down the bucket to 6... - //The current content should be computed as 6, not 9. - bucket.content = Math.min(typeParams.size, bucket.content); - bucket.size = typeParams.size; - if (bucket.content >= count) { bucket.lastConformant = true; bucket.content -= count; @@ -272,6 +308,19 @@ class LimitDB { typeParams.size : params.count || typeParams.size; + if (this._flushInterval) { + return type.buffer.get(params.key, typeParams, (err, bucket) => { + + if (err) { return callback(err); } + bucket.content = Math.min(typeParams.size, bucket.content + count); + callback(null, { + remaining: Math.floor(bucket.content), + reset: bucket.reset, + limit: typeParams.size + }); + }); + } + type.db.gms(params.key, bucket => { if (bucket && typeof bucket === 'string') { bucket = JSON.parse(bucket); @@ -315,6 +364,12 @@ class LimitDB { return setImmediate(callback, new Error(`undefined bucket type ${params.type}`)); } + if (this._flushInterval && !type.buffer.flushed) { + return type.buffer.flush(() => { + this.status(params, callback); + }); + } + const items = []; var count = 0; diff --git a/package.json b/package.json index 871c032..2f94284 100644 --- a/package.json +++ b/package.json @@ -25,8 +25,11 @@ }, "devDependencies": { "async": "^2.1.5", + "bench": "^0.3.6", "chai": "^3.5.0", "mocha": "^3.2.0", - "mockdate": "^2.0.1" + "mockdate": "^2.0.1", + "randomstring": "^1.1.5", + "rimraf": "^2.6.1" } } diff --git a/test/withFlushInterval.tests.js b/test/withFlushInterval.tests.js new file mode 100644 index 0000000..0f23e23 --- /dev/null +++ b/test/withFlushInterval.tests.js @@ -0,0 +1,407 @@ +const LimitDB = require('../lib/db'); +const MockDate = require('mockdate'); +const assert = require('chai').assert; +const async = require('async'); +const _ = require('lodash'); +const ms = require('ms'); + +const types = { + ip: { + size: 10, + per_second: 5, + overrides: { + '127.0.0.1': { + per_second: 100 + }, + 'local-lan': { + match: /192\.168\./, + per_second: 50 + }, + '10.0.0.123': { + until: new Date(Date.now() - ms('24h')), //yesterday + per_second: 50 + }, + '10.0.0.1': { + size: 1, + per_hour: 2 + }, + '0.0.0.0': { + size: 100, + unlimited: true + }, + '8.8.8.8': { + size: 10 + } + } + } +}; + +describe('LimitDB (with flush interval)', () => { + it('should throw an error when path is not specified', () => { + assert.throws(() => new LimitDB({}), /path is required/); + }); + + it('should not throw an error when path is not specified and inMemory: true', () => { + assert.doesNotThrow(() => new LimitDB({ inMemory: true }), /path is required/); + }); + + afterEach(function () { + MockDate.reset(); + }); + + + describe('take', () => { + const db = new LimitDB({ + inMemory: true, + types, + flushInterval: '5m' + }); + + it('should fail when type is not provided', (done) => { + db.take({}, (err) => { + assert.match(err.message, /type is required/); + done(); + }); + }); + + it('should fail when type is not defined', (done) => { + db.take({ type: 'cc' }, (err) => { + assert.match(err.message, /undefined bucket type cc/); + done(); + }); + }); + + it('should fail when key is not provided', (done) => { + db.take({ type: 'ip' }, (err) => { + assert.match(err.message, /key is required/); + done(); + }); + }); + + + it('should work :)', (done) => { + const takeParams = { type: 'ip', key: '21.17.65.41'}; + db.take(takeParams, (err) => { + if (err) return done(err); + db.take(takeParams, (err, result) => { + if (err) { return done(err); } + assert.equal(result.conformant, true); + assert.equal(result.remaining, 8); + done(); + }); + }); + }); + + it('should return TRUE when traffic is conformant', (done) => { + var now = 1425920267; + MockDate.set(now * 1000); + db.take({ + type: 'ip', + key: '1.1.1.1' + }, function (err, result) { + if (err) return done(err); + assert.ok(result.conformant); + assert.equal(result.remaining, 9); + assert.equal(result.reset, now + 1); + assert.equal(result.limit, 10); + done(); + }); + }); + + it('should return FALSE when requesting more than the size of the bucket', (done) => { + var now = 1425920267; + MockDate.set(now * 1000); + db.take({ + type: 'ip', + key: '2.2.2.2', + count: 12 + }, function (err, result) { + if (err) return done(err); + assert.notOk(result.conformant); + assert.equal(result.remaining, 10); + assert.equal(result.reset, now); + assert.equal(result.limit, 10); + done(); + }); + }); + + it('should return FALSE when traffic is not conformant', (done) => { + const takeParams = { + type: 'ip', + key: '3.3.3.3' + }; + const now = 1425920267; + MockDate.set(now * 1000); + async.map(_.range(10), (i, done) => { + db.take(takeParams, done); + }, (err, responses) => { + if (err) return done(err); + assert.ok(responses.every(function (r) { return r.conformant; })); + db.take(takeParams, (err, response) => { + assert.notOk(response.conformant); + assert.equal(response.remaining, 0); + done(); + }); + }); + }); + + it('should return TRUE if an override by name allows more', (done) => { + const takeParams = { + type: 'ip', + key: '127.0.0.1' + }; + const now = 1425920267; + MockDate.set(now * 1000); + async.each(_.range(10), (i, done) => { + db.take(takeParams, done); + }, (err) => { + if (err) return done(err); + db.take(takeParams, function (err, result) { + assert.ok(result.conformant); + assert.ok(result.remaining, 89); + done(); + }); + }); + }); + + it('should return TRUE if an override by regex allows more', (done) => { + const takeParams = { + type: 'ip', + key: '192.168.0.1' + }; + const now = 1425920267; + MockDate.set(now * 1000); + async.each(_.range(10), (i, done) => { + db.take(takeParams, done); + }, (err) => { + if (err) return done(err); + db.take(takeParams, function (err, result) { + assert.ok(result.conformant); + assert.ok(result.remaining, 39); + done(); + }); + }); + }); + + + it('can expire an override', (done) => { + const takeParams = { + type: 'ip', + key: '10.0.0.123' + }; + async.each(_.range(10), (i, cb) => { + db.take(takeParams, cb); + }, (err) => { + if (err) return done(err); + db.take(takeParams, (err, response) => { + assert.notOk(response.conformant); + done(); + }); + }); + }); + + it('should use seconds ceiling for next reset', (done) => { + // it takes ~1790 msec to fill the bucket with this test + const now = 1425920267; + MockDate.set(now * 1000); + const requests = _.range(9).map(() => { + return cb => db.take({ type: 'ip', key: '211.123.12.36' }, cb); + }); + async.series(requests, function (err, results) { + if (err) return done(err); + var lastResult = results[results.length -1]; + assert.ok(lastResult.conformant); + assert.equal(lastResult.remaining, 1); + assert.equal(lastResult.reset, now + 2); + assert.equal(lastResult.limit, 10); + done(); + }); + }); + + it('should set reset to UNIX timestamp regardless of period', function(done){ + var now = 1425920267; + MockDate.set(now * 1000); + db.take({ type: 'ip', key: '10.0.0.1' }, (err, result) => { + if (err) { return done(err); } + assert.ok(result.conformant); + assert.equal(result.remaining, 0); + assert.equal(result.reset, now + 1800); + assert.equal(result.limit, 1); + done(); + }); + }); + + it('should work for unlimited', (done) => { + var now = 1425920267; + MockDate.set(now * 1000); + db.take({ type: 'ip', key: '0.0.0.0' }, (err, response) => { + if (err) return done(err); + assert.ok(response.conformant); + assert.equal(response.remaining, 100); + assert.equal(response.reset, now); + assert.equal(response.limit, 100); + done(); + }); + }); + + it('should work with a fixed bucket', (done) => { + async.map(_.range(10), (i, done) => { + db.take({ type: 'ip', key: '8.8.8.8' }, done); + }, (err, results) => { + if (err) return done(err); + results.forEach((r, i) => { + assert.equal(r.remaining + i + 1, 10); + }); + assert.ok(results.every(r => r.conformant)); + db.take({ type: 'ip', key: '8.8.8.8' }, (err, response) => { + assert.notOk(response.conformant); + done(); + }); + }); + }); + + }); + + describe('PUT', function () { + const db = new LimitDB({ + inMemory: true, + types, + flushInterval: '5m' + }); + + it('should restore the bucket when reseting', (done) => { + const bucketKey = { type: 'ip', key: '211.123.12.12'}; + db.take(bucketKey, (err) => { + if (err) return done(err); + db.put(bucketKey, (err) => { + if (err) return done(err); + db.take(bucketKey, function (err, response) { + if (err) return done(err); + assert.equal(response.remaining, 9); + done(); + }); + }); + }); + }); + + it('should restore the bucket when reseting with all', (done) => { + const takeParams = { type: 'ip', key: '21.17.65.41', count: 9 }; + db.take(takeParams, (err) => { + if (err) return done(err); + db.put({ type: 'ip', key: '21.17.65.41', count: 1, all: true }, (err) => { + if (err) return done(err); + db.take(takeParams, function (err, response) { + if (err) return done(err); + assert.equal(response.conformant, true); + assert.equal(response.remaining, 1); + done(); + }); + }); + }); + }); + + it('should be able to reset without callback', (done) => { + const bucketKey = { type: 'ip', key: '211.123.12.12'}; + db.take(bucketKey, (err) => { + if (err) return done(err); + db.put(bucketKey); + setImmediate(() => { + db.take(bucketKey, function (err, response) { + if (err) return done(err); + assert.equal(response.remaining, 9); + done(); + }); + }); + }); + }); + + it('should work for a fixed bucket', (done) => { + db.take({ type: 'ip', key: '8.8.8.8' }, function (err, result) { + assert.ok(result.conformant); + db.put({ type: 'ip', key: '8.8.8.8' }, function (err, result) { + if (err) return done(err); + assert.equal(result.remaining, 10); + // assert.isUndefined(result.reset); + done(); + }); + }); + }); + }); + + describe('STATUS', function () { + const db = new LimitDB({ + inMemory: true, + types, + flushInterval: '5m' + }); + + it('should return a list of buckets matching the prefix', (done) => { + const now = 1425920267; + MockDate.set(now * 1000); + async.map(_.range(10), (i, done) => { + db.take({ type: 'ip', key: `some-prefix-${i}` }, done); + }, (err, results) => { + if (err) return done(err); + assert.ok(results.every(r => r.conformant)); + db.status({ type: 'ip', prefix: 'some-prefix' }, (err, result) => { + if (err) { return done(err); } + assert.equal(result.items.length, 10); + for(var i = 0; i < 10; i++) { + assert.equal(result.items[i].key, `some-prefix-${i}`); + assert.equal(result.items[i].limit, 10); + assert.equal(result.items[i].reset, now + 1); + } + done(); + }); + }); + }); + }); + + describe('WAIT', function () { + const db = new LimitDB({ + inMemory: true, + types, + flushInterval: '5m' + }); + + it('should work with a simple request', (done) => { + var now = 1425920267; + MockDate.set(now * 1000); + db.wait({ type: 'ip', key: '211.76.23.4' }, (err, response) => { + if (err) return done(err); + assert.ok(response.conformant); + assert.notOk(response.delayed); + + + assert.equal(response.remaining, 9); + assert.equal(response.reset, now + 1); + + done(); + }); + }); + + it('should be delayed when traffic is non conformant', function (done) { + db.take({ + type: 'ip', + key: '211.76.23.5', + count: 10 + }, (err) => { + if (err) return done(err); + const waitingSince = Date.now(); + db.wait({ + type: 'ip', + key: '211.76.23.5', + count: 3 + }, function (err, response) { + if (err) { return done(err); } + var waited = Date.now() - waitingSince; + assert.ok(response.conformant); + assert.ok(response.delayed); + assert.closeTo(waited, 600, 20); + done(); + }); + }); + }); + }); + +}); From edaefb3e760be88a326bd0c76f13350d7a670240 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jose=CC=81=20F=2E=20Romaniello?= Date: Thu, 30 Mar 2017 15:03:44 -0300 Subject: [PATCH 2/2] minor --- bench/same_takes_diff_ticks.js | 79 ++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 bench/same_takes_diff_ticks.js diff --git a/bench/same_takes_diff_ticks.js b/bench/same_takes_diff_ticks.js new file mode 100644 index 0000000..8fe80b3 --- /dev/null +++ b/bench/same_takes_diff_ticks.js @@ -0,0 +1,79 @@ +const bench = require('bench'); +const async = require('async'); +const _ = require('lodash'); +const rimraf = require('rimraf'); +const path = require('path'); +const fs = require('fs'); + +const LimitDB = require('../lib/db'); + +const types = { + ip: { + size: 10, + per_second: 5 + } +}; + +const dbBase = path.join(__dirname, 'db'); + +if (fs.existsSync(dbBase)) { + rimraf.sync(dbBase); +} + +fs.mkdirSync(dbBase); + +const db1 = new LimitDB({ + path: path.join(dbBase, 'db1'), + types +}); + + +const db2 = new LimitDB({ + path: path.join(dbBase, 'db2'), + types, + flushInterval: '5m' +}); + + +exports.compareCount = 20; +exports.countPerLap = 5; + +const fixture = _.range(exports.compareCount).map(() => { + return ['a', 'b', 'c', 'd', 'e']; +}); + +var lapOnTest1 = 0, lapOnTest2 = 0; + +exports.compare = { + 'same take on disk db' : function (done) { + const keys = fixture[lapOnTest1++]; + async.forEach(keys, (key, done) => { + db1.take({ + type: 'ip', + key + }, (err) => { + if (err) { console.error(err.message); process.exit(1); } + done(); + }); + }, done); + }, + 'same take on memory' : function (done) { + const keys = fixture[lapOnTest2++]; + async.forEach(keys, (key, done) => { + db2.take({ + type: 'ip', + key + }, (err) => { + if (err) { console.error(err.message); process.exit(1); } + done(); + }); + }, done); + } +}; + +exports.done = (results) => { + bench.show(results); + process.exit(0); +}; + +require("bench").runMain();