blob: 9ea4636dd0c738c648edfee6ebfeccf86d3c47e5 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.common.ipc;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
public class CCNCFunctions {
private static final int FID_CODE_SIZE = 1;
public enum FunctionId {
REGISTER_NODE,
UNREGISTER_NODE,
NOTIFY_JOBLET_CLEANUP,
NOTIFY_TASK_COMPLETE,
NOTIFY_TASK_FAILURE,
NODE_HEARTBEAT,
REPORT_PROFILE,
REGISTER_PARTITION_PROVIDER,
REGISTER_PARTITION_REQUEST,
APPLICATION_STATE_CHANGE_RESPONSE,
NODE_REGISTRATION_RESULT,
START_TASKS,
ABORT_TASKS,
CLEANUP_JOBLET,
CREATE_APPLICATION,
DESTROY_APPLICATION,
REPORT_PARTITION_AVAILABILITY,
OTHER
}
public static abstract class Function implements Serializable {
private static final long serialVersionUID = 1L;
public abstract FunctionId getFunctionId();
}
public static class RegisterNodeFunction extends Function {
private static final long serialVersionUID = 1L;
private final NodeRegistration reg;
public RegisterNodeFunction(NodeRegistration reg) {
this.reg = reg;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.REGISTER_NODE;
}
public NodeRegistration getNodeRegistration() {
return reg;
}
}
public static class UnregisterNodeFunction extends Function {
private static final long serialVersionUID = 1L;
private final String nodeId;
public UnregisterNodeFunction(String nodeId) {
this.nodeId = nodeId;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.UNREGISTER_NODE;
}
public String getNodeId() {
return nodeId;
}
}
public static class NotifyTaskCompleteFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final TaskAttemptId taskId;
private final String nodeId;
private final TaskProfile statistics;
public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
this.jobId = jobId;
this.taskId = taskId;
this.nodeId = nodeId;
this.statistics = statistics;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.NOTIFY_TASK_COMPLETE;
}
public JobId getJobId() {
return jobId;
}
public TaskAttemptId getTaskId() {
return taskId;
}
public String getNodeId() {
return nodeId;
}
public TaskProfile getStatistics() {
return statistics;
}
}
public static class NotifyTaskFailureFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final TaskAttemptId taskId;
private final String nodeId;
private final String details;
public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
this.jobId = jobId;
this.taskId = taskId;
this.nodeId = nodeId;
this.details = details;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.NOTIFY_TASK_FAILURE;
}
public JobId getJobId() {
return jobId;
}
public TaskAttemptId getTaskId() {
return taskId;
}
public String getNodeId() {
return nodeId;
}
public String getDetails() {
return details;
}
}
public static class NotifyJobletCleanupFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final String nodeId;
public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
this.jobId = jobId;
this.nodeId = nodeId;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.NOTIFY_JOBLET_CLEANUP;
}
public JobId getJobId() {
return jobId;
}
public String getNodeId() {
return nodeId;
}
}
public static class NodeHeartbeatFunction extends Function {
private static final long serialVersionUID = 1L;
private final String nodeId;
private final HeartbeatData hbData;
public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
this.nodeId = nodeId;
this.hbData = hbData;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.NODE_HEARTBEAT;
}
public String getNodeId() {
return nodeId;
}
public HeartbeatData getHeartbeatData() {
return hbData;
}
}
public static class ReportProfileFunction extends Function {
private static final long serialVersionUID = 1L;
private final String nodeId;
private final List<JobProfile> profiles;
public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
this.nodeId = nodeId;
this.profiles = profiles;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.REPORT_PROFILE;
}
public String getNodeId() {
return nodeId;
}
public List<JobProfile> getProfiles() {
return profiles;
}
}
public static class RegisterPartitionProviderFunction extends Function {
private static final long serialVersionUID = 1L;
private final PartitionDescriptor partitionDescriptor;
public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
this.partitionDescriptor = partitionDescriptor;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.REGISTER_PARTITION_PROVIDER;
}
public PartitionDescriptor getPartitionDescriptor() {
return partitionDescriptor;
}
public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
DataInputStream dis = new DataInputStream(bais);
// Read PartitionId
PartitionId pid = readPartitionId(dis);
// Read nodeId
String nodeId = dis.readUTF();
// Read TaskAttemptId
TaskAttemptId taId = readTaskAttemptId(dis);
// Read reusable flag
boolean reusable = dis.readBoolean();
// Read Partition State
PartitionState state = readPartitionState(dis);
PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
pd.setState(state);
return new RegisterPartitionProviderFunction(pd);
}
public static void serialize(OutputStream out, Object object) throws Exception {
RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
DataOutputStream dos = new DataOutputStream(out);
PartitionDescriptor pd = fn.getPartitionDescriptor();
// Write PartitionId
writePartitionId(dos, pd.getPartitionId());
// Write nodeId
dos.writeUTF(pd.getNodeId());
// Write TaskAttemptId
writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
// Write reusable flag
dos.writeBoolean(pd.isReusable());
// Write Partition State
writePartitionState(dos, pd.getState());
}
}
public static class RegisterPartitionRequestFunction extends Function {
private static final long serialVersionUID = 1L;
private final PartitionRequest partitionRequest;
public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
this.partitionRequest = partitionRequest;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.REGISTER_PARTITION_REQUEST;
}
public PartitionRequest getPartitionRequest() {
return partitionRequest;
}
public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
DataInputStream dis = new DataInputStream(bais);
// Read PartitionId
PartitionId pid = readPartitionId(dis);
// Read nodeId
String nodeId = dis.readUTF();
// Read TaskAttemptId
TaskAttemptId taId = readTaskAttemptId(dis);
// Read Partition State
PartitionState state = readPartitionState(dis);
PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
return new RegisterPartitionRequestFunction(pr);
}
public static void serialize(OutputStream out, Object object) throws Exception {
RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
DataOutputStream dos = new DataOutputStream(out);
PartitionRequest pr = fn.getPartitionRequest();
// Write PartitionId
writePartitionId(dos, pr.getPartitionId());
// Write nodeId
dos.writeUTF(pr.getNodeId());
// Write TaskAttemptId
writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
// Write Partition State
writePartitionState(dos, pr.getMinimumState());
}
}
public static class ApplicationStateChangeResponseFunction extends Function {
private static final long serialVersionUID = 1L;
private final String nodeId;
private final String appName;
private final ApplicationStatus status;
public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
this.nodeId = nodeId;
this.appName = appName;
this.status = status;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
}
public String getNodeId() {
return nodeId;
}
public String getApplicationName() {
return appName;
}
public ApplicationStatus getStatus() {
return status;
}
}
public static class NodeRegistrationResult extends Function {
private static final long serialVersionUID = 1L;
private final NodeParameters params;
private final Exception exception;
public NodeRegistrationResult(NodeParameters params, Exception exception) {
this.params = params;
this.exception = exception;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.NODE_REGISTRATION_RESULT;
}
public NodeParameters getNodeParameters() {
return params;
}
public Exception getException() {
return exception;
}
}
public static class StartTasksFunction extends Function {
private static final long serialVersionUID = 1L;
private final String appName;
private final JobId jobId;
private final byte[] planBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
this.appName = appName;
this.jobId = jobId;
this.planBytes = planBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPolicies = connectorPolicies;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.START_TASKS;
}
public String getAppName() {
return appName;
}
public JobId getJobId() {
return jobId;
}
public byte[] getPlanBytes() {
return planBytes;
}
public List<TaskAttemptDescriptor> getTaskDescriptors() {
return taskDescriptors;
}
public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
return connectorPolicies;
}
}
public static class AbortTasksFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final List<TaskAttemptId> tasks;
public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
this.jobId = jobId;
this.tasks = tasks;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.ABORT_TASKS;
}
public JobId getJobId() {
return jobId;
}
public List<TaskAttemptId> getTasks() {
return tasks;
}
}
public static class CleanupJobletFunction extends Function {
private static final long serialVersionUID = 1L;
private final JobId jobId;
private final JobStatus status;
public CleanupJobletFunction(JobId jobId, JobStatus status) {
this.jobId = jobId;
this.status = status;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.CLEANUP_JOBLET;
}
public JobId getJobId() {
return jobId;
}
public JobStatus getStatus() {
return status;
}
}
public static class CreateApplicationFunction extends Function {
private static final long serialVersionUID = 1L;
private final String appName;
private final boolean deployHar;
private final byte[] serializedDistributedState;
public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
this.appName = appName;
this.deployHar = deployHar;
this.serializedDistributedState = serializedDistributedState;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.CREATE_APPLICATION;
}
public String getAppName() {
return appName;
}
public boolean isDeployHar() {
return deployHar;
}
public byte[] getSerializedDistributedState() {
return serializedDistributedState;
}
}
public static class DestroyApplicationFunction extends Function {
private static final long serialVersionUID = 1L;
private final String appName;
public DestroyApplicationFunction(String appName) {
this.appName = appName;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.DESTROY_APPLICATION;
}
public String getAppName() {
return appName;
}
}
public static class ReportPartitionAvailabilityFunction extends Function {
private static final long serialVersionUID = 1L;
private final PartitionId pid;
private final NetworkAddress networkAddress;
public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
this.pid = pid;
this.networkAddress = networkAddress;
}
@Override
public FunctionId getFunctionId() {
return FunctionId.REPORT_PARTITION_AVAILABILITY;
}
public PartitionId getPartitionId() {
return pid;
}
public NetworkAddress getNetworkAddress() {
return networkAddress;
}
public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
DataInputStream dis = new DataInputStream(bais);
// Read PartitionId
PartitionId pid = readPartitionId(dis);
// Read NetworkAddress
NetworkAddress networkAddress = readNetworkAddress(dis);
return new ReportPartitionAvailabilityFunction(pid, networkAddress);
}
public static void serialize(OutputStream out, Object object) throws Exception {
ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
DataOutputStream dos = new DataOutputStream(out);
// Write PartitionId
writePartitionId(dos, fn.getPartitionId());
// Write NetworkAddress
writeNetworkAddress(dos, fn.getNetworkAddress());
}
}
public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
public SerializerDeserializer() {
javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
}
@Override
public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
if (length < FID_CODE_SIZE) {
throw new IllegalStateException("Message size too small: " + length);
}
byte fid = buffer.get();
return deserialize(fid, buffer, length - FID_CODE_SIZE);
}
@Override
public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
if (length < FID_CODE_SIZE) {
throw new IllegalStateException("Message size too small: " + length);
}
byte fid = buffer.get();
if (fid != FunctionId.OTHER.ordinal()) {
throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
}
return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
}
@Override
public byte[] serializeObject(Object object) throws Exception {
if (object instanceof Function) {
Function fn = (Function) object;
return serialize(object, (byte) fn.getFunctionId().ordinal());
} else {
return serialize(object, (byte) FunctionId.OTHER.ordinal());
}
}
@Override
public byte[] serializeException(Exception object) throws Exception {
return serialize(object, (byte) FunctionId.OTHER.ordinal());
}
private byte[] serialize(Object object, byte fid) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.write(fid);
serialize(baos, object, fid);
JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
baos.close();
return baos.toByteArray();
}
private void serialize(OutputStream out, Object object, byte fid) throws Exception {
switch (FunctionId.values()[fid]) {
case REGISTER_PARTITION_PROVIDER:
RegisterPartitionProviderFunction.serialize(out, object);
return;
case REGISTER_PARTITION_REQUEST:
RegisterPartitionRequestFunction.serialize(out, object);
return;
case REPORT_PARTITION_AVAILABILITY:
ReportPartitionAvailabilityFunction.serialize(out, object);
return;
}
JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
}
private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
switch (FunctionId.values()[fid]) {
case REGISTER_PARTITION_PROVIDER:
return RegisterPartitionProviderFunction.deserialize(buffer, length);
case REGISTER_PARTITION_REQUEST:
return RegisterPartitionRequestFunction.deserialize(buffer, length);
case REPORT_PARTITION_AVAILABILITY:
return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
}
return javaSerde.deserializeObject(buffer, length);
}
}
private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
long jobId = dis.readLong();
int cdid = dis.readInt();
int senderIndex = dis.readInt();
int receiverIndex = dis.readInt();
PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
return pid;
}
private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
dos.writeLong(pid.getJobId().getId());
dos.writeInt(pid.getConnectorDescriptorId().getId());
dos.writeInt(pid.getSenderIndex());
dos.writeInt(pid.getReceiverIndex());
}
private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
int odid = dis.readInt();
int aid = dis.readInt();
int partition = dis.readInt();
int attempt = dis.readInt();
TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
partition), attempt);
return taId;
}
private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
TaskId tid = taId.getTaskId();
ActivityId aid = tid.getActivityId();
OperatorDescriptorId odId = aid.getOperatorDescriptorId();
dos.writeInt(odId.getId());
dos.writeInt(aid.getLocalId());
dos.writeInt(tid.getPartition());
dos.writeInt(taId.getAttempt());
}
private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
PartitionState state = PartitionState.values()[dis.readInt()];
return state;
}
private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
dos.writeInt(state.ordinal());
}
private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
int bLen = dis.readInt();
byte[] ipAddress = new byte[bLen];
dis.read(ipAddress);
int port = dis.readInt();
NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
return networkAddress;
}
private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
byte[] ipAddress = networkAddress.getIpAddress();
dos.writeInt(ipAddress.length);
dos.write(ipAddress);
dos.writeInt(networkAddress.getPort());
}
}