Skip to content

Commit e10ab25

Browse files
committed
Support for get and get-put under local transaction
1 parent 9cd66b3 commit e10ab25

File tree

4 files changed

+260
-14
lines changed

4 files changed

+260
-14
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ your own error handling or logging.
118118
* Send a message as Persistent or NonPersistent - [deliverymode_test.go](deliverymode_test.go)
119119
* Get by CorrelationID - [getbycorrelid_test.go](getbycorrelid_test.go)
120120
* Request/reply messaging pattern - [requestreply_test.go](requestreply_test.go)
121+
* Send and receive under a local transaction - [local_transaction_test.go](local_transaction_test.go)
121122
* Sending a message that expires after a period of time - [timetolive_test.go](timetolive_test.go)
122123
* Handle error codes returned by the queue manager - [sample_errorhandling_test.go](sample_errorhandling_test.go)
123124

local_transaction_test.go

Lines changed: 250 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,253 @@ func TestPutTransaction(t *testing.T) {
125125

126126
}
127127

128-
// get-transaction
129-
// - get without transaction, immediately disappears
130-
// - get multiple with transaction, immedatiately disappears
131-
// - rollback, multiple reappear
132-
// - get multiple with transaction, commit, not available
133-
134-
// get-put transaction
135-
// - place initial message on queue
136-
// - get message under transaction, put reply message to different queue
137-
// - neither request nor reply message is available
138-
// - rollback; request message is available, reply message is not
139-
// - (again) get message under transaction, put reply message to different queue
140-
// - commit; request message is gone, and reply message is available
128+
/**
129+
* Test the behaviour of receiving a message under a transaction.
130+
*
131+
* - get without transaction, immediately disappears
132+
* - get multiple with transaction, immediately disappears
133+
* - rollback, multiple reappear
134+
* - get with transaction, commit, not available
135+
* - receive under transaction, close connection - should be available again (rollback)
136+
*/
137+
func TestGetTransaction(t *testing.T) {
138+
139+
// Create a ConnectionFactory using some property files
140+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
141+
assert.Nil(t, cfErr)
142+
143+
// Creates a connection to the queue manager.
144+
untransactedContext, errCtx := cf.CreateContext()
145+
assert.Nil(t, errCtx)
146+
if untransactedContext != nil {
147+
defer untransactedContext.Close()
148+
}
149+
150+
transactedContext, errCtx := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
151+
assert.Nil(t, errCtx)
152+
if transactedContext != nil {
153+
defer transactedContext.Close()
154+
}
155+
156+
// Create queue objects that points at an IBM MQ queue
157+
queueName := "DEV.QUEUE.1"
158+
unQueue := untransactedContext.CreateQueue(queueName)
159+
trQueue := transactedContext.CreateQueue(queueName)
160+
161+
// Create an transacted consumer
162+
transactedConsumer, errCons := transactedContext.CreateConsumer(trQueue)
163+
assert.Nil(t, errCons)
164+
if transactedConsumer != nil {
165+
defer transactedConsumer.Close()
166+
}
167+
168+
// Create an untransacted consumer
169+
untransactedConsumer, errCons := untransactedContext.CreateConsumer(unQueue)
170+
assert.Nil(t, errCons)
171+
if untransactedConsumer != nil {
172+
defer untransactedConsumer.Close()
173+
}
174+
175+
// Send an untransacted message and check it is immediately available for untransacted receive
176+
untransactedProducer := untransactedContext.CreateProducer().SetTimeToLive(20000)
177+
bodyTxt := "untransacted-get"
178+
errSend := untransactedProducer.SendString(unQueue, bodyTxt)
179+
assert.Nil(t, errSend)
180+
rcvBody, errRcv := untransactedConsumer.ReceiveStringBodyNoWait()
181+
assert.Nil(t, errRcv)
182+
assert.Equal(t, bodyTxt, *rcvBody)
183+
rcvBody, errRcv = transactedConsumer.ReceiveStringBodyNoWait()
184+
assert.Nil(t, errRcv)
185+
assert.Nil(t, rcvBody) // Has been consumed by the untransacted consumer
186+
187+
// get multiple with transaction, immediately disappears
188+
bodyTxt1 := "transacted-get-1"
189+
bodyTxt2 := "transacted-get-2"
190+
errSend = untransactedProducer.SendString(unQueue, bodyTxt1)
191+
assert.Nil(t, errSend)
192+
errSend = untransactedProducer.SendString(unQueue, bodyTxt2)
193+
assert.Nil(t, errSend)
194+
195+
rcvBody, errRcv = transactedConsumer.ReceiveStringBodyNoWait()
196+
assert.Nil(t, errRcv)
197+
assert.Equal(t, bodyTxt1, *rcvBody)
198+
rcvBody, errRcv = transactedConsumer.ReceiveStringBodyNoWait()
199+
assert.Nil(t, errRcv)
200+
assert.Equal(t, bodyTxt2, *rcvBody)
201+
202+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
203+
assert.Nil(t, errRcv)
204+
assert.Nil(t, rcvBody) // Message is not available (consumed pending transaction)
205+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
206+
assert.Nil(t, errRcv)
207+
assert.Nil(t, rcvBody)
208+
209+
// rollback, messages reappear
210+
transactedContext.Rollback() // puts the two messages back on the queue
211+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
212+
assert.Nil(t, errRcv)
213+
assert.Equal(t, bodyTxt1, *rcvBody)
214+
215+
// get the second reappeared message with transaction, commit, not available
216+
rcvBody, errRcv = transactedConsumer.ReceiveStringBodyNoWait()
217+
assert.Nil(t, errRcv)
218+
assert.Equal(t, bodyTxt2, *rcvBody)
219+
transactedContext.Commit() // commit the consumption of the one message.
220+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
221+
assert.Nil(t, errRcv)
222+
assert.Nil(t, rcvBody) // No message should be available
223+
224+
// receive under transaction, close connection - should be available again (rollback)
225+
bodyTxt3 := "transacted-get-3"
226+
errSend = untransactedProducer.SendString(unQueue, bodyTxt3)
227+
assert.Nil(t, errSend)
228+
rcvBody, errRcv = transactedConsumer.ReceiveStringBodyNoWait()
229+
assert.Nil(t, errRcv)
230+
assert.Equal(t, bodyTxt3, *rcvBody)
231+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
232+
assert.Nil(t, errRcv)
233+
assert.Nil(t, rcvBody) // message not available
234+
transactedContext.Close() // causes rollback
235+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
236+
assert.Nil(t, errRcv)
237+
assert.Equal(t, bodyTxt3, *rcvBody) // message now available
238+
239+
}
240+
241+
/**
242+
* Test the behaviour of receiving and sending a message under the same local transaction.
243+
*
244+
* - place initial message on queue
245+
* - get message under transaction, put reply message to different queue
246+
* - neither request nor reply message is available
247+
* - rollback; request message is available, reply message is not
248+
* - (again) get message under transaction, put reply message to different queue
249+
* - commit; request message is gone, and reply message is available
250+
*/
251+
func TestPutGetTransaction(t *testing.T) {
252+
253+
// Create a ConnectionFactory using some property files
254+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
255+
assert.Nil(t, cfErr)
256+
257+
// Creates a connection to the queue manager.
258+
untransactedContext, errCtx := cf.CreateContext()
259+
assert.Nil(t, errCtx)
260+
if untransactedContext != nil {
261+
defer untransactedContext.Close()
262+
}
263+
264+
transactedContext, errCtx := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
265+
assert.Nil(t, errCtx)
266+
if transactedContext != nil {
267+
defer transactedContext.Close()
268+
}
269+
270+
senderTransactedContext, errCtx := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
271+
assert.Nil(t, errCtx)
272+
if senderTransactedContext != nil {
273+
defer senderTransactedContext.Close()
274+
}
275+
276+
// Create Request + Reply queue objects that points at IBM MQ queues
277+
reqQueueName := "DEV.QUEUE.1"
278+
unReqQueue := untransactedContext.CreateQueue(reqQueueName)
279+
trReqQueue := transactedContext.CreateQueue(reqQueueName)
280+
281+
replyQueueName := "DEV.QUEUE.2"
282+
unReplyQueue := untransactedContext.CreateQueue(replyQueueName)
283+
trReplyQueue := transactedContext.CreateQueue(replyQueueName)
284+
285+
// Create an unrelated transacted producer (different connection)
286+
transactedReqProducer := senderTransactedContext.CreateProducer().SetTimeToLive(20000)
287+
288+
// Create an transacted consumer for the request queue
289+
transactedReqConsumer, errCons := transactedContext.CreateConsumer(trReqQueue)
290+
assert.Nil(t, errCons)
291+
if transactedReqConsumer != nil {
292+
defer transactedReqConsumer.Close()
293+
}
294+
295+
// Create an untransacted consumer for the request queue
296+
untransactedReqConsumer, errCons := untransactedContext.CreateConsumer(unReqQueue)
297+
assert.Nil(t, errCons)
298+
if untransactedReqConsumer != nil {
299+
defer untransactedReqConsumer.Close()
300+
}
301+
302+
// Create a transacted producer for the reply
303+
transactedReplyProducer := transactedContext.CreateProducer().SetTimeToLive(20000)
304+
305+
// Create an untransacted consumer for the reply queue
306+
untransactedReplyConsumer, errCons := untransactedContext.CreateConsumer(unReplyQueue)
307+
assert.Nil(t, errCons)
308+
if untransactedReplyConsumer != nil {
309+
defer untransactedReplyConsumer.Close()
310+
}
311+
312+
// First check that both queues are empty
313+
rcvBody, errRcv := untransactedReqConsumer.ReceiveStringBodyNoWait()
314+
assert.Nil(t, errRcv)
315+
assert.Nil(t, rcvBody)
316+
rcvBody, errRcv = untransactedReplyConsumer.ReceiveStringBodyNoWait()
317+
assert.Nil(t, errRcv)
318+
assert.Nil(t, rcvBody)
319+
320+
// Use the transacted sender context to send a request message (under a transaction)
321+
msgBody := "putget-transaction"
322+
errSend := transactedReqProducer.SendString(trReqQueue, msgBody)
323+
assert.Nil(t, errSend)
324+
rcvBody, errRcv = untransactedReqConsumer.ReceiveStringBodyNoWait()
325+
assert.Nil(t, errRcv)
326+
assert.Nil(t, rcvBody) // Not yet visible for consumption
327+
senderTransactedContext.Commit() // Make the message visible
328+
329+
// get message under transaction, put reply message to different queue
330+
rcvBody, errRcv = transactedReqConsumer.ReceiveStringBodyNoWait()
331+
assert.Nil(t, errRcv)
332+
assert.Equal(t, msgBody, *rcvBody)
333+
replyMsgBody := "putget-transaction-reply"
334+
errSend = transactedReplyProducer.SendString(trReplyQueue, replyMsgBody)
335+
assert.Nil(t, errSend)
336+
337+
// neither request nor reply message is available
338+
rcvBody, errRcv = untransactedReqConsumer.ReceiveStringBodyNoWait()
339+
assert.Nil(t, errRcv)
340+
assert.Nil(t, rcvBody)
341+
rcvBody, errRcv = untransactedReplyConsumer.ReceiveStringBodyNoWait()
342+
assert.Nil(t, errRcv)
343+
assert.Nil(t, rcvBody)
344+
345+
// rollback; request message is available, reply message is not
346+
transactedContext.Rollback()
347+
rcvBody, errRcv = untransactedReqConsumer.ReceiveStringBodyNoWait()
348+
assert.Nil(t, errRcv)
349+
assert.Equal(t, msgBody, *rcvBody)
350+
rcvBody, errRcv = untransactedReplyConsumer.ReceiveStringBodyNoWait()
351+
assert.Nil(t, errRcv)
352+
assert.Nil(t, rcvBody)
353+
354+
// Put a new request message
355+
msgBody2 := "putget-transaction-2"
356+
errSend = transactedReqProducer.SendString(trReqQueue, msgBody2)
357+
assert.Nil(t, errSend)
358+
senderTransactedContext.Commit()
359+
360+
// (again) get message under transaction, put reply message to the other queue
361+
rcvBody, errRcv = transactedReqConsumer.ReceiveStringBodyNoWait()
362+
assert.Nil(t, errRcv)
363+
assert.Equal(t, msgBody2, *rcvBody)
364+
replyMsgBody2 := "putget-transaction-reply-2"
365+
errSend = transactedReplyProducer.SendString(trReplyQueue, replyMsgBody2)
366+
assert.Nil(t, errSend)
367+
368+
// commit; request message is gone, and reply message is available
369+
transactedContext.Commit()
370+
rcvBody, errRcv = untransactedReqConsumer.ReceiveStringBodyNoWait()
371+
assert.Nil(t, errRcv)
372+
assert.Nil(t, rcvBody)
373+
rcvBody, errRcv = untransactedReplyConsumer.ReceiveStringBodyNoWait()
374+
assert.Nil(t, errRcv)
375+
assert.Equal(t, replyMsgBody2, *rcvBody)
376+
377+
}

mqjms/ConsumerImpl.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
// ConsumerImpl defines a struct that contains the necessary objects for
2222
// receiving messages from a queue on an IBM MQ queue manager.
2323
type ConsumerImpl struct {
24+
ctx ContextImpl
2425
qObject ibmmq.MQObject
2526
selector string
2627
}
@@ -63,8 +64,14 @@ func (consumer ConsumerImpl) receiveInternal(gmo *ibmmq.MQGMO) (jms20subset.Mess
6364
getmqmd := ibmmq.NewMQMD()
6465
buffer := make([]byte, 32768)
6566

67+
// Calculate the syncpoint value
68+
syncpointSetting := ibmmq.MQGMO_NO_SYNCPOINT
69+
if consumer.ctx.sessionMode == jms20subset.JMSContextSESSIONTRANSACTED {
70+
syncpointSetting = ibmmq.MQGMO_SYNCPOINT
71+
}
72+
6673
// Set the GMO (get message options)
67-
gmo.Options |= ibmmq.MQGMO_NO_SYNCPOINT
74+
gmo.Options |= syncpointSetting
6875
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
6976

7077
// Apply the selector if one has been specified in the Consumer

mqjms/ContextImpl.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination,
9090
// Success - store the necessary objects away for later use to receive
9191
// messages.
9292
consumer = ConsumerImpl{
93+
ctx: ctx,
9394
qObject: qObject,
9495
selector: selector,
9596
}

0 commit comments

Comments
 (0)