blob: 99884074ca320d223d9427e66a49b89ea3a7c329 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/serverlet.h"
#include "runtime/task/task_code.h"
#include "utils/threadpool_code.h"
#include "utils/zlocks.h"
namespace dsn {
class message_ex;
} // namespace dsn
namespace pegasus {
namespace proxy {
DEFINE_TASK_CODE_RPC(RPC_CALL_RAW_SESSION_DISCONNECT,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE_RPC(RPC_CALL_RAW_MESSAGE, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
class proxy_stub;
class proxy_session : public std::enable_shared_from_this<proxy_session>
{
public:
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, dsn::message_ex *first_msg)>
factory;
proxy_session(proxy_stub *p, dsn::message_ex *first_msg);
virtual ~proxy_session();
// on_recv_request & on_remove_session are called by proxy_stub when messages are got from
// underlying rpc engine.
//
// then rpc engine ensures that on_recv_request for one proxy_session
// won't be called concurrently. that is to say: another on_recv_requst
// may happen only after the first one returns
//
// however, during the running of on_recv_request, an "on_remove_session" may be called,
// the proxy_session and its derived class may need to do some synchronization on this.
void on_recv_request(dsn::message_ex *msg);
void on_remove_session();
protected:
// return true if parse ok
virtual bool parse(dsn::message_ex *msg) = 0;
dsn::message_ex *create_response();
const char *log_prefix() const { return _session_remote_str.c_str(); }
protected:
proxy_stub *_stub;
std::atomic_bool _is_session_reset;
// when get message from raw parser, request & response of "dsn::message_ex*" are not in couple.
// we need to backup one request to create a response struct.
dsn::message_ex *_backup_one_request;
// the client for which this session served
dsn::host_port _session_remote;
std::string _session_remote_str;
};
class proxy_stub : public ::dsn::serverlet<proxy_stub>
{
public:
proxy_stub(const proxy_session::factory &f,
const char *cluster,
const char *app,
const char *geo_app = "");
const char *get_cluster() const { return _cluster.c_str(); }
const char *get_app() const { return _app.c_str(); }
const char *get_geo_app() const { return _geo_app.c_str(); }
void open_service()
{
this->register_rpc_handler(
RPC_CALL_RAW_MESSAGE, "raw_message", &proxy_stub::on_rpc_request);
this->register_rpc_handler(RPC_CALL_RAW_SESSION_DISCONNECT,
"raw_session_disconnect",
&proxy_stub::on_recv_remove_session_request);
}
void close_service()
{
this->unregister_rpc_handler(RPC_CALL_RAW_MESSAGE);
this->unregister_rpc_handler(RPC_CALL_RAW_SESSION_DISCONNECT);
}
void remove_session(dsn::host_port remote_address);
private:
void on_rpc_request(dsn::message_ex *request);
void on_recv_remove_session_request(dsn::message_ex *);
::dsn::zrwlock_nr _lock;
std::unordered_map<::dsn::host_port, std::shared_ptr<proxy_session>> _sessions;
proxy_session::factory _factory;
::dsn::host_port _uri_address;
std::string _cluster;
std::string _app;
std::string _geo_app;
};
} // namespace proxy
} // namespace pegasus