blob: 5b40f7f343807554f7b9afa4503ca30b411b3b3c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
#include "absl/container/flat_hash_map.h"
#include "absl/memory/memory.h"
#include "absl/synchronization/mutex.h"
#include "opencensus/trace/exporter/span_data.h"
#include "opencensus/trace/exporter/span_exporter.h"
#include "opencensus/trace/sampler.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
#include "ClientConfig.h"
#include "ClientManager.h"
#include "InvocationContext.h"
#include "rocketmq/RocketMQ.h"
ROCKETMQ_NAMESPACE_BEGIN
namespace collector = opentelemetry::proto::collector;
namespace collector_trace = collector::trace::v1;
enum class TraceMode : std::uint8_t
{
Off = 0,
Develop = 1,
Grpc = 2
};
class Samplers {
public:
static opencensus::trace::Sampler& always();
};
class OtlpExporter : public std::enable_shared_from_this<OtlpExporter> {
public:
OtlpExporter(std::weak_ptr<ClientManager> client_manager, ClientConfig* client_config)
: client_manager_(std::move(client_manager)), client_config_(client_config) {
}
void updateHosts(std::vector<std::string> hosts) LOCKS_EXCLUDED(hosts_mtx_) {
absl::MutexLock lk(&hosts_mtx_);
hosts_ = std::move(hosts);
}
void start();
void shutdown();
std::vector<std::string> hosts() LOCKS_EXCLUDED(hosts_mtx_) {
absl::MutexLock lk(&hosts_mtx_);
return hosts_;
}
std::weak_ptr<ClientManager>& clientManager() {
return client_manager_;
}
ClientConfig* clientConfig() {
return client_config_;
}
void traceMode(TraceMode mode) {
mode_ = mode;
}
TraceMode traceMode() const {
return mode_;
}
private:
std::weak_ptr<ClientManager> client_manager_;
ClientConfig* client_config_;
std::vector<std::string> hosts_;
absl::Mutex hosts_mtx_;
TraceMode mode_{TraceMode::Off};
};
class ExportClient {
public:
ExportClient(std::shared_ptr<CompletionQueue> completion_queue, std::shared_ptr<grpc::Channel> channel)
: completion_queue_(std::move(completion_queue)), stub_(collector_trace::TraceService::NewStub(channel)) {
}
void asyncExport(const collector_trace::ExportTraceServiceRequest& request,
InvocationContext<collector_trace::ExportTraceServiceResponse>* invocation_context);
private:
std::weak_ptr<CompletionQueue> completion_queue_;
std::unique_ptr<collector_trace::TraceService::Stub> stub_;
};
class OtlpExporterHandler : public ::opencensus::trace::exporter::SpanExporter::Handler {
public:
OtlpExporterHandler(std::weak_ptr<OtlpExporter> exporter);
void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans) override;
void start();
void shutdown();
private:
std::weak_ptr<OtlpExporter> exporter_;
std::shared_ptr<CompletionQueue> completion_queue_;
std::thread poll_thread_;
absl::Mutex start_mtx_;
absl::CondVar start_cv_;
absl::flat_hash_map<std::string, std::unique_ptr<ExportClient>> clients_map_ GUARDED_BY(clients_map_mtx_);
absl::Mutex clients_map_mtx_;
thread_local static std::uint32_t round_robin_;
std::atomic_bool stopped_{false};
void poll();
void syncExportClients() LOCKS_EXCLUDED(clients_map_mtx_);
};
ROCKETMQ_NAMESPACE_END