blob: 374d0b3b1324c079888caa3cf6cc9aad8361ab2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.ceresdb;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import io.ceresdb.proto.internal.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.ceresdb.common.Display;
import io.ceresdb.common.Endpoint;
import io.ceresdb.common.Lifecycle;
import io.ceresdb.common.util.Clock;
import io.ceresdb.common.util.Cpus;
import io.ceresdb.common.util.MetricsUtil;
import io.ceresdb.common.util.Requires;
import io.ceresdb.common.util.SharedScheduledPool;
import io.ceresdb.common.util.Spines;
import io.ceresdb.common.util.TopKSelector;
import io.ceresdb.errors.RouteTableException;
import io.ceresdb.options.RouterOptions;
import io.ceresdb.rpc.Context;
import io.ceresdb.rpc.Observer;
import io.ceresdb.rpc.RpcClient;
import io.ceresdb.rpc.errors.RemotingException;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
/**
* A route rpc client which cached the routing table information locally
* and will refresh when the server returns an error code of INVALID_ROUTE
*
* @author jiachun.fjc
*/
public class RouterClient implements Lifecycle<RouterOptions>, Display, Iterable<Route> {
private static final Logger LOG = LoggerFactory.getLogger(RouterClient.class);
// I don't think they needs to be open to user configuration, so I'll just put a fixed value here
private static final float CLEAN_CACHE_THRESHOLD = 0.75f;
private static final float CLEAN_THRESHOLD = 0.1f;
private static final int MAX_CONTINUOUS_GC_TIMES = 3;
private static final int ITEM_COUNT_EACH_REFRESH = 512;
private static final long BLOCKING_ROUTE_TIMEOUT_MS = 3000;
private static final SharedScheduledPool CLEANER_POOL = Utils.getSharedScheduledPool("route_cache_cleaner", 1);
private static final SharedScheduledPool REFRESHER_POOL = Utils.getSharedScheduledPool("route_cache_refresher",
Math.min(4, Cpus.cpus()));
private ScheduledExecutorService cleaner;
private ScheduledExecutorService refresher;
private RouterOptions opts;
private RpcClient rpcClient;
private RouterByMetrics router;
private InnerMetrics metrics;
private final ConcurrentMap<String, Route> routeCache = new ConcurrentHashMap<>();
static final class InnerMetrics {
final Histogram refreshedSize;
final Histogram cachedSize;
final Histogram gcTimes;
final Histogram gcItems;
final Timer gcTimer;
final Timer refreshTimer;
private InnerMetrics(final Endpoint name) {
final String nameSuffix = name.toString();
this.refreshedSize = MetricsUtil.histogram("route_for_metrics_refreshed_size", nameSuffix);
this.cachedSize = MetricsUtil.histogram("route_for_metrics_cached_size", nameSuffix);
this.gcTimes = MetricsUtil.histogram("route_for_metrics_gc_times", nameSuffix);
this.gcItems = MetricsUtil.histogram("route_for_metrics_gc_items", nameSuffix);
this.gcTimer = MetricsUtil.timer("route_for_metrics_gc_timer", nameSuffix);
this.refreshTimer = MetricsUtil.timer("route_for_metrics_refresh_timer", nameSuffix);
}
Histogram refreshedSize() {
return this.refreshedSize;
}
Histogram cachedSize() {
return this.cachedSize;
}
Histogram gcTimes() {
return this.gcTimes;
}
Histogram gcItems() {
return this.gcItems;
}
Timer gcTimer() {
return this.gcTimer;
}
Timer refreshTimer() {
return this.refreshTimer;
}
}
@Override
public boolean init(final RouterOptions opts) {
this.opts = Requires.requireNonNull(opts, "RouterClient.opts").copy();
this.rpcClient = this.opts.getRpcClient();
final Endpoint address = Requires.requireNonNull(this.opts.getClusterAddress(), "Null.clusterAddress");
this.router = new RouterByMetrics(address);
this.metrics = new InnerMetrics(address);
final long gcPeriod = this.opts.getGcPeriodSeconds();
if (gcPeriod > 0) {
this.cleaner = CLEANER_POOL.getObject();
this.cleaner.scheduleWithFixedDelay(this::gc, Utils.randomInitialDelay(300), gcPeriod, TimeUnit.SECONDS);
LOG.info("Route table cache cleaner has been started.");
}
final long refreshPeriod = this.opts.getRefreshPeriodSeconds();
if (refreshPeriod > 0) {
this.refresher = REFRESHER_POOL.getObject();
this.refresher.scheduleWithFixedDelay(this::refresh, Utils.randomInitialDelay(180), refreshPeriod,
TimeUnit.SECONDS);
LOG.info("Route table cache refresher has been started.");
}
return true;
}
@Override
public void shutdownGracefully() {
if (this.rpcClient != null) {
this.rpcClient.shutdownGracefully();
}
if (this.cleaner != null) {
CLEANER_POOL.returnObject(this.cleaner);
this.cleaner = null;
}
if (this.refresher != null) {
REFRESHER_POOL.returnObject(this.refresher);
this.refresher = null;
}
clearRouteCache();
}
@Override
public Iterator<Route> iterator() {
return this.routeCache.values().iterator();
}
public Route clusterRoute() {
return Route.of(this.opts.getClusterAddress());
}
public CompletableFuture<Map<String, Route>> routeFor(final Collection<String> metrics) {
if (metrics == null || metrics.isEmpty()) {
return Utils.completedCf(Collections.emptyMap());
}
final Map<String, Route> local = new HashMap<>();
final List<String> misses = new ArrayList<>();
metrics.forEach(metric -> {
final Route r = this.routeCache.get(metric);
if (r == null) {
misses.add(metric);
} else {
local.put(metric, r);
}
});
if (misses.isEmpty()) {
return Utils.completedCf(local);
}
return routeRefreshFor(misses) // refresh from remote
.thenApply(remote -> { // then merge result
final Map<String, Route> ret;
if (remote.size() > local.size()) {
remote.putAll(local);
ret = remote;
} else {
local.putAll(remote);
ret = local;
}
return ret;
}) //
.thenApply(hits -> { // update cache hits
final long now = Clock.defaultClock().getTick();
hits.values().forEach(route -> route.tryWeekSetHit(now));
return hits;
});
}
public CompletableFuture<Map<String, Route>> routeRefreshFor(final Collection<String> metrics) {
final long startCall = Clock.defaultClock().getTick();
return this.router.routeFor(metrics).whenComplete((remote, err) -> {
if (err == null) {
this.routeCache.putAll(remote);
this.metrics.refreshedSize().update(remote.size());
this.metrics.cachedSize().update(this.routeCache.size());
this.metrics.refreshTimer().update(Clock.defaultClock().duration(startCall), TimeUnit.MILLISECONDS);
LOG.info("Route refreshed: {}, cached_size={}.", metrics, this.routeCache.size());
} else {
LOG.warn("Route refresh failed: {}.", metrics, err);
}
});
}
private void blockingRouteRefreshFor(final Collection<String> metrics) {
try {
routeRefreshFor(metrics).get(BLOCKING_ROUTE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Fail to blocking refresh route.", e);
}
}
public void clearRouteCacheBy(final Collection<String> metrics) {
if (metrics == null || metrics.isEmpty()) {
return;
}
metrics.forEach(this.routeCache::remove);
}
public int clearRouteCache() {
final int size = this.routeCache.size();
this.routeCache.clear();
return size;
}
public void refresh() {
final Collection<String> cachedKeys = this.routeCache.keySet();
if (cachedKeys.size() <= ITEM_COUNT_EACH_REFRESH) {
blockingRouteRefreshFor(cachedKeys);
return;
}
final Collection<String> keysToRefresh = Spines.newBuf(ITEM_COUNT_EACH_REFRESH);
for (final String metric : cachedKeys) {
keysToRefresh.add(metric);
if (keysToRefresh.size() >= ITEM_COUNT_EACH_REFRESH) {
blockingRouteRefreshFor(keysToRefresh);
keysToRefresh.clear();
}
}
if (!keysToRefresh.isEmpty()) {
blockingRouteRefreshFor(keysToRefresh);
}
}
public void gc() {
this.metrics.gcTimer().time(() -> this.metrics.gcTimes().update(gc0(0)));
}
private int gc0(final int times) {
if (this.routeCache.size() < this.opts.getMaxCachedSize() * CLEAN_CACHE_THRESHOLD) {
LOG.info("Now that the number of cached entries is {}.", this.routeCache.size());
return times;
}
LOG.warn("Now that the number of cached entries [{}] is about to exceed its limit [{}], we need to clean up.",
//
this.routeCache.size(), this.opts.getMaxCachedSize());
final int itemsToGC = (int) (this.routeCache.size() * CLEAN_THRESHOLD);
if (itemsToGC <= 0) {
LOG.warn("No more need to be clean.");
return times;
}
final List<String> topK = TopKSelector.selectTopK( //
this.routeCache.entrySet(), //
itemsToGC, //
(o1, o2) -> -Long.compare(o1.getValue().getLastHit(), o2.getValue().getLastHit()) //
) //
.map(Map.Entry::getKey) //
.collect(Collectors.toList());
this.metrics.gcItems().update(topK.size());
clearRouteCacheBy(topK);
LOG.warn("Cleaned {} entries from route cache, now entries size {}.", itemsToGC, this.routeCache.size());
if (this.routeCache.size() > this.opts.getMaxCachedSize() * CLEAN_CACHE_THRESHOLD
&& times < MAX_CONTINUOUS_GC_TIMES) {
LOG.warn("Now we need to work continuously, this will be the {}th attempt.", times + 1);
return gc0(times + 1);
}
return times;
}
public <Req, Resp> CompletableFuture<Resp> invoke(final Endpoint endpoint, //
final Req request, //
final Context ctx) {
return invoke(endpoint, request, ctx, -1 /* use default rpc timeout */);
}
public <Req, Resp> CompletableFuture<Resp> invoke(final Endpoint endpoint, //
final Req request, //
final Context ctx, //
final long timeoutMs) {
final CompletableFuture<Resp> future = new CompletableFuture<>();
try {
this.rpcClient.invokeAsync(endpoint, request, ctx, new Observer<Resp>() {
@Override
public void onNext(final Resp value) {
future.complete(value);
}
@Override
public void onError(final Throwable err) {
future.completeExceptionally(err);
}
}, timeoutMs);
} catch (final RemotingException e) {
future.completeExceptionally(e);
}
return future;
}
public <Req, Resp> void invokeServerStreaming(final Endpoint endpoint, //
final Req request, //
final Context ctx, //
final Observer<Resp> observer) {
try {
this.rpcClient.invokeServerStreaming(endpoint, request, ctx, observer);
} catch (final RemotingException e) {
observer.onError(e);
}
}
public <Req, Resp> Observer<Req> invokeClientStreaming(final Endpoint endpoint, //
final Req defaultReqIns, //
final Context ctx, //
final Observer<Resp> respObserver) {
try {
return this.rpcClient.invokeClientStreaming(endpoint, defaultReqIns, ctx, respObserver);
} catch (final RemotingException e) {
respObserver.onError(e);
return new Observer.RejectedObserver<>(e);
}
}
private Collection<Endpoint> reserveAddresses() {
return this.routeCache.values().stream().map(Route::getEndpoint).collect(Collectors.toSet());
}
private boolean checkConn(final Endpoint endpoint, final boolean create) {
return this.rpcClient.checkConnection(endpoint, create);
}
@Override
public void display(final Printer out) {
out.println("--- RouterClient ---") //
.print("opts=") //
.println(this.opts) //
.print("routeCache.size=") //
.println(this.routeCache.size());
if (this.rpcClient != null) {
out.println("");
this.rpcClient.display(out);
}
}
@Override
public String toString() {
return "RouterClient{" + //
"opts=" + opts + //
", rpcClient=" + rpcClient + //
", router=" + router + //
'}';
}
private class RouterByMetrics implements Router<Collection<String>, Map<String, Route>> {
private final Endpoint endpoint;
private RouterByMetrics(Endpoint endpoint) {
this.endpoint = endpoint;
}
@Override
public CompletableFuture<Map<String, Route>> routeFor(final Collection<String> request) {
if (request == null || request.isEmpty()) {
return Utils.completedCf(Collections.emptyMap());
}
final Storage.RouteRequest req = Storage.RouteRequest.newBuilder().addAllMetrics(request).build();
final Context ctx = Context.of("call_priority", "100"); // Mysterious trick!!! \(^▽^)/
final CompletableFuture<Storage.RouteResponse> f = invokeRpc(req, ctx);
return f.thenCompose(resp -> {
if (Utils.isSuccess(resp.getHeader())) {
final Map<String, Route> ret = resp.getRoutesList().stream()
.collect(Collectors.toMap(Storage.Route::getMetric, this::toRouteObj));
return Utils.completedCf(ret);
}
return Utils.errorCf(new RouteTableException("Fail to get route table: " + resp.getHeader()));
});
}
private CompletableFuture<Storage.RouteResponse> invokeRpc(final Storage.RouteRequest req, final Context ctx) {
if (checkConn(this.endpoint, true)) {
return invoke(this.endpoint, req, ctx);
}
LOG.warn("Fail to connect to the cluster address: {}.", this.endpoint);
final Collection<Endpoint> reserves = reserveAddresses();
// RR
int i = 0;
for (final Endpoint ep : reserves) {
LOG.warn("Try to invoke to the {}th server {}.", ++i, ep);
if (checkConn(ep, false)) {
return invoke(ep, req, ctx);
}
}
return Utils.errorCf(new RouteTableException("Fail to connect to: " + this.endpoint));
}
private Route toRouteObj(final Storage.Route r) {
final Storage.Endpoint ep = Requires.requireNonNull(r.getEndpoint(), "CeresDB.Endpoint");
return Route.of(r.getMetric(), Endpoint.of(ep.getIp(), ep.getPort()), r.getExt());
}
}
}