Skip to content

Commit 1a562e5

Browse files
committed
Get READAHEAD option working again
1 parent d45b267 commit 1a562e5

File tree

3 files changed

+140
-28
lines changed

3 files changed

+140
-28
lines changed

mqmetric/globals.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,19 @@ import (
2929
)
3030

3131
type sessionInfo struct {
32-
qMgr ibmmq.MQQueueManager
33-
cmdQObj ibmmq.MQObject
34-
replyQObj ibmmq.MQObject
35-
qMgrObject ibmmq.MQObject
32+
qMgr ibmmq.MQQueueManager
33+
cmdQObj ibmmq.MQObject
34+
qMgrObject ibmmq.MQObject
35+
3636
replyQBaseName string
3737
replyQ2BaseName string
38-
statusReplyQObj ibmmq.MQObject
39-
statusReplyBuf []byte
38+
39+
replyQObj ibmmq.MQObject
40+
replyQReadAhead bool
41+
42+
statusReplyQObj ibmmq.MQObject
43+
statusReplyQReadAhead bool
44+
statusReplyBuf []byte
4045

4146
platform int32
4247
commandLevel int32

mqmetric/mqif.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -280,17 +280,37 @@ func initConnectionKey(key string, qMgrName string, replyQ string, replyQ2 strin
280280
}
281281

282282
// MQOPEN of a reply queue also used for subscription delivery
283+
inputOpt := ibmmq.MQOO_INPUT_EXCLUSIVE
284+
// inputOpt = ibmmq.MQOO_INPUT_SHARED
283285
if err == nil {
284286
mqod := ibmmq.NewMQOD()
285-
openOptions := ibmmq.MQOO_INPUT_EXCLUSIVE | ibmmq.MQOO_FAIL_IF_QUIESCING
287+
openOptions := inputOpt | ibmmq.MQOO_FAIL_IF_QUIESCING
286288
openOptions |= ibmmq.MQOO_INQUIRE
287289
mqod.ObjectType = ibmmq.MQOT_Q
288290
mqod.ObjectName = replyQ
289291
ci.si.replyQObj, err = ci.si.qMgr.Open(mqod, openOptions)
290292
ci.si.replyQBaseName = replyQ
291293
if err == nil {
292294
ci.si.queuesOpened = true
293-
clearQ(ci.si.replyQObj)
295+
ci.si.replyQReadAhead = false
296+
297+
// There may be performance benefits to using READAHEAD on the reply queues, but we
298+
// need to know if it's going to be used. We don't use the MQOO option to ask for
299+
// it explicitly, but rely on the queue's default option. So find that with an MQINQ
300+
// and stash it.
301+
selectors := []int32{ibmmq.MQIA_DEF_READ_AHEAD}
302+
vals, inqErr := ci.si.replyQObj.Inq(selectors)
303+
if inqErr == nil {
304+
ra := vals[ibmmq.MQIA_DEF_READ_AHEAD]
305+
if ra == ibmmq.MQREADA_YES {
306+
ci.si.replyQReadAhead = true
307+
}
308+
} else {
309+
// log the error but ignore it
310+
logWarn("Cannot find DEF_READ_AHEAD for %s: %v", mqod.ObjectName, inqErr)
311+
312+
}
313+
clearQ(ci.si.replyQObj, ci.si.replyQReadAhead)
294314
} else {
295315
errorString = "Cannot open queue " + mqod.ObjectName
296316
mqreturn = err.(*ibmmq.MQReturn)
@@ -300,7 +320,9 @@ func initConnectionKey(key string, qMgrName string, replyQ string, replyQ2 strin
300320
// MQOPEN of a second reply queue used for status polling
301321
if err == nil {
302322
mqod := ibmmq.NewMQOD()
303-
openOptions := ibmmq.MQOO_INPUT_EXCLUSIVE | ibmmq.MQOO_FAIL_IF_QUIESCING
323+
openOptions := inputOpt | ibmmq.MQOO_FAIL_IF_QUIESCING
324+
openOptions |= ibmmq.MQOO_INQUIRE
325+
304326
mqod.ObjectType = ibmmq.MQOT_Q
305327
ci.si.replyQ2BaseName = replyQ2
306328
if replyQ2 != "" {
@@ -313,13 +335,33 @@ func initConnectionKey(key string, qMgrName string, replyQ string, replyQ2 strin
313335
errorString = "Cannot open queue " + mqod.ObjectName
314336
mqreturn = err.(*ibmmq.MQReturn)
315337
} else {
316-
clearQ(ci.si.statusReplyQObj)
338+
// If replyQ2 is not set, we can reuse knowledge from the previous
339+
// block about how readahead is configured.
340+
if replyQ2 != "" {
341+
ci.si.statusReplyQReadAhead = false
342+
343+
selectors := []int32{ibmmq.MQIA_DEF_READ_AHEAD}
344+
vals, inqErr := ci.si.statusReplyQObj.Inq(selectors)
345+
if inqErr == nil {
346+
ra := vals[ibmmq.MQIA_DEF_READ_AHEAD]
347+
if ra == ibmmq.MQREADA_YES {
348+
ci.si.statusReplyQReadAhead = true
349+
}
350+
} else {
351+
// log the error but ignore it
352+
logWarn("Cannot find DEF_READ_AHEAD for %s: %v", mqod.ObjectName, inqErr)
353+
354+
}
355+
} else {
356+
ci.si.statusReplyQReadAhead = ci.si.replyQReadAhead
357+
}
358+
clearQ(ci.si.statusReplyQObj, ci.si.statusReplyQReadAhead)
317359
}
318360
}
319361

320362
// Start from a clean set of subscriptions. Errors from this can be ignored.
321363
if err == nil && ci.durableSubPrefix != "" && ci.usePublications {
322-
clearDurableSubscriptions(ci.durableSubPrefix, ci.si.cmdQObj, ci.si.statusReplyQObj)
364+
clearDurableSubscriptions(ci.durableSubPrefix, ci.si.cmdQObj, ci.si.statusReplyQObj, ci.si.statusReplyQReadAhead)
323365
}
324366

325367
// If anything has gone wrong in the initial connection and object access then return an error.
@@ -586,13 +628,13 @@ We can't use the resubscribe/close technique here because a) we don't know in ad
586628
subscription names are and b) we don't know which queue is attached - the collector configuration
587629
might have changed. So we do this cleanup using the PCF commands.
588630
*/
589-
func clearDurableSubscriptions(prefix string, cmdQObj ibmmq.MQObject, replyQObj ibmmq.MQObject) {
631+
func clearDurableSubscriptions(prefix string, cmdQObj ibmmq.MQObject, replyQObj ibmmq.MQObject, readAhead bool) {
590632
var err error
591633

592634
subNameList := make(map[string]string)
593635
traceEntry("clearDurableSubscriptions")
594636

595-
clearQ(replyQObj)
637+
clearQ(replyQObj, readAhead)
596638
putmqmd, pmo, cfh, buf := statusSetCommandHeaders()
597639

598640
// Can allow all the other fields to default
@@ -639,7 +681,7 @@ func clearDurableSubscriptions(prefix string, cmdQObj ibmmq.MQObject, replyQObj
639681
// For each of th returned subscription names, do the delete
640682
for subName, _ := range subNameList {
641683
logDebug("About to delete subscription %s", subName)
642-
clearQ(replyQObj)
684+
clearQ(replyQObj, readAhead)
643685

644686
putmqmd, pmo, cfh, buf := statusSetCommandHeaders()
645687
// Can allow all the other fields to default

mqmetric/status.go

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ import (
2828
)
2929

3030
// var statusDummy = fmt.Sprintf("dummy")
31-
var timeTravelWarningIssued = false
32-
var persistenceWarningIssued = false
31+
var (
32+
timeTravelWarningIssued = false
33+
persistenceWarningIssued = false
34+
clearQBuf = make([]byte, 32768)
35+
)
3336

3437
/*
3538
This file defines types and constructors for elements related to status
@@ -129,7 +132,7 @@ func statusTimeDiff(now time.Time, d string, t string) int64 {
129132
if diff < -(60 * 5) { // Cannot have status from the future but allow a tiny amount of flex
130133
if !timeTravelWarningIssued {
131134
logError("Status reports appear to be from the future. Difference is approximately %d seconds. Check the TZ Offset value in the program configuration.", int64(-diff))
132-
logDebug("statusTimeDiff d:%s t:%s diff:%f tzoffset: %f err:%v\n", d, t, diff, ci.tzOffsetSecs, err)
135+
logDebug("statusTimeDiff d:%s t:%s diff:%f tzoffset: %f err:%v", d, t, diff, ci.tzOffsetSecs, err)
133136
timeTravelWarningIssued = true
134137
}
135138
}
@@ -169,21 +172,77 @@ func statusTimeEpoch(d string, t string) int64 {
169172
return epoch
170173
}
171174

172-
func clearQ(hObj ibmmq.MQObject) {
175+
func clearMsgWithoutTruncation(hObj ibmmq.MQObject) (*ibmmq.MQMD, int, error) {
176+
var err error
177+
var md *ibmmq.MQMD
178+
179+
traceEntry("clearMsgWithoutTruncation")
180+
181+
msgLen := 0
182+
for trunc := true; trunc; {
183+
// Now get the response. Reset the MD and GMO on each iteration to ensure we don't get mixed up
184+
// with anything that gets modified (like the CCSID) even on failed/truncated GETs.
185+
md = ibmmq.NewMQMD()
186+
gmo := ibmmq.NewMQGMO()
187+
gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT
188+
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
189+
gmo.Options |= ibmmq.MQGMO_NO_WAIT
190+
gmo.Options |= ibmmq.MQGMO_CONVERT
191+
192+
logTrace("clearQWithoutTruncation: Trying MQGET with clearQBuffer size %d ", len(clearQBuf))
193+
msgLen, err = hObj.Get(md, gmo, clearQBuf)
194+
if err != nil {
195+
mqreturn := err.(*ibmmq.MQReturn)
196+
if mqreturn.MQCC != ibmmq.MQCC_OK && mqreturn.MQRC == ibmmq.MQRC_TRUNCATED_MSG_FAILED && len(clearQBuf) < maxBufSize {
197+
// Double the size, apart from capping it at 100MB
198+
clearQBuf = append(clearQBuf, make([]byte, len(clearQBuf))...)
199+
if len(clearQBuf) > maxBufSize {
200+
clearQBuf = clearQBuf[0:maxBufSize]
201+
}
202+
} else {
203+
traceExitF("clearMsgWithoutTruncation", 1, "BufSize %d Error %v", len(clearQBuf), err)
204+
return md, msgLen, err
205+
}
206+
} else {
207+
trunc = false
208+
}
209+
}
210+
211+
traceExit("clearMsgWithoutTruncation", 0)
212+
return md, msgLen, err
213+
}
214+
215+
func clearQ(hObj ibmmq.MQObject, usingReadAhead bool) {
216+
var err error
217+
var getmqmd *ibmmq.MQMD
218+
// msgLen := 0
173219
p := 0
174220
buf := make([]byte, 0)
221+
222+
traceEntry("clearQ")
223+
logTrace("clearQ: QueueName=%s readAhead=%v", hObj.Name, usingReadAhead)
224+
175225
// Empty reply and publication destination queues in case any left over from previous runs.
176226
// Do it in batches if the messages are persistent. Which they shouldn't be, but you
177227
// never know.
178228
for ok := true; ok; {
179-
getmqmd := ibmmq.NewMQMD()
180-
gmo := ibmmq.NewMQGMO()
181-
gmo.Options = ibmmq.MQGMO_SYNCPOINT_IF_PERSISTENT
182-
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
183-
gmo.Options |= ibmmq.MQGMO_NO_WAIT
184-
gmo.Options |= ibmmq.MQGMO_CONVERT
185-
gmo.Options |= ibmmq.MQGMO_ACCEPT_TRUNCATED_MSG
186-
_, err := hObj.Get(getmqmd, gmo, buf)
229+
if !usingReadAhead {
230+
getmqmd = ibmmq.NewMQMD()
231+
gmo := ibmmq.NewMQGMO()
232+
gmo.Options = ibmmq.MQGMO_SYNCPOINT_IF_PERSISTENT
233+
gmo.Options = ibmmq.MQGMO_NO_SYNCPOINT
234+
gmo.Options |= ibmmq.MQGMO_FAIL_IF_QUIESCING
235+
gmo.Options |= ibmmq.MQGMO_NO_WAIT
236+
gmo.Options |= ibmmq.MQGMO_CONVERT
237+
gmo.Options |= ibmmq.MQGMO_ACCEPT_TRUNCATED_MSG
238+
_, err = hObj.Get(getmqmd, gmo, buf)
239+
240+
} else {
241+
// logDebug("Reverting to clearMsgWithoutTruncation")
242+
getmqmd, _, err = clearMsgWithoutTruncation(hObj)
243+
}
244+
245+
// logDebug("clearQ: got message with err %v", err)
187246

188247
if err != nil && err.(*ibmmq.MQReturn).MQCC == ibmmq.MQCC_FAILED {
189248
ok = false
@@ -196,6 +255,7 @@ func clearQ(hObj ibmmq.MQObject) {
196255
if err != nil {
197256
logError("Problem committing removal of persistent messages: %v", err)
198257
} else {
258+
logDebug("Successful MQCMIT")
199259
p = 0
200260
}
201261
}
@@ -205,15 +265,20 @@ func clearQ(hObj ibmmq.MQObject) {
205265
logWarn("Response messages are unnecessarily persistent. Check the DEFPSIST value on the configured reply queues.")
206266
}
207267
}
268+
208269
}
209270

210271
// If we've not committed removal of a final batch of persistent messages, do it now.
211272
if p > 0 {
212273
err := hObj.GetHConn().Cmit()
213274
if err != nil {
214275
logError("Problem committing removal of persistent messages: %v", err)
276+
} else {
277+
logDebug("Successful MQCMIT")
278+
215279
}
216280
}
281+
traceExit("clearQ", 0)
217282

218283
return
219284
}
@@ -222,7 +287,7 @@ func statusClearReplyQ() {
222287
traceEntry("statusClearReplyQ")
223288
ci := getConnection(GetConnectionKey())
224289

225-
clearQ(ci.si.statusReplyQObj)
290+
clearQ(ci.si.statusReplyQObj, ci.si.statusReplyQReadAhead)
226291

227292
traceExit("statusClearReplyQ", 0)
228293
return
@@ -316,7 +381,7 @@ func statusGetReply(correlId []byte) (*ibmmq.MQCFH, []byte, bool, error) {
316381
// command tries to use this replyQ.
317382
allDone = true
318383
if err.(*ibmmq.MQReturn).MQRC != ibmmq.MQRC_NO_MSG_AVAILABLE {
319-
logError("StatusGetReply error : %v\n", err)
384+
logError("StatusGetReply error : %v", err)
320385
}
321386
traceExitErr("statusGetReply", 3, err)
322387
return nil, nil, allDone, err

0 commit comments

Comments
 (0)