| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #include "storage/DataExchangerAsync.hpp" |
| |
| #include <grpc++/grpc++.h> |
| |
| #include <iostream> |
| #include <memory> |
| #include <string> |
| |
| #include "storage/DataExchange.grpc.pb.h" |
| #include "storage/DataExchange.pb.h" |
| #include "storage/StorageManager.hpp" |
| #include "utility/NetworkUtil.hpp" |
| |
| #include "glog/logging.h" |
| |
| using grpc::ServerCompletionQueue; |
| |
| namespace quickstep { |
| namespace { |
| |
| /** |
| * @brief RPC Request Context Instance. |
| **/ |
| class CallContext { |
| public: |
| /** |
| * @brief Constructor. |
| * |
| * @param service The async service. |
| * @param queue The RPC request queue. |
| * @param storage_manager The StorageManager to use. |
| **/ |
| CallContext(DataExchange::AsyncService *service, |
| ServerCompletionQueue *queue, |
| StorageManager *storage_manager) |
| : service_(service), |
| queue_(queue), |
| storage_manager_(DCHECK_NOTNULL(storage_manager)), |
| responder_(&context_), |
| status_(CallStatus::CREATE) { |
| Proceed(); |
| } |
| |
| /** |
| * @brief Process the RPC request. |
| **/ |
| void Proceed(); |
| |
| private: |
| DataExchange::AsyncService *service_; |
| ServerCompletionQueue *queue_; |
| |
| StorageManager *storage_manager_; |
| |
| grpc::ServerContext context_; |
| |
| PullRequest request_; |
| PullResponse response_; |
| |
| grpc::ServerAsyncResponseWriter<PullResponse> responder_; |
| |
| enum class CallStatus { |
| CREATE = 0, |
| PROCESS, |
| FINISH, |
| }; |
| CallStatus status_; |
| }; |
| |
| void CallContext::Proceed() { |
| switch (status_) { |
| case CallStatus::CREATE: { |
| // Change this instance progress to the PROCESS state. |
| status_ = CallStatus::PROCESS; |
| |
| // As part of the initial CREATE state, we *request* that the system |
| // start processing Pull requests. In this request, "this" acts are |
| // the tag uniquely identifying the request (so that different CallContext |
| // instances can serve different requests concurrently), in this case |
| // the memory address of this CallContext instance. |
| service_->RequestPull(&context_, &request_, &responder_, queue_, queue_, this); |
| break; |
| } |
| case CallStatus::PROCESS: { |
| // Spawn a new CallContext instance to serve new clients while we process |
| // the one for this CallContext. The instance will deallocate itself as |
| // part of its FINISH state. |
| new CallContext(service_, queue_, storage_manager_); |
| |
| // The actual processing. |
| storage_manager_->pullBlockOrBlob(request_.block_id(), &response_); |
| |
| // And we are done! Let the gRPC runtime know we've finished, using the |
| // memory address of this instance as the uniquely identifying tag for |
| // the event. |
| status_ = CallStatus::FINISH; |
| responder_.Finish(response_, grpc::Status::OK, this); |
| break; |
| } |
| case CallStatus::FINISH: { |
| // Once in the FINISH state, deallocate ourselves (CallContext). |
| delete this; |
| break; |
| } |
| default: |
| LOG(FATAL) << "Unknown call status."; |
| } |
| } |
| |
| } // namespace |
| |
| DataExchangerAsync::DataExchangerAsync() { |
| const std::string ipv4_address(GetIpv4Address() + ':'); |
| |
| grpc::ServerBuilder builder; |
| builder.AddListeningPort(ipv4_address, grpc::InsecureServerCredentials(), &port_); |
| builder.RegisterService(&service_); |
| |
| queue_ = builder.AddCompletionQueue(); |
| server_ = builder.BuildAndStart(); |
| |
| DCHECK_GT(port_, 0); |
| server_address_ = ipv4_address + std::to_string(port_); |
| LOG(INFO) << "DataExchangerAsync Service listening on " << server_address_; |
| } |
| |
| void DataExchangerAsync::run() { |
| // Self-destruct upon success. |
| new CallContext(&service_, queue_.get(), storage_manager_); |
| |
| void *tag = nullptr; // Uniquely identify a request. |
| bool ok = false; |
| |
| while (true) { |
| if (queue_->Next(&tag, &ok)) { |
| CallContext *call_context = static_cast<CallContext*>(tag); |
| if (ok) { |
| call_context->Proceed(); |
| } else { |
| LOG(WARNING) << "DataExchangerAsync " << server_address_ << " is not ok"; |
| delete call_context; |
| } |
| } else { |
| LOG(INFO) << "DataExchangerAsync " << server_address_ << " shuts down"; |
| return; |
| } |
| } |
| } |
| |
| } // namespace quickstep |