blob: b78660e3746c89a9ed35866798e7e83fea8ecf99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.closure;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.binary.BinaryRawWriter;
import org.apache.ignite.binary.BinaryReader;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeLoadBalancer;
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridClosureCallMode;
import org.apache.ignite.internal.GridInternalWrapper;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.resource.GridNoImplicitInjection;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.GridWorkerFuture;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.LoadBalancerResource;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SKIP_AUTH;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
/**
*
*/
public class GridClosureProcessor extends GridProcessorAdapter {
/** Pool processor. */
private final PoolProcessor pools;
/** Lock to control execution after stop. */
private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
/** Stopping flag. */
private boolean stopping;
/**
* @param ctx Kernal context.
*/
public GridClosureProcessor(GridKernalContext ctx) {
super(ctx);
pools = ctx.pools();
assert pools != null;
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Started closure processor.");
}
/** {@inheritDoc} */
@SuppressWarnings("BusyWait")
@Override public void onKernalStop(boolean cancel) {
boolean interrupted = false;
// Busy wait is intentional.
while (true) {
try {
if (busyLock.tryWriteLock(200, TimeUnit.MILLISECONDS))
break;
else
Thread.sleep(200);
}
catch (InterruptedException ignore) {
// Preserve interrupt status & ignore.
// Note that interrupted flag is cleared.
interrupted = true;
}
}
try {
if (interrupted)
Thread.currentThread().interrupt();
stopping = true;
}
finally {
busyLock.writeUnlock();
}
if (log.isDebugEnabled())
log.debug("Stopped closure processor.");
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs,
@Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
return runAsync(mode, jobs, nodes, false, execName);
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
* @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
Collection<? extends Runnable> jobs,
@Nullable Collection<ClusterNode> nodes,
boolean sys,
@Nullable String execName)
{
assert mode != null;
assert !F.isEmpty(jobs) : jobs;
busyLock.readLock();
try {
if (stopping) {
return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class,
new IgniteCheckedException("Closure processor cannot be used on stopped grid: " + ctx.igniteInstanceName()));
}
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T1.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T1(mode, jobs), null, sys, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
@Nullable Collection<ClusterNode> nodes) {
return runAsync(mode, job, nodes, null);
}
/**
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
@Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
return runAsync(mode, job, nodes, false, execName);
}
/**
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
* @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
Runnable job,
@Nullable Collection<ClusterNode> nodes,
boolean sys,
@Nullable String execName)
{
assert mode != null;
assert job != null;
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T2.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T2(mode, job), null, sys, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* Maps {@link Runnable} jobs to specified nodes based on distribution mode.
*
* @param mode Distribution mode.
* @param jobs Closures to map.
* @param nodes Grid nodes.
* @param lb Load balancer.
* @throws IgniteException Thrown in case of any errors.
* @return Mapping.
*/
private Map<ComputeJob, ClusterNode> absMap(GridClosureCallMode mode, Collection<? extends Runnable> jobs,
Collection<ClusterNode> nodes, ComputeLoadBalancer lb) throws IgniteException {
assert mode != null;
assert jobs != null;
assert nodes != null;
assert lb != null;
try {
if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) {
JobMapper mapper = new JobMapper(jobs.size());
switch (mode) {
case BROADCAST: {
for (ClusterNode n : nodes)
for (Runnable r : jobs)
mapper.map(job(r), n);
break;
}
case BALANCE: {
for (Runnable r : jobs) {
ComputeJob job = job(r);
ClusterNode n = lb.getBalancedNode(job, null);
mapper.map(job, n);
}
break;
}
}
return mapper.map();
}
else
return Collections.emptyMap();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/**
* Maps {@link Callable} jobs to specified nodes based on distribution mode.
*
* @param mode Distribution mode.
* @param jobs Closures to map.
* @param nodes Grid nodes.
* @param lb Load balancer.
* @throws IgniteException Thrown in case of any errors.
* @return Mapping.
*/
private <R> Map<ComputeJob, ClusterNode> outMap(GridClosureCallMode mode,
Collection<? extends Callable<R>> jobs, Collection<ClusterNode> nodes, ComputeLoadBalancer lb)
throws IgniteException {
assert mode != null;
assert jobs != null;
assert nodes != null;
assert lb != null;
try {
if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) {
JobMapper mapper = new JobMapper(jobs.size());
switch (mode) {
case BROADCAST: {
for (ClusterNode n : nodes)
for (Callable<R> c : jobs)
mapper.map(job(c), n);
break;
}
case BALANCE: {
for (Callable<R> c : jobs) {
ComputeJob job = job(c);
ClusterNode n = lb.getBalancedNode(job, null);
mapper.map(job, n);
}
break;
}
}
return mapper.map();
}
else
return Collections.emptyMap();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param rdc Reducer.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @param <R1> Type.
* @param <R2> Type.
* @return Reduced result.
*/
public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode,
Collection<? extends Callable<R1>> jobs,
IgniteReducer<R1, R2> rdc,
@Nullable Collection<ClusterNode> nodes,
@Nullable String execName)
{
assert mode != null;
assert rdc != null;
assert !F.isEmpty(jobs);
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T3.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T3<>(mode, jobs, rdc), null, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
GridClosureCallMode mode,
@Nullable Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes) {
return callAsync(mode, jobs, nodes, null);
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
GridClosureCallMode mode,
@Nullable Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes,
@Nullable String execName) {
return callAsync(mode, jobs, nodes, false, execName);
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
* @param execName Custom executor name.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode,
Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes,
boolean sys,
@Nullable String execName)
{
assert mode != null;
assert !F.isEmpty(jobs);
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T6.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T6<>(mode, jobs), null, sys, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
@Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
return callAsync(mode, job, nodes, null);
}
/**
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
@Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes,
@Nullable String execName) {
return callAsync(mode, job, nodes, false, execName);
}
/**
* @param cacheNames Cache names.
* @param partId Partition.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Grid future for collection of closure results.
* @throws IgniteCheckedException If failed.
*/
public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames,
int partId,
Callable<R> job,
@Nullable Collection<ClusterNode> nodes,
@Nullable String execName) throws IgniteCheckedException {
assert partId >= 0 : partId;
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
final String cacheName = F.first(cacheNames);
final AffinityTopologyVersion mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
if (node == null)
return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null,
false, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param cacheNames Cache names.
* @param partId Partition.
* @param job Job.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Job future.
* @throws IgniteCheckedException If failed.
*/
public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames,
int partId,
Runnable job,
@Nullable Collection<ClusterNode> nodes,
@Nullable String execName) throws IgniteCheckedException {
assert partId >= 0 : partId;
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
final String cacheName = F.first(cacheNames);
final AffinityTopologyVersion mapTopVer = ctx.cache().context().exchange().readyAffinityVersion();
final ClusterNode node = ctx.affinity().mapPartitionToNode(cacheName, partId, mapTopVer);
if (node == null)
return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null,
false, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param <R> Type.
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
* @param timeout Timeout.
* @param skipAuth Skip authorization check.
* @return Grid future for collection of closure results.
*/
public <R> IgniteInternalFuture<R> callAsyncNoFailover(
GridClosureCallMode mode,
@Nullable Callable<R> job,
@Nullable Collection<ClusterNode> nodes,
boolean sys,
long timeout,
boolean skipAuth) {
assert mode != null;
assert timeout >= 0 : timeout;
busyLock.readLock();
try {
if (job == null)
return new GridFinishedFuture<>();
if (F.isEmpty(nodes))
return new GridFinishedFuture<>(U.emptyTopologyException());
ctx.task().setThreadContext(TC_NO_FAILOVER, true);
ctx.task().setThreadContext(TC_SUBGRID, nodes);
if (skipAuth)
ctx.task().setThreadContext(TC_SKIP_AUTH, true);
if (timeout > 0)
ctx.task().setThreadContext(TC_TIMEOUT, timeout);
return ctx.task().execute(new T7<>(mode, job), null, sys);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
* @param timeout If greater than 0 limits task execution. Cannot be negative.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover(
GridClosureCallMode mode,
@Nullable Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes,
boolean sys,
long timeout
) {
assert mode != null;
assert timeout >= 0 : timeout;
busyLock.readLock();
try {
if (F.isEmpty(jobs))
return new GridFinishedFuture<>();
if (F.isEmpty(nodes))
return new GridFinishedFuture<>(U.emptyTopologyException());
ctx.task().setThreadContext(TC_NO_FAILOVER, true);
ctx.task().setThreadContext(TC_SUBGRID, nodes);
if (timeout > 0)
ctx.task().setThreadContext(TC_TIMEOUT, timeout);
return ctx.task().execute(new T6<>(mode, jobs), null, sys);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
* @param execName Custom executor name.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
Callable<R> job,
@Nullable Collection<ClusterNode> nodes,
boolean sys,
@Nullable String execName)
{
assert mode != null;
assert job != null;
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T7.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T7<>(mode, job), null, sys, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param job Job closure.
* @param arg Optional job argument.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg,
@Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T8.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T8(job, arg), null, false, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param job Job closure.
* @param arg Optional job argument.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R> IgniteInternalFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg,
@Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return new GridFinishedFuture<>(U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T11<>(job), arg, false, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param job Job closure.
* @param args Job arguments.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job,
@Nullable Collection<? extends T> args,
@Nullable Collection<ClusterNode> nodes,
@Nullable String execName)
{
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T9.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T9<>(job, args), null, false, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param job Job closure.
* @param args Job arguments.
* @param rdc Reducer.
* @param nodes Grid nodes.
* @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job,
Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes,
@Nullable String execName) {
busyLock.readLock();
try {
if (F.isEmpty(nodes))
return ComputeTaskInternalFuture.finishedFuture(ctx, T10.class, U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
return ctx.task().execute(new T10<>(job, args, rdc), null, false, execName);
}
finally {
busyLock.readUnlock();
}
}
/**
* @param thread Thread.
* @param c Closure.
*/
public void runLocalWithThreadPolicy(IgniteThread thread, Runnable c) {
assert thread.stripe() >= 0 || thread.policy() != GridIoPolicy.UNDEFINED : thread;
if (thread.stripe() >= 0)
ctx.pools().getStripedExecutorService().execute(thread.stripe(), c);
else {
try {
ctx.pools().poolForPolicy(thread.policy()).execute(c);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to get pool for policy: " + thread.policy(), e);
}
}
}
/**
* @param c Closure to execute.
* @param plc Whether to run on system or public pool.
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc)
throws IgniteCheckedException {
if (c == null)
return new GridFinishedFuture();
busyLock.readLock();
try {
// Inject only if needed.
if (!(c instanceof GridPlainRunnable))
ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), c.getClass(), c);
final ClassLoader ldr = Thread.currentThread().getContextClassLoader();
final GridWorkerFuture fut = new GridWorkerFuture();
GridWorker w = new GridWorker(ctx.igniteInstanceName(), "closure-proc-worker", log) {
@Override protected void body() {
try {
if (ldr != null)
U.wrapThreadLoader(ldr, c);
else
c.run();
fut.onDone();
}
catch (Throwable e) {
if (e instanceof Error)
U.error(log, "Closure execution failed with error.", e);
fut.onDone(U.cast(e));
if (e instanceof Error)
throw e;
}
}
};
fut.setWorker(w);
try {
pools.poolForPolicy(plc).execute(w);
}
catch (RejectedExecutionException e) {
U.error(log, "Failed to execute worker due to execution rejection " +
"(increase upper bound on executor service) [policy=" + plc + ']', e);
w.run();
}
return fut;
}
finally {
busyLock.readUnlock();
}
}
/**
* Executes closure on system pool. In case of rejected execution re-runs the closure in the current
* thread (blocking).
*
* @param c Closure to execute.
* @return Future.
*/
public IgniteInternalFuture<?> runLocalSafe(Runnable c) {
return runLocalSafe(c, true);
}
/**
* In case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
* @return Future.
*/
public IgniteInternalFuture<?> runLocalSafe(Runnable c, boolean sys) {
return runLocalSafe(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL);
}
/**
* In case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param plc Policy to choose executor pool.
* @return Future.
*/
public IgniteInternalFuture<?> runLocalSafe(Runnable c, byte plc) {
try {
return runLocal(c, plc);
}
catch (Throwable e) {
if (e instanceof Error) {
U.error(log, "Closure execution failed with error.", e);
throw (Error)e;
}
// If execution was rejected - rerun locally.
if (e.getCause() instanceof RejectedExecutionException) {
U.warn(log, "Closure execution has been rejected (will execute in the same thread) [plc=" + plc +
", closure=" + c + ']');
try {
c.run();
return new GridFinishedFuture();
}
catch (Throwable t) {
if (t instanceof Error) {
U.error(log, "Closure execution failed with error.", t);
throw t;
}
return new GridFinishedFuture(U.cast(t));
}
}
// If failed for other reasons - return error future.
else
return new GridFinishedFuture(U.cast(e));
}
}
/**
* @param c Closure to execute.
* @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException {
return callLocal(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL);
}
/**
* @param c Closure to execute.
* @param plc Whether to run on system or public pool.
* @param <R> Type of closure return value.
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
public <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
throws IgniteCheckedException {
if (c == null)
return new GridFinishedFuture<>();
busyLock.readLock();
try {
// Inject only if needed.
if (!(c instanceof GridPlainCallable))
ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), c.getClass(), c);
final ClassLoader ldr = Thread.currentThread().getContextClassLoader();
final GridWorkerFuture<R> fut = new GridWorkerFuture<>();
GridWorker w = new GridWorker(ctx.igniteInstanceName(), "closure-proc-worker", log) {
@Override protected void body() {
try {
if (ldr != null)
fut.onDone(U.wrapThreadLoader(ldr, c));
else
fut.onDone(c.call());
}
catch (Throwable e) {
if (e instanceof Error)
U.error(log, "Closure execution failed with error.", e);
fut.onDone(U.cast(e));
if (e instanceof Error)
throw (Error)e;
}
}
};
fut.setWorker(w);
try {
pools.poolForPolicy(plc).execute(w);
}
catch (RejectedExecutionException e) {
U.error(log, "Failed to execute worker due to execution rejection " +
"(increase upper bound on executor service) [policy=" + plc + ']', e);
w.run();
}
return fut;
}
finally {
busyLock.readUnlock();
}
}
/**
* Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)}
* but in case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @return Future.
*/
public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c) {
return callLocalSafe(c, true);
}
/**
* Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)}
* but in case of rejected execution re-runs the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param sys If {@code true}, then system pool will be used, otherwise public pool will be used.
* @return Future.
*/
public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, boolean sys) {
return callLocalSafe(c, sys ? GridIoPolicy.SYSTEM_POOL : GridIoPolicy.PUBLIC_POOL);
}
/**
* Companion to {@link #callLocal(Callable, boolean)} but in case of rejected execution re-runs
* the closure in the current thread (blocking).
*
* @param c Closure to execute.
* @param plc Policy to choose executor pool.
* @return Future.
*/
public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) {
try {
return callLocal(c, plc);
}
catch (IgniteCheckedException e) {
// If execution was rejected - rerun locally.
if (e.getCause() instanceof RejectedExecutionException) {
U.warn(log, "Closure execution has been rejected (will execute in the same thread) [plc=" + plc +
", closure=" + c + ']');
try {
return new GridFinishedFuture<>(c.call());
}
// If failed again locally - return error future.
catch (Exception e2) {
return new GridFinishedFuture<>(U.cast(e2));
}
}
// If failed for other reasons - return error future.
else
return new GridFinishedFuture<>(U.cast(e));
}
}
/**
* Converts given closure with arguments to grid job.
* @param job Job.
* @param arg Optional argument.
* @return Job.
*/
private static <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) {
A.notNull(job, "job");
return job instanceof ComputeJobMasterLeaveAware ? new C1MLA<>(job, arg) : new C1<>(job, arg);
}
/**
* Converts given closure to a grid job.
*
* @param c Closure to convert to grid job.
* @return Grid job made out of closure.
*/
private static <R> ComputeJob job(final Callable<R> c) {
A.notNull(c, "job");
return c instanceof ComputeJobMasterLeaveAware ? new C2MLA<>(c) : new C2<>(c);
}
/**
* Converts given closure to a grid job.
*
* @param r Closure to convert to grid job.
* @return Grid job made out of closure.
*/
private static ComputeJob job(final Runnable r) {
A.notNull(r, "job");
return r instanceof ComputeJobMasterLeaveAware ? new C4MLA(r) : new C4(r);
}
/**
* Get collection of actual results.
*
* @param res Initial results.
* @return Converted results.
*/
private static <R> Collection<R> jobResults(List<ComputeJobResult> res) {
if (res == null)
return Collections.emptyList();
else {
List<R> res0 = new ArrayList<>(res.size());
for (int i = 0; i < res.size(); i++)
res0.add(res.get(i).<R>getData());
return res0;
}
}
/**
*
*/
private class JobMapper {
/** */
private final Map<ComputeJob, ClusterNode> map;
/** */
private boolean hadLocNode;
/**
* @param expJobCnt Expected Jobs count.
*/
private JobMapper(int expJobCnt) {
map = IgniteUtils.newHashMap(expJobCnt);
}
/**
* @param job Job.
* @param node Node.
* @throws IgniteCheckedException In case of error.
*/
public void map(@NotNull ComputeJob job, @NotNull ClusterNode node) throws IgniteCheckedException {
if (ctx.localNodeId().equals(node.id())) {
if (hadLocNode) {
Marshaller marsh = ctx.config().getMarshaller();
job = U.unmarshal(marsh, U.marshal(marsh, job), U.resolveClassLoader(ctx.config()));
}
else
hadLocNode = true;
}
map.put(job, node);
}
/**
* @return Map.
*/
public Map<ComputeJob, ClusterNode> map() {
return map;
}
}
/**
* No-reduce task adapter.
*/
private abstract static class TaskNoReduceAdapter<T> extends GridPeerDeployAwareTaskAdapter<T, Void> {
/** */
private static final long serialVersionUID = 0L;
/**
* @param pda Peer deploy aware instance.
*/
protected TaskNoReduceAdapter(@Nullable GridPeerDeployAware pda) {
super(pda);
}
/** {@inheritDoc} */
@Nullable @Override public Void reduce(List<ComputeJobResult> results) {
return null;
}
}
/**
* Task that is free of dragged in enclosing context for the method
* {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection,String)}.
*/
private class T1 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/** */
private IgniteBiTuple<GridClosureCallMode, Collection<? extends Runnable>> t;
/**
* @param mode Call mode.
* @param jobs Collection of jobs.
*/
private T1(GridClosureCallMode mode, Collection<? extends Runnable> jobs) {
super(U.peerDeployAware0(jobs));
t = F.t(mode, jobs);
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return absMap(t.get1(), t.get2(), subgrid, lb);
}
}
/**
* Task that is free of dragged in enclosing context for the method
* {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection, String)}.
*/
private class T2 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/** */
private IgniteBiTuple<GridClosureCallMode, Runnable> t;
/**
* @param mode Call mode.
* @param job Job.
*/
private T2(GridClosureCallMode mode, Runnable job) {
super(U.peerDeployAware(job));
t = F.t(mode, job);
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return absMap(t.get1(), F.asList(t.get2()), subgrid, lb);
}
}
/**
* Task that is free of dragged in enclosing context for the method
* {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection, String)}
*/
private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/** */
private GridTuple3<GridClosureCallMode,
Collection<? extends Callable<R1>>,
IgniteReducer<R1, R2>> t;
/**
*
* @param mode Call mode.
* @param jobs Collection of jobs.
* @param rdc Reducer.
*/
private T3(GridClosureCallMode mode, Collection<? extends Callable<R1>> jobs, IgniteReducer<R1, R2> rdc) {
super(U.peerDeployAware0(jobs));
t = F.t(mode, jobs, rdc);
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return outMap(t.get1(), t.get2(), subgrid, lb);
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
ComputeJobResultPolicy resPlc = super.result(res, rcvd);
if (res.getException() == null && resPlc != FAILOVER && !t.get3().collect((R1)res.getData()))
resPlc = REDUCE; // If reducer returned false - reduce right away.
return resPlc;
}
/** {@inheritDoc} */
@Override public R2 reduce(List<ComputeJobResult> res) {
return t.get3().reduce();
}
}
/**
*/
private static class T4 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection, AffinityTask {
/** */
private static final long serialVersionUID = 0L;
/** */
private ClusterNode node;
/** */
private Runnable job;
/** */
private int partId;
/** */
private AffinityTopologyVersion topVer;
/** */
private Collection<String> affCacheNames;
/**
* @param node Cluster node.
* @param job Job affinity partition.
* @param affCacheNames Affinity caches.
* @param partId Partition.
* @param topVer Affinity topology version.
*/
private T4(ClusterNode node, Runnable job, Collection<String> affCacheNames, int partId,
AffinityTopologyVersion topVer) {
super(U.peerDeployAware0(job));
assert partId >= 0;
this.node = node;
this.job = job;
this.affCacheNames = affCacheNames;
this.partId = partId;
this.topVer = topVer;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return Collections.singletonMap(job(job), node);
}
/** {@inheritDoc} */
@Override public int partition() {
return partId;
}
/** {@inheritDoc} */
@Nullable @Override public Collection<String> affinityCacheNames() {
return affCacheNames;
}
/** {@inheritDoc} */
@Nullable @Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
}
/**
*/
private static class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements
GridNoImplicitInjection, AffinityTask {
/** */
private static final long serialVersionUID = 0L;
/** */
private ClusterNode node;
/** */
private Callable<R> job;
/** */
private int partId;
/** */
private AffinityTopologyVersion topVer;
/** */
private Collection<String> affCacheNames;
/**
* @param node Cluster node.
* @param job Job affinity partition.
* @param affCacheNames Affinity caches.
* @param partId Partition.
* @param topVer Affinity topology version.
*/
private T5(ClusterNode node,
Callable<R> job,
Collection<String> affCacheNames,
int partId,
AffinityTopologyVersion topVer) {
super(U.peerDeployAware0(job));
this.node = node;
this.job = job;
this.affCacheNames = affCacheNames;
this.partId = partId;
this.topVer = topVer;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return Collections.singletonMap(job(job), node);
}
/** {@inheritDoc} */
@Override public R reduce(List<ComputeJobResult> res) {
for (ComputeJobResult r : res) {
if (r.getException() == null)
return r.getData();
}
throw new IgniteException("Failed to find successful job result: " + res);
}
/** {@inheritDoc} */
@Override public int partition() {
return partId;
}
/** {@inheritDoc} */
@Nullable @Override public Collection<String> affinityCacheNames() {
return affCacheNames;
}
/** {@inheritDoc} */
@Nullable @Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
}
/**
* Task that is free of dragged in enclosing context for the method
* {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection, String)}
*/
private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
private final GridClosureCallMode mode;
/** */
private final Collection<? extends Callable<R>> jobs;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/**
*
* @param mode Call mode.
* @param jobs Collection of jobs.
*/
private T6(
GridClosureCallMode mode,
Collection<? extends Callable<R>> jobs) {
super(U.peerDeployAware0(jobs));
this.mode = mode;
this.jobs = jobs;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return outMap(mode, jobs, subgrid, lb);
}
/** {@inheritDoc} */
@Override public Collection<R> reduce(List<ComputeJobResult> res) {
return jobResults(res);
}
}
/**
* Task that is free of dragged in enclosing context for the method
* {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection, String)}
*/
private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
private IgniteBiTuple<GridClosureCallMode, Callable<R>> t;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/**
* @param mode Call mode.
* @param job Job.
*/
private T7(GridClosureCallMode mode, Callable<R> job) {
super(U.peerDeployAware(job));
t = F.t(mode, job);
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
return outMap(t.get1(), F.asList(t.get2()), subgrid, lb);
}
/** {@inheritDoc} */
@Override public R reduce(List<ComputeJobResult> res) {
for (ComputeJobResult r : res)
if (r.getException() == null)
return r.getData();
throw new IgniteException("Failed to find successful job result: " + res);
}
}
/**
*/
private static class T8<T, R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
private IgniteClosure<T, R> job;
/** */
private T arg;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/**
* @param job Job.
* @param arg Optional job argument.
*/
private T8(IgniteClosure<T, R> job, @Nullable T arg) {
super(U.peerDeployAware(job));
this.job = job;
this.arg = arg;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
ComputeJob job = job(this.job, this.arg);
ClusterNode node = lb.getBalancedNode(job, null);
return Collections.singletonMap(job, node);
}
/** {@inheritDoc} */
@Override public R reduce(List<ComputeJobResult> res) throws IgniteException {
for (ComputeJobResult r : res)
if (r.getException() == null)
return r.getData();
throw new IgniteException("Failed to find successful job result: " + res);
}
}
/**
*/
private class T9<T, R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>>
implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
private IgniteClosure<T, R> job;
/** */
private Collection<? extends T> args;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/**
* @param job Job.
* @param args Job arguments.
*/
private T9(IgniteClosure<T, R> job, Collection<? extends T> args) {
super(U.peerDeployAware(job));
this.job = job;
this.args = args;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
try {
JobMapper mapper = new JobMapper(args.size());
for (T jobArg : args) {
ComputeJob job = job(this.job, jobArg);
ClusterNode node = lb.getBalancedNode(job, null);
mapper.map(job, node);
}
return mapper.map();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public Collection<R> reduce(List<ComputeJobResult> res) throws IgniteException {
return jobResults(res);
}
}
/**
*/
private class T10<T, R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
private IgniteClosure<T, R1> job;
/** */
private Collection<? extends T> args;
/** */
private IgniteReducer<R1, R2> rdc;
/** */
@LoadBalancerResource
private ComputeLoadBalancer lb;
/**
* @param job Job.
* @param args Job arguments.
* @param rdc Reducer.
*/
private T10(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc) {
super(U.peerDeployAware(job));
this.job = job;
this.args = args;
this.rdc = rdc;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) {
try {
JobMapper mapper = new JobMapper(args.size());
for (T jobArg : args) {
ComputeJob job = job(this.job, jobArg);
ClusterNode node = lb.getBalancedNode(job, null);
mapper.map(job, node);
}
return mapper.map();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
ComputeJobResultPolicy resPlc = super.result(res, rcvd);
if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1)res.getData()))
resPlc = REDUCE; // If reducer returned false - reduce right away.
return resPlc;
}
/** {@inheritDoc} */
@Override public R2 reduce(List<ComputeJobResult> res) {
return rdc.reduce();
}
}
/**
*/
private class T11<T, R> extends GridPeerDeployAwareTaskAdapter<T, Collection<R>>
implements GridNoImplicitInjection {
/** */
private static final long serialVersionUID = 0L;
/** */
private final IgniteClosure<T, R> job;
/**
* @param job Job.
*/
private T11(IgniteClosure<T, R> job) {
super(U.peerDeployAware(job));
this.job = job;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable T arg) {
if (F.isEmpty(subgrid))
return Collections.emptyMap();
try {
JobMapper mapper = new JobMapper(subgrid.size());
for (ClusterNode n : subgrid)
mapper.map(job(job, arg), n);
return mapper.map();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public Collection<R> reduce(List<ComputeJobResult> res) {
return jobResults(res);
}
}
/**
*
*/
public static class C1<T, R> implements ComputeJob, Binarylizable, GridNoImplicitInjection,
GridInternalWrapper<IgniteClosure> {
/** */
private static final long serialVersionUID = 0L;
/** */
@GridToStringInclude
protected IgniteClosure<T, R> job;
/** */
@GridToStringInclude(sensitive = true)
protected T arg;
/**
*
*/
public C1() {
// No-op.
}
/**
* @param job Job.
* @param arg Argument.
*/
C1(IgniteClosure<T, R> job, T arg) {
this.job = job;
this.arg = arg;
}
/** {@inheritDoc} */
@Nullable @Override public Object execute() {
return job.apply(arg);
}
/** {@inheritDoc} */
@Override public void cancel() {
// No-op.
}
/** {@inheritDoc} */
@Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
BinaryRawWriter rawWriter = writer.rawWriter();
rawWriter.writeObject(job);
rawWriter.writeObject(arg);
}
/** {@inheritDoc} */
@Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
BinaryRawReader rawReader = reader.rawReader();
job = rawReader.readObject();
arg = rawReader.readObject();
}
/** {@inheritDoc} */
@Override public IgniteClosure userObject() {
return job;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(C1.class, this);
}
}
/**
*
*/
public static class C1MLA<T, R> extends C1<T, R> implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
/**
*
*/
public C1MLA() {
// No-op.
}
/**
* @param job Job.
* @param arg Argument.
*/
private C1MLA(IgniteClosure<T, R> job, T arg) {
super(job, arg);
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(C1MLA.class, this, super.toString());
}
}
/**
*
*/
public static class C2<R> implements ComputeJob, Binarylizable, GridNoImplicitInjection,
GridInternalWrapper<Callable> {
/** */
private static final long serialVersionUID = 0L;
/** */
@GridToStringInclude
protected Callable<R> c;
/**
*
*/
public C2() {
// No-op.
}
/**
* @param c Callable.
*/
private C2(Callable<R> c) {
this.c = c;
}
/** {@inheritDoc} */
@Override public Object execute() {
try {
return c.call();
}
catch (Exception e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void cancel() {
// No-op.
}
/** {@inheritDoc} */
@Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
writer.rawWriter().writeObject(c);
}
/** {@inheritDoc} */
@Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
c = reader.rawReader().readObject();
}
/** {@inheritDoc} */
@Override public Callable userObject() {
return c;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(C2.class, this);
}
}
/**
*
*/
public static class C2MLA<R> extends C2<R> implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
/**
*
*/
public C2MLA() {
// No-op.
}
/**
* @param c Callable.
*/
private C2MLA(Callable<R> c) {
super(c);
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(C2MLA.class, this, super.toString());
}
}
/**
*/
public static class C4 implements ComputeJob, Binarylizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
/** */
private static final long serialVersionUID = 0L;
/** */
@GridToStringInclude
protected Runnable r;
/**
*
*/
public C4() {
// No-op.
}
/**
* @param r Runnable.
*/
private C4(Runnable r) {
this.r = r;
}
/** {@inheritDoc} */
@Nullable @Override public Object execute() {
r.run();
return null;
}
/** {@inheritDoc} */
@Override public void cancel() {
// No-op.
}
/** {@inheritDoc} */
@Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
writer.rawWriter().writeObject(r);
}
/** {@inheritDoc} */
@Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
r = reader.rawReader().readObject();
}
/** {@inheritDoc} */
@Override public Runnable userObject() {
return r;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(C4.class, this);
}
}
/**
*
*/
public static class C4MLA extends C4 implements ComputeJobMasterLeaveAware {
/** */
private static final long serialVersionUID = 0L;
/**
*
*/
public C4MLA() {
// No-op.
}
/**
* @param r Runnable.
*/
private C4MLA(Runnable r) {
super(r);
}
/** {@inheritDoc} */
@Override public void onMasterNodeLeft(ComputeTaskSession ses) {
((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(C4MLA.class, this, super.toString());
}
}
}