blob: c561f4d7f280f550c59123596a7210e3e09dcec4 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.ceresdb.http;
import java.util.Collection;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.ceresdb.Management;
import io.ceresdb.MetricParser;
import io.ceresdb.MetricParserFactory;
import io.ceresdb.MetricParserFactoryProvider;
import io.ceresdb.Route;
import io.ceresdb.RouterClient;
import io.ceresdb.common.Endpoint;
import io.ceresdb.common.Tenant;
import io.ceresdb.common.util.AuthUtil;
import io.ceresdb.common.util.Requires;
import io.ceresdb.common.util.Strings;
import io.ceresdb.common.util.internal.ThrowUtil;
import io.ceresdb.http.errors.ManagementException;
import io.ceresdb.models.SqlResult;
import io.ceresdb.options.ManagementOptions;
import io.ceresdb.rpc.Context;
import com.google.gson.Gson;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
/**
* A management API client that communicates with the server using HTTP protocol.
*
* @author jiachun.fjc
*/
public class HttpManagementClient implements Management {
private static final Logger LOG = LoggerFactory.getLogger(HttpManagementClient.class);
private static final String AFFECTED_ROWS = "affected_rows";
private static final String ROWS = "rows";
private final AtomicBoolean started = new AtomicBoolean(false);
private ManagementOptions opts;
private Tenant tenant;
private RouterClient routerClient;
@Override
public boolean init(final ManagementOptions opts) {
if (!this.started.compareAndSet(false, true)) {
throw new IllegalStateException("Http management client has started");
}
this.opts = Requires.requireNonNull(opts, "Null.opts");
this.tenant = Requires.requireNonNull(opts.getTenant(), "Null.Tenant");
this.routerClient = Requires.requireNonNull(opts.getRouterClient(), "Null.RouterClient");
return true;
}
@Override
public void shutdownGracefully() {
if (!this.started.compareAndSet(true, false)) {
return;
}
this.opts = null;
}
@Override
public void display(final Printer out) {
out.println("--- HttpManagementClient ---") //
.print("started=") //
.println(this.started) //
.print("tenant=") //
.println(this.tenant.getTenant());
}
@Override
public SqlResult executeSql(final boolean autoRouting, final Context ctx, final String fmtSql,
final Object... args) {
final String sql = getSql(fmtSql, args);
final Endpoint managementAddress = this.opts.getManagementAddress();
if (!autoRouting && !this.opts.isCheckSql()) {
return doExecuteSql(sql, managementAddress, ctx);
}
final MetricParser parser = parseSql(sql);
if (!autoRouting) {
return doExecuteSql(sql, managementAddress, ctx);
}
Requires.requireNonNull(parser, "Null.parser");
final List<String> tables = parser.metricNames();
Requires.requireNonNull(tables, "Null.tables");
Requires.requireTrue(!tables.isEmpty(), "Empty.tables");
try {
final Map<String, Route> routes = this.routerClient.routeFor(tables).get();
final Endpoint target = getTargetEndpoint(routes.values());
return doExecuteSql(sql, target, ctx);
} catch (final Exception e) {
ThrowUtil.throwException(e);
}
return null;
}
private SqlResult doExecuteSql(final String sql, final Endpoint endpoint, final Context ctx) {
final Request request = contextToHeaders(newBaseRequestBuilder(), ctx) //
.url(getUrl(endpoint)) //
.post(HttpUtil.requestBody(HttpUtil.params("query", sql))) //
.build();
LOG.info("Executing sql: {}, to: {}.", sql, endpoint);
try (Response resp = HttpUtil.httpClient().newCall(request).execute()) {
if (!resp.isSuccessful()) {
throw new ManagementException(
String.format("Execute sql [%s] error from server %s, err_code=%d, err_msg=%s, detail_msg=%s", //
sql, endpoint, resp.code(), resp.message(), getRespBody(resp)));
}
return toSqlResult(resp);
} catch (final Throwable t) {
LOG.error("Fail to execute sql: {}.", sql, t);
ThrowUtil.throwException(t);
}
return null; // never got here
}
private Endpoint getTargetEndpoint(final Collection<Route> routes) {
final Endpoint managementAddress = this.opts.getManagementAddress();
return routes == null ? managementAddress :
routes.stream().findFirst().map(r -> Endpoint.of(r.getEndpoint().getIp(), managementAddress.getPort()))
.orElse(managementAddress);
}
private String getSql(final String fmtSql, final Object... args) {
return String.format(fmtSql, args);
}
private MetricParser parseSql(final String sql) {
final MetricParser sqlParser = getSqlParser(sql);
if (sqlParser == null || !this.opts.isCheckSql()) {
return sqlParser;
}
return checkStatementType(sqlParser);
}
private Request.Builder contextToHeaders(final Request.Builder builder, final Context ctx) {
if (ctx == null) {
return builder;
}
ctx.entrySet().forEach(e -> builder.addHeader(e.getKey(), String.valueOf(e.getValue())));
return builder;
}
private Request.Builder newBaseRequestBuilder() {
return authHeaders(new Request.Builder()) //
.addHeader("Content-Type", "application/json");
}
private Request.Builder authHeaders(final Request.Builder builder) {
AuthUtil.authHeaders(this.tenant).forEach(builder::addHeader);
return builder;
}
private static String getUrl(final Endpoint endpoint) {
Requires.requireNonNull(endpoint, "endpoint");
return String.format("%s://%s:%d/sql", HttpUtil.PROTOCOL, endpoint.getIp(), endpoint.getPort());
}
@SuppressWarnings("unchecked")
private static SqlResult toSqlResult(final Response resp) throws IOException {
final String body = getRespBody(resp);
if (Strings.isBlank(body)) {
return SqlResult.EMPTY_RESULT;
}
final Map<String, Object> mapResult = new Gson().fromJson(body, Map.class);
final Number affectedRows = (Number) mapResult.get(AFFECTED_ROWS);
final List<Map<String, Object>> rows = (List<Map<String, Object>>) mapResult.get(ROWS);
return new SqlResult(affectedRows == null ? 0L : affectedRows.longValue(), rows);
}
private static String getRespBody(final Response resp) throws IOException {
final ResponseBody body = resp.body();
return body == null ? "" : body.string();
}
private static MetricParser getSqlParser(final String sql) {
final MetricParserFactory factory = MetricParserFactoryProvider.getMetricParserFactory();
if (MetricParserFactory.DEFAULT == factory) {
return null;
}
return factory.getParser(sql);
}
private static MetricParser checkStatementType(final MetricParser sqlParser) {
final MetricParser.StatementType statementType = sqlParser.statementType();
switch (statementType) {
case Create:
case Select:
case Alter:
case Describe:
case Show:
case Drop:
case Exists:
break;
case Insert: // We must use the SDK to write data, insert statements are not allowed
case Unknown:
default:
throw new UnsupportedOperationException("Unsupported statement: " + statementType);
}
return sqlParser;
}
}