blob: 2f354d7a7890255127368093836c32203eca3c8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/wire_format_lite.h>
#include "drill/common.hpp"
#include "rpcEncoder.hpp"
#include "rpcMessage.hpp"
namespace Drill{
using google::protobuf::internal::WireFormatLite;
using exec::rpc::CompleteRpcMessage;
const uint32_t RpcEncoder::HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
const uint32_t RpcEncoder::PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
const uint32_t RpcEncoder::RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
const uint32_t RpcEncoder::HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG);
const uint32_t RpcEncoder::PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG);
const uint32_t RpcEncoder::RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
bool RpcEncoder::Encode(DataBuf& buf, OutBoundRpcMessage& msg) {
using exec::rpc::RpcHeader;
using google::protobuf::io::CodedOutputStream;
using google::protobuf::io::ArrayOutputStream;
// Todo:
//
// - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();`
// - builder pattern
//
#ifdef CODER_DEBUG
cerr << "\nEncoding outbound message " << msg << endl;
#endif
RpcHeader header;
header.set_mode(msg.m_mode);
header.set_coordination_id(msg.m_coord_id);
header.set_rpc_type(msg.m_rpc_type);
// calcute the length of the message
int header_length = header.ByteSize();
int proto_body_length = msg.m_pbody->ByteSize();
int full_length = HEADER_TAG_LENGTH + getRawVarintSize(header_length) + header_length + \
PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(proto_body_length) + proto_body_length;
/*
if(raw_body_length > 0) {
full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length);
}
*/
buf.resize(full_length + getRawVarintSize(full_length));
ArrayOutputStream* os = new ArrayOutputStream(buf.data(), buf.size());
CodedOutputStream* cos = new CodedOutputStream(os);
#ifdef CODER_DEBUG
cerr << "Writing full length " << full_length << endl;
#endif
// write full length first (this is length delimited stream).
cos->WriteVarint32(full_length);
#ifdef CODER_DEBUG
cerr << "Writing header length " << header_length << endl;
#endif
cos->WriteVarint32(HEADER_TAG);
cos->WriteVarint32(header_length);
header.SerializeToCodedStream(cos);
// write protobuf body length and body
#ifdef CODER_DEBUG
cerr << "Writing protobuf body length " << proto_body_length << endl;
#endif
cos->WriteVarint32(PROTOBUF_BODY_TAG);
cos->WriteVarint32(proto_body_length);
msg.m_pbody->SerializeToCodedStream(cos);
delete os;
delete cos;
// Done! no read to write data body for client
return true;
}
} // namespace Drill