| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <brpc/closure_guard.h> |
| #include <gen_cpp/internal_service.pb.h> |
| |
| #include <string> |
| |
| #include "common/status.h" |
| #include "util/work_thread_pool.hpp" |
| |
| namespace google::protobuf { |
| class Closure; |
| class RpcController; |
| } // namespace google::protobuf |
| |
| namespace doris { |
| |
| class StorageEngine; |
| class ExecEnv; |
| class PHandShakeRequest; |
| class PHandShakeResponse; |
| class RuntimeState; |
| |
| class PInternalService : public PBackendService { |
| public: |
| PInternalService(ExecEnv* exec_env); |
| ~PInternalService() override; |
| |
| void exec_plan_fragment(google::protobuf::RpcController* controller, |
| const PExecPlanFragmentRequest* request, |
| PExecPlanFragmentResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void exec_plan_fragment_prepare(google::protobuf::RpcController* controller, |
| const PExecPlanFragmentRequest* request, |
| PExecPlanFragmentResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void exec_plan_fragment_start(google::protobuf::RpcController* controller, |
| const PExecPlanFragmentStartRequest* request, |
| PExecPlanFragmentResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void cancel_plan_fragment(google::protobuf::RpcController* controller, |
| const PCancelPlanFragmentRequest* request, |
| PCancelPlanFragmentResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, |
| PFetchDataResult* result, google::protobuf::Closure* done) override; |
| |
| void fetch_arrow_data(google::protobuf::RpcController* controller, |
| const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void outfile_write_success(google::protobuf::RpcController* controller, |
| const POutfileWriteSuccessRequest* request, |
| POutfileWriteSuccessResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void fetch_table_schema(google::protobuf::RpcController* controller, |
| const PFetchTableSchemaRequest* request, |
| PFetchTableSchemaResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void fetch_arrow_flight_schema(google::protobuf::RpcController* controller, |
| const PFetchArrowFlightSchemaRequest* request, |
| PFetchArrowFlightSchemaResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void tablet_writer_open(google::protobuf::RpcController* controller, |
| const PTabletWriterOpenRequest* request, |
| PTabletWriterOpenResult* response, |
| google::protobuf::Closure* done) override; |
| |
| void open_load_stream(google::protobuf::RpcController* controller, |
| const POpenLoadStreamRequest* request, POpenLoadStreamResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void tablet_writer_add_block(google::protobuf::RpcController* controller, |
| const PTabletWriterAddBlockRequest* request, |
| PTabletWriterAddBlockResult* response, |
| google::protobuf::Closure* done) override; |
| |
| void tablet_writer_add_block_by_http(google::protobuf::RpcController* controller, |
| const ::doris::PEmptyRequest* request, |
| PTabletWriterAddBlockResult* response, |
| google::protobuf::Closure* done) override; |
| |
| void tablet_writer_cancel(google::protobuf::RpcController* controller, |
| const PTabletWriterCancelRequest* request, |
| PTabletWriterCancelResult* response, |
| google::protobuf::Closure* done) override; |
| |
| void get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, |
| PProxyResult* response, google::protobuf::Closure* done) override; |
| |
| void update_cache(google::protobuf::RpcController* controller, |
| const PUpdateCacheRequest* request, PCacheResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, |
| PFetchCacheResult* result, google::protobuf::Closure* done) override; |
| |
| void clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, |
| PCacheResponse* response, google::protobuf::Closure* done) override; |
| |
| void merge_filter(::google::protobuf::RpcController* controller, |
| const ::doris::PMergeFilterRequest* request, |
| ::doris::PMergeFilterResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void send_filter_size(::google::protobuf::RpcController* controller, |
| const ::doris::PSendFilterSizeRequest* request, |
| ::doris::PSendFilterSizeResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void sync_filter_size(::google::protobuf::RpcController* controller, |
| const ::doris::PSyncFilterSizeRequest* request, |
| ::doris::PSyncFilterSizeResponse* response, |
| ::google::protobuf::Closure* done) override; |
| void apply_filterv2(::google::protobuf::RpcController* controller, |
| const ::doris::PPublishFilterRequestV2* request, |
| ::doris::PPublishFilterResponse* response, |
| ::google::protobuf::Closure* done) override; |
| void transmit_block(::google::protobuf::RpcController* controller, |
| const ::doris::PTransmitDataParams* request, |
| ::doris::PTransmitDataResult* response, |
| ::google::protobuf::Closure* done) override; |
| void transmit_block_by_http(::google::protobuf::RpcController* controller, |
| const ::doris::PEmptyRequest* request, |
| ::doris::PTransmitDataResult* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, |
| PSendDataResult* response, google::protobuf::Closure* done) override; |
| void commit(google::protobuf::RpcController* controller, const PCommitRequest* request, |
| PCommitResult* response, google::protobuf::Closure* done) override; |
| void rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, |
| PRollbackResult* response, google::protobuf::Closure* done) override; |
| void fold_constant_expr(google::protobuf::RpcController* controller, |
| const PConstantExprRequest* request, PConstantExprResult* response, |
| google::protobuf::Closure* done) override; |
| void check_rpc_channel(google::protobuf::RpcController* controller, |
| const PCheckRPCChannelRequest* request, |
| PCheckRPCChannelResponse* response, |
| google::protobuf::Closure* done) override; |
| void reset_rpc_channel(google::protobuf::RpcController* controller, |
| const PResetRPCChannelRequest* request, |
| PResetRPCChannelResponse* response, |
| google::protobuf::Closure* done) override; |
| void hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, |
| PHandShakeResponse* response, google::protobuf::Closure* done) override; |
| |
| void report_stream_load_status(google::protobuf::RpcController* controller, |
| const PReportStreamLoadStatusRequest* request, |
| PReportStreamLoadStatusResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void glob(google::protobuf::RpcController* controller, const PGlobRequest* request, |
| PGlobResponse* response, google::protobuf::Closure* done) override; |
| |
| void group_commit_insert(google::protobuf::RpcController* controller, |
| const PGroupCommitInsertRequest* request, |
| PGroupCommitInsertResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void get_wal_queue_size(google::protobuf::RpcController* controller, |
| const PGetWalQueueSizeRequest* request, |
| PGetWalQueueSizeResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request, |
| PMultiGetResponse* response, google::protobuf::Closure* done) override; |
| |
| void multiget_data_v2(google::protobuf::RpcController* controller, |
| const PMultiGetRequestV2* request, PMultiGetResponseV2* response, |
| google::protobuf::Closure* done) override; |
| |
| void tablet_fetch_data(google::protobuf::RpcController* controller, |
| const PTabletKeyLookupRequest* request, |
| PTabletKeyLookupResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void test_jdbc_connection(google::protobuf::RpcController* controller, |
| const PJdbcTestConnectionRequest* request, |
| PJdbcTestConnectionResult* result, |
| google::protobuf::Closure* done) override; |
| |
| void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, |
| const PFetchRemoteSchemaRequest* request, |
| PFetchRemoteSchemaResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void get_be_resource(google::protobuf::RpcController* controller, |
| const PGetBeResourceRequest* request, PGetBeResourceResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void delete_dictionary(google::protobuf::RpcController* controller, |
| const PDeleteDictionaryRequest* request, |
| PDeleteDictionaryResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void commit_refresh_dictionary(google::protobuf::RpcController* controller, |
| const PCommitRefreshDictionaryRequest* request, |
| PCommitRefreshDictionaryResponse* response, |
| google::protobuf::Closure* done) override; |
| void abort_refresh_dictionary(google::protobuf::RpcController* controller, |
| const PAbortRefreshDictionaryRequest* request, |
| PAbortRefreshDictionaryResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| private: |
| void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, |
| const PExecPlanFragmentRequest* request, |
| PExecPlanFragmentResult* result, |
| google::protobuf::Closure* done); |
| |
| Status _exec_plan_fragment_impl(const std::string& s_request, PFragmentRequestVersion version, |
| bool compact, |
| const std::function<void(RuntimeState*, Status*)>& cb = |
| std::function<void(RuntimeState*, Status*)>()); |
| |
| void _transmit_block(::google::protobuf::RpcController* controller, |
| const ::doris::PTransmitDataParams* request, |
| ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, |
| const Status& extract_st, const int64_t wait_for_worker); |
| |
| Status _tablet_fetch_data(const PTabletKeyLookupRequest* request, |
| PTabletKeyLookupResponse* response); |
| |
| protected: |
| ExecEnv* _exec_env = nullptr; |
| |
| // every brpc service request should put into thread pool |
| // the reason see issue #16634 |
| // define the interface for reading and writing data as heavy interface |
| // otherwise as light interface |
| FifoThreadPool _heavy_work_pool; |
| FifoThreadPool _light_work_pool; |
| FifoThreadPool _arrow_flight_work_pool; |
| }; |
| |
| // `StorageEngine` mixin for `PInternalService` |
| class PInternalServiceImpl final : public PInternalService { |
| public: |
| PInternalServiceImpl(StorageEngine& engine, ExecEnv* exec_env); |
| |
| ~PInternalServiceImpl() override; |
| void request_slave_tablet_pull_rowset(google::protobuf::RpcController* controller, |
| const PTabletWriteSlaveRequest* request, |
| PTabletWriteSlaveResult* response, |
| google::protobuf::Closure* done) override; |
| void response_slave_tablet_pull_rowset(google::protobuf::RpcController* controller, |
| const PTabletWriteSlaveDoneRequest* request, |
| PTabletWriteSlaveDoneResult* response, |
| google::protobuf::Closure* done) override; |
| |
| void get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, |
| const PFetchColIdsRequest* request, |
| PFetchColIdsResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| void get_tablet_rowset_versions(google::protobuf::RpcController* controller, |
| const PGetTabletVersionsRequest* request, |
| PGetTabletVersionsResponse* response, |
| google::protobuf::Closure* done) override; |
| |
| private: |
| void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, |
| int64_t txn_id, int64_t tablet_id, int64_t node_id, |
| bool is_succeed); |
| Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse* response); |
| |
| void _get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, |
| const PFetchColIdsRequest* request, |
| PFetchColIdsResponse* response, |
| google::protobuf::Closure* done); |
| |
| StorageEngine& _engine; |
| }; |
| } // namespace doris |