From 170aa26d9ec3407a68fd7260eb6984630f934ed7 Mon Sep 17 00:00:00 2001 From: Kristijan Trajkovski Date: Tue, 17 Jun 2014 10:21:58 +0200 Subject: [PATCH 01/14] Added redis clients function --- index.js | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++- package.json | 2 +- 2 files changed, 103 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 46d9a86..679d32c 100644 --- a/index.js +++ b/index.js @@ -45,6 +45,7 @@ function adapter(uri, opts){ var port = Number(opts.port || 6379); var pub = opts.pubClient; var sub = opts.subClient; + var data = opts.dataClient; var prefix = opts.key || 'socket.io'; // init clients if needed @@ -52,12 +53,15 @@ function adapter(uri, opts){ if (!sub) sub = socket ? redis(socket, { detect_buffers: true }) : redis(port, host, {detect_buffers: true}); + if (!data) data = socket ? redis(socket) : redis(port, host); // this server's key var uid = uid2(6); var key = prefix + '#' + uid; + var self; + /** * Adapter constructor. * @@ -68,7 +72,7 @@ function adapter(uri, opts){ function Redis(nsp){ Adapter.call(this, nsp); - var self = this; + self = this; sub.psubscribe(prefix + '#*', function(err){ if (err) self.emit('error', err); }); @@ -100,6 +104,75 @@ function adapter(uri, opts){ this.broadcast.apply(this, args); }; + /** + * Adds a socket from a room. + * + * @param {String} socket id + * @param {String} room name + * @param {Function} callback + * @api public + */ + + Redis.prototype.add = function(id, room, fn){ + Adapter.prototype.add.call(this, id, room, fn); + // this.sids[id] = this.sids[id] || {}; + // this.sids[id][room] = true; + // this.rooms[room] = this.rooms[room] || []; + // this.rooms[room][id] = true; + data.sadd(room, id, fn); + data.sadd(id, room, fn); + + // if (fn) process.nextTick(fn.bind(null, null)); + }; + + /** + * Removes a socket from a room. + * + * @param {String} socket id + * @param {String} room name + * @param {Function} callback + * @api public + */ + + Redis.prototype.del = function(id, room, fn){ + Adapter.prototype.del.call(this, room, fn); + data.multi() + .srem(room, id) + .srem(id, room) + .exec(fn); + }; + + + /** + * Removes a socket from all rooms it's joined. + * + * @param {String} socket id + * @api public + */ + + Redis.prototype.delAll = function(id, fn){ + Adapter.prototype.delAll.call(this, id, fn); + + data.smembers(id, function(err, replies){ + var multi = data.multi(); + for(var i=0; i Date: Tue, 17 Jun 2014 12:08:33 +0200 Subject: [PATCH 02/14] Callback fixes --- index.js | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/index.js b/index.js index 679d32c..24e5c11 100644 --- a/index.js +++ b/index.js @@ -104,6 +104,7 @@ function adapter(uri, opts){ this.broadcast.apply(this, args); }; + /** * Adds a socket from a room. * @@ -114,15 +115,18 @@ function adapter(uri, opts){ */ Redis.prototype.add = function(id, room, fn){ - Adapter.prototype.add.call(this, id, room, fn); + Adapter.prototype.add.call(this, id, room); // this.sids[id] = this.sids[id] || {}; // this.sids[id][room] = true; // this.rooms[room] = this.rooms[room] || []; // this.rooms[room][id] = true; - data.sadd(room, id, fn); - data.sadd(id, room, fn); + data.multi() + .sadd(room, id) + .sadd(id, room) + .exec(function(){ + if (fn) process.nextTick(fn.bind(null, null)); + }); - // if (fn) process.nextTick(fn.bind(null, null)); }; /** @@ -135,11 +139,13 @@ function adapter(uri, opts){ */ Redis.prototype.del = function(id, room, fn){ - Adapter.prototype.del.call(this, room, fn); + Adapter.prototype.del.call(this, room); data.multi() .srem(room, id) .srem(id, room) - .exec(fn); + .exec(function(){ + if (fn) process.nextTick(fn.bind(null, null)); + }); }; @@ -151,7 +157,7 @@ function adapter(uri, opts){ */ Redis.prototype.delAll = function(id, fn){ - Adapter.prototype.delAll.call(this, id, fn); + Adapter.prototype.delAll.call(this, id); data.smembers(id, function(err, replies){ var multi = data.multi(); @@ -162,7 +168,7 @@ function adapter(uri, opts){ multi.exec(fn); }); }; - + /** * Get all clients in room. * From 7c63228442a2571f644588691792457085dd9850 Mon Sep 17 00:00:00 2001 From: Kristijan Trajkovski Date: Tue, 17 Jun 2014 12:54:48 +0200 Subject: [PATCH 03/14] Updated package --- index.js | 34 ++-------------------------------- package.json | 8 ++++---- 2 files changed, 6 insertions(+), 36 deletions(-) diff --git a/index.js b/index.js index 24e5c11..76a727f 100644 --- a/index.js +++ b/index.js @@ -60,8 +60,6 @@ function adapter(uri, opts){ var uid = uid2(6); var key = prefix + '#' + uid; - var self; - /** * Adapter constructor. * @@ -72,7 +70,7 @@ function adapter(uri, opts){ function Redis(nsp){ Adapter.call(this, nsp); - self = this; + var self = this; sub.psubscribe(prefix + '#*', function(err){ if (err) self.emit('error', err); }); @@ -104,7 +102,6 @@ function adapter(uri, opts){ this.broadcast.apply(this, args); }; - /** * Adds a socket from a room. * @@ -179,6 +176,7 @@ function adapter(uri, opts){ data.smembers(room, fn); }; + /** * Broadcasts a packet. * @@ -193,34 +191,6 @@ function adapter(uri, opts){ if (!remote) pub.publish(key, msgpack.encode([packet, opts])); }; - // Set up exit handlers so we can clean up this process's redis data before exiting - - process.stdin.resume(); //so the program will not close instantly - function exitHandler(options, err){ - var i; - var multi = data.multi(); - var execDone = false; - - var roomIds = Object.keys(self.rooms); - var socketIds = Object.keys(self.sids); - - for(i=0; i Date: Tue, 17 Jun 2014 13:57:10 +0200 Subject: [PATCH 04/14] Fixed bugs, testing completeness --- index.js | 35 +++++++++- test/index.js | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 76a727f..4b38278 100644 --- a/index.js +++ b/index.js @@ -67,10 +67,11 @@ function adapter(uri, opts){ * @api public */ + var self = this; + function Redis(nsp){ + self = this; Adapter.call(this, nsp); - - var self = this; sub.psubscribe(prefix + '#*', function(err){ if (err) self.emit('error', err); }); @@ -191,6 +192,36 @@ function adapter(uri, opts){ if (!remote) pub.publish(key, msgpack.encode([packet, opts])); }; + + // Set up exit handlers so we can clean up this process's redis data before exiting + + process.stdin.resume(); //so the program will not close instantly + function exitHandler(options, err){ + var i; + var multi = data.multi(); + var execDone = false; + + var roomIds = Object.keys(self.rooms); + var socketIds = Object.keys(self.sids); + for(i=0; i Date: Tue, 17 Jun 2014 13:58:38 +0200 Subject: [PATCH 05/14] removed commented testing lines --- test/index.js | 81 --------------------------------------------------- 1 file changed, 81 deletions(-) diff --git a/test/index.js b/test/index.js index 687e83d..9244f25 100644 --- a/test/index.js +++ b/test/index.js @@ -216,85 +216,4 @@ describe('socket.io-redis', function(){ }); }); }); - - // describe('clients', function(){ - // var sio; - - // beforeEach(function(done){ - // this.redisClients = []; - // var self = this; - - // async.times(2, function(n, next){ - // var pub = redis.createClient(); - // var sub = redis.createClient(null, null, {detect_buffers: true}); - // var srv = http(); - // sio = io(srv, {adapter: redisAdapter({pubClient: pub, subClient: sub})}); - // self.redisClients.push(pub, sub); - - // srv.listen(function(){ - // ['/', '/nsp'].forEach(function(name){ - // sio.of(name).on('connection', function(socket){ - // socket.on('join', function(callback){ - // socket.join('room', callback); - // }); - - // socket.on('socket broadcast', function(data){ - // socket.broadcast.to('room').emit('broadcast', data); - // }); - - // socket.on('namespace broadcast', function(data){ - // sio.of('/nsp').in('room').emit('broadcast', data); - // }); - // }); - // }); - - // async.parallel([ - // function(callback){ - // async.times(2, function(n, next){ - // var socket = client(srv, '/nsp', {forceNew: true}); - // socket.on('connect', function(){ - // socket.emit('join', function(){ - // next(null, socket); - // }); - // }); - // }, callback); - // }, - // function(callback){ - // // a socket of the same namespace but not joined in the room. - // var socket = client(srv, '/nsp', {forceNew: true}); - // socket.on('connect', function(){ - // socket.on('broadcast', function(){ - // throw new Error('Called unexpectedly: different room'); - // }); - // callback(); - // }); - // }, - // function(callback){ - // // a socket joined in a room but for a different namespace. - // var socket = client(srv, {forceNew: true}); - // socket.on('connect', function(){ - // socket.on('broadcast', function(){ - // throw new Error('Called unexpectedly: different namespace'); - // }); - // socket.emit('join', function(){ - // callback(); - // }); - // }); - // } - // ], function(err, results){ - // next(err, results[0]); - // }); - // }); - // }, function(err, sockets){ - // self.sockets = sockets.reduce(function(a, b){ return a.concat(b); }); - // done(err); - // }); - // }); - // it('should get all clients in a room', function(done){ - // sio.sockets.clients('room', function(err, clients){ - // console.log(clients); - // done(); - // }); - // }); - // }); }); From 215d26316af22a379c9f00c1644947eb37d18a3d Mon Sep 17 00:00:00 2001 From: Kristijan Trajkovski Date: Thu, 19 Jun 2014 13:02:27 +0200 Subject: [PATCH 06/14] Changed to standards-recommended prototype inheritance, added prefixes to key names, attempt Travis build --- index.js | 32 ++++++++++++++------------------ package.json | 6 +++--- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/index.js b/index.js index 4b38278..241aa2c 100644 --- a/index.js +++ b/index.js @@ -82,7 +82,7 @@ function adapter(uri, opts){ * Inherits from `Adapter`. */ - Redis.prototype.__proto__ = Adapter.prototype; + Redis.prototype = Object.create(Adapter.prototype); /** * Called with a subscription message @@ -96,9 +96,9 @@ function adapter(uri, opts){ var args = msgpack.decode(msg); if (args[0] && args[0].nsp === undefined) - args[0].nsp = '/' + args[0].nsp = '/'; - if (!args[0] || args[0].nsp != this.nsp.name) return debug('ignore different namespace') + if (!args[0] || args[0].nsp != this.nsp.name) return debug('ignore different namespace'); args.push(true); this.broadcast.apply(this, args); }; @@ -114,13 +114,9 @@ function adapter(uri, opts){ Redis.prototype.add = function(id, room, fn){ Adapter.prototype.add.call(this, id, room); - // this.sids[id] = this.sids[id] || {}; - // this.sids[id][room] = true; - // this.rooms[room] = this.rooms[room] || []; - // this.rooms[room][id] = true; data.multi() - .sadd(room, id) - .sadd(id, room) + .sadd(prefix + '#' + room, id) + .sadd(prefix + '#' + id, room) .exec(function(){ if (fn) process.nextTick(fn.bind(null, null)); }); @@ -139,8 +135,8 @@ function adapter(uri, opts){ Redis.prototype.del = function(id, room, fn){ Adapter.prototype.del.call(this, room); data.multi() - .srem(room, id) - .srem(id, room) + .srem(prefix + '#' + room, id) + .srem(prefix + '#' + id, room) .exec(function(){ if (fn) process.nextTick(fn.bind(null, null)); }); @@ -157,12 +153,12 @@ function adapter(uri, opts){ Redis.prototype.delAll = function(id, fn){ Adapter.prototype.delAll.call(this, id); - data.smembers(id, function(err, replies){ + data.smembers(id, function(err, rooms){ var multi = data.multi(); - for(var i=0; i Date: Thu, 7 Aug 2014 21:02:51 +0200 Subject: [PATCH 07/14] Call to Adapter.del corrected The client id was missing. --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 241aa2c..5fcd126 100644 --- a/index.js +++ b/index.js @@ -133,7 +133,7 @@ function adapter(uri, opts){ */ Redis.prototype.del = function(id, room, fn){ - Adapter.prototype.del.call(this, room); + Adapter.prototype.del.call(this, id, room); data.multi() .srem(prefix + '#' + room, id) .srem(prefix + '#' + id, room) From 2f2aaa86fe7931c149024e812f68d3dd9149d243 Mon Sep 17 00:00:00 2001 From: olive75 Date: Thu, 7 Aug 2014 22:24:39 +0200 Subject: [PATCH 08/14] Corrected key deletion on leave --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 241aa2c..3af7e34 100644 --- a/index.js +++ b/index.js @@ -153,7 +153,7 @@ function adapter(uri, opts){ Redis.prototype.delAll = function(id, fn){ Adapter.prototype.delAll.call(this, id); - data.smembers(id, function(err, rooms){ + data.smembers(prefix + '#' + id, function(err, rooms){ var multi = data.multi(); for(var i=0; i Date: Wed, 10 Sep 2014 12:27:06 +0200 Subject: [PATCH 09/14] Update README.md --- README.md | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e1b434b..77393e5 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,35 @@ By running socket.io with the `socket.io-redis` adapter you can run multiple socket.io instances in different processes or servers that can all broadcast and emit events to and from each other. +`socket.io-redis` use Redis pub/sub mechanism to route events to different nodes/servers and +store rooms and sockets ids in Redis sets. + If you need to emit events to socket.io instances from a non-socket.io process, you should use [socket.io-emitter](http:///github.com/Automattic/socket.io-emitter). +## Known limitation + +**Warning! Current module implementation doesn't cleanup Redis storage on exit.** + +Consequence is that in a multi-node/server configuration with the out-of-the-box module, +shutting down a node process will let sockets and rooms data remain in Redis even if the +current sockets are now probably not longer connected. + +The reason of this limitation is the non ability for node to execute asynchronous tasks (like +Redis queries) on exit. + +**It is strongely adviced to implement your proper cleanup on exit or to take this point in consideration in your implementation**. + +## Stored schema + +For each new socket connected to a node the following HSET is created: + +... + +For each new room joined by a user to a node the following HSET is created: + +... + ## API ### adapter(uri[, opts]) @@ -36,8 +62,10 @@ The following options are allowed: be used instead of the host and port options if specified. - `pubClient`: optional, the redis client to publish events on - `subClient`: optional, the redis client to subscribe to events on +- `dataClient`: optional, the redis client used to store and read socket.io + sockets/rooms data -If you decide to supply `pubClient` and `subClient`, make sure you use +If you decide to supply `pubClient`, `subClient` or `dataClient` make sure you use [node_redis](https://github.com/mranney/node_redis) as a client or one with an equivalent API. From d48e77f0ad97507fb77bb5b20fca1fb6ec7376cc Mon Sep 17 00:00:00 2001 From: Damien Brugne Date: Wed, 10 Sep 2014 12:44:24 +0200 Subject: [PATCH 10/14] Update README.md --- README.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 77393e5..ca7d78f 100644 --- a/README.md +++ b/README.md @@ -36,13 +36,17 @@ Redis queries) on exit. ## Stored schema -For each new socket connected to a node the following HSET is created: +Every new keys in Redis are created with "socket.io" prefix (customizable with the *key* option). -... +For each new socket connected to a node a SET is created with the key: socket.io#{{socket uid}}. On creation the set contain only a record, the socket uid String. -For each new room joined by a user to a node the following HSET is created: +For each new room created by socket.io (generally when a user enter in) a SET is created with the key: socket.io#{{room id}} -... +Then each time a socket join a room the room id string is added to user Redis SET **and** socket uid is added to room Redis SET. +Also when a socket leave a room the corresponding record (socket uid) is removed from the room Redis SET and the room id is removed from socket SET. + +On disconnect corresponding user socket SET is automatically removed and corresponding record also removed from rooms SET. +Room SET are removed automatically when no more socket remain inside. ## API From 014b47e87885212f6ec30cd88e0de5247449a8ce Mon Sep 17 00:00:00 2001 From: Damien Brugne Date: Mon, 15 Sep 2014 10:46:55 +0200 Subject: [PATCH 11/14] Update README.md --- README.md | 62 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index ca7d78f..21b8548 100644 --- a/README.md +++ b/README.md @@ -21,32 +21,62 @@ store rooms and sockets ids in Redis sets. If you need to emit events to socket.io instances from a non-socket.io process, you should use [socket.io-emitter](http:///github.com/Automattic/socket.io-emitter). -## Known limitation +## Stored schema -**Warning! Current module implementation doesn't cleanup Redis storage on exit.** +The module store two different entity in Redis: **socket** and **room**. +Every key is prefixed with "socket.io". Prefix is customizable with the *key* option. -Consequence is that in a multi-node/server configuration with the out-of-the-box module, -shutting down a node process will let sockets and rooms data remain in Redis even if the -current sockets are now probably not longer connected. +### socket -The reason of this limitation is the non ability for node to execute asynchronous tasks (like -Redis queries) on exit. +The module create a new Redis SET for each new socket. -**It is strongely adviced to implement your proper cleanup on exit or to take this point in consideration in your implementation**. +The socket SET key is defined as __PREFIX__#__SOCKET_ID__ (e.g.: *socket.io#951wMmbBjkREmCapAAAD*). +The socket SET is created with only one record: the socket ID string. -## Stored schema +Then each time this socket join/leave a room module add/remove a Redis record in SET. + +Example for a socket with the ID *951wMmbBjkREmCapAAAD* in *foo* and *bar* rooms: + +``` +socket.io#951wMmbBjkREmCapAAAD + -> 951wMmbBjkREmCapAAAD + -> foo + -> bar +``` + +### room + +Each time a room is needed (= a socket join a room that not already exists) the module create a new Redis SET. -Every new keys in Redis are created with "socket.io" prefix (customizable with the *key* option). +The room SET key is defined as __PREFIX__#__ROOM_NAME__ (e.g.: *socket.io#foo*). +The room SET contain the socket IDs of the room sockets. -For each new socket connected to a node a SET is created with the key: socket.io#{{socket uid}}. On creation the set contain only a record, the socket uid String. +Then each time a socket join/leave the room the module add/remove the corresponding Redis record from the SET. + +Example for a room *foo* with the following socket in *951wMmbBjkREmCapAAAD*, *566Mm_BjkREmRff456*: + +``` +socket.io#foo + -> 951wMmbBjkREmCapAAAD + -> 566Mm_BjkREmRff456 +``` + +As with native adapter the not longer needed room SET are deleted automatically (except on application +exit, see below). + +## Known limitation + +**Warning! Current module implementation doesn't cleanup Redis storage on exit.** -For each new room created by socket.io (generally when a user enter in) a SET is created with the key: socket.io#{{room id}} +Consequence is that in a multi-node/server configuration with the out-of-the-box module, +shutting down a node process will let sockets and rooms SET remain in Redis even if the +current sockets are not longer connected. -Then each time a socket join a room the room id string is added to user Redis SET **and** socket uid is added to room Redis SET. -Also when a socket leave a room the corresponding record (socket uid) is removed from the room Redis SET and the room id is removed from socket SET. +The reason is the non ability for node to execute asynchronous tasks (like Redis queries) +on exit. -On disconnect corresponding user socket SET is automatically removed and corresponding record also removed from rooms SET. -Room SET are removed automatically when no more socket remain inside. +So, every developer should implement his proper cleanup logic in the context of +his particular project. ## API From 60fa002ddc777749aedde82fc5d68106650f3c13 Mon Sep 17 00:00:00 2001 From: Damien Brugne Date: Mon, 15 Sep 2014 10:50:05 +0200 Subject: [PATCH 12/14] Update README.md --- README.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 21b8548..23d1da6 100644 --- a/README.md +++ b/README.md @@ -23,15 +23,19 @@ process, you should use [socket.io-emitter](http:///github.com/Automattic/socket ## Stored schema -The module store two different entity in Redis: **socket** and **room**. +The module store two different entities in Redis: **socket** and **room**. + +Each as a Redis SET. + Every key is prefixed with "socket.io". Prefix is customizable with the *key* option. ### socket -The module create a new Redis SET for each new socket. +The module creates a new Redis SET for each new socket. + +The socket SET key is defined as *{{PREFIX}}*#*{{SOCKET_ID}}* (e.g.: *socket.io#951wMmbBjkREmCapAAAD*). -The socket SET key is defined as __PREFIX__#__SOCKET_ID__ (e.g.: *socket.io#951wMmbBjkREmCapAAAD*). -The socket SET is created with only one record: the socket ID string. +The socket SET is created with one record: the socket ID string. Then each time this socket join/leave a room module add/remove a Redis record in SET. @@ -48,7 +52,7 @@ socket.io#951wMmbBjkREmCapAAAD Each time a room is needed (= a socket join a room that not already exists) the module create a new Redis SET. -The room SET key is defined as __PREFIX__#__ROOM_NAME__ (e.g.: *socket.io#foo*). +The room SET key is defined as *{{PREFIX}}*#*{{ROOM_NAME }}* (e.g.: *socket.io#foo*). The room SET contain the socket IDs of the room sockets. Then each time a socket join/leave the room the module add/remove the corresponding Redis record from the SET. From b2d352adc1b22bed50cc12394a8ba11f866ec0c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bar=C4=B1=C5=9F=20Soner=20U=C5=9Fakl=C4=B1?= Date: Fri, 21 Nov 2014 00:53:19 -0500 Subject: [PATCH 13/14] added roomClients method --- index.js | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/index.js b/index.js index bf1f004..758ebb7 100644 --- a/index.js +++ b/index.js @@ -173,6 +173,15 @@ function adapter(uri, opts){ data.smembers(prefix + '#' + room, fn); }; + /** + * Get all rooms the client is in. + * + * @param {String} client id + * @api public + */ + Redis.prototype.roomClients = function(id, fn){ + data.smembers(prefix + '#' + id, fn); + }; /** * Broadcasts a packet. From 1e840fa550da0c432ea9a103e3b1537991b012ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bar=C4=B1=C5=9F=20Soner=20U=C5=9Fakl=C4=B1?= Date: Fri, 21 Nov 2014 18:05:00 -0500 Subject: [PATCH 14/14] test for roomClients --- test/index.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/index.js b/test/index.js index 9244f25..dfc1558 100644 --- a/test/index.js +++ b/test/index.js @@ -215,5 +215,18 @@ describe('socket.io-redis', function(){ done(); }); }); + + it('should get all rooms the clients are in', function(done){ + var self = this; + self.sio.sockets.clients('room', function(err, clients){ + expect(clients).to.be.an('array'); + async.each(clients, function(client, next) { + self.sio.sockets.roomClients(client, function(err, rooms) { + expect(rooms).to.be.an('array'); + next(); + }); + }, done); + }); + }); }); });