blob: c83b39a5fb58d533cdb3ea55f59fe29a06ad2735 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "RpcCommon.h"
#include <map>
#include <string>
#include <vector>
#include "ThriftConvert.h"
#include "client_types.h"
#include "common_types.h"
using namespace std;
RpcUtils::RpcUtils() {
SUCCESS_STATUS = std::make_shared<TSStatus>();
SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
}
void RpcUtils::verifySuccess(const TSStatus& status) {
if (status.code == TSStatusCode::MULTIPLE_ERROR) {
verifySuccess(status.subStatus);
return;
}
if (status.code != TSStatusCode::SUCCESS_STATUS &&
status.code != TSStatusCode::REDIRECTION_RECOMMEND) {
throw ExecutionException(to_string(status.code) + ": " + status.message,
statusFromThrift(status));
}
}
void RpcUtils::verifySuccessWithRedirection(const TSStatus& status) {
verifySuccess(status);
if (status.__isset.redirectNode) {
throw RedirectException(to_string(status.code) + ": " + status.message,
endpointFromThrift(status.redirectNode));
}
if (status.__isset.subStatus) {
auto statusSubStatus = status.subStatus;
vector<TEndPoint> endPointList(statusSubStatus.size());
int count = 0;
for (const TSStatus& subStatus : statusSubStatus) {
if (subStatus.__isset.redirectNode) {
endPointList[count++] = subStatus.redirectNode;
} else {
TEndPoint endPoint;
endPointList[count++] = endPoint;
}
}
if (!endPointList.empty()) {
throw RedirectException(to_string(status.code) + ": " + status.message,
endpointListFromThrift(endPointList));
}
}
}
void RpcUtils::verifySuccessWithRedirectionForMultiDevices(const TSStatus& status,
vector<string> devices) {
verifySuccess(status);
if (status.code == TSStatusCode::MULTIPLE_ERROR ||
status.code == TSStatusCode::REDIRECTION_RECOMMEND) {
map<string, TEndPoint> deviceEndPointMap;
const vector<TSStatus>& statusSubStatus = status.subStatus;
for (size_t i = 0; i < statusSubStatus.size() && i < devices.size(); i++) {
const TSStatus& subStatus = statusSubStatus[i];
if (subStatus.__isset.redirectNode) {
deviceEndPointMap.insert(make_pair(devices[i], subStatus.redirectNode));
}
}
throw RedirectException(to_string(status.code) + ": " + status.message,
endpointMapFromThrift(deviceEndPointMap));
}
if (status.__isset.redirectNode) {
throw RedirectException(to_string(status.code) + ": " + status.message,
endpointFromThrift(status.redirectNode));
}
if (status.__isset.subStatus) {
auto statusSubStatus = status.subStatus;
vector<TEndPoint> endPointList(statusSubStatus.size());
int count = 0;
for (const TSStatus& subStatus : statusSubStatus) {
if (subStatus.__isset.redirectNode) {
endPointList[count++] = subStatus.redirectNode;
} else {
TEndPoint endPoint;
endPointList[count++] = endPoint;
}
}
if (!endPointList.empty()) {
throw RedirectException(to_string(status.code) + ": " + status.message,
endpointListFromThrift(endPointList));
}
}
}
void RpcUtils::verifySuccess(const vector<TSStatus>& statuses) {
for (const TSStatus& status : statuses) {
if (status.code != TSStatusCode::SUCCESS_STATUS) {
vector<Status> publicStatuses;
publicStatuses.reserve(statuses.size());
for (const TSStatus& s : statuses) {
publicStatuses.push_back(statusFromThrift(s));
}
throw BatchExecutionException(status.message, publicStatuses);
}
}
}
TSStatus RpcUtils::getStatus(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status;
status.__set_code(tsStatusCode);
return status;
}
TSStatus RpcUtils::getStatus(int code, const string& message) {
TSStatus status;
status.__set_code(code);
status.__set_message(message);
return status;
}
shared_ptr<TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSExecuteStatementResp(status);
}
shared_ptr<TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode,
const string& message) {
TSStatus status = getStatus(tsStatusCode, message);
return getTSExecuteStatementResp(status);
}
shared_ptr<TSExecuteStatementResp> RpcUtils::getTSExecuteStatementResp(const TSStatus& status) {
shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
resp->__set_status(status);
return resp;
}
shared_ptr<TSFetchResultsResp>
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSFetchResultsResp(status);
}
shared_ptr<TSFetchResultsResp>
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode,
const string& appendMessage) {
TSStatus status = getStatus(tsStatusCode, appendMessage);
return getTSFetchResultsResp(status);
}
shared_ptr<TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(const TSStatus& status) {
shared_ptr<TSFetchResultsResp> resp(new TSFetchResultsResp());
resp->__set_status(status);
return resp;
}
const string UrlUtils::PORT_SEPARATOR = ":";
const string UrlUtils::ABB_COLON = "[";
TEndPoint UrlUtils::parseTEndPointIpv4AndIpv6Url(const string& endPointUrl) {
TEndPoint endPoint;
if (endPointUrl.empty()) {
return endPoint;
}
size_t portSeparatorPos = endPointUrl.find_last_of(PORT_SEPARATOR);
if (portSeparatorPos == string::npos) {
endPoint.__set_ip(endPointUrl);
return endPoint;
}
string portStr = endPointUrl.substr(portSeparatorPos + 1);
string ip = endPointUrl.substr(0, portSeparatorPos);
if (ip.find(ABB_COLON) != string::npos) {
if (ip.size() >= 2 && ip.front() == '[' && ip.back() == ']') {
ip = ip.substr(1, ip.size() - 2);
}
}
try {
int port = stoi(portStr);
endPoint.__set_ip(ip);
endPoint.__set_port(port);
} catch (const exception&) {
endPoint.__set_ip(endPointUrl);
}
return endPoint;
}