blob: 81940167f5bbcf94580b9bb3ddc0b828ee419ada [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RPC_THRIFT_CLIENT_H
#define IMPALA_RPC_THRIFT_CLIENT_H
#include <ostream>
#include <sstream>
#include <boost/shared_ptr.hpp>
#include <common/status.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TSSLSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <sstream>
#include <gflags/gflags.h>
#include "transport/TSaslClientTransport.h"
#include "transport/TSasl.h"
#include "rpc/authentication.h"
#include "rpc/thrift-server.h"
#include "gen-cpp/Types_types.h"
DECLARE_string(principal);
DECLARE_string(hostname);
namespace impala {
/// Super class for templatized thrift clients.
class ThriftClientImpl {
public:
~ThriftClientImpl() {
Close();
}
const TNetworkAddress& address() const { return address_; }
/// Open the connection to the remote server. May be called repeatedly, is idempotent
/// unless there is a failure to connect.
/// If Open() fails, the connection remains closed.
Status Open();
/// Retry the Open num_retries time waiting wait_ms milliseconds between retries.
/// If num_retries == 0, the connection is retried indefinitely.
Status OpenWithRetry(uint32_t num_retries, uint64_t wait_ms);
/// Close the connection with the remote server. May be called repeatedly.
void Close();
/// Set receive timeout on the underlying TSocket.
void setRecvTimeout(int32_t ms) { socket_->setRecvTimeout(ms); }
/// Set send timeout on the underlying TSocket.
void setSendTimeout(int32_t ms) { socket_->setSendTimeout(ms); }
Status init_status() { return init_status_; }
protected:
ThriftClientImpl(const std::string& ipaddress, int port, bool ssl);
/// Create a new socket without opening it. Returns an error if the socket could not
/// be created.
Status CreateSocket();
/// Address of the server this client communicates with.
TNetworkAddress address_;
/// True if ssl encryption is enabled on this connection.
bool ssl_;
Status init_status_;
/// Sasl Client object. Contains client kerberos identification data.
/// Will be NULL if kerberos is not being used.
boost::shared_ptr<sasl::TSasl> sasl_client_;
/// This factory sets up the openSSL library state and needs to be alive as long as its
/// owner(a ThriftClientImpl instance) does. Otherwise the OpenSSL state is lost
/// (refer IMPALA-2747).
boost::scoped_ptr<apache::thrift::transport::TSSLSocketFactory> ssl_factory_;
/// All shared pointers, because Thrift requires them to be
boost::shared_ptr<apache::thrift::transport::TSocket> socket_;
boost::shared_ptr<apache::thrift::transport::TTransport> transport_;
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> protocol_;
};
/// Utility client to a Thrift server. The parameter type is the Thrift interface type
/// that the server implements.
/// TODO: Consider a builder class to make constructing this class easier.
template <class InterfaceType>
class ThriftClient : public ThriftClientImpl {
public:
/// Creates, but does not connect, a new ThriftClient for a remote server.
/// - ipaddress: address of remote server
/// - port: port on which remote service runs
/// - service_name: If set, the target service to connect to.
/// - auth_provider: Authentication scheme to use. If NULL, use the global default
/// client<->demon authentication scheme.
/// - ssl: if true, SSL is enabled on this connection
ThriftClient(const std::string& ipaddress, int port,
const std::string& service_name = "", AuthProvider* auth_provider = NULL,
bool ssl = false);
/// Returns the object used to actually make RPCs against the remote server
InterfaceType* iface() { return iface_.get(); }
private:
boost::shared_ptr<InterfaceType> iface_;
AuthProvider* auth_provider_;
};
template <class InterfaceType>
ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port,
const std::string& service_name, AuthProvider* auth_provider, bool ssl)
: ThriftClientImpl(ipaddress, port, ssl),
iface_(new InterfaceType(protocol_)),
auth_provider_(auth_provider) {
if (auth_provider_ == NULL) {
auth_provider_ = AuthManager::GetInstance()->GetInternalAuthProvider();
DCHECK(auth_provider_ != NULL);
}
// If socket_ is NULL (because ThriftClientImpl::CreateSocket() failed in the base
// class constructor, nothing else should be constructed. Open()/Reopen() will return
// the error that the socket couldn't be created and the caller should be careful to
// not use the client after that.
// TODO: Move initialization code that can fail into a separate Init() method.
if (socket_ == NULL) {
DCHECK(!init_status_.ok());
return;
}
// transport_ is created by wrapping the socket_ in the TTransport provided by the
// auth_provider_ and then a TBufferedTransport (IMPALA-1928).
transport_ = socket_;
init_status_ = auth_provider_->WrapClientTransport(address_.hostname, transport_,
service_name, &transport_);
if (!init_status_.ok()) return; // The caller will decide what to do with the Status.
ThriftServer::BufferedTransportFactory factory;
transport_ = factory.getTransport(transport_);
protocol_.reset(new apache::thrift::protocol::TBinaryProtocol(transport_));
iface_.reset(new InterfaceType(protocol_));
}
}
#endif