Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
31da616
WIP
anton-vinogradov Nov 18, 2025
9492376
WIP
anton-vinogradov Nov 18, 2025
a52a71c
WIP
anton-vinogradov Nov 18, 2025
bf4fc47
WIP
anton-vinogradov Nov 18, 2025
b2b9092
WIP
anton-vinogradov Nov 18, 2025
e33f606
WIP
anton-vinogradov Nov 19, 2025
d367510
WIP
anton-vinogradov Nov 19, 2025
dbc5475
WIP
anton-vinogradov Nov 19, 2025
509cf23
WIP
anton-vinogradov Nov 19, 2025
8a3cd3d
WIP
anton-vinogradov Nov 19, 2025
abc7b3e
WIP
anton-vinogradov Nov 19, 2025
eb4e7be
WIP
anton-vinogradov Nov 19, 2025
b0519c3
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Nov 20, 2025
04710f3
WIP
anton-vinogradov Nov 20, 2025
5c2817e
WIP
anton-vinogradov Nov 20, 2025
e16f6e8
WIP
anton-vinogradov Nov 20, 2025
85f67bf
WIP
anton-vinogradov Nov 20, 2025
2c249a0
WIP
anton-vinogradov Nov 20, 2025
1c72889
WIP
anton-vinogradov Nov 21, 2025
e0441ad
WIP
anton-vinogradov Nov 21, 2025
0189213
WIP
anton-vinogradov Nov 24, 2025
2a1810e
WIP
anton-vinogradov Nov 24, 2025
c5a6047
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Nov 24, 2025
8f6f028
WIP
anton-vinogradov Nov 25, 2025
211a52b
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Nov 25, 2025
8fb697a
WIP
anton-vinogradov Nov 25, 2025
790ecd5
WIP
anton-vinogradov Nov 25, 2025
d835543
WIP
anton-vinogradov Nov 26, 2025
c977447
WIP
anton-vinogradov Nov 26, 2025
772c3d3
WIP
anton-vinogradov Nov 26, 2025
8cd74c6
WIP
anton-vinogradov Nov 26, 2025
ac2a762
WIP
anton-vinogradov Nov 26, 2025
4a82bfb
WIP
anton-vinogradov Nov 26, 2025
5bc605d
WIP
anton-vinogradov Nov 26, 2025
ddf6701
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Nov 26, 2025
8e9c706
WIP
anton-vinogradov Nov 27, 2025
fc4051f
WIP
anton-vinogradov Nov 27, 2025
c40455f
WIP
anton-vinogradov Nov 27, 2025
fdd04eb
WIP
anton-vinogradov Nov 27, 2025
878ed9d
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Nov 27, 2025
089ab13
WIP
anton-vinogradov Nov 27, 2025
e83a227
WIP
anton-vinogradov Nov 27, 2025
75c73c1
WIP
anton-vinogradov Nov 27, 2025
908fa01
WIP
anton-vinogradov Nov 27, 2025
d2d4202
WIP
anton-vinogradov Nov 27, 2025
4297d15
WIP
anton-vinogradov Nov 28, 2025
3771a7c
WIP
anton-vinogradov Nov 28, 2025
271d2fc
WIP
anton-vinogradov Nov 28, 2025
56913ef
WIP
anton-vinogradov Nov 28, 2025
3be87f6
WIP
anton-vinogradov Nov 28, 2025
42ff4c7
WIP
anton-vinogradov Dec 1, 2025
b81fabe
WIP
anton-vinogradov Dec 1, 2025
3a88f21
WIP
anton-vinogradov Dec 1, 2025
5ba7753
WIP
anton-vinogradov Dec 1, 2025
02341b3
WIP
anton-vinogradov Dec 1, 2025
b9872aa
WIP
anton-vinogradov Dec 1, 2025
270dc39
WIP
anton-vinogradov Dec 1, 2025
5129948
WIP
anton-vinogradov Dec 1, 2025
f2defda
WIP
anton-vinogradov Dec 1, 2025
7db5da5
WIP
anton-vinogradov Dec 1, 2025
94143ad
WIP
anton-vinogradov Dec 1, 2025
4889784
WIP
anton-vinogradov Dec 1, 2025
e949d59
WIP
anton-vinogradov Dec 1, 2025
cbcab6b
WIP
anton-vinogradov Dec 1, 2025
8b29eb8
WIP
anton-vinogradov Dec 2, 2025
c0903a5
WIP
anton-vinogradov Dec 2, 2025
f6e5c85
WIP
anton-vinogradov Dec 2, 2025
f14abe9
WIP
anton-vinogradov Dec 2, 2025
dc91994
WIP
anton-vinogradov Dec 2, 2025
b8cda3c
WIP
anton-vinogradov Dec 2, 2025
f1a9aa5
WIP
anton-vinogradov Dec 2, 2025
c3d05ae
WIP
anton-vinogradov Dec 2, 2025
8b41e49
WIP
anton-vinogradov Dec 2, 2025
3adc19a
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Dec 10, 2025
174df6f
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Dec 10, 2025
126d0be
WIP
anton-vinogradov Dec 12, 2025
13704c5
Merge remote-tracking branch 'origin/master' into ignite-26584
anton-vinogradov Dec 12, 2025
ebed582
WIP
anton-vinogradov Dec 12, 2025
776f906
WIP
anton-vinogradov Dec 16, 2025
af4bf93
WIP
anton-vinogradov Dec 16, 2025
3860481
WIP
anton-vinogradov Dec 16, 2025
67f6311
WIP
anton-vinogradov Dec 16, 2025
ac91b59
WIP
anton-vinogradov Dec 23, 2025
18ce7ae
WIP
anton-vinogradov Dec 24, 2025
3d7c702
WIP
anton-vinogradov Dec 24, 2025
c7e1ce3
WIP
anton-vinogradov Dec 24, 2025
429a37b
WIP
anton-vinogradov Dec 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
Expand Down Expand Up @@ -221,12 +222,14 @@ public boolean disableRebalancingCancellationOptimization() {
assert part != null;
assert part.id() == p;

GridDhtPartitionState state = part.state();

// Do not rebalance OWNING or LOST partitions.
if (part.state() == OWNING || part.state() == LOST)
if (state == OWNING || state == LOST)
continue;

// State should be switched to MOVING during PME.
if (part.state() != MOVING) {
if (state != MOVING) {
throw new AssertionError("Partition has invalid state for rebalance "
+ aff.topologyVersion() + " " + part);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable Tcp

Collection<ClusterNode> top = topHist.get(topVer);

assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']';
assert top != null : "Failed to find topology history [top=" + topVer + ", msg=" + msg + ", hist=" + topHist + ']';

return top;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1837,6 +1837,7 @@ private void prepareNodeAddedMessage(
TcpDiscoveryAbstractMessage msg,
UUID destNodeId,
@Nullable Collection<PendingMessage> msgs,
@Nullable IgniteUuid discardMsgId,
@Nullable IgniteUuid discardCustomMsgId
) {
assert destNodeId != null;
Expand Down Expand Up @@ -1874,10 +1875,7 @@ private void prepareNodeAddedMessage(
}
}

// No need to send discardMsgId because we already filtered out
// cleaned up messages.
// TODO IGNITE-11271
nodeAddedMsg.messages(msgs0, null, discardCustomMsgId);
nodeAddedMsg.messages(msgs0, discardMsgId, discardCustomMsgId);

Map<Long, Collection<ClusterNode>> hist;

Expand Down Expand Up @@ -2659,7 +2657,7 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI

TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg);

prepareNodeAddedMessage(msg0, destNodeId, null, null);
prepareNodeAddedMessage(msg0, destNodeId, null, null, null);

msg0.topology(addedMsg.clientTopology());

Expand Down Expand Up @@ -3401,7 +3399,7 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
msg0 = U.unmarshal(spi.marshaller(), msgBytes,
U.resolveClassLoader(spi.ignite().configuration()));

prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null);
prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null, null);

msgBytes0 = null;
}
Expand Down Expand Up @@ -3751,7 +3749,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof
long tsNanos = System.nanoTime();

prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
pendingMsgs.customDiscardId);
pendingMsgs.discardId, pendingMsgs.customDiscardId);

addFailedNodes(pendingMsg, failedNodes);

Expand Down Expand Up @@ -3793,7 +3791,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof

if (!(msg instanceof TcpDiscoveryConnectionCheckMessage))
prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs,
pendingMsgs.customDiscardId);
pendingMsgs.discardId, pendingMsgs.customDiscardId);

try {
SecurityUtils.serializeVersion(1);
Expand Down Expand Up @@ -4007,7 +4005,7 @@ private void processPendingMessagesLocally(TcpDiscoveryAbstractMessage curMsg) {

for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs,
pendingMsgs.customDiscardId);
pendingMsgs.discardId, pendingMsgs.customDiscardId);

pendingMsg.senderNodeId(locNodeId);

Expand Down Expand Up @@ -4214,7 +4212,7 @@ private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg)
if (isLocalNodeCoordinator()) {
TcpDiscoveryNode existingNode = ring.node(node.id());

if (existingNode != null) {
if (existingNode != null && existingNode.visible()) {
if (!node.socketAddresses().equals(existingNode.socketAddresses())) {
if (!pingNode(existingNode)) {
U.warn(log, "Sending node failed message for existing node: " + node);
Expand Down Expand Up @@ -5213,12 +5211,6 @@ else if (spiState == CONNECTING)

pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
msg.discardedCustomMessageId());

// Clear data to minimize message size.
msg.messages(null, null, null);
msg.topology(null);
msg.topologyHistory(null);
msg.clearDiscoveryData();
}
else {
if (log.isDebugEnabled())
Expand Down Expand Up @@ -5285,7 +5277,7 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms
UUID locNodeId = getLocalNodeId();

if (locNodeCoord) {
if (msg.verified()) {
if (msg.verified() && msg.verifierNodeId().equals(locNodeId)) {
addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));

return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.tcp.internal;

import java.util.Comparator;

/** Compares nodes using the Data Center id as a primary factor. */
public class MdcAwareNodesComparator implements Comparator<TcpDiscoveryNode> {
/** */
@Override public int compare(TcpDiscoveryNode n1, TcpDiscoveryNode n2) {
String n1DcId = n1.dataCenterId() == null ? "" : n1.dataCenterId();
String n2DcId = n2.dataCenterId() == null ? "" : n2.dataCenterId();

int res = n1DcId.compareTo(n2DcId);

if (res == 0) {
res = n1.compareTo(n2);
}

return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,10 @@ public void clear() {
if (filtered.size() < 2)
return null;

Iterator<TcpDiscoveryNode> iter = filtered.iterator();
NavigableSet<TcpDiscoveryNode> sorted = new TreeSet<>(new MdcAwareNodesComparator());
sorted.addAll(filtered);

Iterator<TcpDiscoveryNode> iter = sorted.iterator();

while (iter.hasNext()) {
TcpDiscoveryNode node = iter.next();
Expand All @@ -515,7 +518,7 @@ public void clear() {
break;
}

return iter.hasNext() ? iter.next() : F.first(filtered);
return iter.hasNext() ? iter.next() : F.first(sorted);
}
finally {
rwLock.readLock().unlock();
Expand All @@ -541,10 +544,13 @@ public void clear() {
if (filtered.size() < 2)
return null;

NavigableSet<TcpDiscoveryNode> sorted = new TreeSet<>(new MdcAwareNodesComparator());
sorted.addAll(filtered);

TcpDiscoveryNode previous = null;

// Get last node that is previous in a ring
for (TcpDiscoveryNode node : filtered) {
for (TcpDiscoveryNode node : sorted) {
if (locNode.equals(node) && previous != null)
break;

Expand All @@ -569,11 +575,14 @@ public TcpDiscoveryNode previousNodeOf(TcpDiscoveryNode ringNode) {
try {
TcpDiscoveryNode prev = null;

for (TcpDiscoveryNode node : nodes) {
NavigableSet<TcpDiscoveryNode> sorted = new TreeSet<>(new MdcAwareNodesComparator());
sorted.addAll(nodes);

for (TcpDiscoveryNode node : sorted) {
if (node.equals(ringNode)) {
if (prev == null)
// ringNode is the first node, return last node in the ring.
return nodes.last();
return sorted.last();

return prev;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.distributed;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.IgniteConfiguration;

/** */
public class CacheExchangeMergeMdcTest extends CacheExchangeMergeTest {
/** */
protected static final String DC_ID_0 = "DC0";

/** */
protected static final String DC_ID_1 = "DC1";

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

applyDC();

return cfg;
}

/** */
protected void applyDC() {
ThreadLocalRandom rnd = ThreadLocalRandom.current();

boolean mainDc = rnd.nextBoolean();

System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, mainDc ? DC_ID_0 : DC_ID_1);
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
}
}
Loading
Loading