blob: 2efb012aa20a49e494ab58935c3b4fd4b1e58ba7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "util/thrift_util.h"
#include <gen_cpp/Types_types.h>
#include <thrift/TOutput.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportException.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <string>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "exec/tablet_info.h"
#include "olap/tablet_schema.h"
#include "util/thrift_server.h"
namespace apache::thrift::protocol {
class TProtocol;
} // namespace apache::thrift::protocol
// TCompactProtocol requires some #defines to work right. They also define UNLIKELY
// so we need to undef this.
// TODO: is there a better include to use?
#ifdef UNLIKELY
#undef UNLIKELY
#endif
#ifndef SIGNED_RIGHT_SHIFT_IS
#define SIGNED_RIGHT_SHIFT_IS 1
#endif
#ifndef ARITHMETIC_RIGHT_SHIFT
#define ARITHMETIC_RIGHT_SHIFT 1
#endif
#include <thrift/protocol/TCompactProtocol.h>
#include <sstream>
#include <thread>
namespace doris {
ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size)
: _mem_buffer(new apache::thrift::transport::TMemoryBuffer(initial_buffer_size)) {
if (compact) {
apache::thrift::protocol::TCompactProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
factory;
_protocol = factory.getProtocol(_mem_buffer);
} else {
apache::thrift::protocol::TBinaryProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
factory;
_protocol = factory.getProtocol(_mem_buffer);
}
}
std::shared_ptr<apache::thrift::protocol::TProtocol> create_deserialize_protocol(
std::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem, bool compact) {
if (compact) {
apache::thrift::protocol::TCompactProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
tproto_factory;
return tproto_factory.getProtocol(mem);
} else {
apache::thrift::protocol::TBinaryProtocolFactoryT<apache::thrift::transport::TMemoryBuffer>
tproto_factory;
return tproto_factory.getProtocol(mem);
}
}
// Comparator for THostPorts. Thrift declares this (in gen-cpp/Types_types.h) but
// never defines it.
bool TNetworkAddress::operator<(const TNetworkAddress& that) const {
if (this->hostname < that.hostname) {
return true;
} else if ((this->hostname == that.hostname) && (this->port < that.port)) {
return true;
}
return false;
};
static void thrift_output_function(const char* output) {
VLOG_QUERY << output;
}
void init_thrift_logging() {
apache::thrift::GlobalOutput.setOutputFunction(thrift_output_function);
}
Status wait_for_local_server(const ThriftServer& server, int num_retries, int retry_interval_ms) {
return wait_for_server("localhost", server.port(), num_retries, retry_interval_ms);
}
Status wait_for_server(const std::string& host, int port, int num_retries, int retry_interval_ms) {
int retry_count = 0;
while (retry_count < num_retries) {
try {
apache::thrift::transport::TSocket socket(host, port);
// Timeout is in ms
socket.setConnTimeout(500);
socket.open();
socket.close();
return Status::OK();
} catch (apache::thrift::transport::TTransportException& e) {
VLOG_QUERY << "Connection failed: " << e.what();
}
++retry_count;
VLOG_QUERY << "Waiting " << retry_interval_ms << "ms for Thrift server at " << host << ":"
<< port << " to come up, failed attempt " << retry_count << " of "
<< num_retries;
std::this_thread::sleep_for(std::chrono::milliseconds(retry_interval_ms));
}
return Status::InternalError("Server did not come up");
}
void t_network_address_to_string(const TNetworkAddress& address, std::string* out) {
std::stringstream ss;
ss << address;
*out = ss.str();
}
bool t_network_address_comparator(const TNetworkAddress& a, const TNetworkAddress& b) {
int cmp = a.hostname.compare(b.hostname);
if (cmp < 0) {
return true;
}
if (cmp == 0) {
return a.port < b.port;
}
return false;
}
std::string to_string(const TUniqueId& id) {
return std::to_string(id.hi).append(std::to_string(id.lo));
}
bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink) {
OlapTableSchemaParam schema;
if (!schema.init(sink.schema).ok()) {
return false;
}
if (schema.is_partial_update()) {
return true;
}
for (const auto& index_schema : schema.indexes()) {
for (const auto& index : index_schema->indexes) {
if (index->index_type() == INVERTED) {
if (sink.schema.inverted_index_file_storage_format ==
TInvertedIndexFileStorageFormat::V1) {
return true;
} else {
return false;
}
}
}
}
return false;
}
} // namespace doris