Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ public class CsiManager
*/
private final Deque<Packet> queue = new LinkedList<>();

/**
* Indicates that activation-triggered queue flushing is in progress.
*
* While true, this field serves two purposes:
* 1) queue gating: {@link #queueOrPush(Packet)} keeps new stanzas queued (not pushed) to preserve ordering;
* 2) flusher ownership: concurrent {@link #activate()} calls do not start a second flush loop.
*/
private boolean flushingOnActivate = false;

public CsiManager(@Nonnull final LocalClientSession session)
{
this.session = session;
Expand All @@ -118,7 +127,7 @@ public CsiManager(@Nonnull final LocalClientSession session)
*
* @param nonza The CSI nonza to be processed.
*/
public synchronized void process(@Nonnull final Element nonza)
public void process(@Nonnull final Element nonza)
{
switch(nonza.getName()) {
case "active":
Expand All @@ -137,23 +146,57 @@ public synchronized void process(@Nonnull final Element nonza)
*/
public void activate()
{
Log.trace("Session for '{}' to CSI 'active'", session.getAddress());
active = true;

// If there are delayed stanzas, cause them to be delivered by rescheduling the last one.
if (!queue.isEmpty()) {
try {
session.deliver(queue.pollLast());
} catch (UnauthorizedException e) {
Log.error("Unexpected exception while activating CSI.", e);
synchronized (this)
{
Log.trace("Session for '{}' to CSI 'active'", session.getAddress());
active = true;

// If another thread is already flushing the queue as part of activation, avoid starting a second flusher.
if (flushingOnActivate) {
return;
}

flushingOnActivate = true;
}

try {
while (true) {
final List<Packet> stanzasToPush;
synchronized (this) {
// Stop flushing as soon as the session became inactive again.
if (!active) {
flushingOnActivate = false;
return;
}

stanzasToPush = drainQueue();
if (stanzasToPush.isEmpty()) {
flushingOnActivate = false;
lastPush = Instant.now();
return;
}
}

// I/O is intentionally performed outside the CSI lock.
session.pushPackets(stanzasToPush);
Comment thread
guusdk marked this conversation as resolved.
Comment thread
guusdk marked this conversation as resolved.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
synchronized (this) {
lastPush = Instant.now();
}
}
} catch (UnauthorizedException e) {
Log.error("Unexpected exception while activating CSI.", e);
} finally {
synchronized (this) {
flushingOnActivate = false;
}
}
}

/**
* Switch to the client state of 'inactive'.
*/
public void deactivate()
public synchronized void deactivate()
{
Log.trace("Session for '{}' to CSI 'inactive'", session.getAddress());
active = false;
Expand Down Expand Up @@ -190,12 +233,13 @@ public synchronized List<Packet> queueOrPush(@Nonnull final Packet packet)
{
queue.add(packet);

final boolean mustPush =
!DELAY_ENABLED.getValue() // The feature is disabled by configuration. Always send stanzas immediately.
|| active // The client is active! Do not delay.
|| queue.size() > DELAY_QUEUE_CAPACITY.getValue() // The delay queue has reached its capacity. Flush the entire thing.
|| Instant.now().isAfter(lastPush.plus(DELAY_MAX_DURATION.getValue())) // Ensure that periodically, delayed data is sent anyway.
|| !canDelay(packet);
final boolean mustPush = !flushingOnActivate // Never flush while activation is in progress, as this can cause out-of-order delivery.
&& ( !DELAY_ENABLED.getValue() // The feature is disabled by configuration. Always send stanzas immediately.
|| active // The client is active! Do not delay.
|| queue.size() >= DELAY_QUEUE_CAPACITY.getValue() // The delay queue has reached its capacity. Flush the entire thing.
|| Instant.now().isAfter(lastPush.plus(DELAY_MAX_DURATION.getValue())) // Ensure that periodically, delayed data is sent anyway.
|| !canDelay(packet)
Comment thread
guusdk marked this conversation as resolved.
);
Comment thread
guusdk marked this conversation as resolved.

final List<Packet> result = new LinkedList<>();
if (mustPush) {
Expand All @@ -209,6 +253,18 @@ public synchronized List<Packet> queueOrPush(@Nonnull final Packet packet)
return result;
}

/**
* Returns all queued stanzas and clears the queue.
*
* @return The queued stanzas
*/
private List<Packet> drainQueue()
{
final List<Packet> result = new LinkedList<>(queue);
queue.clear();
return result;
}

/**
* Inspects a stanza and evaluates if it is eligible for delayed delivery to inactive clients.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,11 +962,21 @@ static void returnPrivacyListErrorToSender(Packet packet) {
}

@Override
public void deliver(Packet queueOrPushStanza) throws UnauthorizedException {
// Queue this stanza, possibly returning it immediately in line with any previously queued stanzas if this
// stanza needs to be pushed to the client immediately.
final List<Packet> stanzasToPush = csiManager.queueOrPush(queueOrPushStanza);
public void deliver(Packet queueOrPushStanza) throws UnauthorizedException
{
pushPackets(csiManager.queueOrPush(queueOrPushStanza));
}

/**
* Delivers stanzas to the client, without evaluating CSI.
*
* This method should generally not be used, as it is designed to be used by the CSI implementation specifically.
* Prefer using {@link #deliver(Packet)} instead.
*
* @param stanzasToPush The stanzas to deliver
*/
public void pushPackets(@Nonnull final List<Packet> stanzasToPush) throws UnauthorizedException
{
if (stanzasToPush.isEmpty()) {
return;
}
Expand Down
Loading
Loading