| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.protocol.datatransfer; |
| |
| import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto; |
| import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos; |
| import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto; |
| |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| import org.apache.hadoop.security.token.Token; |
| |
| import com.google.protobuf.Message; |
| |
| /** Sender */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class Sender implements DataTransferProtocol { |
| private final DataOutputStream out; |
| |
| /** Create a sender for DataTransferProtocol with a output stream. */ |
| public Sender(final DataOutputStream out) { |
| this.out = out; |
| } |
| |
| /** Initialize a operation. */ |
| private static void op(final DataOutput out, final Op op |
| ) throws IOException { |
| out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); |
| op.write(out); |
| } |
| |
| private static void send(final DataOutputStream out, final Op opcode, |
| final Message proto) throws IOException { |
| op(out, opcode); |
| proto.writeDelimitedTo(out); |
| out.flush(); |
| } |
| |
| @Override |
| public void readBlock(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String clientName, |
| final long blockOffset, |
| final long length) throws IOException { |
| |
| OpReadBlockProto proto = OpReadBlockProto.newBuilder() |
| .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) |
| .setOffset(blockOffset) |
| .setLen(length) |
| .build(); |
| |
| send(out, Op.READ_BLOCK, proto); |
| } |
| |
| |
| @Override |
| public void writeBlock(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String clientName, |
| final DatanodeInfo[] targets, |
| final DatanodeInfo source, |
| final BlockConstructionStage stage, |
| final int pipelineSize, |
| final long minBytesRcvd, |
| final long maxBytesRcvd, |
| final long latestGenerationStamp) throws IOException { |
| ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( |
| blk, clientName, blockToken); |
| |
| OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() |
| .setHeader(header) |
| .addAllTargets(toProtos(targets, 1)) |
| .setStage(toProto(stage)) |
| .setPipelineSize(pipelineSize) |
| .setMinBytesRcvd(minBytesRcvd) |
| .setMaxBytesRcvd(maxBytesRcvd) |
| .setLatestGenerationStamp(latestGenerationStamp); |
| |
| if (source != null) { |
| proto.setSource(toProto(source)); |
| } |
| |
| send(out, Op.WRITE_BLOCK, proto.build()); |
| } |
| |
| @Override |
| public void transferBlock(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String clientName, |
| final DatanodeInfo[] targets) throws IOException { |
| |
| OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() |
| .setHeader(DataTransferProtoUtil.buildClientHeader( |
| blk, clientName, blockToken)) |
| .addAllTargets(toProtos(targets, 0)) |
| .build(); |
| |
| send(out, Op.TRANSFER_BLOCK, proto); |
| } |
| |
| @Override |
| public void replaceBlock(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String delHint, |
| final DatanodeInfo source) throws IOException { |
| OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() |
| .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) |
| .setDelHint(delHint) |
| .setSource(toProto(source)) |
| .build(); |
| |
| send(out, Op.REPLACE_BLOCK, proto); |
| } |
| |
| @Override |
| public void copyBlock(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken) throws IOException { |
| OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() |
| .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) |
| .build(); |
| |
| send(out, Op.COPY_BLOCK, proto); |
| } |
| |
| @Override |
| public void blockChecksum(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken) throws IOException { |
| OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() |
| .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) |
| .build(); |
| |
| send(out, Op.BLOCK_CHECKSUM, proto); |
| } |
| } |