blob: 2ad7f31e20cd53a76ee85bfac4ab96d0a5e17a34 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ProcessUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionResponseProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public class DatanodeProtocolServerSideTranslatorPB implements
DatanodeProtocolPB {
private final DatanodeProtocol impl;
private static final ErrorReportResponseProto ERROR_REPORT_RESPONSE_PROTO =
ErrorReportResponseProto.newBuilder().build();
private static final BlockReceivedAndDeletedResponseProto
BLOCK_RECEIVED_AND_DELETE_RESPONSE =
BlockReceivedAndDeletedResponseProto.newBuilder().build();
private static final ReportBadBlocksResponseProto REPORT_BAD_BLOCK_RESPONSE =
ReportBadBlocksResponseProto.newBuilder().build();
private static final CommitBlockSynchronizationResponseProto
COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
CommitBlockSynchronizationResponseProto.newBuilder().build();
public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
this.impl = impl;
}
@Override
public RegisterDatanodeResponseProto registerDatanode(
RpcController controller, RegisterDatanodeRequestProto request)
throws ServiceException {
DatanodeRegistration registration = PBHelper.convert(request
.getRegistration());
DatanodeRegistration registrationResp;
DatanodeStorage[] storages = new DatanodeStorage[request.getStoragesCount()];
for (int i = 0; i < request.getStoragesCount(); i++) {
storages[i] = PBHelper.convert(request.getStorages(i));
}
try {
registrationResp = impl.registerDatanode(registration, storages);
} catch (IOException e) {
throw new ServiceException(e);
}
return RegisterDatanodeResponseProto.newBuilder()
.setRegistration(PBHelper.convert(registrationResp)).build();
}
@Override
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException {
DatanodeCommand[] cmds = null;
try {
List<StorageReportProto> list = request.getReportsList();
StorageReport[] report = new StorageReport[list.size()];
int i = 0;
for (StorageReportProto p : list) {
report[i++] = new StorageReport(p.getStorageID(), p.getFailed(),
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed());
}
cmds = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getXmitsInProgress(), request.getXceiverCount(),
request.getFailedVolumes());
} catch (IOException e) {
throw new ServiceException(e);
}
HeartbeatResponseProto.Builder builder = HeartbeatResponseProto
.newBuilder();
if (cmds != null) {
for (int i = 0; i < cmds.length; i++) {
if (cmds[i] != null) {
builder.addCmds(PBHelper.convert(cmds[i]));
}
}
}
return builder.build();
}
@Override
public BlockReportResponseProto blockReport(RpcController controller,
BlockReportRequestProto request) throws ServiceException {
DatanodeCommand cmd = null;
StorageBlockReport[] report =
new StorageBlockReport[request.getReportsCount()];
int index = 0;
for (StorageBlockReportProto s : request.getReportsList()) {
List<Long> blockIds = s.getBlocksList();
long[] blocks = new long[blockIds.size()];
for (int i = 0; i < blockIds.size(); i++) {
blocks[i] = blockIds.get(i);
}
report[index++] = new StorageBlockReport(s.getStorageID(), blocks);
}
try {
cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), report);
} catch (IOException e) {
throw new ServiceException(e);
}
BlockReportResponseProto.Builder builder =
BlockReportResponseProto.newBuilder();
if (cmd != null) {
builder.setCmd(PBHelper.convert(cmd));
}
return builder.build();
}
@Override
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request)
throws ServiceException {
List<StorageReceivedDeletedBlocksProto> sBlocks = request.getBlocksList();
StorageReceivedDeletedBlocks[] info =
new StorageReceivedDeletedBlocks[sBlocks.size()];
for (int i = 0; i < sBlocks.size(); i++) {
StorageReceivedDeletedBlocksProto sBlock = sBlocks.get(i);
List<ReceivedDeletedBlockInfoProto> list = sBlock.getBlocksList();
ReceivedDeletedBlockInfo[] rdBlocks =
new ReceivedDeletedBlockInfo[list.size()];
for (int j = 0; j < list.size(); j++) {
rdBlocks[j] = PBHelper.convert(list.get(j));
}
info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageID(), rdBlocks);
}
try {
impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),
request.getBlockPoolId(), info);
} catch (IOException e) {
throw new ServiceException(e);
}
return BLOCK_RECEIVED_AND_DELETE_RESPONSE;
}
@Override
public ErrorReportResponseProto errorReport(RpcController controller,
ErrorReportRequestProto request) throws ServiceException {
try {
impl.errorReport(PBHelper.convert(request.getRegistartion()),
request.getErrorCode(), request.getMsg());
} catch (IOException e) {
throw new ServiceException(e);
}
return ERROR_REPORT_RESPONSE_PROTO;
}
@Override
public VersionResponseProto versionRequest(RpcController controller,
VersionRequestProto request) throws ServiceException {
NamespaceInfo info;
try {
info = impl.versionRequest();
} catch (IOException e) {
throw new ServiceException(e);
}
return VersionResponseProto.newBuilder()
.setInfo(PBHelper.convert(info)).build();
}
@Override
public ProcessUpgradeResponseProto processUpgrade(RpcController controller,
ProcessUpgradeRequestProto request) throws ServiceException {
UpgradeCommand ret;
try {
UpgradeCommand cmd = request.hasCmd() ? PBHelper
.convert(request.getCmd()) : null;
ret = impl.processUpgradeCommand(cmd);
} catch (IOException e) {
throw new ServiceException(e);
}
ProcessUpgradeResponseProto.Builder builder =
ProcessUpgradeResponseProto.newBuilder();
if (ret != null) {
builder.setCmd(PBHelper.convert(ret));
}
return builder.build();
}
@Override
public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller,
ReportBadBlocksRequestProto request) throws ServiceException {
List<LocatedBlockProto> lbps = request.getBlocksList();
LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
for(int i=0; i<lbps.size(); i++) {
blocks[i] = PBHelper.convert(lbps.get(i));
}
try {
impl.reportBadBlocks(blocks);
} catch (IOException e) {
throw new ServiceException(e);
}
return REPORT_BAD_BLOCK_RESPONSE;
}
@Override
public CommitBlockSynchronizationResponseProto commitBlockSynchronization(
RpcController controller, CommitBlockSynchronizationRequestProto request)
throws ServiceException {
List<DatanodeIDProto> dnprotos = request.getNewTaragetsList();
DatanodeID[] dns = new DatanodeID[dnprotos.size()];
for (int i = 0; i < dnprotos.size(); i++) {
dns[i] = PBHelper.convert(dnprotos.get(i));
}
try {
impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
request.getNewGenStamp(), request.getNewLength(),
request.getCloseFile(), request.getDeleteBlock(), dns);
} catch (IOException e) {
throw new ServiceException(e);
}
return COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return RPC.getProtocolVersion(DatanodeProtocolPB.class);
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link DatanodeProtocol}
*/
if (!protocol.equals(RPC.getProtocolName(DatanodeProtocolPB.class))) {
throw new IOException("Namenode Serverside implements " +
RPC.getProtocolName(DatanodeProtocolPB.class) +
". The following requested protocol is unknown: " + protocol);
}
return ProtocolSignature.getProtocolSignature(clientMethodsHash,
RPC.getProtocolVersion(DatanodeProtocolPB.class),
DatanodeProtocolPB.class);
}
@Override
public ProtocolSignatureWritable getProtocolSignature2(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
/**
* Don't forward this to the server. The protocol version and signature is
* that of {@link DatanodeProtocolPB}
*/
return ProtocolSignatureWritable.convert(
this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
}
}