blob: f50c66d726c928c086352055d0d42f602a066270 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.ceresdb;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.ceresdb.common.Display;
import io.ceresdb.common.Endpoint;
import io.ceresdb.common.Lifecycle;
import io.ceresdb.common.VisibleForTest;
import io.ceresdb.common.util.Clock;
import io.ceresdb.common.util.MetricsUtil;
import io.ceresdb.common.util.Requires;
import io.ceresdb.common.util.SerializingExecutor;
import io.ceresdb.errors.StreamException;
import io.ceresdb.models.Err;
import io.ceresdb.models.QueryOk;
import io.ceresdb.models.QueryRequest;
import io.ceresdb.models.Result;
import io.ceresdb.options.QueryOptions;
import io.ceresdb.proto.Storage;
import io.ceresdb.rpc.Context;
import io.ceresdb.rpc.Observer;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
/**
* Default Query API impl.
*
* @author jiachun.fjc
*/
public class QueryClient implements Query, Lifecycle<QueryOptions>, Display {
private static final Logger LOG = LoggerFactory.getLogger(QueryClient.class);
private QueryOptions opts;
private RouterClient routerClient;
private Executor asyncPool;
private QueryLimiter queryLimiter;
static final class InnerMetrics {
static final Histogram READ_ROW_COUNT = MetricsUtil.histogram("read_row_count");
static final Meter READ_FAILED = MetricsUtil.meter("read_failed");
static final Meter READ_QPS = MetricsUtil.meter("read_qps");
static Histogram readRowCount() {
return READ_ROW_COUNT;
}
static Meter readFailed() {
return READ_FAILED;
}
static Meter readQps() {
return READ_QPS;
}
static Meter readByRetries(final int retries) {
// more than 3 retries are classified as the same metric
return MetricsUtil.meter("read_by_retries", Math.min(3, retries));
}
}
@Override
public boolean init(final QueryOptions opts) {
this.opts = Requires.requireNonNull(opts, "QueryOptions.opts");
this.routerClient = this.opts.getRouterClient();
final Executor pool = this.opts.getAsyncPool();
this.asyncPool = pool != null ? pool : new SerializingExecutor("query_client");
this.queryLimiter = new DefaultQueryLimiter(this.opts.getMaxInFlightQueryRequests(),
this.opts.getLimitedPolicy());
return true;
}
@Override
public void shutdownGracefully() {
// NO-OP
}
@Override
public CompletableFuture<Result<QueryOk, Err>> query(final QueryRequest req, final Context ctx) {
Requires.requireNonNull(req, "Null.request");
final long startCall = Clock.defaultClock().getTick();
setMetricsIfAbsent(req);
return this.queryLimiter.acquireAndDo(req, () -> query0(req, ctx, 0).whenCompleteAsync((r, e) -> {
InnerMetrics.readQps().mark();
if (r != null) {
final int rowCount = r.mapOr(0, QueryOk::getRowCount);
InnerMetrics.readRowCount().update(rowCount);
if (Utils.isRwLogging()) {
LOG.info("Read from {}, duration={} ms, rowCount={}.", Utils.DB_NAME,
Clock.defaultClock().duration(startCall), rowCount);
}
if (r.isOk()) {
return;
}
}
InnerMetrics.readFailed().mark();
}, this.asyncPool));
}
@Override
public void streamQuery(final QueryRequest req, final Context ctx, final Observer<QueryOk> observer) {
Requires.requireNonNull(req, "Null.request");
Requires.requireNonNull(observer, "Null.observer");
setMetricsIfAbsent(req);
this.routerClient.routeFor(req.getMetrics())
.thenApply(routes -> routes.values().stream().findAny().orElse(this.routerClient.clusterRoute()))
.thenAccept(route -> streamQueryFrom(route.getEndpoint(), req, ctx, observer));
}
private CompletableFuture<Result<QueryOk, Err>> query0(final QueryRequest req, //
final Context ctx, //
final int retries) {
InnerMetrics.readByRetries(retries).mark();
return this.routerClient.routeFor(req.getMetrics()) //
.thenApplyAsync(routes -> routes.values() //
.stream() //
.findAny() // everyone is OK
.orElse(this.routerClient.clusterRoute()), this.asyncPool) //
.thenComposeAsync(route -> queryFrom(route.getEndpoint(), req, ctx, retries), this.asyncPool)
.thenComposeAsync(r -> {
if (r.isOk()) {
LOG.debug("Success to read from {}, ok={}.", Utils.DB_NAME, r.getOk());
return Utils.completedCf(r);
}
final Err err = r.getErr();
LOG.warn("Failed to read from {}, retries={}, err={}.", Utils.DB_NAME, retries, err);
if (retries > this.opts.getMaxRetries()) {
LOG.error("Retried {} times still failed.", retries);
return Utils.completedCf(r);
}
// Should refresh route table
final Set<String> toRefresh = err.stream() //
.filter(Utils::shouldRefreshRouteTable) //
.flatMap(e -> e.getFailedMetrics().stream()) //
.collect(Collectors.toSet());
if (toRefresh.isEmpty()) {
return Utils.completedCf(r);
}
// Async to refresh route info
return this.routerClient.routeRefreshFor(toRefresh)
.thenComposeAsync(routes -> query0(req, ctx, retries + 1), this.asyncPool);
}, this.asyncPool);
}
private void setMetricsIfAbsent(final QueryRequest req) {
if (req.getMetrics() != null && !req.getMetrics().isEmpty()) {
return;
}
final MetricParser parser = MetricParserFactoryProvider.getMetricParserFactory().getParser(req.getQl());
req.setMetrics(parser.metricNames());
}
private static final class ErrHandler implements Runnable {
private final QueryRequest req;
private ErrHandler(QueryRequest req) {
this.req = req;
}
@Override
public void run() {
LOG.error("Fail to query by request: {}.", this.req);
}
}
private CompletableFuture<Result<QueryOk, Err>> queryFrom(final Endpoint endpoint, //
final QueryRequest req, //
final Context ctx, //
final int retries) {
final Storage.QueryRequest request = Storage.QueryRequest.newBuilder() //
.addAllMetrics(req.getMetrics()) //
.setQl(req.getQl()) //
.build();
final CompletableFuture<Storage.QueryResponse> qrf = this.routerClient.invoke(endpoint, //
request, //
ctx.with("retries", retries) // server can use this in metrics
);
return qrf.thenApplyAsync(
resp -> Utils.toResult(resp, req.getQl(), endpoint, req.getMetrics(), new ErrHandler(req)),
this.asyncPool);
}
private void streamQueryFrom(final Endpoint endpoint, //
final QueryRequest req, //
final Context ctx, //
final Observer<QueryOk> observer) {
final Storage.QueryRequest request = Storage.QueryRequest.newBuilder() //
.addAllMetrics(req.getMetrics()) //
.setQl(req.getQl()) //
.build();
this.routerClient.invokeServerStreaming(endpoint, request, ctx, new Observer<Storage.QueryResponse>() {
@Override
public void onNext(final Storage.QueryResponse value) {
final Result<QueryOk, Err> ret = Utils.toResult(value, req.getQl(), endpoint, req.getMetrics(),
new ErrHandler(req));
if (ret.isOk()) {
observer.onNext(ret.getOk());
} else {
observer.onError(new StreamException("Failed to do stream query: " + ret.getErr()));
}
}
@Override
public void onError(final Throwable err) {
observer.onError(err);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public Executor executor() {
return observer.executor();
}
});
}
@Override
public void display(final Printer out) {
out.println("--- QueryClient ---") //
.print("maxRetries=") //
.println(this.opts.getMaxRetries()) //
.print("asyncPool=") //
.println(this.asyncPool);
}
@Override
public String toString() {
return "QueryClient{" + //
"opts=" + opts + //
", routerClient=" + routerClient + //
", asyncPool=" + asyncPool + //
'}';
}
@VisibleForTest
static class DefaultQueryLimiter extends QueryLimiter {
public DefaultQueryLimiter(int maxInFlight, LimitedPolicy policy) {
super(maxInFlight, policy, "query_limiter_acquire");
}
@Override
public int calculatePermits(final QueryRequest request) {
return 1;
}
@Override
public Result<QueryOk, Err> rejected(final QueryRequest request, final RejectedState state) {
final String errMsg = String.format(
"Query limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", //
state.acquirePermits(), //
state.maxPermits(), //
state.availablePermits());
return Result.err(Err.queryErr(Result.FLOW_CONTROL, errMsg, null, request.getQl(), request.getMetrics()));
}
}
}