blob: d49d089205da20f0301a8d57f307193bff4d483d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.jobtracker;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.hadoop.HadoopMapReducePlan;
import org.apache.ignite.hadoop.HadoopMapReducePlanner;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
import org.apache.ignite.internal.processors.hadoop.HadoopContext;
import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
import org.apache.ignite.internal.util.GridMutex;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_COMPLETE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_MAP;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_REDUCE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.ABORT;
import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMMIT;
import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.REDUCE;
import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.SETUP;
import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.COMPLETED;
import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED;
import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED;
import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.RUNNING;
/**
* Hadoop job tracker.
*/
public class HadoopJobTracker extends HadoopComponent {
/** */
private static final CachePeekMode[] OFFHEAP_PEEK_MODE = {CachePeekMode.OFFHEAP};
/** */
private final GridMutex mux = new GridMutex();
/** */
private volatile IgniteInternalCache<HadoopJobId, HadoopJobMetadata> jobMetaPrj;
/** Projection with expiry policy for finished job updates. */
private volatile IgniteInternalCache<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
/** Map-reduce execution planner. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private HadoopMapReducePlanner mrPlanner;
/** All the known jobs. */
private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobEx>> jobs = new ConcurrentHashMap<>();
/** Locally active jobs. */
private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap<>();
/** Locally requested finish futures. */
private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobId>> activeFinishFuts =
new ConcurrentHashMap<>();
/** Event processing service. */
private ExecutorService evtProcSvc;
/** Component busy lock. */
private GridSpinReadWriteLock busyLock;
/** Class to create HadoopJobEx instances from. */
private Class<? extends HadoopJobEx> jobCls;
/** Closure to check result of async transform of system cache. */
private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> gridFut) {
try {
gridFut.get();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to transform system cache.", e);
}
}
};
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void start(final HadoopContext ctx) throws IgniteCheckedException {
super.start(ctx);
busyLock = new GridSpinReadWriteLock();
evtProcSvc = Executors.newFixedThreadPool(1);
assert jobCls == null;
HadoopClassLoader ldr = ctx.kernalContext().hadoopHelper().commonClassLoader();
try {
jobCls = (Class<HadoopJobEx>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME);
}
catch (Exception ioe) {
throw new IgniteCheckedException("Failed to load job class [class=" +
HadoopCommonUtils.JOB_CLS_NAME + ']', ioe);
}
}
/**
* @return Job meta projection.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
private IgniteInternalCache<HadoopJobId, HadoopJobMetadata> jobMetaCache() {
IgniteInternalCache<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;
if (prj == null) {
synchronized (mux) {
if ((prj = jobMetaPrj) == null) {
GridCacheAdapter<HadoopJobId, HadoopJobMetadata> sysCache = ctx.kernalContext().cache()
.internalCache(CU.SYS_CACHE_HADOOP_MR);
assert sysCache != null;
mrPlanner = ctx.planner();
try {
ctx.kernalContext().resource().injectGeneric(mrPlanner);
}
catch (IgniteCheckedException e) { // Must not happen.
U.error(log, "Failed to inject resources.", e);
throw new IllegalStateException(e);
}
jobMetaPrj = prj = sysCache;
if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl()));
finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc);
}
else
finishedJobMetaPrj = jobMetaPrj;
}
}
}
return prj;
}
/**
* @return Projection with expiry policy for finished job updates.
*/
private IgniteInternalCache<HadoopJobId, HadoopJobMetadata> finishedJobMetaCache() {
IgniteInternalCache<HadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj;
if (prj == null) {
jobMetaCache();
prj = finishedJobMetaPrj;
assert prj != null;
}
return prj;
}
/** {@inheritDoc} */
@Override public void onKernalStart() throws IgniteCheckedException {
super.onKernalStart();
jobMetaCache().context().continuousQueries().executeInternalQuery(
new CacheEntryUpdatedListener<HadoopJobId, HadoopJobMetadata>() {
@Override public void onUpdated(final Iterable<CacheEntryEvent<? extends HadoopJobId,
? extends HadoopJobMetadata>> evts) {
if (!busyLock.tryReadLock())
return;
try {
// Must process query callback in a separate thread to avoid deadlocks.
evtProcSvc.execute(new EventHandler() {
@Override protected void body() throws IgniteCheckedException {
processJobMetadataUpdates(evts);
}
});
}
finally {
busyLock.readUnlock();
}
}
},
null,
true,
true,
false,
false
);
ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(final Event evt) {
if (!busyLock.tryReadLock())
return;
try {
// Must process discovery callback in a separate thread to avoid deadlock.
evtProcSvc.execute(new EventHandler() {
@Override protected void body() {
processNodeLeft((DiscoveryEvent)evt);
}
});
}
finally {
busyLock.readUnlock();
}
}
}, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
super.onKernalStop(cancel);
busyLock.writeLock();
evtProcSvc.shutdown();
// Fail all pending futures.
for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values())
fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping)."));
}
/**
* Submits execution of Hadoop job to grid.
*
* @param jobId Job ID.
* @param info Job info.
* @return Job completion future.
*/
public IgniteInternalFuture<HadoopJobId> submit(HadoopJobId jobId, HadoopJobInfo info) {
if (!busyLock.tryReadLock()) {
return new GridFinishedFuture<>(new IgniteCheckedException("Failed to execute map-reduce job " +
"(grid is stopping): " + info));
}
try {
long jobPrepare = U.currentTimeMillis();
if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
HadoopJobEx job = job(jobId, info);
HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
logPlan(info, mrPlan);
HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
meta.mapReducePlan(mrPlan);
meta.pendingSplits(allSplits(mrPlan));
meta.pendingReducers(allReducers(mrPlan));
GridFutureAdapter<HadoopJobId> completeFut = new GridFutureAdapter<>();
GridFutureAdapter<HadoopJobId> old = activeFinishFuts.put(jobId, completeFut);
assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']';
if (log.isDebugEnabled())
log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']');
long jobStart = U.currentTimeMillis();
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(),
ctx.localNodeId());
perfCntr.clientSubmissionEvents(info);
perfCntr.onJobPrepare(jobPrepare);
perfCntr.onJobStart(jobStart);
if (jobMetaCache().getAndPutIfAbsent(jobId, meta) != null)
throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
return completeFut;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to submit job: " + jobId, e);
return new GridFinishedFuture<>(e);
}
finally {
busyLock.readUnlock();
}
}
/**
* Log map-reduce plan if needed.
*
* @param info Job info.
* @param plan Plan.
*/
@SuppressWarnings("StringConcatenationInsideStringBufferAppend")
private void logPlan(HadoopJobInfo info, HadoopMapReducePlan plan) {
if (log.isDebugEnabled()) {
Map<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> map = new HashMap<>();
for (UUID nodeId : plan.mapperNodeIds())
map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(plan.mappers(nodeId), null));
for (UUID nodeId : plan.reducerNodeIds()) {
int[] reducers = plan.reducers(nodeId);
IgniteBiTuple<Collection<HadoopInputSplit>, int[]> entry = map.get(nodeId);
if (entry == null)
map.put(nodeId, new IgniteBiTuple<Collection<HadoopInputSplit>, int[]>(null, reducers));
else
entry.set2(reducers);
}
StringBuilder details = new StringBuilder("[");
boolean first = true;
for (Map.Entry<UUID, IgniteBiTuple<Collection<HadoopInputSplit>, int[]>> entry : map.entrySet()) {
if (first)
first = false;
else
details.append(", ");
UUID nodeId = entry.getKey();
Collection<HadoopInputSplit> mappers = entry.getValue().get1();
if (mappers == null)
mappers = Collections.emptyList();
int[] reducers = entry.getValue().get2();
if (reducers == null)
reducers = new int[0];
details.append("[nodeId=" + nodeId + ", mappers=" + mappers.size() + ", reducers=" + reducers.length +
", mapperDetails=" + mappers + ", reducerDetails=" + Arrays.toString(reducers) + ']');
}
details.append(']');
log.debug("Prepared map-reduce plan [jobName=" + info.jobName() + ", mappers=" + plan.mappers() +
", reducers=" + plan.reducers() + ", details=" + details + ']');
}
}
/**
* Convert Hadoop job metadata to job status.
*
* @param meta Metadata.
* @return Status.
*/
public static HadoopJobStatus status(HadoopJobMetadata meta) {
HadoopJobInfo jobInfo = meta.jobInfo();
return new HadoopJobStatus(
meta.jobId(),
jobInfo.jobName(),
jobInfo.user(),
meta.pendingSplits() != null ? meta.pendingSplits().size() : 0,
meta.pendingReducers() != null ? meta.pendingReducers().size() : 0,
meta.mapReducePlan().mappers(),
meta.mapReducePlan().reducers(),
meta.phase(),
meta.failCause() != null,
meta.version()
);
}
/**
* Gets hadoop job status for given job ID.
*
* @param jobId Job ID to get status for.
* @return Job status for given job ID or {@code null} if job was not found.
*/
@Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
if (!busyLock.tryReadLock())
return null; // Grid is stopping.
try {
HadoopJobMetadata meta = jobMetaCache().get(jobId);
return meta != null ? status(meta) : null;
}
finally {
busyLock.readUnlock();
}
}
/**
* Gets job finish future.
*
* @param jobId Job ID.
* @return Finish future or {@code null}.
* @throws IgniteCheckedException If failed.
*/
@Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
if (!busyLock.tryReadLock())
return null; // Grid is stopping.
try {
HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta == null)
return null;
if (log.isTraceEnabled())
log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
if (meta.phase() == PHASE_COMPLETE) {
if (log.isTraceEnabled())
log.trace("Job is complete, returning finished future: " + jobId);
return new GridFinishedFuture<>(jobId);
}
GridFutureAdapter<HadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId,
new GridFutureAdapter<HadoopJobId>());
// Get meta from cache one more time to close the window.
meta = jobMetaCache().get(jobId);
if (log.isTraceEnabled())
log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
if (meta == null) {
fut.onDone();
activeFinishFuts.remove(jobId, fut);
}
else if (meta.phase() == PHASE_COMPLETE) {
fut.onDone(jobId, meta.failCause());
activeFinishFuts.remove(jobId, fut);
}
return fut;
}
finally {
busyLock.readUnlock();
}
}
/**
* Gets job plan by job ID.
*
* @param jobId Job ID.
* @return Job plan.
* @throws IgniteCheckedException If failed.
*/
public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException {
if (!busyLock.tryReadLock())
return null;
try {
HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta != null)
return meta.mapReducePlan();
return null;
}
finally {
busyLock.readUnlock();
}
}
/**
* Callback from task executor invoked when a task has been finished.
*
* @param info Task info.
* @param status Task status.
*/
@SuppressWarnings({"ConstantConditions"})
public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) {
if (!busyLock.tryReadLock())
return;
try {
assert status.state() != RUNNING;
if (log.isDebugEnabled())
log.debug("Received task finished callback [info=" + info + ", status=" + status + ']');
JobLocalState state = activeJobs.get(info.jobId());
// Task CRASHes with null fail cause.
assert (status.state() != FAILED) || status.failCause() != null :
"Invalid task status [info=" + info + ", status=" + status + ']';
assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)) :
"Missing local state for finished task [info=" + info + ", status=" + status + ']';
StackedProcessor incrCntrs = null;
if (status.state() == COMPLETED)
incrCntrs = new IncrementCountersProcessor(null, status.counters());
switch (info.type()) {
case SETUP: {
state.onSetupFinished(info, status, incrCntrs);
break;
}
case MAP: {
state.onMapFinished(info, status, incrCntrs);
break;
}
case REDUCE: {
state.onReduceFinished(info, status, incrCntrs);
break;
}
case COMBINE: {
state.onCombineFinished(info, status, incrCntrs);
break;
}
case COMMIT:
case ABORT: {
IgniteInternalCache<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache();
cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
listen(failsLog);
break;
}
}
}
finally {
busyLock.readUnlock();
}
}
/**
* @param jobId Job id.
* @param c Closure of operation.
*/
private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) {
jobMetaCache().invokeAsync(jobId, c).listen(failsLog);
}
/**
* Callback from task executor called when process is ready to received shuffle messages.
*
* @param jobId Job ID.
* @param reducers Reducers.
* @param desc Process descriptor.
*/
public void onExternalMappersInitialized(HadoopJobId jobId, Collection<Integer> reducers,
HadoopProcessDescriptor desc) {
transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
}
/**
* Gets all input splits for given hadoop map-reduce plan.
*
* @param plan Map-reduce plan.
* @return Collection of all input splits that should be processed.
*/
@SuppressWarnings("ConstantConditions")
private Map<HadoopInputSplit, Integer> allSplits(HadoopMapReducePlan plan) {
Map<HadoopInputSplit, Integer> res = new HashMap<>();
int taskNum = 0;
for (UUID nodeId : plan.mapperNodeIds()) {
for (HadoopInputSplit split : plan.mappers(nodeId)) {
if (res.put(split, taskNum++) != null)
throw new IllegalStateException("Split duplicate.");
}
}
return res;
}
/**
* Gets all reducers for this job.
*
* @param plan Map-reduce plan.
* @return Collection of reducers.
*/
private Collection<Integer> allReducers(HadoopMapReducePlan plan) {
Collection<Integer> res = new HashSet<>();
for (int i = 0; i < plan.reducers(); i++)
res.add(i);
return res;
}
/**
* Processes node leave (or fail) event.
*
* @param evt Discovery event.
*/
@SuppressWarnings("ConstantConditions")
private void processNodeLeft(DiscoveryEvent evt) {
if (log.isDebugEnabled())
log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']');
// Check only if this node is responsible for job status updates.
if (ctx.jobUpdateLeader()) {
boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder();
Iterable<IgniteCache.Entry<HadoopJobId, HadoopJobMetadata>> entries;
try {
entries = jobMetaCache().localEntries(OFFHEAP_PEEK_MODE);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to get local entries", e);
return;
}
// Iteration over all local entries is correct since system cache is REPLICATED.
for (IgniteCache.Entry<HadoopJobId, HadoopJobMetadata> entry : entries) {
HadoopJobMetadata meta = entry.getValue();
HadoopJobId jobId = meta.jobId();
HadoopMapReducePlan plan = meta.mapReducePlan();
HadoopJobPhase phase = meta.phase();
try {
if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
// Failover setup task.
HadoopJobEx job = job(jobId, meta.jobInfo());
Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
assert setupTask != null;
ctx.taskExecutor().run(job, setupTask);
}
else if (phase == PHASE_MAP || phase == PHASE_REDUCE) {
// Must check all nodes, even that are not event node ID due to
// multiple node failure possibility.
Collection<HadoopInputSplit> cancelSplits = null;
for (UUID nodeId : plan.mapperNodeIds()) {
if (ctx.kernalContext().discovery().node(nodeId) == null) {
// Node has left the grid.
Collection<HadoopInputSplit> mappers = plan.mappers(nodeId);
if (cancelSplits == null)
cancelSplits = new HashSet<>();
cancelSplits.addAll(mappers);
}
}
Collection<Integer> cancelReducers = null;
for (UUID nodeId : plan.reducerNodeIds()) {
if (ctx.kernalContext().discovery().node(nodeId) == null) {
// Node has left the grid.
int[] reducers = plan.reducers(nodeId);
if (cancelReducers == null)
cancelReducers = new HashSet<>();
for (int rdc : reducers)
cancelReducers.add(rdc);
}
}
if (cancelSplits != null || cancelReducers != null)
jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
"One or more nodes participating in map-reduce job execution failed."), cancelSplits,
cancelReducers));
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to cancel job: " + meta, e);
}
}
}
}
/**
* @param updated Updated cache entries.
* @throws IgniteCheckedException If failed.
*/
private void processJobMetadataUpdates(
Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> updated)
throws IgniteCheckedException {
UUID locNodeId = ctx.localNodeId();
for (CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata> entry : updated) {
HadoopJobId jobId = entry.getKey();
HadoopJobMetadata meta = entry.getValue();
if (meta == null || !ctx.isParticipating(meta))
continue;
if (log.isDebugEnabled())
log.debug("Processing job metadata update callback [locNodeId=" + locNodeId +
", meta=" + meta + ']');
try {
ctx.taskExecutor().onJobStateChanged(meta);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process job state changed callback (will fail the job) " +
"[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
transform(jobId, new CancelJobProcessor(null, e));
continue;
}
processJobMetaUpdate(jobId, meta, locNodeId);
}
}
/**
* @param jobId Job ID.
* @param plan Map-reduce plan.
*/
@SuppressWarnings({"unused", "ConstantConditions" })
private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) {
if (log.isInfoEnabled()) {
log.info("Plan for " + jobId);
SB b = new SB();
b.a(" Map: ");
for (UUID nodeId : plan.mapperNodeIds())
b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' ');
log.info(b.toString());
b = new SB();
b.a(" Reduce: ");
for (UUID nodeId : plan.reducerNodeIds())
b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' ');
log.info(b.toString());
}
}
/**
* @param jobId Job ID.
* @param meta Job metadata.
* @param locNodeId Local node ID.
* @throws IgniteCheckedException If failed.
*/
private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId)
throws IgniteCheckedException {
JobLocalState state = activeJobs.get(jobId);
HadoopJobEx job = job(jobId, meta.jobInfo());
HadoopMapReducePlan plan = meta.mapReducePlan();
switch (meta.phase()) {
case PHASE_SETUP: {
if (ctx.jobUpdateLeader()) {
Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
if (setupTask != null)
ctx.taskExecutor().run(job, setupTask);
}
break;
}
case PHASE_MAP: {
// Check if we should initiate new task on local node.
Collection<HadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta);
if (tasks != null)
ctx.taskExecutor().run(job, tasks);
break;
}
case PHASE_REDUCE: {
if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) {
HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null);
if (log.isDebugEnabled())
log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId +
", jobId=" + jobId + ']');
ctx.taskExecutor().run(job, Collections.singletonList(info));
break;
}
Collection<HadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job);
if (tasks != null)
ctx.taskExecutor().run(job, tasks);
break;
}
case PHASE_CANCELLING: {
// Prevent multiple task executor notification.
if (state != null && state.onCancel()) {
if (log.isDebugEnabled())
log.debug("Cancelling local task execution for job: " + meta);
ctx.taskExecutor().cancelTasks(jobId);
}
if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) {
if (ctx.jobUpdateLeader()) {
if (state == null)
state = initState(jobId);
// Prevent running multiple abort tasks.
if (state.onAborted()) {
HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null);
if (log.isDebugEnabled())
log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId +
", jobId=" + jobId + ']');
ctx.taskExecutor().run(job, Collections.singletonList(info));
}
}
break;
}
else {
// Check if there are unscheduled mappers or reducers.
Collection<HadoopInputSplit> cancelMappers = new ArrayList<>();
Collection<Integer> cancelReducers = new ArrayList<>();
Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
if (mappers != null) {
for (HadoopInputSplit b : mappers) {
if (state == null || !state.mapperScheduled(b))
cancelMappers.add(b);
}
}
int[] rdc = plan.reducers(ctx.localNodeId());
if (rdc != null) {
for (int r : rdc) {
if (state == null || !state.reducerScheduled(r))
cancelReducers.add(r);
}
}
if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
}
break;
}
case PHASE_COMPLETE: {
if (log.isDebugEnabled())
log.debug("Job execution is complete, will remove local state from active jobs " +
"[jobId=" + jobId + ", meta=" + meta + ']');
if (state != null) {
state = activeJobs.remove(jobId);
assert state != null;
ctx.shuffle().jobFinished(jobId);
}
GridFutureAdapter<HadoopJobId> finishFut = activeFinishFuts.remove(jobId);
if (finishFut != null) {
if (log.isDebugEnabled())
log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']');
finishFut.onDone(jobId, meta.failCause());
}
assert job != null;
if (ctx.jobUpdateLeader())
job.cleanupStagingDirectory();
jobs.remove(jobId);
if (ctx.jobUpdateLeader()) {
ClassLoader ldr = job.getClass().getClassLoader();
try {
String statWriterClsName = job.info().property(HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY);
if (statWriterClsName != null) {
Class<?> cls = ldr.loadClass(statWriterClsName);
HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance();
HadoopCounters cntrs = meta.counters();
writer.write(job, cntrs);
}
}
catch (Exception e) {
log.error("Can't write statistic due to: ", e);
}
}
job.dispose(false);
break;
}
default:
throw new IllegalStateException("Unknown phase: " + meta.phase());
}
}
/**
* Creates setup task based on job information.
*
* @param jobId Job ID.
* @return Setup task wrapped in collection.
*/
@Nullable private Collection<HadoopTaskInfo> setupTask(HadoopJobId jobId) {
if (activeJobs.containsKey(jobId))
return null;
else {
initState(jobId);
return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null));
}
}
/**
* Creates mapper tasks based on job information.
*
* @param mappers Mapper blocks.
* @param meta Job metadata.
* @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
*/
private Collection<HadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) {
UUID locNodeId = ctx.localNodeId();
HadoopJobId jobId = meta.jobId();
JobLocalState state = activeJobs.get(jobId);
Collection<HadoopTaskInfo> tasks = null;
if (mappers != null) {
if (state == null)
state = initState(jobId);
int mapperIdx = 0;
for (HadoopInputSplit split : mappers) {
if (state.addMapper(split)) {
if (log.isDebugEnabled())
log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId +
", split=" + split + ']');
HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
taskInfo.mapperIndex(mapperIdx++);
if (tasks == null)
tasks = new ArrayList<>();
tasks.add(taskInfo);
}
}
}
return tasks;
}
/**
* Creates reducer tasks based on job information.
*
* @param reducers Reducers (may be {@code null}).
* @param job Job instance.
* @return Collection of task infos.
*/
private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJobEx job) {
UUID locNodeId = ctx.localNodeId();
HadoopJobId jobId = job.id();
JobLocalState state = activeJobs.get(jobId);
Collection<HadoopTaskInfo> tasks = null;
if (reducers != null) {
if (state == null)
state = initState(job.id());
for (int rdc : reducers) {
if (state.addReducer(rdc)) {
if (log.isDebugEnabled())
log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId +
", rdc=" + rdc + ']');
HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null);
if (tasks == null)
tasks = new ArrayList<>();
tasks.add(taskInfo);
}
}
}
return tasks;
}
/**
* Initializes local state for given job metadata.
*
* @param jobId Job ID.
* @return Local state.
*/
private JobLocalState initState(HadoopJobId jobId) {
return F.addIfAbsent(activeJobs, jobId, new JobLocalState());
}
/**
* Gets or creates job instance.
*
* @param jobId Job ID.
* @param jobInfo Job info.
* @return Job.
* @throws IgniteCheckedException If failed.
*/
@Nullable public HadoopJobEx job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
GridFutureAdapter<HadoopJobEx> fut = jobs.get(jobId);
if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJobEx>())) != null)
return fut.get();
fut = jobs.get(jobId);
HadoopJobEx job = null;
try {
if (jobInfo == null) {
HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta == null)
throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
jobInfo = meta.jobInfo();
}
job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames(),
ctx.kernalContext().hadoopHelper());
job.initialize(false, ctx.localNodeId());
fut.onDone(job);
return job;
}
catch (IgniteCheckedException e) {
fut.onDone(e);
jobs.remove(jobId, fut);
if (job != null) {
try {
job.dispose(false);
}
catch (IgniteCheckedException e0) {
U.error(log, "Failed to dispose job: " + jobId, e0);
}
}
throw e;
}
}
/**
* Kills job.
*
* @param jobId Job ID.
* @return {@code True} if job was killed.
* @throws IgniteCheckedException If failed.
*/
public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException {
if (!busyLock.tryReadLock())
return false; // Grid is stopping.
try {
HadoopJobMetadata meta = jobMetaCache().get(jobId);
if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
}
}
finally {
busyLock.readUnlock();
}
IgniteInternalFuture<?> fut = finishFuture(jobId);
if (fut != null) {
try {
fut.get();
}
catch (Exception e) {
if (e.getCause() instanceof HadoopTaskCancelledException)
return true;
}
}
return false;
}
/**
* Returns job counters.
*
* @param jobId Job identifier.
* @return Job counters or {@code null} if job cannot be found.
* @throws IgniteCheckedException If failed.
*/
@Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException {
if (!busyLock.tryReadLock())
return null;
try {
final HadoopJobMetadata meta = jobMetaCache().get(jobId);
return meta != null ? meta.counters() : null;
}
finally {
busyLock.readUnlock();
}
}
/**
* Event handler protected by busy lock.
*/
private abstract class EventHandler implements Runnable {
/** {@inheritDoc} */
@Override public void run() {
if (!busyLock.tryReadLock())
return;
try {
body();
}
catch (Throwable e) {
U.error(log, "Unhandled exception while processing event.", e);
if (e instanceof Error)
throw (Error)e;
}
finally {
busyLock.readUnlock();
}
}
/**
* Handler body.
*/
protected abstract void body() throws Exception;
}
/**
*
*/
private class JobLocalState {
/** Mappers. */
private final Collection<HadoopInputSplit> currMappers = new HashSet<>();
/** Reducers. */
private final Collection<Integer> currReducers = new HashSet<>();
/** Number of completed mappers. */
private final AtomicInteger completedMappersCnt = new AtomicInteger();
/** Cancelled flag. */
private boolean cancelled;
/** Aborted flag. */
private boolean aborted;
/**
* @param mapSplit Map split to add.
* @return {@code True} if mapper was added.
*/
private boolean addMapper(HadoopInputSplit mapSplit) {
return currMappers.add(mapSplit);
}
/**
* @param rdc Reducer number to add.
* @return {@code True} if reducer was added.
*/
private boolean addReducer(int rdc) {
return currReducers.add(rdc);
}
/**
* Checks whether this split was scheduled for given attempt.
*
* @param mapSplit Map split to check.
* @return {@code True} if mapper was scheduled.
*/
public boolean mapperScheduled(HadoopInputSplit mapSplit) {
return currMappers.contains(mapSplit);
}
/**
* Checks whether this split was scheduled for given attempt.
*
* @param rdc Reducer number to check.
* @return {@code True} if reducer was scheduled.
*/
public boolean reducerScheduled(int rdc) {
return currReducers.contains(rdc);
}
/**
* @param taskInfo Task info.
* @param status Task status.
* @param prev Previous closure.
*/
private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
final HadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
transform(jobId, new CancelJobProcessor(prev, status.failCause()));
else
transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
}
/**
* @param taskInfo Task info.
* @param status Task status.
* @param prev Previous closure.
*/
private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status,
final StackedProcessor prev) {
final HadoopJobId jobId = taskInfo.jobId();
boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
if (status.state() == FAILED || status.state() == CRASHED) {
// Fail the whole job.
transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
return;
}
IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() {
@Override public void applyx(IgniteInternalFuture<?> f) {
Throwable err = null;
if (f != null) {
try {
f.get();
}
catch (IgniteCheckedException e) {
err = e;
}
}
transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
}
};
if (lastMapperFinished)
ctx.shuffle().flush(jobId).listen(cacheUpdater);
else
cacheUpdater.apply(null);
}
/**
* @param taskInfo Task info.
* @param status Task status.
* @param prev Previous closure.
*/
private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
HadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
// Fail the whole job.
transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
else
transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
}
/**
* @param taskInfo Task info.
* @param status Task status.
* @param prev Previous closure.
*/
private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status,
final StackedProcessor prev) {
final HadoopJobId jobId = taskInfo.jobId();
if (status.state() == FAILED || status.state() == CRASHED)
// Fail the whole job.
transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
else {
ctx.shuffle().flush(jobId).listen(new CIX1<IgniteInternalFuture<?>>() {
@Override public void applyx(IgniteInternalFuture<?> f) {
Throwable err = null;
if (f != null) {
try {
f.get();
}
catch (IgniteCheckedException e) {
err = e;
}
}
transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
}
});
}
}
/**
* @return {@code True} if job was cancelled by this (first) call.
*/
public boolean onCancel() {
if (!cancelled && !aborted) {
cancelled = true;
return true;
}
return false;
}
/**
* @return {@code True} if job was aborted this (first) call.
*/
public boolean onAborted() {
if (!aborted) {
aborted = true;
return true;
}
return false;
}
}
/**
* Update job phase transform closure.
*/
private static class UpdatePhaseProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
/** Phase to update. */
private final HadoopJobPhase phase;
/**
* @param prev Previous closure.
* @param phase Phase to update.
*/
private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) {
super(prev);
this.phase = phase;
}
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
cp.phase(phase);
}
}
/**
* Remove mapper transform closure.
*/
private static class RemoveMappersProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
/** Mapper split to remove. */
private final Collection<HadoopInputSplit> splits;
/** Error. */
private final Throwable err;
/**
* @param prev Previous closure.
* @param split Mapper split to remove.
* @param err Error.
*/
private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) {
this(prev, Collections.singletonList(split), err);
}
/**
* @param prev Previous closure.
* @param splits Mapper splits to remove.
* @param err Error.
*/
private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits,
Throwable err) {
super(prev);
this.splits = splits;
this.err = err;
}
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
for (HadoopInputSplit s : splits)
splitsCp.remove(s);
cp.pendingSplits(splitsCp);
if (cp.phase() != PHASE_CANCELLING && err != null)
cp.failCause(err);
if (err != null)
cp.phase(PHASE_CANCELLING);
if (splitsCp.isEmpty()) {
if (cp.phase() != PHASE_CANCELLING)
cp.phase(PHASE_REDUCE);
}
}
}
/**
* Remove reducer transform closure.
*/
private static class RemoveReducerProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
/** Mapper split to remove. */
private final int rdc;
/** Error. */
private Throwable err;
/**
* @param prev Previous closure.
* @param rdc Reducer to remove.
*/
private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
super(prev);
this.rdc = rdc;
}
/**
* @param prev Previous closure.
* @param rdc Reducer to remove.
* @param err Error.
*/
private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
super(prev);
this.rdc = rdc;
this.err = err;
}
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
rdcCp.remove(rdc);
cp.pendingReducers(rdcCp);
if (err != null) {
cp.phase(PHASE_CANCELLING);
cp.failCause(err);
}
}
}
/**
* Initialize reducers.
*/
private static class InitializeReducersProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
/** Reducers. */
private final Collection<Integer> rdc;
/** Process descriptor for reducers. */
private final HadoopProcessDescriptor desc;
/**
* @param prev Previous closure.
* @param rdc Reducers to initialize.
* @param desc External process descriptor.
*/
private InitializeReducersProcessor(@Nullable StackedProcessor prev,
Collection<Integer> rdc,
HadoopProcessDescriptor desc) {
super(prev);
assert !F.isEmpty(rdc);
assert desc != null;
this.rdc = rdc;
this.desc = desc;
}
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses();
Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ?
new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap);
for (Integer r : rdc)
rdcMap.put(r, desc);
cp.reducersAddresses(rdcMap);
}
}
/**
* Remove reducer transform closure.
*/
private static class CancelJobProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
/** Mapper split to remove. */
private final Collection<HadoopInputSplit> splits;
/** Reducers to remove. */
private final Collection<Integer> rdc;
/** Error. */
private final Throwable err;
/**
* @param prev Previous closure.
* @param err Fail cause.
*/
private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
this(prev, err, null, null);
}
/**
* @param prev Previous closure.
* @param splits Splits to remove.
* @param rdc Reducers to remove.
*/
private CancelJobProcessor(@Nullable StackedProcessor prev,
Collection<HadoopInputSplit> splits,
Collection<Integer> rdc) {
this(prev, null, splits, rdc);
}
/**
* @param prev Previous closure.
* @param err Error.
* @param splits Splits to remove.
* @param rdc Reducers to remove.
*/
private CancelJobProcessor(@Nullable StackedProcessor prev,
Throwable err,
Collection<HadoopInputSplit> splits,
Collection<Integer> rdc) {
super(prev);
this.splits = splits;
this.rdc = rdc;
this.err = err;
}
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
final HadoopJobPhase currPhase = meta.phase();
assert currPhase == PHASE_CANCELLING || currPhase == PHASE_COMPLETE
|| err != null : "Invalid phase for cancel: " + currPhase;
Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
if (rdc != null)
rdcCp.removeAll(rdc);
cp.pendingReducers(rdcCp);
Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
if (splits != null) {
for (HadoopInputSplit s : splits)
splitsCp.remove(s);
}
cp.pendingSplits(splitsCp);
if (currPhase != PHASE_COMPLETE && currPhase != PHASE_CANCELLING)
cp.phase(PHASE_CANCELLING);
if (err != null)
cp.failCause(err);
}
}
/**
* Increment counter values closure.
*/
private static class IncrementCountersProcessor extends StackedProcessor {
/** */
private static final long serialVersionUID = 0L;
/** */
private final HadoopCounters counters;
/**
* @param prev Previous closure.
* @param counters Task counters to add into job counters.
*/
private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) {
super(prev);
assert counters != null;
this.counters = counters;
}
/** {@inheritDoc} */
@Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
HadoopCounters cntrs = new HadoopCountersImpl(cp.counters());
cntrs.merge(counters);
cp.counters(cntrs);
}
}
/**
* Abstract stacked closure.
*/
private abstract static class StackedProcessor implements
EntryProcessor<HadoopJobId, HadoopJobMetadata, Void>, Serializable {
/** */
private static final long serialVersionUID = 0L;
/** */
private final StackedProcessor prev;
/**
* @param prev Previous closure.
*/
private StackedProcessor(@Nullable StackedProcessor prev) {
this.prev = prev;
}
/** {@inheritDoc} */
@Override public Void process(MutableEntry<HadoopJobId, HadoopJobMetadata> e, Object... args) {
HadoopJobMetadata val = apply(e.getValue());
if (val != null)
e.setValue(val);
else
e.remove();
return null;
}
/**
* @param meta Old value.
* @return New value.
*/
private HadoopJobMetadata apply(HadoopJobMetadata meta) {
if (meta == null)
return null;
HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta);
update(meta, cp);
return cp;
}
/**
* Update given job metadata object.
*
* @param meta Initial job metadata.
* @param cp Copy.
*/
protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
}
}