blob: 65893fdae8094f7e06ba67fb48de3f5132b1e935 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
/**
*
*/
class PreviousQueries {
/** */
private static class Node {
/** */
@GridToStringInclude
boolean init;
/** */
@GridToStringInclude
Set<Long> cntrs;
/** */
boolean isDone() {
return init && (cntrs == null || cntrs.stream().allMatch(l -> l < 0));
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Node.class, this);
}
}
/** */
private Map<UUID, Node> active = new HashMap<>();
/** */
private boolean init;
/** */
private volatile boolean done;
/**
* @param nodes Waiting nodes.
* @param alivePredicate Alive nodes filter.
*/
synchronized void init(Collection<ClusterNode> nodes, Predicate<UUID> alivePredicate) {
assert !init && !done;
nodes.stream().map(ClusterNode::id).forEach(uuid -> active.putIfAbsent(uuid, new Node()));
active.entrySet().removeIf(e -> !alivePredicate.test(e.getKey()) || e.getValue().isDone());
if (active.isEmpty())
done = true;
init = true;
}
/**
* @param nodeId Node ID.
*/
void onNodeFailed(@NotNull UUID nodeId) {
if (done())
return;
synchronized (this) {
if (init)
removeAndCheckDone(nodeId);
}
}
/**
* @param nodeId Node ID.
* @param qryId Query tracker Id.
*/
void onQueryDone(@NotNull UUID nodeId, long qryId) {
if (!done())
onQueryDone0(nodeId, qryId);
}
/**
* @param nodeId Node ID.
* @param queryIds Query tracker Ids.
*/
void addActiveQueries(@NotNull UUID nodeId, @NotNull GridLongList queryIds) {
if (!done())
addActiveQueries0(nodeId, queryIds);
}
/**
* @return {@code True} if all queries mapped on previous coordinator are done.
*/
boolean done() {
return done;
}
/** */
private synchronized void onQueryDone0(@NotNull UUID nodeId, long qryId) {
assert qryId > 0;
Node node = active.get(nodeId);
if (node == null && !init)
active.put(nodeId, node = new Node());
if (node != null) {
Set<Long> cntrs = node.cntrs;
boolean wasNull = cntrs == null;
if (cntrs == null)
cntrs = node.cntrs = new HashSet<>();
if (wasNull || !cntrs.remove(qryId))
cntrs.add(-qryId);
if (init && node.isDone())
removeAndCheckDone(nodeId);
}
}
/** */
private synchronized void addActiveQueries0(@NotNull UUID nodeId, @NotNull GridLongList queryIds) {
Node node = active.get(nodeId);
if (node == null && !init)
active.put(nodeId, node = new Node());
if (node != null) {
Set<Long> cntrs = node.cntrs;
boolean wasNull = cntrs == null, hasQueries = false;
for (int i = 0; i < queryIds.size(); i++) {
long qryId = queryIds.get(i);
assert qryId > 0;
if (cntrs == null)
cntrs = node.cntrs = new HashSet<>();
if (wasNull || !cntrs.remove(-qryId))
hasQueries |= cntrs.add(qryId);
}
if (init && !hasQueries)
removeAndCheckDone(nodeId);
else
node.init = true;
}
}
/** */
private void removeAndCheckDone(@NotNull UUID nodeId) {
assert init;
active.remove(nodeId);
if (active.isEmpty())
done = true;
}
}