diff --git a/lib/mongodbQueue.js b/lib/mongodbQueue.js index ca8ccd7..565852c 100644 --- a/lib/mongodbQueue.js +++ b/lib/mongodbQueue.js @@ -5,7 +5,6 @@ var async = require('async'); var syncUtil = require('./util'); var debug = syncUtil.debug; var debugError = syncUtil.debugError; - /** * Make sure the real queue is created. Otherwise throws an error. */ @@ -38,8 +37,10 @@ function MongodbQueue(name, metrics, lock, opts) { this.lockName = opts.lockName || ('lock:sync:' + this.queueName); this.lockTimeout = opts.lockTimeout || 10000; this.mongodb = opts.mongodb; - this.queueOptions = {visibility: opts.visibility || 30}; - this.queueTTL = opts.queueMessagesTTL || 24*60*60; + this.queueOptions = { + visibility: opts.visibility || 30, + ttl: opts.queueMessagesTTL || 24*60*60 + }; this.queue; } @@ -72,59 +73,59 @@ MongodbQueue.prototype.create = function(cb) { } else { debug('[%s] lock %s acquired. Continue.', self.queueName, self.lockName); async.waterfall([ - function createIndexes(callback) { - debug('[%s] creating queue indexes', self.queueName); - self.queue.createIndexes(callback); - }, - function addExtraIdIndex(createdIndex, callback) { - debug('[%s] adding extra _id index for queue', self.queueName); - collection.createIndex({ deleted : 1, visible : 1, _id : 1}, callback); - }, - function addExtraAckIndex(createdIndex, callback) { - debug('[%s] adding extra ack index for queue', self.queueName); - collection.createIndex({ ack: 1, visible : 1, deleted : 1}, callback); - }, - function listIndexes(createdIndex, callback) { + // Check if there's already a TTL index for the 'deleted' field. + // If there is, and the TTL value is different, delete it so it can be + // recreated by the mongodb-queue module + function listIndexes(callback) { debug('[%s] list existing indexes', self.queueName); - collection.indexInformation({full: true}, callback); + collection.indexInformation({full: true}, function(err, indexInfo) { + if (err) { + debug('[%s] error getting indexInfo. skipping ttl index check: %s', self.queueName, err); + return callback(null, null); + } + return callback(null, indexInfo); + }); }, function checkIndexTTL(indexInfo, callback) { + if (!indexInfo) { + // skipping ttl index check + return callback(null, false); + } debug('[%s] found queue indexInfo : %j', self.queueName, indexInfo); var existingIndex = _.findWhere(indexInfo, {name: indexName}); - var needDrop = false; - var needCreate = false; if (!existingIndex) { - needCreate = true; - } else if (existingIndex.expireAfterSeconds !== self.queueTTL) { - needDrop = true; - needCreate = true; + return callback(null, false); + } else if (existingIndex.expireAfterSeconds !== self.queueOptions.ttl) { + return callback(null, true); } - return callback(null, needDrop, needCreate); }, - function dropTTLIndex(needDrop, needCreate, callback) { + function dropTTLIndex(needDrop, callback) { if (needDrop) { debug('[%s] dropping ttl index: %s', self.queueName, indexName); - collection.dropIndex(indexName, function(err){ - return callback(err, needCreate); + collection.dropIndex(indexName, function(err) { + return callback(err); }); } else { debug('[%s] skip dropping ttl index', self.queueName); - return callback(null, needCreate); + return callback(null); } }, - function createTTLIndex(needCreate, callback) { - if (needCreate) { - debug('[%s] creating ttl index: %s', self.queueName, indexName); - collection.createIndex({'deleted': 1}, {'expireAfterSeconds': self.queueTTL, 'background': true}, callback); - } else { - debug('[%s] skip creating ttl index', self.queueName); - return callback(); - } + function createIndexes(callback) { + debug('[%s] creating queue indexes', self.queueName); + self.queue.createIndexes(callback); + }, + function addExtraIdIndex(createdIndex, callback) { + debug('[%s] adding extra _id index for queue', self.queueName); + collection.createIndex({ deleted : 1, visible : 1, _id : 1}, callback); + }, + function addExtraAckIndex(createdIndex, callback) { + debug('[%s] adding extra ack index for queue', self.queueName); + collection.createIndex({ ack: 1, visible : 1, deleted : 1}, callback); } ], function(err){ if (err) { - debugError('[%s] failed to create queue index due to error: %s', self.queueName, err); + debugError('[%s] failed to create queue index due to error: %s %s', self.queueName, err, err.stack); return cb(err); } self.lock.release(self.lockName, lockCode, function(releaseErr){ diff --git a/package.json b/package.json index a2b571c..9369657 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fh-sync", - "version": "1.0.3", + "version": "1.0.4", "description": "FeedHenry Data Synchronization Server", "main": "index.js", "dependencies": { @@ -10,7 +10,7 @@ "fh-component-metrics": "2.7.0", "mongodb": "2.1.18", "mongodb-lock": "0.4.0", - "mongodb-queue": "2.2.0", + "mongodb-queue": "david-martin/mongodb-queue#ttl-index-01", "parse-duration": "0.1.1", "redis": "2.6.5", "underscore": "1.7.0"