blob: b338b71182a3e5b33b823386fb15f04067a0ed8e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <butil/logging.h>
#include <json2pb/pb_to_json.h>
#include <json2pb/json_to_pb.h>
#include "pb_util.h"
#include "json_loader.h"
#include <errno.h>
namespace brpc {
class JsonLoader::Reader {
public:
explicit Reader(int fd2)
: _fd(fd2)
, _brace_depth(0)
, _quoted(false)
, _quote_char(0)
{}
explicit Reader(const std::string& jsons)
: _fd(-1)
, _brace_depth(0)
, _quoted(false)
, _quote_char(0) {
_file_buf.append(jsons);
}
bool get_next_json(butil::IOBuf* json1);
bool read_some();
private:
int _fd;
int _brace_depth;
bool _quoted; // quoted by " or '
char _quote_char;
butil::IOPortal _file_buf;
};
// Load data from the file.
bool JsonLoader::Reader::read_some() {
if (_fd < 0) { // loading from a string.
return false;
}
ssize_t nr = _file_buf.append_from_file_descriptor(_fd, 65536);
if (nr < 0) {
if (errno == EINTR) {
return read_some();
}
PLOG(ERROR) << "Fail to read fd=" << _fd;
return false;
} else if (nr == 0) {
return false;
} else {
return true;
}
}
// Ignore json only with spaces and newline
static bool possibly_valid_json(const butil::IOBuf& json) {
butil::IOBufAsZeroCopyInputStream it(json);
const void* data = NULL;
for (int size = 0; it.Next(&data, &size); ) {
for (int i = 0; i < size; ++i) {
char c = ((const char*)data)[i];
if (!isspace(c) && c != '\n') {
return true;
}
}
}
return false;
}
// Separate jsons with closed braces.
bool JsonLoader::Reader::get_next_json(butil::IOBuf* json1) {
if (_file_buf.empty()) {
if (!read_some()) {
return false;
}
}
json1->clear();
while (1) {
butil::IOBufAsZeroCopyInputStream it(_file_buf);
const void* data = NULL;
int size = 0;
int total_size = 0;
int skipped = 0;
for (; it.Next(&data, &size); total_size += size) {
for (int i = 0; i < size; ++i) {
const char c = ((const char*)data)[i];
if (_brace_depth == 0) {
// skip any character until the first left brace is found.
if (c != '{') {
++skipped;
continue;
}
}
switch (c) {
case '{':
if (!_quoted) {
++_brace_depth;
} else {
VLOG(1) << "Quoted left brace";
}
break;
case '}':
if (!_quoted) {
--_brace_depth;
if (_brace_depth == 0) {
// the braces are closed, a complete object.
_file_buf.cutn(json1, total_size + i + 1);
json1->pop_front(skipped);
return possibly_valid_json(*json1);
} else if (_brace_depth < 0) {
LOG(ERROR) << "More right braces than left braces";
return false;
}
} else {
VLOG(1) << "Quoted right brace";
}
break;
case '"':
if (_quoted) {
if (_quote_char == '"') {
_quoted = false;
}
} else {
_quoted = true;
_quote_char = '"';
}
break;
case '\'':
if (_quoted) {
if (_quote_char == '\'') {
_quoted = false;
}
} else {
_quoted = true;
_quote_char = '\'';
}
break;
default:
break;
// just skip
}
}
}
if (!_file_buf.empty()) {
json1->append(_file_buf);
_file_buf.clear();
}
if (!read_some()) {
json1->pop_front(skipped);
if (!json1->empty()) {
return possibly_valid_json(*json1);
}
return false;
}
}
return false;
}
JsonLoader::JsonLoader(google::protobuf::compiler::Importer* importer,
google::protobuf::DynamicMessageFactory* factory,
const std::string& service_name,
const std::string& method_name)
: _importer(importer)
, _factory(factory)
, _service_name(service_name)
, _method_name(method_name)
{
_request_prototype = pbrpcframework::get_prototype_by_name(
_service_name, _method_name, true, _importer, _factory);
}
void JsonLoader::load_messages(
JsonLoader::Reader* ctx,
std::deque<google::protobuf::Message*>* out_msgs) {
out_msgs->clear();
butil::IOBuf request_json;
while (ctx->get_next_json(&request_json)) {
VLOG(1) << "Load " << out_msgs->size() + 1 << "-th json=`"
<< request_json << '\'';
std::string error;
google::protobuf::Message* request = _request_prototype->New();
butil::IOBufAsZeroCopyInputStream wrapper(request_json);
if (!json2pb::JsonToProtoMessage(&wrapper, request, &error)) {
LOG(WARNING) << "Fail to convert to pb: " << error << ", json=`"
<< request_json << '\'';
delete request;
continue;
}
out_msgs->push_back(request);
LOG_IF(INFO, (out_msgs->size() % 10000) == 0)
<< "Loaded " << out_msgs->size() << " jsons";
}
}
void JsonLoader::load_messages(
int fd,
std::deque<google::protobuf::Message*>* out_msgs) {
JsonLoader::Reader ctx(fd);
load_messages(&ctx, out_msgs);
}
void JsonLoader::load_messages(
const std::string& jsons,
std::deque<google::protobuf::Message*>* out_msgs) {
JsonLoader::Reader ctx(jsons);
load_messages(&ctx, out_msgs);
}
} // namespace brpc