Skip to content

Commit a842551

Browse files
committed
Do a real CLEAR QL if possible
1 parent 1a562e5 commit a842551

File tree

2 files changed

+109
-51
lines changed

2 files changed

+109
-51
lines changed

mqmetric/mqif.go

Lines changed: 91 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -279,83 +279,97 @@ func initConnectionKey(key string, qMgrName string, replyQ string, replyQ2 strin
279279

280280
}
281281

282-
// MQOPEN of a reply queue also used for subscription delivery
282+
// MQOPEN of a reply queue used for status polling, and responses to any other PCF commands.
283+
// Although this is replyQ2, we now open it first to give a chance of executing CLEAR QL(replyQ) before
284+
// that subscription delivery queue has been opened
283285
inputOpt := ibmmq.MQOO_INPUT_EXCLUSIVE
284-
// inputOpt = ibmmq.MQOO_INPUT_SHARED
285286
if err == nil {
286287
mqod := ibmmq.NewMQOD()
287288
openOptions := inputOpt | ibmmq.MQOO_FAIL_IF_QUIESCING
288289
openOptions |= ibmmq.MQOO_INQUIRE
290+
289291
mqod.ObjectType = ibmmq.MQOT_Q
290-
mqod.ObjectName = replyQ
291-
ci.si.replyQObj, err = ci.si.qMgr.Open(mqod, openOptions)
292-
ci.si.replyQBaseName = replyQ
293-
if err == nil {
294-
ci.si.queuesOpened = true
295-
ci.si.replyQReadAhead = false
292+
ci.si.replyQ2BaseName = replyQ2
293+
if replyQ2 != "" {
294+
mqod.ObjectName = replyQ2
295+
} else {
296+
mqod.ObjectName = replyQ
297+
}
298+
ci.si.statusReplyQObj, err = ci.si.qMgr.Open(mqod, openOptions)
299+
if err != nil {
300+
errorString = "Cannot open queue " + mqod.ObjectName
301+
mqreturn = err.(*ibmmq.MQReturn)
302+
} else {
296303

297304
// There may be performance benefits to using READAHEAD on the reply queues, but we
298305
// need to know if it's going to be used. We don't use the MQOO option to ask for
299306
// it explicitly, but rely on the queue's default option. So find that with an MQINQ
300307
// and stash it.
308+
ci.si.statusReplyQReadAhead = false
309+
301310
selectors := []int32{ibmmq.MQIA_DEF_READ_AHEAD}
302-
vals, inqErr := ci.si.replyQObj.Inq(selectors)
311+
vals, inqErr := ci.si.statusReplyQObj.Inq(selectors)
303312
if inqErr == nil {
304313
ra := vals[ibmmq.MQIA_DEF_READ_AHEAD]
305314
if ra == ibmmq.MQREADA_YES {
306-
ci.si.replyQReadAhead = true
315+
ci.si.statusReplyQReadAhead = true
307316
}
317+
logTrace("DEF_READ_AHEAD for %s: %d", mqod.ObjectName, ra)
318+
308319
} else {
309320
// log the error but ignore it
310321
logWarn("Cannot find DEF_READ_AHEAD for %s: %v", mqod.ObjectName, inqErr)
311322

312323
}
313-
clearQ(ci.si.replyQObj, ci.si.replyQReadAhead)
314-
} else {
315-
errorString = "Cannot open queue " + mqod.ObjectName
316-
mqreturn = err.(*ibmmq.MQReturn)
324+
325+
// This queue ought always to be empty, but we'll clear it via repeated MQGETs anyway.
326+
clearQ(ci.si.statusReplyQObj, ci.si.statusReplyQReadAhead)
327+
328+
// Before we open the subscription delivery queue, we will try to clear it using the CLEAR QLOCAL.
329+
// command. We can't do that once it's been opened. But if this fails, ignore the error. The reply queue
330+
// names have to be different, and replyQ must refer to a local queue. But we can't check the queue type
331+
// in advance, so we'll try it anyway.
332+
if replyQ2 != "" && replyQ2 != replyQ {
333+
clearQPCF(replyQ, ci.si.cmdQObj, ci.si.statusReplyQObj, ci.si.statusReplyQReadAhead)
334+
}
317335
}
318336
}
319337

320-
// MQOPEN of a second reply queue used for status polling
338+
// MQOPEN of a reply queue used for subscription delivery
321339
if err == nil {
322340
mqod := ibmmq.NewMQOD()
323341
openOptions := inputOpt | ibmmq.MQOO_FAIL_IF_QUIESCING
324342
openOptions |= ibmmq.MQOO_INQUIRE
325-
326343
mqod.ObjectType = ibmmq.MQOT_Q
327-
ci.si.replyQ2BaseName = replyQ2
328-
if replyQ2 != "" {
329-
mqod.ObjectName = replyQ2
330-
} else {
331-
mqod.ObjectName = replyQ
332-
}
333-
ci.si.statusReplyQObj, err = ci.si.qMgr.Open(mqod, openOptions)
334-
if err != nil {
335-
errorString = "Cannot open queue " + mqod.ObjectName
336-
mqreturn = err.(*ibmmq.MQReturn)
337-
} else {
338-
// If replyQ2 is not set, we can reuse knowledge from the previous
339-
// block about how readahead is configured.
340-
if replyQ2 != "" {
341-
ci.si.statusReplyQReadAhead = false
344+
mqod.ObjectName = replyQ
345+
ci.si.replyQObj, err = ci.si.qMgr.Open(mqod, openOptions)
346+
ci.si.replyQBaseName = replyQ
347+
if err == nil {
348+
ci.si.queuesOpened = true
349+
ci.si.replyQReadAhead = false
342350

351+
// Do the inquire again, this time for the main replyQ, if it's not the same as the 2ary reply queue
352+
if replyQ2 != "" && replyQ2 != replyQ {
343353
selectors := []int32{ibmmq.MQIA_DEF_READ_AHEAD}
344-
vals, inqErr := ci.si.statusReplyQObj.Inq(selectors)
354+
vals, inqErr := ci.si.replyQObj.Inq(selectors)
345355
if inqErr == nil {
346356
ra := vals[ibmmq.MQIA_DEF_READ_AHEAD]
347357
if ra == ibmmq.MQREADA_YES {
348-
ci.si.statusReplyQReadAhead = true
358+
ci.si.replyQReadAhead = true
349359
}
360+
logTrace("DEF_READ_AHEAD for %s: %d", mqod.ObjectName, ra)
350361
} else {
351362
// log the error but ignore it
352363
logWarn("Cannot find DEF_READ_AHEAD for %s: %v", mqod.ObjectName, inqErr)
353-
354364
}
355365
} else {
356-
ci.si.statusReplyQReadAhead = ci.si.replyQReadAhead
366+
ci.si.replyQReadAhead = ci.si.statusReplyQReadAhead
357367
}
358-
clearQ(ci.si.statusReplyQObj, ci.si.statusReplyQReadAhead)
368+
369+
clearQ(ci.si.replyQObj, ci.si.replyQReadAhead)
370+
} else {
371+
errorString = "Cannot open queue " + mqod.ObjectName
372+
mqreturn = err.(*ibmmq.MQReturn)
359373
}
360374
}
361375

@@ -717,6 +731,47 @@ func clearDurableSubscriptions(prefix string, cmdQObj ibmmq.MQObject, replyQObj
717731

718732
}
719733

734+
func clearQPCF(qName string, cmdQObj ibmmq.MQObject, replyQObj ibmmq.MQObject, readAhead bool) {
735+
var err error
736+
737+
traceEntryF("clearQPCF", "for queue %s", qName)
738+
739+
clearQ(replyQObj, readAhead)
740+
putmqmd, pmo, cfh, buf := statusSetCommandHeaders()
741+
742+
// Can allow all the other fields to default
743+
cfh.Command = ibmmq.MQCMD_CLEAR_Q
744+
745+
// Add the parameters one at a time into a buffer
746+
pcfparm := new(ibmmq.PCFParameter)
747+
pcfparm.Type = ibmmq.MQCFT_STRING
748+
pcfparm.Parameter = ibmmq.MQCA_Q_NAME
749+
pcfparm.String = []string{qName}
750+
cfh.ParameterCount++
751+
buf = append(buf, pcfparm.Bytes()...)
752+
753+
// Once we know the total number of parameters, put the
754+
// CFH header on the front of the buffer.
755+
buf = append(cfh.Bytes(), buf...)
756+
757+
// And now put the command to the queue
758+
err = cmdQObj.Put(putmqmd, pmo, buf)
759+
if err != nil {
760+
traceExitErr("clearQPCF", 1, err)
761+
return
762+
}
763+
764+
// Don't really care about the responses, just loop until
765+
// the operation is complete one way or the other
766+
for allReceived := false; !allReceived; {
767+
_, _, allReceived, err = statusGetReply(putmqmd.MsgId)
768+
}
769+
770+
traceExitErr("clearQPCF", 0, err)
771+
772+
return
773+
}
774+
720775
// Given a PCF response message, parse it to extract the desired fields
721776
func parseInqSubData(cfh *ibmmq.MQCFH, buf []byte) (string, string) {
722777
var elem *ibmmq.PCFParameter

mqmetric/status.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,11 @@ func statusTimeEpoch(d string, t string) int64 {
172172
return epoch
173173
}
174174

175-
func clearMsgWithoutTruncation(hObj ibmmq.MQObject) (*ibmmq.MQMD, int, error) {
175+
func getMsgWithoutTruncation(hObj ibmmq.MQObject) (*ibmmq.MQMD, int, error) {
176176
var err error
177177
var md *ibmmq.MQMD
178178

179-
traceEntry("clearMsgWithoutTruncation")
179+
traceEntry("getMsgWithoutTruncation")
180180

181181
msgLen := 0
182182
for trunc := true; trunc; {
@@ -189,7 +189,7 @@ func clearMsgWithoutTruncation(hObj ibmmq.MQObject) (*ibmmq.MQMD, int, error) {
189189
gmo.Options |= ibmmq.MQGMO_NO_WAIT
190190
gmo.Options |= ibmmq.MQGMO_CONVERT
191191

192-
logTrace("clearQWithoutTruncation: Trying MQGET with clearQBuffer size %d ", len(clearQBuf))
192+
// logTrace("clearQWithoutTruncation: Trying MQGET with clearQBuffer size %d ", len(clearQBuf))
193193
msgLen, err = hObj.Get(md, gmo, clearQBuf)
194194
if err != nil {
195195
mqreturn := err.(*ibmmq.MQReturn)
@@ -200,15 +200,21 @@ func clearMsgWithoutTruncation(hObj ibmmq.MQObject) (*ibmmq.MQMD, int, error) {
200200
clearQBuf = clearQBuf[0:maxBufSize]
201201
}
202202
} else {
203-
traceExitF("clearMsgWithoutTruncation", 1, "BufSize %d Error %v", len(clearQBuf), err)
204-
return md, msgLen, err
203+
if mqreturn.MQRC != ibmmq.MQRC_NO_MSG_AVAILABLE {
204+
traceExitF("getMsgWithoutTruncation", 1, "BufSize %d Error %v", len(clearQBuf), err)
205+
return md, msgLen, err
206+
} else {
207+
// Quit cleanly
208+
trunc = false
209+
// err = nil
210+
}
205211
}
206212
} else {
207213
trunc = false
208214
}
209215
}
210216

211-
traceExit("clearMsgWithoutTruncation", 0)
217+
traceExit("getMsgWithoutTruncation", 0)
212218
return md, msgLen, err
213219
}
214220

@@ -219,8 +225,7 @@ func clearQ(hObj ibmmq.MQObject, usingReadAhead bool) {
219225
p := 0
220226
buf := make([]byte, 0)
221227

222-
traceEntry("clearQ")
223-
logTrace("clearQ: QueueName=%s readAhead=%v", hObj.Name, usingReadAhead)
228+
traceEntryF("clearQ", "QueueName=%s readAhead=%v", hObj.Name, usingReadAhead)
224229

225230
// Empty reply and publication destination queues in case any left over from previous runs.
226231
// Do it in batches if the messages are persistent. Which they shouldn't be, but you
@@ -239,23 +244,21 @@ func clearQ(hObj ibmmq.MQObject, usingReadAhead bool) {
239244

240245
} else {
241246
// logDebug("Reverting to clearMsgWithoutTruncation")
242-
getmqmd, _, err = clearMsgWithoutTruncation(hObj)
247+
getmqmd, _, err = getMsgWithoutTruncation(hObj)
243248
}
244249

245-
// logDebug("clearQ: got message with err %v", err)
250+
// logDebug("clearQ: got message with err %v p=%d", err, getmqmd.Persistence)
246251

247252
if err != nil && err.(*ibmmq.MQReturn).MQCC == ibmmq.MQCC_FAILED {
248253
ok = false
249-
}
250-
251-
if getmqmd.Persistence == ibmmq.MQPER_PERSISTENT {
254+
} else if getmqmd.Persistence == ibmmq.MQPER_PERSISTENT {
252255
p++
253256
if (p % 50) == 0 {
254257
err = hObj.GetHConn().Cmit()
255258
if err != nil {
256259
logError("Problem committing removal of persistent messages: %v", err)
257260
} else {
258-
logDebug("Successful MQCMIT")
261+
logTrace("Successful MQCMIT")
259262
p = 0
260263
}
261264
}
@@ -274,7 +277,7 @@ func clearQ(hObj ibmmq.MQObject, usingReadAhead bool) {
274277
if err != nil {
275278
logError("Problem committing removal of persistent messages: %v", err)
276279
} else {
277-
logDebug("Successful MQCMIT")
280+
logTrace("Successful MQCMIT")
278281

279282
}
280283
}

0 commit comments

Comments
 (0)