blob: 9338968ae9d8b59914c73b8919d89249fd0911b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.shuffle;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.hadoop.HadoopInputSplit;
import org.apache.ignite.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
import org.apache.ignite.internal.processors.hadoop.HadoopContext;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Shuffle.
*/
public class HadoopShuffle extends HadoopComponent {
/** */
private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
/** */
protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
/** Mutex for iternal synchronization. */
private final Object mux = new Object();
/** {@inheritDoc} */
@Override public void start(HadoopContext ctx) throws IgniteCheckedException {
super.start(ctx);
ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
onMessageReceived(nodeId, (HadoopMessage)msg);
}
});
ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
new IgniteBiPredicate<UUID, Object>() {
@Override public boolean apply(UUID nodeId, Object msg) {
return onMessageReceived(nodeId, (HadoopMessage)msg);
}
});
}
/**
* Stops shuffle.
*
* @param cancel If should cancel all ongoing activities.
*/
@Override public void stop(boolean cancel) {
for (HadoopShuffleJob job : jobs.values()) {
try {
job.close();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to close job.", e);
}
}
jobs.clear();
}
/**
* Creates new shuffle job.
*
* @param jobId Job ID.
* @return Created shuffle job.
* @throws IgniteCheckedException If job creation failed.
*/
private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, ctx.jobTracker().job(jobId, null),
mem, plan.reducers(), plan.reducers(ctx.localNodeId()), localMappersCount(plan), true);
UUID[] rdcAddrs = new UUID[plan.reducers()];
for (int i = 0; i < rdcAddrs.length; i++) {
UUID nodeId = plan.nodeForReducer(i);
assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
rdcAddrs[i] = nodeId;
}
boolean init = job.initializeReduceAddresses(rdcAddrs);
assert init;
return job;
}
/**
* Get number of local mappers.
*
* @param plan Plan.
* @return Number of local mappers.
*/
private int localMappersCount(HadoopMapReducePlan plan) {
Collection<HadoopInputSplit> locMappers = plan.mappers(ctx.localNodeId());
return F.isEmpty(locMappers) ? 0 : locMappers.size();
}
/**
* @param nodeId Node ID to send message to.
* @param msg Message to send.
* @throws IgniteCheckedException If send failed.
*/
private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
if (msg instanceof Message)
ctx.kernalContext().io().sendToGridTopic(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
else
ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
}
/**
* @param jobId Task info.
* @return Shuffle job.
* @throws IgniteCheckedException If failed.
*/
private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
HadoopShuffleJob<UUID> res = jobs.get(jobId);
if (res == null) {
synchronized (mux) {
res = jobs.get(jobId);
if (res == null) {
res = newJob(jobId);
HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
if (old != null) {
res.close();
res = old;
}
else if (res.reducersInitialized())
startSending(res);
}
}
}
return res;
}
/**
* Starts message sending thread.
*
* @param shuffleJob Job to start sending for.
*/
private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
shuffleJob.startSending(ctx.kernalContext().igniteInstanceName(),
new IgniteInClosure2X<UUID, HadoopMessage>() {
@Override public void applyx(UUID dest, HadoopMessage msg) throws IgniteCheckedException {
send0(dest, msg);
}
}
);
}
/**
* Message received callback.
*
* @param src Sender node ID.
* @param msg Received message.
* @return {@code True}.
*/
public boolean onMessageReceived(UUID src, HadoopMessage msg) {
try {
if (msg instanceof HadoopShuffleMessage) {
HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
job(m.jobId()).onShuffleMessage(src, m);
}
else if (msg instanceof HadoopDirectShuffleMessage) {
HadoopDirectShuffleMessage m = (HadoopDirectShuffleMessage)msg;
job(m.jobId()).onDirectShuffleMessage(src, m);
}
else if (msg instanceof HadoopShuffleAck) {
HadoopShuffleAck m = (HadoopShuffleAck)msg;
job(m.jobId()).onShuffleAck(m);
}
else if (msg instanceof HadoopShuffleFinishRequest) {
HadoopShuffleFinishRequest m = (HadoopShuffleFinishRequest)msg;
job(m.jobId()).onShuffleFinishRequest(src, m);
}
else if (msg instanceof HadoopShuffleFinishResponse) {
HadoopShuffleFinishResponse m = (HadoopShuffleFinishResponse)msg;
job(m.jobId()).onShuffleFinishResponse(src);
}
else
throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
", msg=" + msg + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Message handling failed.", e);
}
return true;
}
/**
* @param taskCtx Task info.
* @return Output.
*/
public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return job(taskCtx.taskInfo().jobId()).output(taskCtx);
}
/**
* @param taskCtx Task info.
* @return Input.
*/
public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
return job(taskCtx.taskInfo().jobId()).input(taskCtx);
}
/**
* @param jobId Job id.
*/
public void jobFinished(HadoopJobId jobId) {
HadoopShuffleJob job = jobs.remove(jobId);
if (job != null) {
try {
job.close();
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to close job: " + jobId, e);
}
}
}
/**
* Flushes all the outputs for the given job to remote nodes.
*
* @param jobId Job ID.
* @return Future.
*/
public IgniteInternalFuture<?> flush(HadoopJobId jobId) {
HadoopShuffleJob job = jobs.get(jobId);
if (job == null)
return new GridFinishedFuture<>();
try {
return job.flush();
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
/**
* @return Memory.
*/
public GridUnsafeMemory memory() {
return mem;
}
}