blob: f7fe9cb7212828020e759ea4e982035406f0b8b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
* Represents partition update counters delivery to remote nodes.
*/
public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdentityFuture<Void> {
/** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** */
private boolean trackable = true;
/** */
private final GridCacheSharedContext<?, ?> cctx;
/** */
private final IgniteInternalTx tx;
/** */
private final IgniteLogger log;
/** */
public PartitionCountersNeighborcastFuture(
IgniteInternalTx tx, GridCacheSharedContext<?, ?> cctx) {
super(null);
this.tx = tx;
this.cctx = cctx;
log = cctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
}
/**
* Starts processing.
*/
public void init() {
if (log.isInfoEnabled()) {
log.info("Starting delivery partition countres to remote nodes [txId=" + tx.nearXidVersion() +
", futId=" + futId);
}
HashSet<UUID> siblings = siblingBackups();
cctx.mvcc().addFuture(this, futId);
for (UUID peer : siblings) {
List<PartitionUpdateCountersMessage> cntrs = cctx.tm().txHandler()
.filterUpdateCountersForBackupNode(tx, cctx.node(peer));
if (F.isEmpty(cntrs))
continue;
MiniFuture miniFut = new MiniFuture(peer);
try {
cctx.io().send(peer, new PartitionCountersNeighborcastRequest(cntrs, futId), SYSTEM_POOL);
add(miniFut);
}
catch (IgniteCheckedException e) {
if (!(e instanceof ClusterTopologyCheckedException))
log.warning("Failed to send partition counters to remote node [node=" + peer + ']', e);
else
logNodeLeft(peer);
miniFut.onDone();
}
}
markInitialized();
}
/** */
private HashSet<UUID> siblingBackups() {
Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
assert txNodes != null;
UUID locNodeId = cctx.localNodeId();
HashSet<UUID> siblings = new HashSet<>();
txNodes.values().stream()
.filter(backups -> backups.contains(locNodeId))
.forEach(siblings::addAll);
siblings.remove(locNodeId);
return siblings;
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
boolean comp = super.onDone(res, err);
if (comp)
cctx.mvcc().removeFuture(futId);
return comp;
}
/**
* Processes a response from a remote peer. Completes a mini future for that peer.
*
* @param nodeId Remote peer node id.
*/
public void onResult(UUID nodeId) {
if (log.isInfoEnabled())
log.info("Remote peer acked partition counters delivery [futId=" + futId +
", node=" + nodeId + ']');
completeMini(nodeId);
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
logNodeLeft(nodeId);
// if a left node is one of remote peers then a mini future for it is completed successfully
completeMini(nodeId);
return true;
}
/** */
private void completeMini(UUID nodeId) {
for (IgniteInternalFuture<?> fut : futures()) {
assert fut instanceof MiniFuture;
MiniFuture mini = (MiniFuture)fut;
if (mini.nodeId.equals(nodeId)) {
cctx.kernalContext().closure().runLocalSafe(mini::onDone);
break;
}
}
}
/** */
private void logNodeLeft(UUID nodeId) {
if (log.isInfoEnabled()) {
log.info("Failed during partition counters delivery to remote node. " +
"Node left cluster (will ignore) [futId=" + futId +
", node=" + nodeId + ']');
}
}
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return trackable;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
trackable = false;
}
/**
* Component of compound parent future. Represents interaction with one of remote peers.
*/
private static class MiniFuture extends GridFutureAdapter<Void> {
/** */
private final UUID nodeId;
/** */
private MiniFuture(UUID nodeId) {
this.nodeId = nodeId;
}
}
}