Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f388bf9
try
Vladsz83 May 29, 2026
15ac011
Merge branch 'master' into Replace-SecurityAwareMessage-with-OpCtxMsg
Vladsz83 Jun 17, 2026
ab108d4
Merge branch 'master' into Replace-SecurityAwareMessage-with-OpCtxMsg
Vladsz83 Jun 26, 2026
86dfef8
raw
Vladsz83 Jun 26, 2026
5defc7a
raw
Vladsz83 Jun 26, 2026
6d3bf1c
split from Disco changes
Vladsz83 Jun 29, 2026
a5534fa
Merge branch 'master' into Replace-SecurityAwareMessage-with-OpCtxMsg
Vladsz83 Jun 29, 2026
9529036
Minor javadoc, test fix
Vladsz83 Jun 30, 2026
cf98fb8
even better test
Vladsz83 Jun 30, 2026
a81e18b
Merge branch 'master' into Replace-SecurityAwareMessage-with-OpCtxMsg
Vladsz83 Jun 30, 2026
e689503
test fix
Vladsz83 Jun 30, 2026
02f1609
fix
Vladsz83 Jul 1, 2026
54231db
review fix
Vladsz83 Jul 2, 2026
eb3ee17
minority
Vladsz83 Jul 2, 2026
9dbc300
minority
Vladsz83 Jul 2, 2026
5ca3b6b
minority
Vladsz83 Jul 2, 2026
e494920
review fixes
Vladsz83 Jul 2, 2026
dd57b81
fix
Vladsz83 Jul 2, 2026
1d94212
renaming, fix
Vladsz83 Jul 2, 2026
d9fd00b
renaming, fix
Vladsz83 Jul 2, 2026
136f1f7
renaming, fix
Vladsz83 Jul 2, 2026
97d977b
Merge remote-tracking branch 'my/Replace-SecurityAwareMessage-with-Op…
Vladsz83 Jul 2, 2026
a499dd8
renaming
Vladsz83 Jul 2, 2026
075b120
minority
Vladsz83 Jul 2, 2026
278ddae
minority
Vladsz83 Jul 2, 2026
eb3094d
minor renaming
Vladsz83 Jul 2, 2026
a380d27
review fixes
Vladsz83 Jul 2, 2026
81af6dc
review fixes
Vladsz83 Jul 2, 2026
5a85a7c
Merge branch 'master' into Replace-SecurityAwareMessage-with-OpCtxMsg_v2
Vladsz83 Jul 3, 2026
8299333
review fix
Vladsz83 Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridIoSecurityAwareMessage;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage;
import org.apache.ignite.internal.managers.communication.SessionChannelMessage;
Expand Down Expand Up @@ -240,6 +239,7 @@
import org.apache.ignite.internal.processors.rollingupgrade.RollingUpgradeNodeData;
import org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureSet;
import org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteProductFeatures;
import org.apache.ignite.internal.processors.security.SecurityContextWrapper;
import org.apache.ignite.internal.processors.service.ServiceChangeBatchRequest;
import org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResult;
import org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResultBatch;
Expand Down Expand Up @@ -608,12 +608,13 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
// [11500 - 11600]: IO, networking messages.
msgIdx = NODE_ID_MSG_TYPE;
withNoSchema(NodeIdMessage.class);
msgIdx = HANDSHAKE_MSG_TYPE;
withNoSchema(HandshakeMessage.class);
msgIdx = HANDSHAKE_WAIT_MSG_TYPE;
withNoSchema(HandshakeWaitMessage.class);
withNoSchema(GridIoMessage.class);
withNoSchema(IgniteIoTestMessage.class);
withSchema(GridIoUserMessage.class);
withSchema(GridIoSecurityAwareMessage.class);
withNoSchema(RecoveryLastReceivedMessage.class);
withNoSchema(TcpInverseConnectionResponseMessage.class);
withNoSchema(SessionChannelMessage.class);
Expand Down Expand Up @@ -690,9 +691,10 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
// [13400 - 13500]: Operation context messages.
msgIdx = 13400;
withNoSchema(OperationContextMessage.class);
withNoSchema(SecurityContextWrapper.class);

// [13500 - 13600]: Rolling Upgrade messages.
msgIdx = 13500;
// [13600 - 13700]: Rolling Upgrade messages.
msgIdx = 13600;
withNoSchema(IgniteFeatureSet.class);
withNoSchema(IgniteProductFeatures.class);
withNoSchema(RollingUpgradeNodeData.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor;
import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage;
import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
Expand Down Expand Up @@ -1317,7 +1316,7 @@ private void processP2PMessage(

assert obj != null;

invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg));
invokeListener(msg.policy(), lsnr, nodeId, obj);
}
finally {
threadProcessingMessage(false, null);
Expand Down Expand Up @@ -1455,7 +1454,7 @@ private void processRegularMessage0(GridIoMessage msg, UUID nodeId) {

assert obj != null;

invokeListener(msg.policy(), lsnr, nodeId, obj, secSubjId(msg));
invokeListener(msg.policy(), lsnr, nodeId, obj);
}

/**
Expand Down Expand Up @@ -1819,9 +1818,8 @@ private void unwindMessageSet(GridCommunicationMessageSet msgSet, GridMessageLis
* @param lsnr Listener.
* @param nodeId Node ID.
* @param msg Message.
* @param secSubjId Security subject that will be used to open a security session.
*/
private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg, UUID secSubjId) {
private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) {
MTC.span().addLog(() -> "Invoke listener");

Byte oldPlc = CUR_PLC.get();
Expand All @@ -1831,9 +1829,7 @@ private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Obj
if (change)
CUR_PLC.set(plc);

UUID newSecSubjId = secSubjId != null ? secSubjId : nodeId;

try (Scope ignored = ctx.security().withContext(newSecSubjId)) {
try (Scope ignored = withRemoteSecurityContext(nodeId)) {
lsnr.onMessage(nodeId, msg, plc);
}
finally {
Expand All @@ -1842,6 +1838,19 @@ private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Obj
}
}

/** */
private Scope withRemoteSecurityContext(UUID nodeId) {
// No remote Security Context has been attached to the message processing thread so far.
// This means that the message was sent as part of an operation initiated by the sender node.
if (ctx.security().isDefaultContext())
return ctx.security().withContext(nodeId);

// Verify that the Security Context currently attached to the thread is valid.
ctx.security().securityContext();
Comment thread
Vladsz83 marked this conversation as resolved.

return Scope.NOOP_SCOPE;
}

/**
* @return Current IO policy
*/
Expand Down Expand Up @@ -2029,11 +2038,8 @@ private long getInverseConnectionWaitTimeout() {
return ctx.config().getFailureDetectionTimeout();
}

/**
* @return One of two message wrappers. The first is {@link GridIoMessage}, the second is secured version {@link
* GridIoSecurityAwareMessage}.
*/
private @NotNull GridIoMessage createGridIoMessage(
/** @return A {@link GridIoMessage} wrapper for {@code msg}. */
public GridIoMessage createGridIoMessage(
Object topic,
Message msg,
byte plc,
Expand All @@ -2043,16 +2049,7 @@ private long getInverseConnectionWaitTimeout() {
) {
GridIoMessage res;

if (ctx.security().enabled()) {
UUID secSubjId = null;

if (!ctx.security().isDefaultContext())
secSubjId = ctx.security().securityContext().subject().id();

res = new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, ordered, timeout, skipOnTimeout);
}
else
res = new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout);
res = new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout);

res.opCtxMsg = ctx.operationContextDispatcher().collectDistributedAttributes();

Expand Down Expand Up @@ -3812,7 +3809,7 @@ void unwind(GridMessageListener lsnr) {

MTC.span().addTag(SpanTags.MESSAGE, () -> traceName(fmc.message));

invokeListener(plc, lsnr, nodeId, mc.message.message(), secSubjId(mc.message));
invokeListener(plc, lsnr, nodeId, mc.message.message());
}
finally {
if (mc.closure != null)
Expand Down Expand Up @@ -4241,19 +4238,6 @@ public long binLatencyMcs() {
}
}

/**
* @return Security subject id.
*/
private UUID secSubjId(GridIoMessage msg) {
if (ctx.security().enabled()) {
assert msg instanceof GridIoSecurityAwareMessage;

return ((GridIoSecurityAwareMessage)msg).securitySubjectId();
}

return null;
}

/**
* Responsible for handling network situation where server cannot open connection to client and
* has to ask client to establish a connection to specific server.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
};

/** Discovery cached history size. */
private final int DISCOVERY_HISTORY_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, DFLT_DISCOVERY_HISTORY_SIZE);
private final int discoHistSz = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, DFLT_DISCOVERY_HISTORY_SIZE);

/** */
private final Object discoEvtMux = new Object();
Expand Down Expand Up @@ -254,7 +254,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {

/** Topology cache history. */
private final GridBoundedConcurrentLinkedHashMap<AffinityTopologyVersion, DiscoCache> discoCacheHist =
new GridBoundedConcurrentLinkedHashMap<>(DISCOVERY_HISTORY_SIZE);
new GridBoundedConcurrentLinkedHashMap<>(discoHistSz);

/** Topology snapshots history. */
private volatile NavigableMap<Long, Collection<ClusterNode>> topHist = Collections.emptyNavigableMap();
Expand Down Expand Up @@ -1107,7 +1107,7 @@ private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg

rcvdCustomMsgs.addLast(customMsg.id());

while (rcvdCustomMsgs.size() > DISCOVERY_HISTORY_SIZE)
while (rcvdCustomMsgs.size() > discoHistSz)
rcvdCustomMsgs.pollFirst();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ private RefreshUsersStorageWorker(ArrayList<User> usrs) {
}

/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
@Override protected void body() {
if (ctx.clientNode())
return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.internal.processors.security.sandbox.NoOpSandbox;
import org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.OperationContextAttribute;
import org.apache.ignite.internal.thread.context.OperationContextDispatcher;
import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -55,6 +56,7 @@
import static org.apache.ignite.internal.processors.security.SecurityUtils.MSG_SEC_PROC_CLS_IS_INVALID;
import static org.apache.ignite.internal.processors.security.SecurityUtils.hasSecurityManager;
import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext;
import static org.apache.ignite.internal.thread.context.DistributedOperationContextAttribute.SECURITY;
import static org.apache.ignite.plugin.security.SecurityPermission.ADMIN_USER_ACCESS;
import static org.apache.ignite.plugin.security.SecurityPermission.JOIN_AS_SERVER;

Expand Down Expand Up @@ -88,8 +90,12 @@ static boolean hasSandboxedNodes() {
return SANDBOXED_NODES_COUNTER.get() > 0;
}

/** Context attribute that holds Security Context. */
private static final OperationContextAttribute<SecurityContext> SEC_CTX = OperationContextAttribute.newInstance();
/**
* Attribute that holds local and distributed Security Context.
*
* @see OperationContextDispatcher
*/
private static final OperationContextAttribute<SecurityContextWrapper> SEC_CTX_ATTR = OperationContextAttribute.newInstance();

/** Security processor. */
private final GridSecurityProcessor secPrc;
Expand Down Expand Up @@ -126,28 +132,12 @@ public IgniteSecurityProcessor(GridKernalContext ctx, GridSecurityProcessor secP

/** {@inheritDoc} */
@Override public Scope withContext(SecurityContext secCtx) {
return OperationContext.set(SEC_CTX, secCtx == dfltSecCtx ? null : secCtx);
return OperationContext.set(SEC_CTX_ATTR, secCtx == dfltSecCtx ? null : new SecurityContextWrapper(secCtx));
}

/** {@inheritDoc} */
@Override public Scope withContext(UUID subjId) {
try {
SecurityContext res = secPrc.securityContext(subjId);

if (res == null) {
res = findNodeSecurityContext(subjId);

if (res == null)
throw new IllegalStateException("Failed to find security context for subject with given ID : " + subjId);
}

return withContext(res);
}
catch (Throwable e) {
log.error(FAILED_OBTAIN_SEC_CTX_MSG, e);

throw e;
}
return withContext(securityContext(subjId));
}

/**
Expand All @@ -172,14 +162,41 @@ public IgniteSecurityProcessor(GridKernalContext ctx, GridSecurityProcessor secP

/** {@inheritDoc} */
@Override public boolean isDefaultContext() {
return OperationContext.get(SEC_CTX) == null;
return OperationContext.get(SEC_CTX_ATTR) == null;
}

/** {@inheritDoc} */
@Override public SecurityContext securityContext() {
SecurityContext res = OperationContext.get(SEC_CTX);
SecurityContextWrapper secCtx = OperationContext.get(SEC_CTX_ATTR);

if (secCtx == null)
return dfltSecCtx;

if (secCtx.delegate() == null)
secCtx.delegate(securityContext(secCtx.subjId));

return res == null ? dfltSecCtx : res;
return secCtx.delegate();
}

/** */
private SecurityContext securityContext(UUID subjId) {
try {
SecurityContext res = secPrc.securityContext(subjId);

if (res == null) {
res = findNodeSecurityContext(subjId);

if (res == null)
throw new IllegalStateException("Failed to find security context for subject with given ID : " + subjId);
}

return res;
}
catch (Throwable e) {
log.error(FAILED_OBTAIN_SEC_CTX_MSG, e);

throw e;
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -236,6 +253,8 @@ public IgniteSecurityProcessor(GridKernalContext ctx, GridSecurityProcessor secP
@Override public void start() throws IgniteCheckedException {
super.start();

ctx.operationContextDispatcher().registerDistributedAttribute(SECURITY.id(), SEC_CTX_ATTR);

ctx.addNodeAttribute(ATTR_GRID_SEC_PROC_CLASS, secPrc.getClass().getName());

secPrc.start();
Expand Down
Loading
Loading