Skip to content

Commit 6a05e55

Browse files
committed
Add peeking
Added peeking for messages without removing them untill explicitly done. This is great for situations where you want to be absolutely sure that the message was parsed and dealt with properly removing being removed from the queue.
1 parent cbe7788 commit 6a05e55

File tree

4 files changed

+140
-5
lines changed

4 files changed

+140
-5
lines changed

MSMQLib/MSMQInterface.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,47 @@ public async Task<object> ReceiveMessages(dynamic input)
9999
}
100100
}
101101

102+
/// <summary>
103+
/// Removes a message with the given id from the queue.
104+
/// </summary>
105+
/// <param name="input">The queue's path and message id.</param>
106+
public async Task<object> ReceiveMessageById(dynamic input)
107+
{
108+
var path = (string)input.path;
109+
var id = (string)input.id;
110+
111+
MessageQueue queue = new MessageQueue(path);
112+
queue.Formatter = new BinaryMessageFormatter();
113+
queue.MessageReadPropertyFilter.SetAll();
114+
115+
try
116+
{
117+
var msg = (MSMQMessage)queue.ReceiveById(id);
118+
return true;
119+
}
120+
catch (InvalidOperationException) // Thrown if message with given id is not in queue.
121+
{
122+
return false;
123+
}
124+
}
125+
126+
/// <summary>
127+
/// Peeks a message from a queue without removing it.
128+
/// </summary>
129+
/// <param name="path">The queue's path.</param>
130+
public async Task<object> Peek(string path)
131+
{
132+
MessageQueue queue = new MessageQueue(path);
133+
queue.Formatter = new BinaryMessageFormatter();
134+
queue.MessageReadPropertyFilter.SetAll();
135+
136+
var msg = await Task.Factory.FromAsync<Message>(
137+
queue.BeginPeek(),
138+
queue.EndPeek);
139+
140+
return (MSMQMessage)msg;
141+
}
142+
102143
/// <summary>
103144
/// Gets all messages from a queue without removing them.
104145
/// </summary>

proxy.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ const baseOptions = {
77
};
88

99
function getMethod(methodName) {
10-
return edge.func(Object.assign({}, baseOptions, {methodName}));
10+
return edge.func(Object.assign({}, baseOptions, { methodName }));
1111
}
1212

1313
export var queueProxy = {
1414
exists: getMethod('ExistsQueue'),
1515
create: getMethod('CreateQueue'),
1616
send: getMethod('SendMessage'),
1717
receive: getMethod('ReceiveMessages'),
18+
peek: getMethod('Peek'),
19+
remove: getMethod('ReceiveMessageById'),
1820
list: getMethod('GetAllMessages'),
19-
clear: getMethod('PurgeQueue'),
20-
connectRemote: getMethod('ConnectRemote')
21+
clear: getMethod('PurgeQueue'),
22+
connectRemote: getMethod('ConnectRemote')
2123
};

queue.js

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,49 @@ export default class Queue extends EventEmitter {
4747
});
4848
}
4949

50+
startPeeking() {
51+
if (this.receiving) {
52+
throw new Error('Already receiving messages from this queue');
53+
}
54+
55+
this.receiving = true;
56+
57+
const start = () => {
58+
this.peek().then(msg => {
59+
this.emit('peek', {
60+
msg,
61+
next: async () => {
62+
await this.remove(msg.id);
63+
start();
64+
}
65+
})
66+
})
67+
};
68+
69+
start();
70+
}
71+
72+
peek() {
73+
return new Promise((resolve, reject) => {
74+
queueProxy.peek(this.path, (error, msg) => {
75+
if (error) reject(error);
76+
else resolve(msg);
77+
});
78+
});
79+
}
80+
81+
remove(id) {
82+
return new Promise((resolve, reject) => {
83+
queueProxy.remove({
84+
path: this.path,
85+
id
86+
}, (error, result) => {
87+
if (error) reject(error);
88+
else resolve(result);
89+
});
90+
})
91+
}
92+
5093
send(message, cb) {
5194
let formattedMessage = JSON.stringify(message);
5295

readme.md

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ NPM package is published as updated-node-msmq. https://www.npmjs.com/package/upd
99
## Differences from `node-msmq`
1010

1111
* Support for Node.Js 6.x, 7.x, 8.x, 9.x, 10.x
12-
* Support to push objects to the queue instead of just strings.
12+
* Support to push objects to the queue instead of just strings.
1313
* Support to send/receive messages to/from a queue on a **remote** machine.
1414

1515
## Install
@@ -51,6 +51,55 @@ queue.on('receive', (msg) => {
5151
queue.startReceiving();
5252
```
5353

54+
### Peek messages
55+
56+
Start listeng for messages without removing them until you are ready to do so. Callbacks for the 'peek' event receive an object containing two properties:
57+
58+
* `msg` The MSMQ Message.
59+
* `next()` Pops the message from the queue and start peeking for the next one.
60+
61+
```js
62+
queue.on('peek', (event) => {
63+
console.log(event.msg.body);
64+
65+
// Do some logic that might fail.
66+
if (Math.random() > 0.5) {
67+
throw Error('number is too high!');
68+
}
69+
70+
// Remove the message from the queue and peek for next message.
71+
event.next();
72+
})
73+
74+
queue.startPeeking();
75+
```
76+
77+
### Peek
78+
79+
Promised based method to peek for a message in the queue without removing it. Resolves to a MSMQMessage.
80+
81+
```js
82+
queue.peek().then(msg => console.log(msg.body));
83+
84+
//or
85+
86+
let msg = await queue.peek();
87+
```
88+
89+
### Remove
90+
91+
Promise based method to remove a message with given id from the queue. Resolves to `true` if message was removed and `false` if message didn't exist in the queue.
92+
93+
```js
94+
queue.remove('12345')
95+
.then(status => console.log(status ? 'Message was removed'
96+
: 'Message was not in queue'));
97+
98+
//or
99+
100+
let status = await queue.remove('12345');
101+
```
102+
54103
### Get all messages
55104

56105
Gets all messages without removing them from queue.
@@ -124,7 +173,7 @@ var queue = msmq.connectToRemoteQueue('.\\Private$\\MyAwesomeQueue');
124173
var messages = queue.getAllMessages();
125174
```
126175

127-
#### Note:
176+
#### Note:
128177
* Creating a queue / Checking if a queue exists on a remote machine is currently not supported by MSMQ.
129178
* To communicate with a remote queue, MSMQ should be enabled in the sender's machine too. Also, in the _Security_ tab of the queue on the remote machine should have the appropriate permissions set for _Everyone_ and _ANONYMOUS LOGON_.
130179
* The queue should already be created on the remote machine.

0 commit comments

Comments
 (0)