blob: 93172ea49ba111a99e0c6aa53676c4f43fb7c7be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.compute;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeTaskNoResultCache;
import org.apache.ignite.internal.IgniteComputeImpl;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Interop task which requires full execution cycle.
*/
@ComputeTaskNoResultCache
public final class PlatformFullTask extends PlatformAbstractTask {
/** */
private static final long serialVersionUID = 0L;
/** Initial topology version. */
private final long topVer;
/** Compute instance. */
private final IgniteComputeImpl compute;
/**
* Constructor.
*
* @param ctx Platform context.
* @param compute Target compute instance.
* @param taskPtr Pointer to the task in the native platform.
* @param topVer Initial topology version.
*/
public PlatformFullTask(PlatformContext ctx, IgniteComputeImpl compute, long taskPtr, long topVer) {
super(ctx, taskPtr);
this.compute = compute;
this.topVer = topVer;
}
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
@Nullable Object arg) {
assert arg == null;
lock.readLock().lock();
try {
assert !done;
Collection<ClusterNode> nodes = compute.clusterGroup().nodes();
PlatformMemoryManager memMgr = ctx.memory();
try (PlatformMemory mem = memMgr.allocate()) {
PlatformOutputStream out = mem.output();
BinaryRawWriterEx writer = ctx.writer(out);
writer.writeLong(taskPtr);
write(writer, nodes, subgrid);
out.synchronize();
ctx.gateway().computeTaskMap(mem.pointer());
PlatformInputStream in = mem.input();
in.synchronize();
BinaryRawReaderEx reader = ctx.reader(in);
return read(reader, nodes);
}
}
finally {
lock.readLock().unlock();
}
}
/**
* Write topology information.
*
* @param writer Writer.
* @param nodes Current topology nodes.
* @param subgrid Subgrid.
*/
private void write(BinaryRawWriterEx writer, Collection<ClusterNode> nodes, List<ClusterNode> subgrid) {
GridDiscoveryManager discoMgr = ctx.kernalContext().discovery();
long curTopVer = discoMgr.topologyVersion();
if (topVer != curTopVer) {
writer.writeBoolean(true);
writer.writeLong(curTopVer);
writer.writeInt(nodes.size());
// Write subgrid size for more precise collection allocation on native side.
writer.writeInt(subgrid.size());
for (ClusterNode node : nodes) {
ctx.writeNode(writer, node);
writer.writeBoolean(subgrid.contains(node));
}
}
else
writer.writeBoolean(false);
}
/**
* Read map result.
*
* @param reader Reader.
* @param nodes Current topology nodes.
* @return Map result.
*/
private Map<ComputeJob, ClusterNode> read(BinaryRawReaderEx reader, Collection<ClusterNode> nodes) {
if (reader.readBoolean()) {
if (!reader.readBoolean())
return null;
int size = reader.readInt();
Map<ComputeJob, ClusterNode> map = U.newHashMap(size);
for (int i = 0; i < size; i++) {
long ptr = reader.readLong();
Object nativeJob = reader.readBoolean() ? reader.readObjectDetached() : null;
PlatformJob job = ctx.createJob(this, ptr, nativeJob);
UUID jobNodeId = reader.readUuid();
assert jobNodeId != null;
ClusterNode jobNode = ctx.kernalContext().discovery().node(jobNodeId);
if (jobNode == null) {
// Special case when node has left the grid at this point.
// We expect task processor to perform necessary failover.
for (ClusterNode node : nodes) {
if (node.id().equals(jobNodeId)) {
jobNode = node;
break;
}
}
assert jobNode != null;
}
map.put(job, jobNode);
}
return map;
}
else
throw new IgniteException(reader.readString());
}
}