From 37d24c7344d35d1fe25f1103a5d9175e561f2244 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 11:29:49 -0300 Subject: [PATCH 1/7] fix:race condition in db tracking module --- .../msgtracking/DbTrackingModule.java | 147 +++++++++--------- 1 file changed, 74 insertions(+), 73 deletions(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index 52c7e85d..ad02d621 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -96,89 +96,90 @@ protected CompositeParameters createParser() { } protected void persist(Message msg, Map map) { - try (Connection conn = dbHandler.getConnection()) { - Statement s = conn.createStatement(); - String msgIdField = FIELDS.MSG_ID; - ResultSet rs = s.executeQuery( - "SELECT * FROM " + tableName + " WHERE " + msgIdField + " = '" + map.get(msgIdField) + "'"); - ResultSetMetaData meta = rs.getMetaData(); - boolean isUpdate = rs.next(); // Record already exists so update - if (logger.isTraceEnabled()) { - logger.trace( - "\t\t *** Tracking record found: " + isUpdate + "\n\t\t *** Tracking record metadata: " + meta); - } - StringBuffer fieldStmt = new StringBuffer(); - StringBuffer valuesStmt = new StringBuffer(); - for (int i = 0; i < meta.getColumnCount(); i++) { - String colName = meta.getColumnLabel(i + 1); - if (colName.equalsIgnoreCase("id")) { - continue; - } else if (colName.equalsIgnoreCase(FIELDS.UPDATE_DT)) { - // Ignore if not update mode - if (isUpdate) { - appendFieldForUpdate(colName, DateUtil.getSqlTimestamp(), fieldStmt, meta.getColumnType(i + 1)); + synchronized (msg) { + try (Connection conn = dbHandler.getConnection()) { + Statement s = conn.createStatement(); + String msgIdField = FIELDS.MSG_ID; + ResultSet rs = s.executeQuery( + "SELECT * FROM " + tableName + " WHERE " + msgIdField + " = '" + map.get(msgIdField) + "'"); + ResultSetMetaData meta = rs.getMetaData(); + boolean isUpdate = rs.next(); // Record already exists so update + if (logger.isTraceEnabled()) { + logger.trace( + "\t\t *** Tracking record found: " + isUpdate + "\n\t\t *** Tracking record metadata: " + meta); + } + StringBuffer fieldStmt = new StringBuffer(); + StringBuffer valuesStmt = new StringBuffer(); + for (int i = 0; i < meta.getColumnCount(); i++) { + String colName = meta.getColumnLabel(i + 1); + if (colName.equalsIgnoreCase("id")) { + continue; + } else if (colName.equalsIgnoreCase(FIELDS.UPDATE_DT)) { + // Ignore if not update mode + if (isUpdate) { + appendFieldForUpdate(colName, DateUtil.getSqlTimestamp(), fieldStmt, meta.getColumnType(i + 1)); + } + } else if (colName.equalsIgnoreCase(FIELDS.CREATE_DT)) { + if (isUpdate) { + map.remove(FIELDS.CREATE_DT); + } else { + appendFieldForInsert(colName, DateUtil.getSqlTimestamp(), fieldStmt, valuesStmt, + meta.getColumnType(i + 1)); + } + } else if (isUpdate) { + /* + * Only write unchanged field values. Map is field names in LOWER case so + * convert in case DB server returns column names in uppercase + */ + String mapVal = map.get(colName.toLowerCase()); + if (mapVal == null) { + continue; + } + String dbVal = rs.getString(colName); + if (dbVal != null && mapVal.equals(dbVal)) { + // Unchanged value so remove from map + continue; + } + appendFieldForUpdate(colName, mapVal, fieldStmt, meta.getColumnType(i + 1)); + } else { + // For new record add every field that is not NULL + String mapVal = map.get(colName.toLowerCase()); + if (mapVal == null) { + continue; + } + appendFieldForInsert(colName, mapVal, fieldStmt, valuesStmt, meta.getColumnType(i + 1)); } - } else if (colName.equalsIgnoreCase(FIELDS.CREATE_DT)) { + } + if (fieldStmt.length() > 0) { + String stmt = ""; if (isUpdate) { - map.remove(FIELDS.CREATE_DT); + stmt = "UPDATE " + tableName + " SET " + fieldStmt.toString() + " WHERE " + FIELDS.MSG_ID + " = '" + + map.get(msgIdField) + "'"; } else { - appendFieldForInsert(colName, DateUtil.getSqlTimestamp(), fieldStmt, valuesStmt, - meta.getColumnType(i + 1)); - } - } else if (isUpdate) { - /* - * Only write unchanged field values. Map is field names in LOWER case so - * convert in case DB server returns column names in uppercase - */ - String mapVal = map.get(colName.toLowerCase()); - if (mapVal == null) { - continue; - } - String dbVal = rs.getString(colName); - if (dbVal != null && mapVal.equals(dbVal)) { - // Unchanged value so remove from map - continue; + stmt = "INSERT INTO " + tableName + " (" + fieldStmt.toString() + ") VALUES (" + + valuesStmt.toString() + ")"; } - appendFieldForUpdate(colName, mapVal, fieldStmt, meta.getColumnType(i + 1)); - } else { - // For new record add every field that is not NULL - String mapVal = map.get(colName.toLowerCase()); - if (mapVal == null) { - continue; + if (s.executeUpdate(stmt) > 0) { + if (logger.isTraceEnabled()) { + logger.trace("Tracking record SQL statement: " + stmt); + } + if (logger.isDebugEnabled()) { + logger.debug("Tracking record successfully persisted to database: " + map); + } + } else { + throw new OpenAS2Exception("Failed to persist tracking record to DB: " + map); } - appendFieldForInsert(colName, mapVal, fieldStmt, valuesStmt, meta.getColumnType(i + 1)); - } - } - if (fieldStmt.length() > 0) { - String stmt = ""; - if (isUpdate) { - stmt = "UPDATE " + tableName + " SET " + fieldStmt.toString() + " WHERE " + FIELDS.MSG_ID + " = '" - + map.get(msgIdField) + "'"; } else { - stmt = "INSERT INTO " + tableName + " (" + fieldStmt.toString() + ") VALUES (" - + valuesStmt.toString() + ")"; - } - if (s.executeUpdate(stmt) > 0) { - if (logger.isTraceEnabled()) { - logger.trace("Tracking record SQL statement: " + stmt); + if (logger.isInfoEnabled()) { + logger.info("No change from existing record in DB. Tracking record not updated: " + map); } - if (logger.isDebugEnabled()) { - logger.debug("Tracking record successfully persisted to database: " + map); - } - } else { - throw new OpenAS2Exception("Failed to persist tracking record to DB: " + map); - } - } else { - if (logger.isInfoEnabled()) { - logger.info("No change from existing record in DB. Tracking record not updated: " + map); } + } catch (Exception e) { + msg.setLogMsg("Failed to persist a tracking event: " + org.openas2.util.Logging.getExceptionMsg(e) + + " ::: Data map: " + map); + logger.error(msg.getLogMsg(), e); } - } catch (Exception e) { - msg.setLogMsg("Failed to persist a tracking event: " + org.openas2.util.Logging.getExceptionMsg(e) - + " ::: Data map: " + map); - logger.error(msg.getLogMsg(), e); } - } public ArrayList> listMessages() { From 35c65f29d9d35eee5b59124ba667d7f57bfbfd2e Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 11:38:41 -0300 Subject: [PATCH 2/7] fix:race condition in db tracking module --- .../org/openas2/processor/msgtracking/DbTrackingModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index ad02d621..9163d6ce 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -96,7 +96,7 @@ protected CompositeParameters createParser() { } protected void persist(Message msg, Map map) { - synchronized (msg) { + synchronized (this) { try (Connection conn = dbHandler.getConnection()) { Statement s = conn.createStatement(); String msgIdField = FIELDS.MSG_ID; From 5bddd1f7bc9716fe3210f1e2d6cff93ef92774a1 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 11:44:24 -0300 Subject: [PATCH 3/7] wip --- .../processor/msgtracking/DbTrackingModule.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index 9163d6ce..490c1116 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -19,10 +19,8 @@ import java.sql.ResultSetMetaData; import java.sql.Statement; import java.sql.Types; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; public class DbTrackingModule extends BaseMsgTrackingModule { public static final String PARAM_TCP_SERVER_START = "tcp_server_start"; @@ -54,6 +52,7 @@ public class DbTrackingModule extends BaseMsgTrackingModule { private String dbPlatform = "h2"; private String tableName = null; IDBHandler dbHandler = null; + private static final Map msgIdLocksMap = ConcurrentHashMap(); private Logger logger = LoggerFactory.getLogger(DbTrackingModule.class); @@ -96,12 +95,13 @@ protected CompositeParameters createParser() { } protected void persist(Message msg, Map map) { - synchronized (this) { + String msgIdField = FIELDS.MSG_ID; + String msgIdValue = map.get(msgIdField); + synchronized (msgIdLocksMap.computeIfAbsent(msgIdValue, k -> new Object())) { try (Connection conn = dbHandler.getConnection()) { Statement s = conn.createStatement(); - String msgIdField = FIELDS.MSG_ID; ResultSet rs = s.executeQuery( - "SELECT * FROM " + tableName + " WHERE " + msgIdField + " = '" + map.get(msgIdField) + "'"); + "SELECT * FROM " + tableName + " WHERE " + msgIdField + " = '" + msgIdValue + "'"); ResultSetMetaData meta = rs.getMetaData(); boolean isUpdate = rs.next(); // Record already exists so update if (logger.isTraceEnabled()) { @@ -180,6 +180,7 @@ protected void persist(Message msg, Map map) { logger.error(msg.getLogMsg(), e); } } + msgIdLocksMap.remove(msgIdValue); } public ArrayList> listMessages() { From 5b05fec4ccb5373f917a8fcee6c3cde44f21a898 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 11:46:56 -0300 Subject: [PATCH 4/7] wip --- .../org/openas2/processor/msgtracking/DbTrackingModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index 490c1116..a027befb 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -52,7 +52,7 @@ public class DbTrackingModule extends BaseMsgTrackingModule { private String dbPlatform = "h2"; private String tableName = null; IDBHandler dbHandler = null; - private static final Map msgIdLocksMap = ConcurrentHashMap(); + private static final Map msgIdLocksMap = new ConcurrentHashMap<>(); private Logger logger = LoggerFactory.getLogger(DbTrackingModule.class); From f9e4a08343aa4698d4318cab7872cf0bfb750773 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 11:47:33 -0300 Subject: [PATCH 5/7] wip --- .../org/openas2/processor/msgtracking/DbTrackingModule.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index a027befb..d60e678b 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -19,7 +19,10 @@ import java.sql.ResultSetMetaData; import java.sql.Statement; import java.sql.Types; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DbTrackingModule extends BaseMsgTrackingModule { From f3bfc2f62b07f5993a3763f602a07d322597c00b Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 13:26:03 -0300 Subject: [PATCH 6/7] wip --- .../msgtracking/DbTrackingModule.java | 148 +++++++++--------- 1 file changed, 72 insertions(+), 76 deletions(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index d60e678b..fdf984f4 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -55,7 +55,6 @@ public class DbTrackingModule extends BaseMsgTrackingModule { private String dbPlatform = "h2"; private String tableName = null; IDBHandler dbHandler = null; - private static final Map msgIdLocksMap = new ConcurrentHashMap<>(); private Logger logger = LoggerFactory.getLogger(DbTrackingModule.class); @@ -97,93 +96,90 @@ protected CompositeParameters createParser() { return params; } - protected void persist(Message msg, Map map) { + protected synchronized void persist(Message msg, Map map) { String msgIdField = FIELDS.MSG_ID; String msgIdValue = map.get(msgIdField); - synchronized (msgIdLocksMap.computeIfAbsent(msgIdValue, k -> new Object())) { - try (Connection conn = dbHandler.getConnection()) { - Statement s = conn.createStatement(); - ResultSet rs = s.executeQuery( - "SELECT * FROM " + tableName + " WHERE " + msgIdField + " = '" + msgIdValue + "'"); - ResultSetMetaData meta = rs.getMetaData(); - boolean isUpdate = rs.next(); // Record already exists so update - if (logger.isTraceEnabled()) { - logger.trace( - "\t\t *** Tracking record found: " + isUpdate + "\n\t\t *** Tracking record metadata: " + meta); - } - StringBuffer fieldStmt = new StringBuffer(); - StringBuffer valuesStmt = new StringBuffer(); - for (int i = 0; i < meta.getColumnCount(); i++) { - String colName = meta.getColumnLabel(i + 1); - if (colName.equalsIgnoreCase("id")) { - continue; - } else if (colName.equalsIgnoreCase(FIELDS.UPDATE_DT)) { - // Ignore if not update mode - if (isUpdate) { - appendFieldForUpdate(colName, DateUtil.getSqlTimestamp(), fieldStmt, meta.getColumnType(i + 1)); - } - } else if (colName.equalsIgnoreCase(FIELDS.CREATE_DT)) { - if (isUpdate) { - map.remove(FIELDS.CREATE_DT); - } else { - appendFieldForInsert(colName, DateUtil.getSqlTimestamp(), fieldStmt, valuesStmt, - meta.getColumnType(i + 1)); - } - } else if (isUpdate) { - /* - * Only write unchanged field values. Map is field names in LOWER case so - * convert in case DB server returns column names in uppercase - */ - String mapVal = map.get(colName.toLowerCase()); - if (mapVal == null) { - continue; - } - String dbVal = rs.getString(colName); - if (dbVal != null && mapVal.equals(dbVal)) { - // Unchanged value so remove from map - continue; - } - appendFieldForUpdate(colName, mapVal, fieldStmt, meta.getColumnType(i + 1)); - } else { - // For new record add every field that is not NULL - String mapVal = map.get(colName.toLowerCase()); - if (mapVal == null) { - continue; - } - appendFieldForInsert(colName, mapVal, fieldStmt, valuesStmt, meta.getColumnType(i + 1)); + try (Connection conn = dbHandler.getConnection()) { + Statement s = conn.createStatement(); + ResultSet rs = s.executeQuery( + "SELECT * FROM " + tableName + " WHERE " + msgIdField + " = '" + msgIdValue + "'"); + ResultSetMetaData meta = rs.getMetaData(); + boolean isUpdate = rs.next(); // Record already exists so update + if (logger.isTraceEnabled()) { + logger.trace( + "\t\t *** Tracking record found: " + isUpdate + "\n\t\t *** Tracking record metadata: " + meta); + } + StringBuffer fieldStmt = new StringBuffer(); + StringBuffer valuesStmt = new StringBuffer(); + for (int i = 0; i < meta.getColumnCount(); i++) { + String colName = meta.getColumnLabel(i + 1); + if (colName.equalsIgnoreCase("id")) { + continue; + } else if (colName.equalsIgnoreCase(FIELDS.UPDATE_DT)) { + // Ignore if not update mode + if (isUpdate) { + appendFieldForUpdate(colName, DateUtil.getSqlTimestamp(), fieldStmt, meta.getColumnType(i + 1)); } - } - if (fieldStmt.length() > 0) { - String stmt = ""; + } else if (colName.equalsIgnoreCase(FIELDS.CREATE_DT)) { if (isUpdate) { - stmt = "UPDATE " + tableName + " SET " + fieldStmt.toString() + " WHERE " + FIELDS.MSG_ID + " = '" - + map.get(msgIdField) + "'"; + map.remove(FIELDS.CREATE_DT); } else { - stmt = "INSERT INTO " + tableName + " (" + fieldStmt.toString() + ") VALUES (" - + valuesStmt.toString() + ")"; + appendFieldForInsert(colName, DateUtil.getSqlTimestamp(), fieldStmt, valuesStmt, + meta.getColumnType(i + 1)); } - if (s.executeUpdate(stmt) > 0) { - if (logger.isTraceEnabled()) { - logger.trace("Tracking record SQL statement: " + stmt); - } - if (logger.isDebugEnabled()) { - logger.debug("Tracking record successfully persisted to database: " + map); - } - } else { - throw new OpenAS2Exception("Failed to persist tracking record to DB: " + map); + } else if (isUpdate) { + /* + * Only write unchanged field values. Map is field names in LOWER case so + * convert in case DB server returns column names in uppercase + */ + String mapVal = map.get(colName.toLowerCase()); + if (mapVal == null) { + continue; + } + String dbVal = rs.getString(colName); + if (dbVal != null && mapVal.equals(dbVal)) { + // Unchanged value so remove from map + continue; + } + appendFieldForUpdate(colName, mapVal, fieldStmt, meta.getColumnType(i + 1)); + } else { + // For new record add every field that is not NULL + String mapVal = map.get(colName.toLowerCase()); + if (mapVal == null) { + continue; } + appendFieldForInsert(colName, mapVal, fieldStmt, valuesStmt, meta.getColumnType(i + 1)); + } + } + if (fieldStmt.length() > 0) { + String stmt = ""; + if (isUpdate) { + stmt = "UPDATE " + tableName + " SET " + fieldStmt.toString() + " WHERE " + FIELDS.MSG_ID + " = '" + + map.get(msgIdField) + "'"; } else { - if (logger.isInfoEnabled()) { - logger.info("No change from existing record in DB. Tracking record not updated: " + map); + stmt = "INSERT INTO " + tableName + " (" + fieldStmt.toString() + ") VALUES (" + + valuesStmt.toString() + ")"; + } + if (s.executeUpdate(stmt) > 0) { + if (logger.isTraceEnabled()) { + logger.trace("Tracking record SQL statement: " + stmt); + } + if (logger.isDebugEnabled()) { + logger.debug("Tracking record successfully persisted to database: " + map); } + } else { + throw new OpenAS2Exception("Failed to persist tracking record to DB: " + map); + } + } else { + if (logger.isInfoEnabled()) { + logger.info("No change from existing record in DB. Tracking record not updated: " + map); } - } catch (Exception e) { - msg.setLogMsg("Failed to persist a tracking event: " + org.openas2.util.Logging.getExceptionMsg(e) - + " ::: Data map: " + map); - logger.error(msg.getLogMsg(), e); } + } catch (Exception e) { + msg.setLogMsg("Failed to persist a tracking event: " + org.openas2.util.Logging.getExceptionMsg(e) + + " ::: Data map: " + map); + logger.error(msg.getLogMsg(), e); } - msgIdLocksMap.remove(msgIdValue); } public ArrayList> listMessages() { From 2c4cb559d080fafa08690a3d491212f53451cb69 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Tue, 27 Jan 2026 13:26:31 -0300 Subject: [PATCH 7/7] wip --- .../java/org/openas2/processor/msgtracking/DbTrackingModule.java | 1 - 1 file changed, 1 deletion(-) diff --git a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java index fdf984f4..d134557d 100644 --- a/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java +++ b/Server/src/main/java/org/openas2/processor/msgtracking/DbTrackingModule.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public class DbTrackingModule extends BaseMsgTrackingModule { public static final String PARAM_TCP_SERVER_START = "tcp_server_start";