blob: eb75a5b0d1c5c8cf437b93768c5bf317420ffffc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.service;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.services.Service;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY;
/**
* Wrapper for making {@link org.apache.ignite.services.Service} class proxies.
*/
public class GridServiceProxy<T> implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** */
private static final IgniteProductVersion SVC_POOL_SINCE_VER = IgniteProductVersion.fromString("1.8.5");
/** Grid logger. */
@GridToStringExclude
private final IgniteLogger log;
/** Proxy object. */
private final T proxy;
/** Grid projection. */
private final ClusterGroup prj;
/** Kernal context. */
@GridToStringExclude
private final GridKernalContext ctx;
/** Remote node to use for proxy invocation. */
private final AtomicReference<ClusterNode> rmtNode = new AtomicReference<>();
/** {@code True} if projection includes local node. */
private boolean hasLocNode;
/** Service name. */
private final String name;
/** Whether multi-node request should be done. */
private final boolean sticky;
/** Service availability wait timeout. */
private final long waitTimeout;
/**
* @param prj Grid projection.
* @param name Service name.
* @param svc Service type class.
* @param sticky Whether multi-node request should be done.
* @param timeout Service availability wait timeout. Cannot be negative.
* @param ctx Context.
*/
@SuppressWarnings("unchecked")
public GridServiceProxy(ClusterGroup prj,
String name,
Class<? super T> svc,
boolean sticky,
long timeout,
GridKernalContext ctx)
{
assert timeout >= 0 : timeout;
this.prj = prj;
this.ctx = ctx;
this.name = name;
this.sticky = sticky;
this.waitTimeout = timeout;
hasLocNode = hasLocalNode(prj);
log = ctx.log(getClass());
proxy = (T)Proxy.newProxyInstance(
svc.getClassLoader(),
new Class[] {svc},
new ProxyInvocationHandler()
);
}
/**
* @param prj Grid nodes projection.
* @return Whether given projection contains any local node.
*/
private boolean hasLocalNode(ClusterGroup prj) {
for (ClusterNode n : prj.nodes()) {
if (n.isLocal())
return true;
}
return false;
}
/**
* Invoek the method.
*
* @param mtd Method.
* @param args Arugments.
* @return Result.
*/
@SuppressWarnings("BusyWait")
public Object invokeMethod(final Method mtd, final Object[] args) throws Throwable {
if (U.isHashCodeMethod(mtd))
return System.identityHashCode(proxy);
else if (U.isEqualsMethod(mtd))
return proxy == args[0];
else if (U.isToStringMethod(mtd))
return GridServiceProxy.class.getSimpleName() + " [name=" + name + ", sticky=" + sticky + ']';
ctx.gateway().readLock();
try {
final long startTime = U.currentTimeMillis();
while (true) {
ClusterNode node = null;
try {
node = nodeForService(name, sticky);
if (node == null)
throw new IgniteException("Failed to find deployed service: " + name);
// If service is deployed locally, then execute locally.
if (node.isLocal()) {
ServiceContextImpl svcCtx = ctx.service().serviceContext(name);
if (svcCtx != null) {
Service svc = svcCtx.service();
if (svc != null)
return mtd.invoke(svc, args);
}
}
else {
if (node.version().compareTo(SVC_POOL_SINCE_VER) >= 0)
ctx.task().setThreadContext(TC_IO_POLICY, GridIoPolicy.SERVICE_POOL);
// Execute service remotely.
return ctx.closure().callAsyncNoFailover(
GridClosureCallMode.BROADCAST,
new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args),
Collections.singleton(node),
false,
waitTimeout,
true).get();
}
}
catch (GridServiceNotFoundException | ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Service was not found or topology changed (will retry): " + e.getMessage());
}
catch (RuntimeException | Error e) {
throw e;
}
catch (IgniteCheckedException e) {
// Check if ignorable exceptions are in the cause chain.
Throwable ignorableCause = X.cause(e, GridServiceNotFoundException.class);
if (ignorableCause == null)
ignorableCause = X.cause(e, ClusterTopologyCheckedException.class);
if (ignorableCause != null) {
if (log.isDebugEnabled())
log.debug("Service was not found or topology changed (will retry): " + ignorableCause.getMessage());
}
else {
// Rethrow original service method exception so that calling user code can handle it correctly.
ServiceProxyException svcProxyE = X.cause(e, ServiceProxyException.class);
if (svcProxyE != null)
throw svcProxyE.getCause();
throw U.convertException(e);
}
}
catch (Exception e) {
throw new IgniteException(e);
}
// If we are here, that means that service was not found
// or topology was changed. In this case, we erase the
// previous sticky node and try again.
rmtNode.compareAndSet(node, null);
// Add sleep between retries to avoid busy-wait loops.
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IgniteException(e);
}
if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= waitTimeout)
throw new IgniteException("Service acquire timeout was reached, stopping. [timeout=" + waitTimeout + "]");
}
}
finally {
ctx.gateway().readUnlock();
}
}
/**
* @param sticky Whether multi-node request should be done.
* @param name Service name.
* @return Node with deployed service or {@code null} if there is no such node.
*/
private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException {
do { // Repeat if reference to remote node was changed.
if (sticky) {
ClusterNode curNode = rmtNode.get();
if (curNode != null)
return curNode;
curNode = randomNodeForService(name);
if (curNode == null)
return null;
if (rmtNode.compareAndSet(null, curNode))
return curNode;
}
else
return randomNodeForService(name);
}
while (true);
}
/**
* @param name Service name.
* @return Local node if it has a given service deployed or randomly chosen remote node,
* otherwise ({@code null} if given service is not deployed on any node.
*/
private ClusterNode randomNodeForService(String name) throws IgniteCheckedException {
if (hasLocNode && ctx.service().service(name) != null)
return ctx.discovery().localNode();
Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, waitTimeout);
if (snapshot == null || snapshot.isEmpty())
return null;
// Optimization for cluster singletons.
if (snapshot.size() == 1) {
UUID nodeId = snapshot.keySet().iterator().next();
return prj.node(nodeId);
}
Collection<ClusterNode> nodes = prj.nodes();
// Optimization for 1 node in projection.
if (nodes.size() == 1) {
ClusterNode n = nodes.iterator().next();
return snapshot.containsKey(n.id()) ? n : null;
}
// Optimization if projection is the whole grid.
if (prj.predicate() == F.<ClusterNode>alwaysTrue()) {
int idx = ThreadLocalRandom.current().nextInt(snapshot.size());
int i = 0;
// Get random node.
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (i++ >= idx) {
if (e.getValue() > 0)
return ctx.discovery().node(e.getKey());
}
}
i = 0;
// Circle back.
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (e.getValue() > 0)
return ctx.discovery().node(e.getKey());
if (i++ == idx)
return null;
}
}
else {
List<ClusterNode> nodeList = new ArrayList<>(nodes.size());
for (ClusterNode n : nodes) {
Integer cnt = snapshot.get(n.id());
if (cnt != null && cnt > 0)
nodeList.add(n);
}
if (nodeList.isEmpty())
return null;
int idx = ThreadLocalRandom.current().nextInt(nodeList.size());
return nodeList.get(idx);
}
return null;
}
/**
* @return Proxy object for a given instance.
*/
T proxy() {
return proxy;
}
/**
* Invocation handler for service proxy.
*/
private class ProxyInvocationHandler implements InvocationHandler {
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override public Object invoke(Object proxy, final Method mtd, final Object[] args) throws Throwable {
return invokeMethod(mtd, args);
}
}
/**
* Callable proxy class.
*/
private static class ServiceProxyCallable implements IgniteCallable<Object>, Externalizable {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
/** Method name. */
private String mtdName;
/** Service name. */
private String svcName;
/** Argument types. */
private Class[] argTypes;
/** Args. */
private Object[] args;
/** Grid instance. */
@IgniteInstanceResource
private transient Ignite ignite;
/**
* Empty constructor required for {@link Externalizable}.
*/
public ServiceProxyCallable() {
// No-op.
}
/**
* @param mtdName Service method to invoke.
* @param svcName Service name.
* @param argTypes Argument types.
* @param args Arguments for invocation.
*/
private ServiceProxyCallable(String mtdName, String svcName, Class[] argTypes, Object[] args) {
this.mtdName = mtdName;
this.svcName = svcName;
this.argTypes = argTypes;
this.args = args;
}
/** {@inheritDoc} */
@Override public Object call() throws Exception {
ServiceContextImpl svcCtx = ((IgniteEx)ignite).context().service().serviceContext(svcName);
if (svcCtx == null || svcCtx.service() == null)
throw new GridServiceNotFoundException(svcName);
GridServiceMethodReflectKey key = new GridServiceMethodReflectKey(mtdName, argTypes);
Method mtd = svcCtx.method(key);
if (mtd == null)
throw new GridServiceMethodNotFoundException(svcName, mtdName, argTypes);
try {
return mtd.invoke(svcCtx.service(), args);
}
catch (InvocationTargetException e) {
throw new ServiceProxyException(e.getCause());
}
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, svcName);
U.writeString(out, mtdName);
U.writeArray(out, argTypes);
U.writeArray(out, args);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
svcName = U.readString(in);
mtdName = U.readString(in);
argTypes = U.readClassArray(in);
args = U.readArray(in);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ServiceProxyCallable.class, this);
}
}
/**
* Exception class that wraps an exception thrown by the service implementation.
*/
private static class ServiceProxyException extends RuntimeException {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
ServiceProxyException(Throwable cause) {
super(cause);
}
}
}