blob: 7ff44a890ccb8457ca62fa3b9c5ff2de67ff23dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "testutil/in-process-servers.h"
#include <boost/scoped_ptr.hpp>
#include <stdlib.h>
#include "statestore/statestore.h"
#include "rpc/thrift-util.h"
#include "rpc/thrift-server.h"
#include "util/network-util.h"
#include "util/webserver.h"
#include "util/default-path-handlers.h"
#include "util/metrics.h"
#include "util/openssl-util.h"
#include "runtime/exec-env.h"
#include "service/impala-server.h"
#include "common/names.h"
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
DECLARE_int32(be_port);
DECLARE_int32(krpc_port);
using namespace apache::thrift;
using namespace impala;
InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
const string& statestore_host, int statestore_port) {
for (int tries = 0; tries < 10; ++tries) {
vector<int> used_ports;
int backend_port = FindUnusedEphemeralPort(&used_ports);
if (backend_port == -1) continue;
// This flag is read directly in several places to find the address of the local
// backend interface.
FLAGS_be_port = backend_port;
int krpc_port = FindUnusedEphemeralPort(&used_ports);
if (krpc_port == -1) continue;
FLAGS_krpc_port = krpc_port;
int subscriber_port = FindUnusedEphemeralPort(&used_ports);
if (subscriber_port == -1) continue;
int webserver_port = FindUnusedEphemeralPort(&used_ports);
if (webserver_port == -1) continue;
int beeswax_port = FindUnusedEphemeralPort(&used_ports);
if (beeswax_port == -1) continue;
int hs2_port = FindUnusedEphemeralPort(&used_ports);
if (hs2_port == -1) continue;
InProcessImpalaServer* impala = new InProcessImpalaServer(FLAGS_hostname,
backend_port, krpc_port, subscriber_port, webserver_port, statestore_host,
statestore_port);
// Start the daemon and check if it works, if not delete the current server object and
// pick a new set of ports
Status started = impala->StartWithClientServers(beeswax_port, hs2_port);
if (started.ok()) return impala;
LOG(WARNING) << started.GetDetail();
delete impala;
}
DCHECK(false) << "Could not find port to start Impalad.";
return NULL;
}
InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend_port,
int krpc_port, int subscriber_port, int webserver_port, const string& statestore_host,
int statestore_port)
: hostname_(hostname), backend_port_(backend_port),
beeswax_port_(0),
hs2_port_(0),
impala_server_(NULL),
exec_env_(new ExecEnv(hostname, backend_port, krpc_port, subscriber_port,
webserver_port, statestore_host, statestore_port)) {
}
void InProcessImpalaServer::SetCatalogIsReady() {
DCHECK(impala_server_ != NULL) << "Call Start*() first.";
exec_env_->frontend()->SetCatalogIsReady();
}
Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
RETURN_IF_ERROR(exec_env_->Init());
beeswax_port_ = beeswax_port;
hs2_port_ = hs2_port;
impala_server_.reset(new ImpalaServer(exec_env_.get()));
SetCatalogIsReady();
RETURN_IF_ERROR(impala_server_->Start(backend_port_, beeswax_port, hs2_port));
// Wait for up to 1s for the backend server to start
RETURN_IF_ERROR(WaitForServer(hostname_, backend_port_, 10, 100));
return Status::OK();
}
Status InProcessImpalaServer::Join() {
impala_server_->Join();
return Status::OK();
}
InProcessStatestore* InProcessStatestore::StartWithEphemeralPorts() {
for (int tries = 0; tries < 10; ++tries) {
vector<int> used_ports;
int statestore_port = FindUnusedEphemeralPort(&used_ports);
if (statestore_port == -1) continue;
int webserver_port = FindUnusedEphemeralPort(&used_ports);
if (webserver_port == -1) continue;
InProcessStatestore* ips = new InProcessStatestore(statestore_port, webserver_port);
if (ips->Start().ok()) return ips;
delete ips;
}
DCHECK(false) << "Could not find port to start Statestore.";
return NULL;
}
InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port)
: webserver_(new Webserver(webserver_port)),
metrics_(new MetricGroup("statestore")),
statestore_port_(statestore_port),
statestore_(new Statestore(metrics_.get())) {
AddDefaultUrlCallbacks(webserver_.get());
statestore_->RegisterWebpages(webserver_.get());
}
Status InProcessStatestore::Start() {
RETURN_IF_ERROR(statestore_->Init());
RETURN_IF_ERROR(webserver_->Start());
boost::shared_ptr<TProcessor> processor(
new StatestoreServiceProcessor(statestore_->thrift_iface()));
ThriftServerBuilder builder("StatestoreService", processor, statestore_port_);
if (IsInternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for Statestore";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
}
ThriftServer* server;
ABORT_IF_ERROR(builder.metrics(metrics_.get()).Build(&server));
statestore_server_.reset(server);
RETURN_IF_ERROR(Thread::Create("statestore", "main-loop",
&Statestore::MainLoop, statestore_.get(), &statestore_main_loop_));
RETURN_IF_ERROR(statestore_server_->Start());
return WaitForServer("localhost", statestore_port_, 10, 100);
}