blob: 2739540fc90e1c91dac3601272ae2fea110cb57a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.client.GridClientClosedException;
import org.apache.ignite.internal.client.GridClientDataAffinity;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFuture;
import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.client.GridClientPredicate;
import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.client.balancer.GridClientLoadBalancer;
import org.apache.ignite.internal.client.impl.connection.GridClientConnection;
import org.apache.ignite.internal.client.impl.connection.GridClientConnectionResetException;
import org.apache.ignite.internal.client.impl.connection.GridConnectionIdleClosedException;
import org.apache.ignite.internal.client.util.GridClientUtils;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.client.util.GridClientUtils.applyFilter;
import static org.apache.ignite.internal.client.util.GridClientUtils.restAvailable;
/**
* Class contains common connection-error handling logic.
*/
abstract class GridClientAbstractProjection<T extends GridClientAbstractProjection> {
/** Logger. */
private static final Logger log = Logger.getLogger(GridClientAbstractProjection.class.getName());
/** List of nodes included in this projection. If null, all nodes in topology are included. */
protected Collection<GridClientNode> nodes;
/** Node filter to be applied for this projection. */
protected GridClientPredicate<? super GridClientNode> filter;
/** Balancer to be used in this projection. */
protected GridClientLoadBalancer balancer;
/** Count of reconnect retries before exception is thrown. */
private static final int RETRY_CNT = 3;
/** Retry delay. */
private static final int RETRY_DELAY = 1000;
/** Client instance. */
protected GridClientImpl client;
/**
* Creates projection with specified client.
*
* @param client Client instance to use.
* @param nodes Collections of nodes included in this projection.
* @param filter Node filter to be applied.
* @param balancer Balancer to use.
*/
protected GridClientAbstractProjection(GridClientImpl client, Collection<GridClientNode> nodes,
GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer) {
assert client != null;
this.client = client;
this.nodes = nodes;
this.filter = filter;
this.balancer = balancer;
}
/**
* This method executes request to a communication layer and handles connection error, if it occurs.
* In case of communication exception client instance is notified and new instance of client is created.
* If none of the grid servers can be reached, an exception is thrown.
*
* @param c Closure to be executed.
* @param <R> Result future type.
* @return Future returned by closure.
*/
protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<R> c) {
try {
GridClientNode node = null;
boolean changeNode = false;
Throwable cause = null;
for (int i = 0; i < RETRY_CNT; i++) {
if (node == null || changeNode)
try {
node = balancedNode(node);
}
catch (GridClientException e) {
if (node == null)
throw e;
else
throw new GridServerUnreachableException(
"All nodes in projection failed when retrying to perform request. Attempts made: " + i,
cause);
}
GridClientConnection conn = null;
try {
conn = client.connectionManager().connection(node);
return c.apply(conn, node.nodeId());
}
catch (GridConnectionIdleClosedException e) {
client.connectionManager().terminateConnection(conn, node, e);
// It's ok, just reconnect to the same node.
changeNode = false;
cause = e;
}
catch (GridClientConnectionResetException e) {
client.connectionManager().terminateConnection(conn, node, e);
changeNode = true;
cause = e;
}
catch (GridServerUnreachableException e) {
changeNode = true;
cause = e;
}
U.sleep(RETRY_DELAY);
}
assert cause != null;
throw new GridServerUnreachableException("Failed to communicate with grid nodes " +
"(maximum count of retries reached).", cause);
}
catch (GridClientException e) {
return new GridClientFutureAdapter<>(e);
}
catch (IgniteInterruptedCheckedException | InterruptedException e) {
Thread.currentThread().interrupt();
return new GridClientFutureAdapter<>(
new GridClientException("Interrupted when (re)trying to perform request.", e));
}
}
/**
* This method executes request to a communication layer and handles connection error, if it occurs. Server
* is picked up according to the projection affinity and key given. Connection will be made with the node
* on which key is cached. In case of communication exception client instance is notified and new instance
* of client is created. If none of servers can be reached, an exception is thrown.
*
* @param c Closure to be executed.
* @param cacheName Cache name for which mapped node will be calculated.
* @param affKey Affinity key.
* @param <R> Type of result in future.
* @return Operation future.
*/
protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<R> c, String cacheName,
@Nullable Object affKey) {
GridClientDataAffinity affinity = client.affinity(cacheName);
// If pinned (fixed-nodes) or no affinity provided use balancer.
if (nodes != null || affinity == null || affKey == null)
return withReconnectHandling(c);
try {
Collection<? extends GridClientNode> prjNodes = projectionNodes();
if (prjNodes.isEmpty())
throw new GridServerUnreachableException("Failed to get affinity node (no nodes in topology were " +
"accepted by the filter): " + filter);
GridClientNode node = affinity.node(affKey, prjNodes);
for (int i = 0; i < RETRY_CNT; i++) {
GridClientConnection conn = null;
try {
conn = client.connectionManager().connection(node);
return c.apply(conn, node.nodeId());
}
catch (GridConnectionIdleClosedException e) {
client.connectionManager().terminateConnection(conn, node, e);
}
catch (GridClientConnectionResetException e) {
client.connectionManager().terminateConnection(conn, node, e);
if (!checkNodeAlive(node.nodeId()))
throw new GridServerUnreachableException("Failed to communicate with mapped grid node for " +
"given affinity key (node left the grid) [nodeId=" + node.nodeId() + ", affKey=" + affKey +
']', e);
}
catch (RuntimeException | Error e) {
if (conn != null)
client.connectionManager().terminateConnection(conn, node, e);
throw e;
}
U.sleep(RETRY_DELAY);
}
throw new GridServerUnreachableException("Failed to communicate with mapped grid node for given affinity " +
"key (did node leave the grid?) [nodeId=" + node.nodeId() + ", affKey=" + affKey + ']');
}
catch (GridClientException e) {
return new GridClientFutureAdapter<>(e);
}
catch (IgniteInterruptedCheckedException | InterruptedException e) {
Thread.currentThread().interrupt();
return new GridClientFutureAdapter<>(new GridClientException("Interrupted when (re)trying to perform " +
"request.", e));
}
}
/**
* Tries to refresh node on every possible connection in topology.
*
* @param nodeId Node id to check.
* @return {@code True} if response was received, {@code false} if either {@code null} response received or
* no nodes can be contacted at all.
* @throws GridClientException If failed to refresh topology or if client was closed manually.
* @throws InterruptedException If interrupted.
*/
protected boolean checkNodeAlive(UUID nodeId) throws GridClientException, InterruptedException {
// Try to get node information on any of the connections possible.
for (GridClientNodeImpl node : client.topology().nodes()) {
try {
// Do not try to connect to the same node.
if (node.nodeId().equals(nodeId))
continue;
GridClientConnection conn = client.connectionManager().connection(node);
try {
GridClientNode target = conn.node(nodeId, false, false, node.nodeId()).get();
if (target == null)
client.topology().nodeFailed(nodeId);
return target != null;
}
catch (GridClientConnectionResetException e) {
client.connectionManager().terminateConnection(conn, node, e);
}
catch (GridClientClosedException e) {
throw e;
}
catch (GridClientException e) {
if (log.isLoggable(Level.FINE))
log.log(Level.FINE, "Node request failed, try next node.", e);
}
}
catch (GridServerUnreachableException e) {
if (log.isLoggable(Level.FINE))
log.log(Level.FINE, "Node request failed, try next node.", e);
}
}
return false;
}
/**
* Gets most recently refreshed topology. If this compute instance is a projection,
* then only nodes that satisfy projection criteria will be returned.
*
* @return Most recently refreshed topology.
* @throws GridClientException If failed to refresh topology.
*/
public Collection<? extends GridClientNode> projectionNodes() throws GridClientException {
return projectionNodes(null);
}
/**
* Gets most recently refreshed topology. If this compute instance is a projection,
* then only nodes that satisfy projection criteria will be returned.
*
* @param pred Predicate to additionally filter projection nodes,
* if {@code null} just return projection.
* @return Most recently refreshed topology.
* @throws GridClientException If failed to refresh topology.
*/
protected Collection<? extends GridClientNode> projectionNodes(@Nullable GridClientPredicate<GridClientNode> pred)
throws GridClientException {
Collection<? extends GridClientNode> prjNodes;
if (nodes == null) {
// Dynamic projection, ask topology for current snapshot.
prjNodes = client.topology().nodes();
if (filter != null || pred != null)
prjNodes = applyFilter(prjNodes, filter, pred);
}
else
prjNodes = nodes;
return prjNodes;
}
/**
* Return balanced node for current projection.
*
* @param exclude Nodes to exclude.
* @return Balanced node.
* @throws GridServerUnreachableException If topology is empty.
*/
private GridClientNode balancedNode(@Nullable final GridClientNode exclude) throws GridClientException {
GridClientPredicate<GridClientNode> excludeFilter = exclude == null ?
new GridClientPredicate<GridClientNode>() {
@Override public boolean apply(GridClientNode e) {
return restAvailable(e, client.cfg.getProtocol());
}
@Override public String toString() {
return "Filter nodes with available REST.";
}
} :
new GridClientPredicate<GridClientNode>() {
@Override public boolean apply(GridClientNode e) {
return !exclude.equals(e) && restAvailable(e, client.cfg.getProtocol());
}
@Override public String toString() {
return "Filter nodes with available REST and " +
"exclude (probably due to connection failure) node: " + exclude.nodeId();
}
};
Collection<? extends GridClientNode> prjNodes = projectionNodes(excludeFilter);
if (prjNodes.isEmpty())
throw new GridServerUnreachableException("Failed to get balanced node (no nodes in topology were " +
"accepted by the filters): " + Arrays.asList(filter, excludeFilter));
if (prjNodes.size() == 1) {
GridClientNode ret = GridClientUtils.first(prjNodes);
assert ret != null;
return ret;
}
return balancer.balancedNode(prjNodes);
}
/**
* Creates a sub-projection for current projection.
*
* @param nodes Collection of nodes that sub-projection will be restricted to. If {@code null},
* created projection is dynamic and will take nodes from topology.
* @param filter Filter to be applied to nodes in projection. If {@code null} - no filter applied.
* @param balancer Balancer to use in projection. If {@code null} - inherit balancer from the current projection.
* @param factory Factory to create new projection.
* @return Created projection.
* @throws GridClientException If resulting projection is empty. Note that this exception may
* only be thrown on case of static projections, i.e. where collection of nodes is not null.
*/
protected T createProjection(@Nullable Collection<GridClientNode> nodes,
@Nullable GridClientPredicate<? super GridClientNode> filter, @Nullable GridClientLoadBalancer balancer,
ProjectionFactory<T> factory) throws GridClientException {
if (nodes != null && nodes.isEmpty())
throw new GridClientException("Failed to create projection: given nodes collection is empty.");
if (filter != null && this.filter != null)
filter = new GridClientAndPredicate<>(this.filter, filter);
else if (filter == null)
filter = this.filter;
Collection<GridClientNode> subset = intersection(this.nodes, nodes);
if (subset != null && subset.isEmpty())
throw new GridClientException("Failed to create projection (given node set does not overlap with " +
"existing node set) [prjNodes=" + this.nodes + ", nodes=" + nodes);
if (filter != null && subset != null) {
subset = applyFilter(subset, filter);
if (subset != null && subset.isEmpty())
throw new GridClientException("Failed to create projection (none of the nodes in projection node " +
"set passed the filter) [prjNodes=" + subset + ", filter=" + filter + ']');
}
if (balancer == null)
balancer = this.balancer;
return factory.create(nodes, filter, balancer);
}
/**
* Calculates intersection of two collections. Returned collection always a new collection.
*
* @param first First collection to intersect.
* @param second Second collection to intersect.
* @return Intersection or {@code null} if both collections are {@code null}.
*/
@Nullable private Collection<GridClientNode> intersection(@Nullable Collection<? extends GridClientNode> first,
@Nullable Collection<? extends GridClientNode> second) {
if (first == null && second == null)
return null;
if (first != null && second != null) {
Collection<GridClientNode> res = new LinkedList<>(first);
res.retainAll(second);
return res;
}
else
return new ArrayList<>(first != null ? first : second);
}
/**
* Factory for new projection creation.
*
* @param <X> Projection implementation.
*/
protected static interface ProjectionFactory<X extends GridClientAbstractProjection> {
/**
* Subclasses must implement this method and return concrete implementation of projection needed.
*
* @param nodes Nodes that are included in projection.
* @param filter Filter to be applied.
* @param balancer Balancer to be used.
* @return Created projection.
*/
public X create(@Nullable Collection<GridClientNode> nodes,
@Nullable GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer);
}
/**
* Closure to execute reconnect-handling code.
*/
protected static interface ClientProjectionClosure<R> {
/**
* All closures that implement this interface may safely call all methods of communication connection.
* If any exceptions in connection occur, they will be automatically handled by projection.
*
* @param conn Communication connection that should be accessed.
* @param affinityNodeId Affinity node ID.
* @return Future - result of operation.
* @throws GridClientConnectionResetException If connection was unexpectedly reset. Connection will be
* either re-established or different server will be accessed (if available).
* @throws GridClientClosedException If client was manually closed by user.
*/
public GridClientFuture<R> apply(GridClientConnection conn, UUID affinityNodeId)
throws GridClientConnectionResetException, GridClientClosedException;
}
}