blob: da7244d003c45b00be21be4f13798be98c2556ff [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/rpc_sidecar.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include <vector>
#include <boost/container/vector.hpp>
#include <google/protobuf/repeated_field.h>
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/faststring.h"
#include "kudu/util/status.h"
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace rpc {
// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
// at least as long as this sidecar.
class SliceSidecar : public RpcSidecar {
public:
explicit SliceSidecar(Slice slice) : slice_(slice) { }
void AppendSlices(TransferPayload* payload) const override {
payload->push_back(slice_);
}
size_t TotalSize() const override {
return slice_.size();
}
private:
const Slice slice_;
};
class FaststringSidecar : public RpcSidecar {
public:
explicit FaststringSidecar(faststring data) {
data_.emplace_back(std::move(data));
}
explicit FaststringSidecar(vector<faststring> data) : data_(std::move(data)) { }
void AppendSlices(TransferPayload* payload) const override {
for (const auto& fs : data_) {
payload->push_back(Slice(fs));
}
}
size_t TotalSize() const override {
size_t ret = 0;
for (const auto& fs : data_) {
ret += fs.size();
}
return ret;
}
private:
vector<faststring> data_;
};
unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(faststring data) {
return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
}
unique_ptr<RpcSidecar> RpcSidecar::FromFaststrings(vector<faststring> data) {
return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
}
unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
}
Status RpcSidecar::ParseSidecars(
const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
Slice buffer,
SidecarSliceVector* sidecars) {
if (offsets.empty()) return Status::OK();
int last = offsets.size() - 1;
if (last >= TransferLimits::kMaxSidecars) {
return Status::Corruption(strings::Substitute(
"Received $0 additional payload slices, expected at most $1",
last, TransferLimits::kMaxSidecars));
}
if (buffer.size() > TransferLimits::kMaxTotalSidecarBytes) {
return Status::Corruption(strings::Substitute(
"Received $0 payload bytes, expected at most $1",
buffer.size(), TransferLimits::kMaxTotalSidecarBytes));
}
sidecars->resize(offsets.size());
for (int i = 0; i < last; ++i) {
int64_t cur_offset = offsets.Get(i);
int64_t next_offset = offsets.Get(i + 1);
if (next_offset > buffer.size()) {
return Status::Corruption(strings::Substitute(
"Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
" has length $2, but the entire message has length $3",
i, cur_offset, (next_offset - cur_offset), buffer.size()));
}
if (next_offset < cur_offset) {
return Status::Corruption(strings::Substitute(
"Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
" but ends before that at offset $1.", i, cur_offset, next_offset));
}
(*sidecars)[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
}
int64_t cur_offset = offsets.Get(last);
if (cur_offset > buffer.size()) {
return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
"starts at offset $1after message ends (message length $2).", last,
cur_offset, buffer.size()));
}
(*sidecars)[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
return Status::OK();
}
} // namespace rpc
} // namespace kudu