| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.control.common.ipc; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.Map; |
| |
| import edu.uci.ics.hyracks.api.comm.NetworkAddress; |
| import edu.uci.ics.hyracks.api.dataflow.ActivityId; |
| import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId; |
| import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId; |
| import edu.uci.ics.hyracks.api.dataflow.TaskId; |
| import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy; |
| import edu.uci.ics.hyracks.api.job.JobId; |
| import edu.uci.ics.hyracks.api.job.JobStatus; |
| import edu.uci.ics.hyracks.api.partitions.PartitionId; |
| import edu.uci.ics.hyracks.control.common.application.ApplicationStatus; |
| import edu.uci.ics.hyracks.control.common.controllers.NodeParameters; |
| import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration; |
| import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData; |
| import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor; |
| import edu.uci.ics.hyracks.control.common.job.PartitionRequest; |
| import edu.uci.ics.hyracks.control.common.job.PartitionState; |
| import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor; |
| import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile; |
| import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile; |
| import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer; |
| import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; |
| |
| public class CCNCFunctions { |
| private static final int FID_CODE_SIZE = 1; |
| |
| public enum FunctionId { |
| REGISTER_NODE, |
| UNREGISTER_NODE, |
| NOTIFY_JOBLET_CLEANUP, |
| NOTIFY_TASK_COMPLETE, |
| NOTIFY_TASK_FAILURE, |
| NODE_HEARTBEAT, |
| REPORT_PROFILE, |
| REGISTER_PARTITION_PROVIDER, |
| REGISTER_PARTITION_REQUEST, |
| APPLICATION_STATE_CHANGE_RESPONSE, |
| |
| NODE_REGISTRATION_RESULT, |
| START_TASKS, |
| ABORT_TASKS, |
| CLEANUP_JOBLET, |
| CREATE_APPLICATION, |
| DESTROY_APPLICATION, |
| REPORT_PARTITION_AVAILABILITY, |
| |
| OTHER |
| } |
| |
| public static abstract class Function implements Serializable { |
| private static final long serialVersionUID = 1L; |
| |
| public abstract FunctionId getFunctionId(); |
| } |
| |
| public static class RegisterNodeFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final NodeRegistration reg; |
| |
| public RegisterNodeFunction(NodeRegistration reg) { |
| this.reg = reg; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.REGISTER_NODE; |
| } |
| |
| public NodeRegistration getNodeRegistration() { |
| return reg; |
| } |
| } |
| |
| public static class UnregisterNodeFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String nodeId; |
| |
| public UnregisterNodeFunction(String nodeId) { |
| this.nodeId = nodeId; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.UNREGISTER_NODE; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| } |
| |
| public static class NotifyTaskCompleteFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final JobId jobId; |
| private final TaskAttemptId taskId; |
| private final String nodeId; |
| private final TaskProfile statistics; |
| |
| public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) { |
| this.jobId = jobId; |
| this.taskId = taskId; |
| this.nodeId = nodeId; |
| this.statistics = statistics; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.NOTIFY_TASK_COMPLETE; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public TaskAttemptId getTaskId() { |
| return taskId; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| |
| public TaskProfile getStatistics() { |
| return statistics; |
| } |
| } |
| |
| public static class NotifyTaskFailureFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final JobId jobId; |
| private final TaskAttemptId taskId; |
| private final String nodeId; |
| private final String details; |
| |
| public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) { |
| this.jobId = jobId; |
| this.taskId = taskId; |
| this.nodeId = nodeId; |
| this.details = details; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.NOTIFY_TASK_FAILURE; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public TaskAttemptId getTaskId() { |
| return taskId; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| |
| public String getDetails() { |
| return details; |
| } |
| } |
| |
| public static class NotifyJobletCleanupFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final JobId jobId; |
| private final String nodeId; |
| |
| public NotifyJobletCleanupFunction(JobId jobId, String nodeId) { |
| this.jobId = jobId; |
| this.nodeId = nodeId; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.NOTIFY_JOBLET_CLEANUP; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| } |
| |
| public static class NodeHeartbeatFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String nodeId; |
| private final HeartbeatData hbData; |
| |
| public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) { |
| this.nodeId = nodeId; |
| this.hbData = hbData; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.NODE_HEARTBEAT; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| |
| public HeartbeatData getHeartbeatData() { |
| return hbData; |
| } |
| } |
| |
| public static class ReportProfileFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String nodeId; |
| private final List<JobProfile> profiles; |
| |
| public ReportProfileFunction(String nodeId, List<JobProfile> profiles) { |
| this.nodeId = nodeId; |
| this.profiles = profiles; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.REPORT_PROFILE; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| |
| public List<JobProfile> getProfiles() { |
| return profiles; |
| } |
| } |
| |
| public static class RegisterPartitionProviderFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final PartitionDescriptor partitionDescriptor; |
| |
| public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) { |
| this.partitionDescriptor = partitionDescriptor; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.REGISTER_PARTITION_PROVIDER; |
| } |
| |
| public PartitionDescriptor getPartitionDescriptor() { |
| return partitionDescriptor; |
| } |
| |
| public static Object deserialize(ByteBuffer buffer, int length) throws Exception { |
| ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length); |
| DataInputStream dis = new DataInputStream(bais); |
| |
| // Read PartitionId |
| PartitionId pid = readPartitionId(dis); |
| |
| // Read nodeId |
| String nodeId = dis.readUTF(); |
| |
| // Read TaskAttemptId |
| TaskAttemptId taId = readTaskAttemptId(dis); |
| |
| // Read reusable flag |
| boolean reusable = dis.readBoolean(); |
| |
| // Read Partition State |
| PartitionState state = readPartitionState(dis); |
| |
| PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable); |
| pd.setState(state); |
| return new RegisterPartitionProviderFunction(pd); |
| } |
| |
| public static void serialize(OutputStream out, Object object) throws Exception { |
| RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object; |
| |
| DataOutputStream dos = new DataOutputStream(out); |
| |
| PartitionDescriptor pd = fn.getPartitionDescriptor(); |
| |
| // Write PartitionId |
| writePartitionId(dos, pd.getPartitionId()); |
| |
| // Write nodeId |
| dos.writeUTF(pd.getNodeId()); |
| |
| // Write TaskAttemptId |
| writeTaskAttemptId(dos, pd.getProducingTaskAttemptId()); |
| |
| // Write reusable flag |
| dos.writeBoolean(pd.isReusable()); |
| |
| // Write Partition State |
| writePartitionState(dos, pd.getState()); |
| } |
| } |
| |
| public static class RegisterPartitionRequestFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final PartitionRequest partitionRequest; |
| |
| public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) { |
| this.partitionRequest = partitionRequest; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.REGISTER_PARTITION_REQUEST; |
| } |
| |
| public PartitionRequest getPartitionRequest() { |
| return partitionRequest; |
| } |
| |
| public static Object deserialize(ByteBuffer buffer, int length) throws Exception { |
| ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length); |
| DataInputStream dis = new DataInputStream(bais); |
| |
| // Read PartitionId |
| PartitionId pid = readPartitionId(dis); |
| |
| // Read nodeId |
| String nodeId = dis.readUTF(); |
| |
| // Read TaskAttemptId |
| TaskAttemptId taId = readTaskAttemptId(dis); |
| |
| // Read Partition State |
| PartitionState state = readPartitionState(dis); |
| |
| PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state); |
| return new RegisterPartitionRequestFunction(pr); |
| } |
| |
| public static void serialize(OutputStream out, Object object) throws Exception { |
| RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object; |
| |
| DataOutputStream dos = new DataOutputStream(out); |
| |
| PartitionRequest pr = fn.getPartitionRequest(); |
| |
| // Write PartitionId |
| writePartitionId(dos, pr.getPartitionId()); |
| |
| // Write nodeId |
| dos.writeUTF(pr.getNodeId()); |
| |
| // Write TaskAttemptId |
| writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId()); |
| |
| // Write Partition State |
| writePartitionState(dos, pr.getMinimumState()); |
| } |
| } |
| |
| public static class ApplicationStateChangeResponseFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String nodeId; |
| private final String appName; |
| private final ApplicationStatus status; |
| |
| public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) { |
| this.nodeId = nodeId; |
| this.appName = appName; |
| this.status = status; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE; |
| } |
| |
| public String getNodeId() { |
| return nodeId; |
| } |
| |
| public String getApplicationName() { |
| return appName; |
| } |
| |
| public ApplicationStatus getStatus() { |
| return status; |
| } |
| } |
| |
| public static class NodeRegistrationResult extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final NodeParameters params; |
| |
| private final Exception exception; |
| |
| public NodeRegistrationResult(NodeParameters params, Exception exception) { |
| this.params = params; |
| this.exception = exception; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.NODE_REGISTRATION_RESULT; |
| } |
| |
| public NodeParameters getNodeParameters() { |
| return params; |
| } |
| |
| public Exception getException() { |
| return exception; |
| } |
| } |
| |
| public static class StartTasksFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String appName; |
| private final JobId jobId; |
| private final byte[] planBytes; |
| private final List<TaskAttemptDescriptor> taskDescriptors; |
| private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies; |
| |
| public StartTasksFunction(String appName, JobId jobId, byte[] planBytes, |
| List<TaskAttemptDescriptor> taskDescriptors, |
| Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) { |
| this.appName = appName; |
| this.jobId = jobId; |
| this.planBytes = planBytes; |
| this.taskDescriptors = taskDescriptors; |
| this.connectorPolicies = connectorPolicies; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.START_TASKS; |
| } |
| |
| public String getAppName() { |
| return appName; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public byte[] getPlanBytes() { |
| return planBytes; |
| } |
| |
| public List<TaskAttemptDescriptor> getTaskDescriptors() { |
| return taskDescriptors; |
| } |
| |
| public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() { |
| return connectorPolicies; |
| } |
| } |
| |
| public static class AbortTasksFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final JobId jobId; |
| private final List<TaskAttemptId> tasks; |
| |
| public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) { |
| this.jobId = jobId; |
| this.tasks = tasks; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.ABORT_TASKS; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public List<TaskAttemptId> getTasks() { |
| return tasks; |
| } |
| } |
| |
| public static class CleanupJobletFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final JobId jobId; |
| private final JobStatus status; |
| |
| public CleanupJobletFunction(JobId jobId, JobStatus status) { |
| this.jobId = jobId; |
| this.status = status; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.CLEANUP_JOBLET; |
| } |
| |
| public JobId getJobId() { |
| return jobId; |
| } |
| |
| public JobStatus getStatus() { |
| return status; |
| } |
| } |
| |
| public static class CreateApplicationFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String appName; |
| private final boolean deployHar; |
| private final byte[] serializedDistributedState; |
| |
| public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) { |
| this.appName = appName; |
| this.deployHar = deployHar; |
| this.serializedDistributedState = serializedDistributedState; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.CREATE_APPLICATION; |
| } |
| |
| public String getAppName() { |
| return appName; |
| } |
| |
| public boolean isDeployHar() { |
| return deployHar; |
| } |
| |
| public byte[] getSerializedDistributedState() { |
| return serializedDistributedState; |
| } |
| } |
| |
| public static class DestroyApplicationFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final String appName; |
| |
| public DestroyApplicationFunction(String appName) { |
| this.appName = appName; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.DESTROY_APPLICATION; |
| } |
| |
| public String getAppName() { |
| return appName; |
| } |
| } |
| |
| public static class ReportPartitionAvailabilityFunction extends Function { |
| private static final long serialVersionUID = 1L; |
| |
| private final PartitionId pid; |
| private final NetworkAddress networkAddress; |
| |
| public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) { |
| this.pid = pid; |
| this.networkAddress = networkAddress; |
| } |
| |
| @Override |
| public FunctionId getFunctionId() { |
| return FunctionId.REPORT_PARTITION_AVAILABILITY; |
| } |
| |
| public PartitionId getPartitionId() { |
| return pid; |
| } |
| |
| public NetworkAddress getNetworkAddress() { |
| return networkAddress; |
| } |
| |
| public static Object deserialize(ByteBuffer buffer, int length) throws Exception { |
| ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length); |
| DataInputStream dis = new DataInputStream(bais); |
| |
| // Read PartitionId |
| PartitionId pid = readPartitionId(dis); |
| |
| // Read NetworkAddress |
| NetworkAddress networkAddress = readNetworkAddress(dis); |
| |
| return new ReportPartitionAvailabilityFunction(pid, networkAddress); |
| } |
| |
| public static void serialize(OutputStream out, Object object) throws Exception { |
| ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object; |
| |
| DataOutputStream dos = new DataOutputStream(out); |
| |
| // Write PartitionId |
| writePartitionId(dos, fn.getPartitionId()); |
| |
| // Write NetworkAddress |
| writeNetworkAddress(dos, fn.getNetworkAddress()); |
| } |
| } |
| |
| public static class SerializerDeserializer implements IPayloadSerializerDeserializer { |
| private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde; |
| |
| public SerializerDeserializer() { |
| javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer(); |
| } |
| |
| @Override |
| public Object deserializeObject(ByteBuffer buffer, int length) throws Exception { |
| if (length < FID_CODE_SIZE) { |
| throw new IllegalStateException("Message size too small: " + length); |
| } |
| byte fid = buffer.get(); |
| return deserialize(fid, buffer, length - FID_CODE_SIZE); |
| } |
| |
| @Override |
| public Exception deserializeException(ByteBuffer buffer, int length) throws Exception { |
| if (length < FID_CODE_SIZE) { |
| throw new IllegalStateException("Message size too small: " + length); |
| } |
| byte fid = buffer.get(); |
| if (fid != FunctionId.OTHER.ordinal()) { |
| throw new IllegalStateException("Expected FID for OTHER, found: " + fid); |
| } |
| return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE); |
| } |
| |
| @Override |
| public byte[] serializeObject(Object object) throws Exception { |
| if (object instanceof Function) { |
| Function fn = (Function) object; |
| return serialize(object, (byte) fn.getFunctionId().ordinal()); |
| } else { |
| return serialize(object, (byte) FunctionId.OTHER.ordinal()); |
| } |
| } |
| |
| @Override |
| public byte[] serializeException(Exception object) throws Exception { |
| return serialize(object, (byte) FunctionId.OTHER.ordinal()); |
| } |
| |
| private byte[] serialize(Object object, byte fid) throws Exception { |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| baos.write(fid); |
| serialize(baos, object, fid); |
| JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object); |
| baos.close(); |
| return baos.toByteArray(); |
| } |
| |
| private void serialize(OutputStream out, Object object, byte fid) throws Exception { |
| switch (FunctionId.values()[fid]) { |
| case REGISTER_PARTITION_PROVIDER: |
| RegisterPartitionProviderFunction.serialize(out, object); |
| return; |
| |
| case REGISTER_PARTITION_REQUEST: |
| RegisterPartitionRequestFunction.serialize(out, object); |
| return; |
| |
| case REPORT_PARTITION_AVAILABILITY: |
| ReportPartitionAvailabilityFunction.serialize(out, object); |
| return; |
| } |
| JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object); |
| } |
| |
| private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception { |
| switch (FunctionId.values()[fid]) { |
| case REGISTER_PARTITION_PROVIDER: |
| return RegisterPartitionProviderFunction.deserialize(buffer, length); |
| |
| case REGISTER_PARTITION_REQUEST: |
| return RegisterPartitionRequestFunction.deserialize(buffer, length); |
| |
| case REPORT_PARTITION_AVAILABILITY: |
| return ReportPartitionAvailabilityFunction.deserialize(buffer, length); |
| } |
| |
| return javaSerde.deserializeObject(buffer, length); |
| } |
| } |
| |
| private static PartitionId readPartitionId(DataInputStream dis) throws IOException { |
| long jobId = dis.readLong(); |
| int cdid = dis.readInt(); |
| int senderIndex = dis.readInt(); |
| int receiverIndex = dis.readInt(); |
| PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex); |
| return pid; |
| } |
| |
| private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException { |
| dos.writeLong(pid.getJobId().getId()); |
| dos.writeInt(pid.getConnectorDescriptorId().getId()); |
| dos.writeInt(pid.getSenderIndex()); |
| dos.writeInt(pid.getReceiverIndex()); |
| } |
| |
| private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException { |
| int odid = dis.readInt(); |
| int aid = dis.readInt(); |
| int partition = dis.readInt(); |
| int attempt = dis.readInt(); |
| TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), |
| partition), attempt); |
| return taId; |
| } |
| |
| private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException { |
| TaskId tid = taId.getTaskId(); |
| ActivityId aid = tid.getActivityId(); |
| OperatorDescriptorId odId = aid.getOperatorDescriptorId(); |
| dos.writeInt(odId.getId()); |
| dos.writeInt(aid.getLocalId()); |
| dos.writeInt(tid.getPartition()); |
| dos.writeInt(taId.getAttempt()); |
| } |
| |
| private static PartitionState readPartitionState(DataInputStream dis) throws IOException { |
| PartitionState state = PartitionState.values()[dis.readInt()]; |
| return state; |
| } |
| |
| private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException { |
| dos.writeInt(state.ordinal()); |
| } |
| |
| private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException { |
| int bLen = dis.readInt(); |
| byte[] ipAddress = new byte[bLen]; |
| dis.read(ipAddress); |
| int port = dis.readInt(); |
| NetworkAddress networkAddress = new NetworkAddress(ipAddress, port); |
| return networkAddress; |
| } |
| |
| private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException { |
| byte[] ipAddress = networkAddress.getIpAddress(); |
| dos.writeInt(ipAddress.length); |
| dos.write(ipAddress); |
| dos.writeInt(networkAddress.getPort()); |
| } |
| } |