fix: synchronize CsiManager.activate() and flush full queue atomically#3316
fix: synchronize CsiManager.activate() and flush full queue atomically#3316dwd wants to merge 5 commits into
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughactivate() in CsiManager now sets ChangesCSI Queue Delivery Refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java (1)
121-126:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
process()still holds the same monitor duringactivate()delivery.Line 121 keeps
thislocked for the fullprocess()call, so even thoughactivate()releases its inner lock before delivery,queueOrPush()remains blocked whilesession.deliver(...)runs (Line 155). That defeats the “deliver outside lock” goal for the primary call path (process→activate).Suggested fix
-public synchronized void process(`@Nonnull` final Element nonza) +public void process(`@Nonnull` final Element nonza) { switch(nonza.getName()) { case "active": activate(); break; case "inactive": deactivate(); break; default: Log.warn("Unable to process element that was expected to be a CSI nonza for {}: {}", session, nonza); } } @@ -public void deactivate() +public synchronized void deactivate() { Log.trace("Session for '{}' to CSI 'inactive'", session.getAddress()); active = false; }Also applies to: 142-159
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java` around lines 121 - 126, process() currently holds the CsiManager instance monitor for the entire method which means calling activate() (and similar flows) keeps queueOrPush/session.deliver blocked; change process() to avoid holding synchronized(this) around the whole switch: read nonza.getName(), then for "active"/"inactive" call activate()/deactivate() outside any synchronized block; for branches that must touch shared state (the code around queueOrPush and the session.deliver path referenced in queueOrPush and lines 142-159) acquire the lock only to snapshot/modify minimal state, then release the lock before calling session.deliver; alternatively refactor queueOrPush to compute/collect the delivery payload under lock and invoke session.deliver after releasing the lock so no session.deliver occurs while holding this.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java`:
- Around line 121-126: process() currently holds the CsiManager instance monitor
for the entire method which means calling activate() (and similar flows) keeps
queueOrPush/session.deliver blocked; change process() to avoid holding
synchronized(this) around the whole switch: read nonza.getName(), then for
"active"/"inactive" call activate()/deactivate() outside any synchronized block;
for branches that must touch shared state (the code around queueOrPush and the
session.deliver path referenced in queueOrPush and lines 142-159) acquire the
lock only to snapshot/modify minimal state, then release the lock before calling
session.deliver; alternatively refactor queueOrPush to compute/collect the
delivery payload under lock and invoke session.deliver after releasing the lock
so no session.deliver occurs while holding this.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 80589b7e-17bc-4bb1-aea5-9a932e390f3d
📒 Files selected for processing (1)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java
|
Can we fix the synchronization issue without flushing the entire queue under lock? If Stream Management taught me anything, it's that long-lived locks cause trouble. |
There was a problem hiding this comment.
Pull request overview
This pull request updates CsiManager.activate() to avoid races with queueOrPush() by atomically draining the CSI delay queue under the manager lock, then delivering drained stanzas after releasing the lock to avoid holding the lock during I/O.
Changes:
- Atomically drains and clears the delayed-stanza queue during activation under synchronization.
- Sets CSI state to active before delivering drained stanzas.
- Delivers drained stanzas outside the lock to reduce lock contention during network I/O.
Comments suppressed due to low confidence (1)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java:159
- activate() now calls session.deliver() once per queued stanza. Each call re-enters queueOrPush(), which re-adds the stanza to the queue only to immediately flush it again, causing extra synchronization/logging overhead compared to flushing the whole drained queue in one pass. Consider delivering the drained list without re-queuing each packet (eg by refactoring LocalClientSession/CsiManager to support delivering a batch that bypasses CSI queueing while still applying stream management accounting).
for (final Packet packet : toDeliver) {
try {
session.deliver(packet);
} catch (UnauthorizedException e) {
Log.error("Unexpected exception while activating CSI.", e);
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java (1)
138-152:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
activate()still holds the lock during I/O and only delivers one queued stanzaOn Line 138-152, this implementation does not flush the full delayed queue:
queue.pollLast()removes only the tail, leaving earlier entries queued. Also, becauseactivate()issynchronized,session.deliver(...)runs while holdingthislock.✅ Suggested fix (atomic drain under lock, deliver outside lock)
-public synchronized void activate() +public void activate() { - Log.trace("Session for '{}' to CSI 'active'", session.getAddress()); - active = true; - - // Re-submit the tail of the queue through the normal queueOrPush path. Because active is now true, queueOrPush - // will flush the entire queue (including all preceding stanzas) in one atomic operation. - final Packet tail = queue.pollLast(); - if (tail != null) { - try { - session.deliver(tail); - } catch (UnauthorizedException e) { - Log.error("Unexpected exception while activating CSI.", e); - } - } + final List<Packet> toDeliver; + synchronized (this) { + Log.trace("Session for '{}' to CSI 'active'", session.getAddress()); + active = true; + toDeliver = new LinkedList<>(queue); + queue.clear(); + lastPush = Instant.now(); + } + + for (final Packet packet : toDeliver) { + try { + session.deliver(packet); + } catch (UnauthorizedException e) { + Log.error("Unexpected exception while activating CSI.", e); + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java` around lines 138 - 152, activate() currently is synchronized and only calls queue.pollLast() then session.deliver(...) while holding the lock, which both leaves earlier queued stanzas and performs I/O under the object's monitor; change activate() to set active = true under the synchronized block and then atomically drain the entire queue into a local List (e.g., loop polling queue until empty) while still synchronized, release the lock, and then iterate that local list calling session.deliver(...) for each stanza outside of the synchronized block to avoid holding the lock during I/O and ensure the whole queue is flushed; update references to activate(), queue.pollLast(), and session.deliver(...) accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java`:
- Around line 138-152: activate() currently is synchronized and only calls
queue.pollLast() then session.deliver(...) while holding the lock, which both
leaves earlier queued stanzas and performs I/O under the object's monitor;
change activate() to set active = true under the synchronized block and then
atomically drain the entire queue into a local List (e.g., loop polling queue
until empty) while still synchronized, release the lock, and then iterate that
local list calling session.deliver(...) for each stanza outside of the
synchronized block to avoid holding the lock during I/O and ensure the whole
queue is flushed; update references to activate(), queue.pollLast(), and
session.deliver(...) accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 2a3ee7c0-cc04-4bd7-a643-534e6f1e0aa5
📒 Files selected for processing (1)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java
activate() was not synchronized, causing a race with queueOrPush() on the shared queue field. It also used an indirect flush (pollLast + re- deliver) that could lose stanzas queued between pollLast and the re- entry into queueOrPush. Fix: drain the entire queue atomically under the lock, set active=true, then deliver all stanzas outside the lock so I/O does not block other threads from queuing new stanzas. Co-authored-by: Junie <junie@jetbrains.com>
The implementation was partially synchronized, but lacked synchronization on activate/deactivate. By marking both methods as synchronized, there no longer is a data race. In activate, the rescheduling of queued stanzas was previously performed outside of a lock, although its execution (in `queueOrPush`) was fully synchronized. Because of that, I don't expect this commit to introduce significant more (possibly problematic, remember Stream Management?) locking. This commit also addresses a minor off-by-one error when checking the queue capacity.
|
I have rebased this to the latest head, and added changes to hopefully address all issues raised. |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java (1)
331-343: 💤 Low valueTypo in method name: "Withother" should be "WithOther".
Minor naming issue for consistency with other test method names.
📝 Suggested fix
- public void testCanDelayPresenceWithotherTypeReturnsFalse() throws Exception + public void testCanDelayPresenceWithOtherTypeReturnsFalse() throws Exception🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java` around lines 331 - 343, Rename the test method in CsiManagerTest from testCanDelayPresenceWithotherTypeReturnsFalse to testCanDelayPresenceWithOtherTypeReturnsFalse for consistent naming; update the method declaration and any references (the `@Test-annotated` method name) so the signature remains public void testCanDelayPresenceWithOtherTypeReturnsFalse() throws Exception and ensure the body still calls CsiManager.canDelay(presenceSubscribed) and asserts assertFalse(result, ...).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java`:
- Around line 594-608: The test method name
testCanDelayProceedingToGroupChatReturnsTrue contradicts its assertion; update
the test in CsiManagerTest so the name matches behavior (either rename
testCanDelayProceedingToGroupChatReturnsFalse or similar) and keep the logic
that calls CsiManager.canDelay(groupchatMessage) and asserts false; ensure the
method name change is applied to the test method declaration and any references
so the test intent is clear.
---
Nitpick comments:
In `@xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java`:
- Around line 331-343: Rename the test method in CsiManagerTest from
testCanDelayPresenceWithotherTypeReturnsFalse to
testCanDelayPresenceWithOtherTypeReturnsFalse for consistent naming; update the
method declaration and any references (the `@Test-annotated` method name) so the
signature remains public void testCanDelayPresenceWithOtherTypeReturnsFalse()
throws Exception and ensure the body still calls
CsiManager.canDelay(presenceSubscribed) and asserts assertFalse(result, ...).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 485d967a-3505-4f27-870e-e74da02dea7e
📒 Files selected for processing (2)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.javaxmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java
🚧 Files skipped from review as they are similar to previous changes (1)
- xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java (1)
331-343: 💤 Low valueTypo in method name: "Withother" → "WithOther".
Minor naming inconsistency for readability.
📝 Suggested fix
- public void testCanDelayPresenceWithotherTypeReturnsFalse() throws Exception + public void testCanDelayPresenceWithOtherTypeReturnsFalse() throws Exception🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java` around lines 331 - 343, The test method name has a typo: rename CsiManagerTest.testCanDelayPresenceWithotherTypeReturnsFalse to testCanDelayPresenceWithOtherTypeReturnsFalse to fix the casing and readability; update the method declaration in class CsiManagerTest (keep the `@Test` annotation and body unchanged) and ensure any references (e.g., IDE-run configurations or other tests) are updated to the new name so compilation and test discovery continue to work.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java`:
- Around line 594-608: The test comment is incorrect: in
CsiManagerTest.testCanDelayProceedingToGroupChatReturnsFalse the fixture
includes a <body> and the assertion checks CsiManager.canDelay(...) returns
false, so update the comment to state that a normal groupchat message with a
body (and without subject/invites) is NOT delayable (e.g., "Normal groupchat
message with body and without subject/invites should not be delayable") so the
comment matches the test intent and the assertion message.
---
Nitpick comments:
In `@xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java`:
- Around line 331-343: The test method name has a typo: rename
CsiManagerTest.testCanDelayPresenceWithotherTypeReturnsFalse to
testCanDelayPresenceWithOtherTypeReturnsFalse to fix the casing and readability;
update the method declaration in class CsiManagerTest (keep the `@Test` annotation
and body unchanged) and ensure any references (e.g., IDE-run configurations or
other tests) are updated to the new name so compilation and test discovery
continue to work.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 11373f37-1c58-44ef-9b47-34fed3f92bda
📒 Files selected for processing (1)
xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java
|
@coderabbitai please review |
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java (1)
998-1005:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't mark stanzas as sent when no connection exists.
In the stream-management branch,
streamManager.sentStanza(stanzaToPush)runs even whenconn == null. That records the stanza as sent/ackable although nothing went over the wire, which can break SM replay/ack state after detach/resume. Keep the bookkeeping inside the sameconn != nullguard as the actual delivery.Suggested fix
for (final Packet stanzaToPush : stanzasToPush) { if (conn != null) { conn.deliver(stanzaToPush); + streamManager.sentStanza(stanzaToPush); } - streamManager.sentStanza(stanzaToPush); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java` around lines 998 - 1005, The stream-management bookkeeping currently calls streamManager.sentStanza(stanzaToPush) even when conn is null; update LocalClientSession so that inside the synchronized(streamManager) block the call to streamManager.sentStanza(...) is moved into the same conn != null guard that calls conn.deliver(...), ensuring sentStanza is only invoked when deliver actually occurred; preserve the synchronization and ordering (call conn.deliver(...) then streamManager.sentStanza(...)) for each Packet in the stanzasToPush loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.java`:
- Around line 163-182: The loop in CsiManager (around drainQueue() and
session.pushPackets) can push stanzas after deactivate() wins the race; after
exiting the synchronized(this) block but before calling
session.pushPackets(stanzasToPush) re-check the CSI state (the active flag)
under the same lock and abort (set flushingOnActivate=false and return) or
re-queue the drained stanzas if the session is no longer active; specifically
update the code paths around drainQueue(), flushingOnActivate, active and
session.pushPackets(...) so you perform a synchronized check of active
immediately before calling session.pushPackets and handle the inactive case
(stop flushing or re-enqueue) to prevent delivering batches during deactivation.
- Around line 236-242: The current mustPush boolean disables all safety triggers
while flushingOnActivate is true, allowing unbounded queue growth; update the
mustPush logic (the declaration of mustPush in CsiManager) so that
flushingOnActivate only prevents regular immediate sends but does not ignore the
queue capacity or max-duration conditions—i.e., preserve checks for queue.size()
>= DELAY_QUEUE_CAPACITY.getValue() and
Instant.now().isAfter(lastPush.plus(DELAY_MAX_DURATION.getValue())) even when
flushingOnActivate is true, keeping the other conditions (DELAY_ENABLED, active,
canDelay(packet)) as before; ensure references to flushingOnActivate,
DELAY_QUEUE_CAPACITY, DELAY_MAX_DURATION, lastPush, canDelay(...), active,
DELAY_ENABLED, and queue are used to implement this adjusted boolean.
---
Outside diff comments:
In
`@xmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.java`:
- Around line 998-1005: The stream-management bookkeeping currently calls
streamManager.sentStanza(stanzaToPush) even when conn is null; update
LocalClientSession so that inside the synchronized(streamManager) block the call
to streamManager.sentStanza(...) is moved into the same conn != null guard that
calls conn.deliver(...), ensuring sentStanza is only invoked when deliver
actually occurred; preserve the synchronization and ordering (call
conn.deliver(...) then streamManager.sentStanza(...)) for each Packet in the
stanzasToPush loop.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 02240cf6-a561-4cd9-b383-e7b26ce743f7
📒 Files selected for processing (3)
xmppserver/src/main/java/org/jivesoftware/openfire/csi/CsiManager.javaxmppserver/src/main/java/org/jivesoftware/openfire/session/LocalClientSession.javaxmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java
🚧 Files skipped from review as they are similar to previous changes (1)
- xmppserver/src/test/java/org/jivesoftware/openfire/csi/CsiManagerTest.java
…n flush Refactor CSI activation flushing to avoid holding the CsiManager monitor during delivery I/O and preserve stanza ordering across activation.
activate() was not synchronized, causing a race with queueOrPush() on the shared queue field. It also used an indirect flush (pollLast + re- deliver) that could lose stanzas queued between pollLast and the re- entry into queueOrPush.
Fix: drain the entire queue atomically under the lock, set active=true, then deliver all stanzas outside the lock so I/O does not block other threads from queuing new stanzas.
Summary by CodeRabbit
New Features
Bug Fixes
Tests