blob: ef2144dcace35eeb4331200cc9f4c4f2faa4391a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "rpc/thrift-thread.h"
#include <boost/bind.hpp>
#include <sstream>
#include <thrift/concurrency/Exception.h>
#include "common/names.h"
#include "common/status.h"
using namespace impala;
// Can't import the whole namespace, since impala::Thread will clash with atc::Thread
namespace atc = apache::thrift::concurrency;
void ThriftThread::start() {
Promise<atc::Thread::id_t> promise;
Status status = impala::Thread::Create(group_, name_,
bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);
// Thread creation failed. Thrift expects an exception in this case. See
// the implementation of atc::PosixThreadFactory.cpp or atc::BoostThreadFactory.cpp.
if (!status.ok()) {
throw atc::SystemResourceException(
Substitute("Thread::Create() failed: $0", status.GetDetail()));
}
// Blocks until the thread id has been set
tid_ = promise.Get();
}
atc::Thread::id_t ThriftThread::getId() {
return tid_;
}
void ThriftThread::join() {
impala_thread_->Join();
}
boost::shared_ptr<atc::Thread> ThriftThreadFactory::newThread(
boost::shared_ptr<atc::Runnable> runnable) const {
stringstream name;
name << prefix_ << "-" << count_++;
boost::shared_ptr<ThriftThread> result =
boost::shared_ptr<ThriftThread>(new ThriftThread(group_, name.str(), runnable));
runnable->thread(result);
return result;
}
void ThriftThread::RunRunnable(boost::shared_ptr<atc::Runnable> runnable,
Promise<atc::Thread::id_t>* promise) {
promise->Set(get_current());
// Passing runnable in to this method (rather than reading from this->runnable())
// ensures that it will live as long as this method, otherwise the ThriftThread could be
// destroyed between the previous statement and this one (according to my reading of
// PosixThread)
runnable->run();
}
atc::Thread::id_t ThriftThreadFactory::getCurrentThreadId() const {
return atc::Thread::get_current();
}
ThriftThread::ThriftThread(const string& group, const string& name,
boost::shared_ptr<atc::Runnable> runnable)
: group_(group), name_(name) {
// Sets this::runnable (and no, I don't know why it's not protected in atc::Thread)
this->Thread::runnable(runnable);
}