blob: 9fe65f31eac689ae871e59e3d5f3218aa399dce4 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>
#include "Client.h"
#include "RpcClient.h"
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
enum class StreamState : std::uint8_t
{
Created = 0,
Active = 1,
ReadDone = 2,
WriteDone = 3,
Closed = 4,
};
class TelemetryBidiReactor : public grpc::ClientBidiReactor<TelemetryCommand, TelemetryCommand>,
public std::enable_shared_from_this<TelemetryBidiReactor> {
public:
TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address);
~TelemetryBidiReactor();
void OnWriteDone(bool ok) override;
void OnWritesDoneDone(bool ok) override;
void OnReadDone(bool ok) override;
void OnDone(const grpc::Status& status) override;
void fireRead();
void fireWrite();
void fireClose();
void write(TelemetryCommand command);
bool await();
private:
grpc::ClientContext context_;
/**
* @brief Command to read from server.
*/
TelemetryCommand read_;
/**
* @brief Buffered commands to write to server
*/
std::vector<TelemetryCommand> writes_ GUARDED_BY(writes_mtx_);
absl::Mutex writes_mtx_;
/**
* @brief The command that is currently being written back to server.
*/
TelemetryCommand write_;
/**
* @brief Each TelemetryBidiReactor belongs to a specific client as its owner.
*/
std::weak_ptr<Client> client_;
/**
* @brief Address of remote peer.
*/
std::string peer_address_;
/**
* @brief Indicate if there is a command being written to network.
*/
std::atomic_bool command_inflight_{false};
StreamState stream_state_ GUARDED_BY(stream_state_mtx_);
absl::Mutex stream_state_mtx_;
absl::CondVar stream_state_cv_;
bool server_setting_received_ GUARDED_BY(server_setting_received_mtx_){false};
absl::Mutex server_setting_received_mtx_;
absl::CondVar server_setting_received_cv_;
void onVerifyMessageResult(TelemetryCommand command);
void applySettings(const rmq::Settings& settings);
void applyBackoffPolicy(const rmq::Settings& settings, std::shared_ptr<Client>& client);
void applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client);
void applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client);
};
ROCKETMQ_NAMESPACE_END