blob: dae436b8a1ef6a9b9552488a8b71a538868934d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MQ_UTILALL_H
#define MQ_UTILALL_H
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
#include "Message.h"
#include "ONSFactory.h"
#include "ONSChannel.h"
#include "Common.h"
#include "graal_isolate.h"
#include "Logger.h"
#include "rocketmq.h"
#include "ONSClientException.h"
#include <string>
#include <map>
#include <iostream>
#include <rocketmq.h>
#include <thread>
#include <mutex>
#include <unordered_map>
namespace ons {
class UtilAll {
public:
template<typename T>
static std::string to_string(const T &n) {
std::ostringstream stm;
stm << n;
return stm.str();
}
static long long str2ll(const char *str) {
return atoll(str);
}
static std::string to_string(const std::map<std::string, std::string> &prop);
static graal_isolate_t *get_isolate() {
static graal_isolate_t *singleton = nullptr;
if (nullptr == singleton) {
if (graal_create_isolate(nullptr, &singleton, nullptr)) {
rocketmq::spd_log::error("Failed to create graal isolate");
return nullptr;
}
}
return singleton;
}
static void init_message(message &m, const Message &msg) {
m.topic = const_cast<char *>(msg.getTopic());
m.tags = const_cast<char *>(msg.getTag());
m.body = const_cast<char *>(msg.getBody());
m.body_size = msg.getBodySize();
m.key = const_cast<char *>(msg.getKey());
std::string user_properties = to_string(msg.getUserProperties());
char *user_props = new char[user_properties.length() + 1];
strcpy(user_props, user_properties.c_str());
m.user_prop = user_props;
std::string sys_properties = to_string(msg.getSystemProperties());
char *sys_props = new char[sys_properties.length() + 1];
strcpy(sys_props, sys_properties.c_str());
m.system_prop = sys_props;
}
static void release_message(message &m) {
delete[](m.user_prop);
delete[](m.system_prop);
}
static void fill_message_props(Message &msg, const std::string &json, bool sys);
static void
init_factory_property(const ONSFactoryProperty &factoryProperty, factory_property &fp, bool is_producer);
static void release_factory_property(factory_property &fp) {
delete[](fp.send_msg_timeout_millis);
delete[](fp.consume_thread_nums);
delete[](fp.ons_channel);
delete[](fp.max_msg_cache_size);
delete[](fp.ons_trace_switch);
delete[](fp.language_identifier);
delete[](fp.instance_id);
}
static void init_send_result(send_result &sendResult) {
bzero(&sendResult, sizeof(send_result));
sendResult.message_id = new char[ONS_SEND_RESULT_MSG_ID_LEN_MAX + 1];
//sendResult.message_id[ONS_SEND_RESULT_MSG_ID_LEN_MAX] = 0;
bzero(sendResult.message_id, ONS_SEND_RESULT_MSG_ID_LEN_MAX + 1);
sendResult.error_msg = new char[ONS_SEND_RESULT_ERR_MSG_LEN_MAX + 1];
//sendResult.error_msg[ONS_SEND_RESULT_ERR_MSG_LEN_MAX]=0;
bzero(sendResult.error_msg, ONS_SEND_RESULT_ERR_MSG_LEN_MAX + 1);
}
static void release_send_result(send_result &sendResult) {
delete[](sendResult.message_id);
delete[](sendResult.error_msg);
}
};
class MessageConverter {
public:
MessageConverter(message &m, const Message &msg) : m_(m), msg_(msg) {
UtilAll::init_message(m, msg);
}
~MessageConverter() {
UtilAll::release_message(m_);
}
private:
message &m_;
const Message &msg_;
};
class FactoryPropertyConverter {
public:
FactoryPropertyConverter(const ONSFactoryProperty &factoryProperty, factory_property &fp,
bool is_producer = true) : factoryProperty_(
factoryProperty), fp_(fp) {
UtilAll::init_factory_property(factoryProperty, fp, is_producer);
}
~FactoryPropertyConverter() {
UtilAll::release_factory_property(fp_);
}
private:
const ONSFactoryProperty &factoryProperty_;
factory_property &fp_;
};
class ThreadAttachment {
public:
ThreadAttachment(graal_isolatethread_t **thread) throw(ons::ONSClientException) {
std::stringstream ss;
ss << std::this_thread::get_id();
if (isolate_thread_ref_cnt) {
graal_isolatethread_t *prev = graal_get_current_thread(UtilAll::get_isolate());
assert(nullptr != prev);
*thread = prev;
return;
}
++isolate_thread_ref_cnt;
if (graal_attach_thread(UtilAll::get_isolate(), thread)) {
rocketmq::spd_log::error("Failed to attach native thread {} to graal isolate", ss.str());
throw ons::ONSClientException("attach thread to isolate failed", -1);
}
pthread_key_t key;
pthread_key_create(&key, &ThreadAttachment::detach);
pthread_setspecific(key, &ThreadAttachment::isolate_thread_ref_cnt);
rocketmq::spd_log::debug("Attach thread {} to isolate OK", ss.str());
}
~ThreadAttachment() {
}
static void detach(void *) {
graal_isolatethread_t *attached = graal_get_current_thread(UtilAll::get_isolate());
if (attached) {
graal_detach_thread(attached);
std::stringstream ss;
ss << std::this_thread::get_id();
rocketmq::spd_log::debug("Detach thread {} from isolate OK", ss.str());
}
}
private:
static thread_local int isolate_thread_ref_cnt;
};
class SendResultWrapper {
public:
SendResultWrapper(send_result &sr) : sr_(sr) {
UtilAll::init_send_result(sr);
sr_ = sr;
}
~SendResultWrapper() {
UtilAll::release_send_result(sr_);
}
private:
send_result &sr_;
};
}
#endif //MQ_UTILALL_H