blob: 3693f8875e337efaf39f125dad5d9d7f941ee0b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_CLI_NETWORK_IO_HPP_
#define QUICKSTEP_CLI_NETWORK_IO_HPP_
#include <grpc++/grpc++.h>
#include <cstdio>
#include <iostream>
#include <memory>
#include <string>
#include "cli/IOInterface.hpp"
#include "cli/NetworkCli.grpc.pb.h"
#include "cli/NetworkCli.pb.h"
#include "threading/ConditionVariable.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "utility/Macros.hpp"
#include "utility/MemStream.hpp"
#include "utility/ThreadSafeQueue.hpp"
#include "gflags/gflags.h"
#include "glog/logging.h"
using grpc::Channel;
using grpc::ClientContext;
using grpc::Server;
using grpc::Status;
namespace quickstep {
DECLARE_int32(cli_network_port);
DECLARE_string(cli_network_ip);
namespace networkio_internal {
/**
* Contains state and helper methods for managing interactions between a producer/consumer thread. A producer thread
* will wait on a condition variable. When the consumer thread returns, it will notify the producer.
*/
class RequestState {
public:
explicit RequestState(const QueryRequest& request)
: response_ready_(false),
canceled_(false),
request_(request),
condition_(mutex_.createConditionVariable()) {}
/**
* @brief Notifies that the consumer has finished consuming and that a response is ready.
* To be called after the consumer has executed.
* @param stdout_str Stdout from Quickstep.
* @param stderr_str Stderr from Quickstep.
*/
void responseReady(const std::string& stdout_str, const std::string& stderr_str) {
std::unique_lock<Mutex> lock(mutex_);
response_message_.set_query_result(stdout_str);
response_message_.set_error_result(stderr_str);
response_ready_ = true;
condition_->signalOne();
}
/**
* @brief The producer thread blocks until Quickstep signals that it has finished.
* @note Quickstep may either produce a query response or cancel. Both these actions must notify the condition.
*/
void waitForResponse() {
while (!response_ready_)
condition_->await();
}
/**
* @brief Notifies the producer that its request will not be served by Quickstep.
*/
void cancel() {
std::unique_lock<Mutex> lock(mutex_);
canceled_ = true;
response_ready_ = true;
condition_->signalOne();
}
/**
* @return The producer's query for Quickstep to process.
*/
std::string getRequest() const {
return request_.query();
}
/**
* @return The response message from Quickstep.
*/
QueryResponse getResponse() const {
DCHECK(response_ready_);
return response_message_;
}
/**
* @return True if query was canceled.
*/
bool getCanceled() const {
DCHECK(response_ready_);
return canceled_;
}
private:
bool response_ready_;
bool canceled_;
const QueryRequest& request_;
QueryResponse response_message_;
Mutex mutex_;
ConditionVariable *condition_; // note: owned by the mutex.
DISALLOW_COPY_AND_ASSIGN(RequestState);
};
} // namespace networkio_internal
using networkio_internal::RequestState;
/**
* @brief Contains the callback methods which the gRPC service defines.
* When a request is made of gRPC, a gRPC worker thread is spun up and enters one of the callback methods
* (eg SendQuery). This thread keeps the network connection open while Quickstep processes the query. Concurrency
* control between the gRPC worker thread, and the Quickstep thread is controlled by a RequestState object which is
* created for each interaction.
*/
class NetworkCliServiceImpl final : public NetworkCli::Service {
public:
NetworkCliServiceImpl()
: running_(true) {}
/**
* @brief Handles gRPC request.
* Sets the buffer in the RequestState, places the request on a queue, then waits for a response. The response shall
* be triggered by the Quickstep system.
*/
Status SendQuery(grpc::ServerContext *context,
const QueryRequest *request,
QueryResponse *response) override {
std::unique_ptr<RequestState> request_state(new RequestState(*request));
// Check to see if the gRPC service has been shutdown.
{
SpinSharedMutexSharedLock<true> lock(service_mtx_);
if (!running_) {
return Status::CANCELLED;
}
// While we have a service lock, we add to the Queue. Note that we keep the service lock to protect ourselves from
// a race condition in the kill() method.
// Pushing to the queue will notify consumers.
request_queue_.push(request_state.get());
}
DCHECK(request_state);
// We have pushed to the request queue, so all there is to do now is wait for Quickstep to process the request.
request_state->waitForResponse();
if (request_state->getCanceled()) {
return Status::CANCELLED;
}
*response = request_state->getResponse();
return Status::OK;
}
/**
* @brief The consumer thread waits for a request to materialize.
* @return A non-owned RequestState.
*/
RequestState* waitForRequest() {
return request_queue_.popOne();
}
/**
* @brief Stops accepting further requests and cancels all pending requests.
*/
void kill() {
{
// This action guarantees that no further requests are added to the queue.
SpinSharedMutexExclusiveLock<true> lock(service_mtx_);
running_ = false;
}
// Go through each pending request, and cancel them.
while (!request_queue_.empty()) {
request_queue_.popOne()->cancel();
}
}
private:
SpinSharedMutex<true> service_mtx_;
bool running_;
ThreadSafeQueue<RequestState*> request_queue_;
DISALLOW_COPY_AND_ASSIGN(NetworkCliServiceImpl);
};
class NetworkIOHandle final : public IOHandle {
public:
explicit NetworkIOHandle(RequestState* state)
: request_state_(state) {}
~NetworkIOHandle() override {
// All the commands from the last network interaction have completed, return our response.
// This signals to the producer thread that the interaction is complete.
request_state_->responseReady(out_stream_.str(), err_stream_.str());
}
FILE* out() override {
return out_stream_.file();
}
FILE* err() override {
return err_stream_.file();
}
std::string getCommand() const override {
return request_state_->getRequest();
}
private:
MemStream out_stream_, err_stream_;
RequestState *request_state_;
DISALLOW_COPY_AND_ASSIGN(NetworkIOHandle);
};
/**
* A network interface that uses gRPC to accept commands.
*/
class NetworkIO final : public IOInterface {
public:
NetworkIO();
~NetworkIO() override {
service_.kill();
server_->Shutdown();
server_->Wait();
}
IOHandle* getNextIOHandle() override {
return new NetworkIOHandle(service_.waitForRequest());
}
/**
* @brief Kills the underlying gRPC service.
*/
void killService() {
service_.kill();
}
/**
* @return IP address and port of the network address specified by the user flags.
*/
static std::string GetAddress() {
return FLAGS_cli_network_ip + ":" + std::to_string(FLAGS_cli_network_port);
}
private:
std::unique_ptr<Server> server_;
NetworkCliServiceImpl service_;
DISALLOW_COPY_AND_ASSIGN(NetworkIO);
};
} // namespace quickstep
#endif // QUICKSTEP_CLI_NETWORK_IO_HPP_