| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.service; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.Serializable; |
| import java.lang.reflect.InvocationHandler; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Proxy; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.GridClosureCallMode; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.communication.GridIoPolicy; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteCallable; |
| import org.apache.ignite.lang.IgniteProductVersion; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.services.Service; |
| |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_IO_POLICY; |
| |
| /** |
| * Wrapper for making {@link org.apache.ignite.services.Service} class proxies. |
| */ |
| public class GridServiceProxy<T> implements Serializable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private static final IgniteProductVersion SVC_POOL_SINCE_VER = IgniteProductVersion.fromString("1.8.5"); |
| |
| /** Grid logger. */ |
| @GridToStringExclude |
| private final IgniteLogger log; |
| |
| /** Proxy object. */ |
| private final T proxy; |
| |
| /** Grid projection. */ |
| private final ClusterGroup prj; |
| |
| /** Kernal context. */ |
| @GridToStringExclude |
| private final GridKernalContext ctx; |
| |
| /** Remote node to use for proxy invocation. */ |
| private final AtomicReference<ClusterNode> rmtNode = new AtomicReference<>(); |
| |
| /** {@code True} if projection includes local node. */ |
| private boolean hasLocNode; |
| |
| /** Service name. */ |
| private final String name; |
| |
| /** Whether multi-node request should be done. */ |
| private final boolean sticky; |
| |
| /** Service availability wait timeout. */ |
| private final long waitTimeout; |
| |
| /** |
| * @param prj Grid projection. |
| * @param name Service name. |
| * @param svc Service type class. |
| * @param sticky Whether multi-node request should be done. |
| * @param timeout Service availability wait timeout. Cannot be negative. |
| * @param ctx Context. |
| */ |
| @SuppressWarnings("unchecked") |
| public GridServiceProxy(ClusterGroup prj, |
| String name, |
| Class<? super T> svc, |
| boolean sticky, |
| long timeout, |
| GridKernalContext ctx) |
| { |
| assert timeout >= 0 : timeout; |
| |
| this.prj = prj; |
| this.ctx = ctx; |
| this.name = name; |
| this.sticky = sticky; |
| this.waitTimeout = timeout; |
| hasLocNode = hasLocalNode(prj); |
| |
| log = ctx.log(getClass()); |
| |
| proxy = (T)Proxy.newProxyInstance( |
| svc.getClassLoader(), |
| new Class[] {svc}, |
| new ProxyInvocationHandler() |
| ); |
| } |
| |
| /** |
| * @param prj Grid nodes projection. |
| * @return Whether given projection contains any local node. |
| */ |
| private boolean hasLocalNode(ClusterGroup prj) { |
| for (ClusterNode n : prj.nodes()) { |
| if (n.isLocal()) |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Invoek the method. |
| * |
| * @param mtd Method. |
| * @param args Arugments. |
| * @return Result. |
| */ |
| @SuppressWarnings("BusyWait") |
| public Object invokeMethod(final Method mtd, final Object[] args) throws Throwable { |
| if (U.isHashCodeMethod(mtd)) |
| return System.identityHashCode(proxy); |
| else if (U.isEqualsMethod(mtd)) |
| return proxy == args[0]; |
| else if (U.isToStringMethod(mtd)) |
| return GridServiceProxy.class.getSimpleName() + " [name=" + name + ", sticky=" + sticky + ']'; |
| |
| ctx.gateway().readLock(); |
| |
| try { |
| final long startTime = U.currentTimeMillis(); |
| |
| while (true) { |
| ClusterNode node = null; |
| |
| try { |
| node = nodeForService(name, sticky); |
| |
| if (node == null) |
| throw new IgniteException("Failed to find deployed service: " + name); |
| |
| // If service is deployed locally, then execute locally. |
| if (node.isLocal()) { |
| ServiceContextImpl svcCtx = ctx.service().serviceContext(name); |
| |
| if (svcCtx != null) { |
| Service svc = svcCtx.service(); |
| |
| if (svc != null) |
| return mtd.invoke(svc, args); |
| } |
| } |
| else { |
| if (node.version().compareTo(SVC_POOL_SINCE_VER) >= 0) |
| ctx.task().setThreadContext(TC_IO_POLICY, GridIoPolicy.SERVICE_POOL); |
| |
| // Execute service remotely. |
| return ctx.closure().callAsyncNoFailover( |
| GridClosureCallMode.BROADCAST, |
| new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args), |
| Collections.singleton(node), |
| false, |
| waitTimeout, |
| true).get(); |
| } |
| } |
| catch (GridServiceNotFoundException | ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Service was not found or topology changed (will retry): " + e.getMessage()); |
| } |
| catch (RuntimeException | Error e) { |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| // Check if ignorable exceptions are in the cause chain. |
| Throwable ignorableCause = X.cause(e, GridServiceNotFoundException.class); |
| |
| if (ignorableCause == null) |
| ignorableCause = X.cause(e, ClusterTopologyCheckedException.class); |
| |
| if (ignorableCause != null) { |
| if (log.isDebugEnabled()) |
| log.debug("Service was not found or topology changed (will retry): " + ignorableCause.getMessage()); |
| } |
| else { |
| // Rethrow original service method exception so that calling user code can handle it correctly. |
| ServiceProxyException svcProxyE = X.cause(e, ServiceProxyException.class); |
| |
| if (svcProxyE != null) |
| throw svcProxyE.getCause(); |
| |
| throw U.convertException(e); |
| } |
| } |
| catch (Exception e) { |
| throw new IgniteException(e); |
| } |
| |
| // If we are here, that means that service was not found |
| // or topology was changed. In this case, we erase the |
| // previous sticky node and try again. |
| rmtNode.compareAndSet(node, null); |
| |
| // Add sleep between retries to avoid busy-wait loops. |
| try { |
| Thread.sleep(10); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteException(e); |
| } |
| |
| if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= waitTimeout) |
| throw new IgniteException("Service acquire timeout was reached, stopping. [timeout=" + waitTimeout + "]"); |
| } |
| } |
| finally { |
| ctx.gateway().readUnlock(); |
| } |
| } |
| |
| /** |
| * @param sticky Whether multi-node request should be done. |
| * @param name Service name. |
| * @return Node with deployed service or {@code null} if there is no such node. |
| */ |
| private ClusterNode nodeForService(String name, boolean sticky) throws IgniteCheckedException { |
| do { // Repeat if reference to remote node was changed. |
| if (sticky) { |
| ClusterNode curNode = rmtNode.get(); |
| |
| if (curNode != null) |
| return curNode; |
| |
| curNode = randomNodeForService(name); |
| |
| if (curNode == null) |
| return null; |
| |
| if (rmtNode.compareAndSet(null, curNode)) |
| return curNode; |
| } |
| else |
| return randomNodeForService(name); |
| } |
| while (true); |
| } |
| |
| /** |
| * @param name Service name. |
| * @return Local node if it has a given service deployed or randomly chosen remote node, |
| * otherwise ({@code null} if given service is not deployed on any node. |
| */ |
| private ClusterNode randomNodeForService(String name) throws IgniteCheckedException { |
| if (hasLocNode && ctx.service().service(name) != null) |
| return ctx.discovery().localNode(); |
| |
| Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, waitTimeout); |
| |
| if (snapshot == null || snapshot.isEmpty()) |
| return null; |
| |
| // Optimization for cluster singletons. |
| if (snapshot.size() == 1) { |
| UUID nodeId = snapshot.keySet().iterator().next(); |
| |
| return prj.node(nodeId); |
| } |
| |
| Collection<ClusterNode> nodes = prj.nodes(); |
| |
| // Optimization for 1 node in projection. |
| if (nodes.size() == 1) { |
| ClusterNode n = nodes.iterator().next(); |
| |
| return snapshot.containsKey(n.id()) ? n : null; |
| } |
| |
| // Optimization if projection is the whole grid. |
| if (prj.predicate() == F.<ClusterNode>alwaysTrue()) { |
| int idx = ThreadLocalRandom.current().nextInt(snapshot.size()); |
| |
| int i = 0; |
| |
| // Get random node. |
| for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) { |
| if (i++ >= idx) { |
| if (e.getValue() > 0) |
| return ctx.discovery().node(e.getKey()); |
| } |
| } |
| |
| i = 0; |
| |
| // Circle back. |
| for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) { |
| if (e.getValue() > 0) |
| return ctx.discovery().node(e.getKey()); |
| |
| if (i++ == idx) |
| return null; |
| } |
| } |
| else { |
| List<ClusterNode> nodeList = new ArrayList<>(nodes.size()); |
| |
| for (ClusterNode n : nodes) { |
| Integer cnt = snapshot.get(n.id()); |
| |
| if (cnt != null && cnt > 0) |
| nodeList.add(n); |
| } |
| |
| if (nodeList.isEmpty()) |
| return null; |
| |
| int idx = ThreadLocalRandom.current().nextInt(nodeList.size()); |
| |
| return nodeList.get(idx); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @return Proxy object for a given instance. |
| */ |
| T proxy() { |
| return proxy; |
| } |
| |
| /** |
| * Invocation handler for service proxy. |
| */ |
| private class ProxyInvocationHandler implements InvocationHandler { |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("BusyWait") |
| @Override public Object invoke(Object proxy, final Method mtd, final Object[] args) throws Throwable { |
| return invokeMethod(mtd, args); |
| } |
| } |
| |
| /** |
| * Callable proxy class. |
| */ |
| private static class ServiceProxyCallable implements IgniteCallable<Object>, Externalizable { |
| /** Serial version UID. */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Method name. */ |
| private String mtdName; |
| |
| /** Service name. */ |
| private String svcName; |
| |
| /** Argument types. */ |
| private Class[] argTypes; |
| |
| /** Args. */ |
| private Object[] args; |
| |
| /** Grid instance. */ |
| @IgniteInstanceResource |
| private transient Ignite ignite; |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| */ |
| public ServiceProxyCallable() { |
| // No-op. |
| } |
| |
| /** |
| * @param mtdName Service method to invoke. |
| * @param svcName Service name. |
| * @param argTypes Argument types. |
| * @param args Arguments for invocation. |
| */ |
| private ServiceProxyCallable(String mtdName, String svcName, Class[] argTypes, Object[] args) { |
| this.mtdName = mtdName; |
| this.svcName = svcName; |
| this.argTypes = argTypes; |
| this.args = args; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object call() throws Exception { |
| ServiceContextImpl svcCtx = ((IgniteEx)ignite).context().service().serviceContext(svcName); |
| |
| if (svcCtx == null || svcCtx.service() == null) |
| throw new GridServiceNotFoundException(svcName); |
| |
| GridServiceMethodReflectKey key = new GridServiceMethodReflectKey(mtdName, argTypes); |
| |
| Method mtd = svcCtx.method(key); |
| |
| if (mtd == null) |
| throw new GridServiceMethodNotFoundException(svcName, mtdName, argTypes); |
| |
| try { |
| return mtd.invoke(svcCtx.service(), args); |
| } |
| catch (InvocationTargetException e) { |
| throw new ServiceProxyException(e.getCause()); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeString(out, svcName); |
| U.writeString(out, mtdName); |
| U.writeArray(out, argTypes); |
| U.writeArray(out, args); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| svcName = U.readString(in); |
| mtdName = U.readString(in); |
| argTypes = U.readClassArray(in); |
| args = U.readArray(in); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(ServiceProxyCallable.class, this); |
| } |
| } |
| |
| /** |
| * Exception class that wraps an exception thrown by the service implementation. |
| */ |
| private static class ServiceProxyException extends RuntimeException { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| ServiceProxyException(Throwable cause) { |
| super(cause); |
| } |
| } |
| } |