blob: 59204ae0fc0371ada4222da3193a381aaaea672d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/thrift/client.h"
#include <memory>
#include <mutex>
#include <ostream>
#include <glog/logging.h>
#include <thrift/TOutput.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include "kudu/thrift/sasl_client_transport.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/status.h"
namespace apache {
namespace thrift {
namespace transport {
class TTransport;
} // namespace transport
} // namespace thrift
} // namespace apache
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::protocol::TProtocol;
using apache::thrift::transport::TBufferedTransport;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
using std::make_shared;
using std::shared_ptr;
namespace kudu {
namespace thrift {
namespace {
// A logging callback for Thrift.
void ThriftOutputFunction(const char* output) {
LOG(INFO) << output;
}
} // anonymous namespace
shared_ptr<TProtocol> CreateClientProtocol(const HostPort& address, const ClientOptions& options) {
// Initialize the global Thrift logging callback.
static std::once_flag set_thrift_logging_callback;
std::call_once(set_thrift_logging_callback, [] {
apache::thrift::GlobalOutput.setOutputFunction(ThriftOutputFunction);
});
auto socket = make_shared<TSocket>(address.host(), address.port());
socket->setSendTimeout(options.send_timeout.ToMilliseconds());
socket->setRecvTimeout(options.recv_timeout.ToMilliseconds());
socket->setConnTimeout(options.conn_timeout.ToMilliseconds());
shared_ptr<TTransport> transport;
if (options.enable_kerberos) {
DCHECK(!options.service_principal.empty());
transport = make_shared<SaslClientTransport>(options.service_principal,
address.host(),
std::move(socket),
options.max_buf_size);
} else {
transport = make_shared<TBufferedTransport>(std::move(socket));
}
return make_shared<TBinaryProtocol>(std::move(transport));
}
bool IsFatalError(const Status& error) {
// Whitelist of errors which are not fatal. This errs on the side of
// considering an error fatal since the consequences are low; just an
// unnecessary reconnect. If a fatal error is not recognized it could cause
// another RPC to fail, since there is no way to check the status of the
// connection before sending an RPC.
return !(error.IsAlreadyPresent()
|| error.IsNotFound()
|| error.IsInvalidArgument()
|| error.IsIllegalState()
|| error.IsNotSupported()
|| error.IsRemoteError());
}
} // namespace thrift
} // namespace kudu