blob: 99762c89ee9e410e8b3f1bdb9573efdc6ec9c43f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <functional>
#include "request.h"
#include "rpc_engine.h"
#include "sasl_protocol.h"
#include "hdfspp/ioservice.h"
#include "RpcHeader.pb.h"
#include "ProtobufRpcEngine.pb.h"
#include "IpcConnectionContext.pb.h"
#include <sstream>
namespace hdfs {
namespace pb = ::google::protobuf;
namespace pbio = ::google::protobuf::io;
using namespace ::hadoop::common;
using namespace ::std::placeholders;
static const int kNoRetry = -1;
// Protobuf helper functions.
// Note/todo: Using the zero-copy protobuf API here makes the simple procedures
// below tricky to read and debug while providing minimal benefit. Reducing
// allocations in BlockReader (HDFS-11266) and smarter use of std::stringstream
// will have a much larger impact according to cachegrind profiles on common
// workloads.
static void AddHeadersToPacket(std::string *res,
std::initializer_list<const pb::MessageLite *> headers,
const std::string *payload) {
size_t len = 0;
std::for_each(
headers.begin(), headers.end(),
[&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); });
if (payload) {
len += payload->size();
}
int net_len = htonl(len);
res->reserve(res->size() + sizeof(net_len) + len);
pbio::StringOutputStream ss(res);
pbio::CodedOutputStream os(&ss);
os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len));
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
assert(buf);
std::for_each(
headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) {
buf = pbio::CodedOutputStream::WriteVarint64ToArray(v->ByteSizeLong(), buf);
buf = v->SerializeWithCachedSizesToArray(buf);
});
if (payload) {
buf = os.WriteStringToArray(*payload, buf);
}
}
static void ConstructPayload(std::string *res, const pb::MessageLite *header) {
const auto len = DelimitedPBMessageSize(header);
res->reserve(len);
pbio::StringOutputStream ss(res);
pbio::CodedOutputStream os(&ss);
uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len);
assert(buf);
buf = pbio::CodedOutputStream::WriteVarint64ToArray(header->ByteSizeLong(), buf);
buf = header->SerializeWithCachedSizesToArray(buf);
}
static void SetRequestHeader(std::weak_ptr<LockFreeRpcEngine> weak_engine, int call_id,
const std::string &method_name, int retry_count,
RpcRequestHeaderProto *rpc_header,
RequestHeaderProto *req_header)
{
// Ensure the RpcEngine is live. If it's not then the FileSystem is being destructed.
std::shared_ptr<LockFreeRpcEngine> counted_engine = weak_engine.lock();
if(!counted_engine) {
LOG_ERROR(kRPC, << "SetRequestHeader attempted to access an invalid RpcEngine");
return;
}
const auto& client_id = counted_engine->client_id();
if (client_id == nullptr) {
LOG_ERROR(kRPC, << "Failed to generate client ID");
return;
}
rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER);
rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET);
rpc_header->set_callid(call_id);
if (retry_count != kNoRetry) {
rpc_header->set_retrycount(retry_count);
}
rpc_header->set_clientid(*client_id);
req_header->set_methodname(method_name);
req_header->set_declaringclassprotocolname(counted_engine->protocol_name());
req_header->set_clientprotocolversion(counted_engine->protocol_version());
}
// Request implementation
Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, const std::string &method_name, int call_id,
const pb::MessageLite *request, Handler &&handler)
: engine_(engine),
method_name_(method_name),
call_id_(call_id),
timer_(engine->io_service()->GetRaw()),
handler_(std::move(handler)),
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
failover_count_(0)
{
ConstructPayload(&payload_, request);
}
Request::Request(std::shared_ptr<LockFreeRpcEngine> engine, Handler &&handler)
: engine_(engine),
call_id_(-1/*Handshake ID*/),
timer_(engine->io_service()->GetRaw()),
handler_(std::move(handler)),
retry_count_(engine->retry_policy() ? 0 : kNoRetry),
failover_count_(0) {
}
void Request::GetPacket(std::string *res) const {
LOG_TRACE(kRPC, << "Request::GetPacket called");
if (payload_.empty())
return;
RpcRequestHeaderProto rpc_header;
RequestHeaderProto req_header;
SetRequestHeader(engine_, call_id_, method_name_, retry_count_, &rpc_header,
&req_header);
// SASL messages don't have a request header
if (method_name_ != SASL_METHOD_NAME)
AddHeadersToPacket(res, {&rpc_header, &req_header}, &payload_);
else
AddHeadersToPacket(res, {&rpc_header}, &payload_);
}
void Request::OnResponseArrived(pbio::CodedInputStream *is,
const Status &status) {
LOG_TRACE(kRPC, << "Request::OnResponseArrived called");
handler_(is, status);
}
std::string Request::GetDebugString() const {
// Basic description of this object, aimed at debugging
std::stringstream ss;
ss << "\nRequest Object:\n";
ss << "\tMethod name = \"" << method_name_ << "\"\n";
ss << "\tCall id = " << call_id_ << "\n";
ss << "\tRetry Count = " << retry_count_ << "\n";
ss << "\tFailover count = " << failover_count_ << "\n";
return ss.str();
}
int Request::IncrementFailoverCount() {
// reset retry count when failing over
retry_count_ = 0;
return failover_count_++;
}
} // end namespace hdfs