Skip to content

Commit 9cd66b3

Browse files
committed
Support for put under local transaction
1 parent 79dfab9 commit 9cd66b3

File tree

10 files changed

+212
-216
lines changed

10 files changed

+212
-216
lines changed

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
22
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
33
github.com/ibm-messaging/mq-golang v1.0.1-0.20190820103725-19b946c185a8 h1:kUwSXeftVen12FRnShG+Ykhb2Kd6Cd/DbpWwbYal7j0=
44
github.com/ibm-messaging/mq-golang v1.0.1-0.20190820103725-19b946c185a8/go.mod h1:qjsZDb7m1oKnbPeDma2JVJTKgyCA91I4bcJ1qHY+gcA=
5+
github.com/ibm-messaging/mq-golang v3.0.0+incompatible h1:Yc3c8emAyveT54uNDRMkgvS+EBAHeLNWHkc3hk5x+IY=
56
github.com/ibm-messaging/mq-golang/v5 v5.0.0 h1:9J8bsDoCo60rbSgB7ZAURPG3L5Kpr+F8dYNOwQ7Qnnk=
67
github.com/ibm-messaging/mq-golang/v5 v5.0.0/go.mod h1:ywCwmYbJOU/E0rl+z4GiNoxVMty68O+LVO39a1VMXrE=
78
github.com/ibm-messaging/mq-golang/v5 v5.1.2 h1:u0e1Vce2TNqJpH088vF77rDMsnMRWnGaOIlxZo4DMZc=
89
github.com/ibm-messaging/mq-golang/v5 v5.1.2/go.mod h1:ywCwmYbJOU/E0rl+z4GiNoxVMty68O+LVO39a1VMXrE=
910
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1011
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
12+
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
1113
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
1214
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
1315
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=

jms20subset/ConnectionFactory.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
//
88
// SPDX-License-Identifier: EPL-2.0
99

10-
// Interfaces for messaging applications in the style of the Java Message Service (JMS) API.
10+
// Package jms20subset provides interfaces for messaging applications in the style of the Java Message Service (JMS) API.
1111
package jms20subset
1212

1313
// ConnectionFactory defines a Golang interface which provides similar
@@ -18,5 +18,12 @@ type ConnectionFactory interface {
1818

1919
// CreateContext creates a connection to the messaging provider using the
2020
// configuration parameters that are encapsulated by this ConnectionFactory.
21+
//
22+
// Defaults to sessionMode of JMSContextAUTOACKNOWLEDGE
2123
CreateContext() (JMSContext, JMSException)
24+
25+
// CreateContextWithSessionMode creates a connection to the messaging provider using the
26+
// configuration parameters that are encapsulated by this ConnectionFactory,
27+
// and the specified session mode.
28+
CreateContextWithSessionMode(sessionMode int) (JMSContext, JMSException)
2229
}

jms20subset/JMSContext.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,15 @@
77
//
88
// SPDX-License-Identifier: EPL-2.0
99

10-
//
10+
// Package jms20subset provides interfaces for messaging applications in the style of the Java Message Service (JMS) API.
1111
package jms20subset
1212

13+
// JMSContextAUTOACKNOWLEDGE is used to specify a sessionMode that automatically acknowledge message transmission.
14+
const JMSContextAUTOACKNOWLEDGE int = 1
15+
16+
// JMSContextSESSIONTRANSACTED is used to specify a sessionMode that requires manual commit/rollback of transactions.
17+
const JMSContextSESSIONTRANSACTED int = 0
18+
1319
// JMSContext represents a connection to the messaging provider, and
1420
// provides the capability for applications to create Producer and Consumer
1521
// objects so that it can send and receive messages.
@@ -54,6 +60,12 @@ type JMSContext interface {
5460
// name and different parameters we must use a different function name.
5561
CreateTextMessageWithString(txt string) TextMessage
5662

63+
// Commit confirms all messages sent/received during this transaction.
64+
Commit()
65+
66+
// Rollback releases all messages sent/received during this transaction.
67+
Rollback()
68+
5769
// Closes the connection to the messaging provider.
5870
//
5971
// Since the provider typically allocates significant resources on behalf of

local_transaction_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright (c) IBM Corporation 2019
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package main
11+
12+
import (
13+
"testing"
14+
15+
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
16+
17+
"github.com/ibm-messaging/mq-golang-jms20/mqjms"
18+
"github.com/stretchr/testify/assert"
19+
)
20+
21+
/*
22+
* Test the behaviour of sending a message under a transaction.
23+
*
24+
* - put without transaction, immediately available
25+
* - put multiple msgs with transaction, not immediately available
26+
* - available after commit
27+
* - put multiple msgs under transaction then rollback - not available
28+
* - put message under transaction, close connection - should not be available
29+
*/
30+
func TestPutTransaction(t *testing.T) {
31+
32+
// Create a ConnectionFactory using some property files
33+
cf, cfErr := mqjms.CreateConnectionFactoryFromDefaultJSONFiles()
34+
assert.Nil(t, cfErr)
35+
36+
// Creates a connection to the queue manager.
37+
untransactedContext, errCtx := cf.CreateContext()
38+
assert.Nil(t, errCtx)
39+
if untransactedContext != nil {
40+
defer untransactedContext.Close()
41+
}
42+
43+
transactedContext, errCtx := cf.CreateContextWithSessionMode(jms20subset.JMSContextSESSIONTRANSACTED)
44+
assert.Nil(t, errCtx)
45+
if transactedContext != nil {
46+
defer transactedContext.Close()
47+
}
48+
49+
// Create queue objects that points at an IBM MQ queue
50+
queueName := "DEV.QUEUE.1"
51+
unQueue := untransactedContext.CreateQueue(queueName)
52+
trQueue := transactedContext.CreateQueue(queueName)
53+
54+
// Create an untransacted consumer
55+
untransactedConsumer, errCons := untransactedContext.CreateConsumer(unQueue)
56+
assert.Nil(t, errCons)
57+
if untransactedConsumer != nil {
58+
defer untransactedConsumer.Close()
59+
}
60+
61+
// Send an untransacted message and check it is immediately available
62+
untransactedProducer := untransactedContext.CreateProducer().SetTimeToLive(20000)
63+
bodyTxt := "untransacted-put"
64+
errSend := untransactedProducer.SendString(unQueue, bodyTxt)
65+
assert.Nil(t, errSend)
66+
rcvBody, errRcv := untransactedConsumer.ReceiveStringBodyNoWait()
67+
assert.Nil(t, errRcv)
68+
assert.Equal(t, bodyTxt, *rcvBody)
69+
70+
// put multiple msgs with transaction, not immediately available
71+
transactedProducer := transactedContext.CreateProducer().SetTimeToLive(20000)
72+
bodyTxt1 := "transacted-put-1"
73+
bodyTxt2 := "transacted-put-2"
74+
errSend = transactedProducer.SendString(trQueue, bodyTxt1)
75+
assert.Nil(t, errSend)
76+
errSend = transactedProducer.SendString(trQueue, bodyTxt2)
77+
assert.Nil(t, errSend)
78+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
79+
assert.Nil(t, errRcv)
80+
assert.Nil(t, rcvBody) // Expect nil here - message should not have been received.
81+
82+
// Commit and messages should now be available (in order)
83+
transactedContext.Commit()
84+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
85+
assert.Nil(t, errRcv)
86+
assert.Equal(t, bodyTxt1, *rcvBody)
87+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
88+
assert.Nil(t, errRcv)
89+
assert.Equal(t, bodyTxt2, *rcvBody)
90+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
91+
assert.Nil(t, errRcv)
92+
assert.Nil(t, rcvBody) // Only expected two messages
93+
94+
// put multiple msgs under transaction then rollback - not available
95+
bodyTxt1 = "transacted-put-rollback-1"
96+
bodyTxt2 = "transacted-put-rollback-2"
97+
errSend = transactedProducer.SendString(trQueue, bodyTxt1)
98+
assert.Nil(t, errSend)
99+
errSend = transactedProducer.SendString(trQueue, bodyTxt2)
100+
assert.Nil(t, errSend)
101+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
102+
assert.Nil(t, errRcv)
103+
assert.Nil(t, rcvBody)
104+
transactedContext.Rollback() // Undo the messages
105+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
106+
assert.Nil(t, errRcv)
107+
assert.Nil(t, rcvBody)
108+
transactedContext.Commit() // Should no longer be under the transaction
109+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
110+
assert.Nil(t, errRcv)
111+
assert.Nil(t, rcvBody)
112+
113+
// put message under transaction, close connection - should not be available
114+
errSend = transactedProducer.SendString(trQueue, "orphan1")
115+
assert.Nil(t, errSend)
116+
errSend = transactedProducer.SendString(trQueue, "orphan2")
117+
assert.Nil(t, errSend)
118+
transactedContext.Close()
119+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
120+
assert.Nil(t, errRcv)
121+
assert.Nil(t, rcvBody)
122+
rcvBody, errRcv = untransactedConsumer.ReceiveStringBodyNoWait()
123+
assert.Nil(t, errRcv)
124+
assert.Nil(t, rcvBody)
125+
126+
}
127+
128+
// get-transaction
129+
// - get without transaction, immediately disappears
130+
// - get multiple with transaction, immedatiately disappears
131+
// - rollback, multiple reappear
132+
// - get multiple with transaction, commit, not available
133+
134+
// get-put transaction
135+
// - place initial message on queue
136+
// - get message under transaction, put reply message to different queue
137+
// - neither request nor reply message is available
138+
// - rollback; request message is available, reply message is not
139+
// - (again) get message under transaction, put reply message to different queue
140+
// - commit; request message is gone, and reply message is available

mqjms/ConnectionFactoryImpl.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
//
77
// SPDX-License-Identifier: EPL-2.0
88

9-
// Implementation of the JMS style Golang interfaces to communicate with IBM MQ.
9+
// Package mqjms provides the implementation of the JMS style Golang interfaces to communicate with IBM MQ.
1010
package mqjms
1111

1212
import (
13+
"strconv"
14+
1315
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1416
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
15-
"strconv"
1617
)
1718

1819
// ConnectionFactoryImpl defines a struct that contains attributes for
@@ -44,6 +45,12 @@ type ConnectionFactoryImpl struct {
4445
// CreateContext implements the JMS method to create a connection to an IBM MQ
4546
// queue manager.
4647
func (cf ConnectionFactoryImpl) CreateContext() (jms20subset.JMSContext, jms20subset.JMSException) {
48+
return cf.CreateContextWithSessionMode(jms20subset.JMSContextAUTOACKNOWLEDGE)
49+
}
50+
51+
// CreateContextWithSessionMode implements the JMS method to create a connection to an IBM MQ
52+
// queue manager using the specified session mode.
53+
func (cf ConnectionFactoryImpl) CreateContextWithSessionMode(sessionMode int) (jms20subset.JMSContext, jms20subset.JMSException) {
4754

4855
// Allocate the internal structures required to create an connection to IBM MQ.
4956
cno := ibmmq.NewMQCNO()
@@ -118,7 +125,8 @@ func (cf ConnectionFactoryImpl) CreateContext() (jms20subset.JMSContext, jms20su
118125
// Connection was created successfully, so we wrap the MQI object into
119126
// a new ContextImpl and return it to the caller.
120127
ctx = ContextImpl{
121-
qMgr: qMgr,
128+
qMgr: qMgr,
129+
sessionMode: sessionMode,
122130
}
123131

124132
} else {

mqjms/ContextImpl.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@
66
//
77
// SPDX-License-Identifier: EPL-2.0
88

9-
//
9+
// Package mqjms provides the implementation of the JMS style Golang interfaces to communicate with IBM MQ.
1010
package mqjms
1111

1212
import (
13+
"strconv"
14+
1315
"github.com/ibm-messaging/mq-golang-jms20/jms20subset"
1416
ibmmq "github.com/ibm-messaging/mq-golang/v5/ibmmq"
15-
"strconv"
1617
)
1718

1819
// ContextImpl encapsulates the objects necessary to maintain an active
1920
// connection to an IBM MQ queue manager.
2021
type ContextImpl struct {
21-
qMgr ibmmq.MQQueueManager
22+
qMgr ibmmq.MQQueueManager
23+
sessionMode int
2224
}
2325

2426
// CreateQueue implements the logic necessary to create a provider-specific
@@ -53,7 +55,7 @@ func (ctx ContextImpl) CreateConsumer(dest jms20subset.Destination) (jms20subset
5355
return ctx.CreateConsumerWithSelector(dest, "")
5456
}
5557

56-
// CreateConsumer creates a consumer object that allows an application to
58+
// CreateConsumerWithSelector creates a consumer object that allows an application to
5759
// receive messages that match the specified selector from the given Destination.
5860
func (ctx ContextImpl) CreateConsumerWithSelector(dest jms20subset.Destination, selector string) (jms20subset.JMSConsumer, jms20subset.JMSException) {
5961

@@ -110,18 +112,39 @@ func (ctx ContextImpl) CreateTextMessage() jms20subset.TextMessage {
110112
return &TextMessageImpl{}
111113
}
112114

113-
// CreateTextMessage is a JMS standard mechanism for creating a TextMessage
115+
// CreateTextMessageWithString is a JMS standard mechanism for creating a TextMessage
114116
// and initialise it with the chosen text string.
115117
func (ctx ContextImpl) CreateTextMessageWithString(txt string) jms20subset.TextMessage {
116118
return &TextMessageImpl{
117119
bodyStr: &txt,
118120
}
119121
}
120122

123+
// Commit confirms all messages that were sent under this transaction.
124+
func (ctx ContextImpl) Commit() {
125+
126+
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
127+
ctx.qMgr.Cmit()
128+
}
129+
130+
}
131+
132+
// Rollback releases all messages that were sent under this transaction.
133+
func (ctx ContextImpl) Rollback() {
134+
135+
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
136+
ctx.qMgr.Back()
137+
}
138+
139+
}
140+
121141
// Close this connection to the MQ queue manager, and release any resources
122142
// that were allocated to support this connection.
123143
func (ctx ContextImpl) Close() {
124144

145+
// JMS semantics are to roll back an active transaction on Close.
146+
ctx.Rollback()
147+
125148
if (ibmmq.MQQueueManager{}) != ctx.qMgr {
126149
ctx.qMgr.Disc()
127150
}

mqjms/ProducerImpl.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,15 @@ func (producer ProducerImpl) Send(dest jms20subset.Destination, msg jms20subset.
6868
putmqmd := ibmmq.NewMQMD()
6969
pmo := ibmmq.NewMQPMO()
7070

71+
// Calculate the syncpoint value
72+
syncpointSetting := ibmmq.MQPMO_NO_SYNCPOINT
73+
if producer.ctx.sessionMode == jms20subset.JMSContextSESSIONTRANSACTED {
74+
syncpointSetting = ibmmq.MQPMO_SYNCPOINT
75+
}
76+
7177
// Configure the put message options, including asking MQ to allocate a
7278
// unique message ID
73-
pmo.Options = ibmmq.MQPMO_NO_SYNCPOINT | ibmmq.MQPMO_NEW_MSG_ID
79+
pmo.Options = syncpointSetting | ibmmq.MQPMO_NEW_MSG_ID
7480

7581
// Convert the JMS persistence into the equivalent MQ message descriptor
7682
// attribute.

mqjms/TextMessageImpl.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,14 @@ func (msg *TextMessageImpl) GetJMSCorrelationID() string {
192192
// to turn it back into a string.
193193
realLength := len(correlIDBytes)
194194
if realLength > 0 {
195-
for correlIdBytes[realLength-1] == 0 {
195+
for correlIDBytes[realLength-1] == 0 {
196196
realLength--
197197
}
198198
}
199199

200200
// Attempt to decode the content back into a string.
201201
dst := make([]byte, hex.DecodedLen(realLength))
202-
n, err := hex.Decode(dst, correlIdBytes[0:realLength])
202+
n, err := hex.Decode(dst, correlIDBytes[0:realLength])
203203

204204
if err == nil {
205205
// The decode back to a string was successful so pass back that plain
@@ -210,7 +210,7 @@ func (msg *TextMessageImpl) GetJMSCorrelationID() string {
210210

211211
// An error occurred while decoding to a plain text string, so encode
212212
// the bytes that we have into a raw string representation themselves.
213-
correlID = hex.EncodeToString(correlIdBytes)
213+
correlID = hex.EncodeToString(correlIDBytes)
214214
}
215215

216216
}

next-features.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ project!
66
Not currently implemented:
77
--------------------------
88
- BytesMessage, receiveBytesBody
9-
- Local transactions (e.g. allow request/reply under transaction)
109
- MessageListener
1110
- SendToQmgr, ReplyToQmgr
1211
- Topics (pub/sub)

0 commit comments

Comments
 (0)