blob: 0861da0e0835e2450da8627e15d5d61576f2e844 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.ceresdb;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.ceresdb.common.Display;
import io.ceresdb.common.Endpoint;
import io.ceresdb.common.Lifecycle;
import io.ceresdb.common.VisibleForTest;
import io.ceresdb.common.util.Clock;
import io.ceresdb.common.util.MetricsUtil;
import io.ceresdb.common.util.Requires;
import io.ceresdb.common.util.SerializingExecutor;
import io.ceresdb.common.util.Spines;
import io.ceresdb.common.util.Strings;
import io.ceresdb.errors.StreamException;
import io.ceresdb.models.Err;
import io.ceresdb.models.Result;
import io.ceresdb.models.Rows;
import io.ceresdb.models.Value;
import io.ceresdb.models.WriteOk;
import io.ceresdb.options.WriteOptions;
import io.ceresdb.proto.internal.Storage;
import io.ceresdb.rpc.Context;
import io.ceresdb.rpc.Observer;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
/**
* Default Write API impl.
*
* @author jiachun.fjc
*/
public class WriteClient implements Write, Lifecycle<WriteOptions>, Display {
private static final Logger LOG = LoggerFactory.getLogger(WriteClient.class);
private WriteOptions opts;
private RouterClient routerClient;
private Executor asyncPool;
private WriteLimiter writeLimiter;
static final class InnerMetrics {
static final Histogram WRITE_ROWS_SUCCESS = MetricsUtil.histogram("write_rows_success_num");
static final Histogram WRITE_ROWS_FAILED = MetricsUtil.histogram("write_rows_failed_num");
static final Histogram METRICS_NUM_PER_WRITE = MetricsUtil.histogram("metrics_num_per_write");
static final Meter WRITE_FAILED = MetricsUtil.meter("write_failed");
static final Meter WRITE_QPS = MetricsUtil.meter("write_qps");
static Histogram writeRowsSuccess() {
return WRITE_ROWS_SUCCESS;
}
static Histogram writeRowsFailed() {
return WRITE_ROWS_FAILED;
}
static Histogram metricsNumPerWrite() {
return METRICS_NUM_PER_WRITE;
}
static Meter writeFailed() {
return WRITE_FAILED;
}
static Meter writeQps() {
return WRITE_QPS;
}
static Meter writeByRetries(final int retries) {
// more than 3 retries are classified as the same metric
return MetricsUtil.meter("write_by_retries", Math.min(3, retries));
}
}
@Override
public boolean init(final WriteOptions opts) {
this.opts = Requires.requireNonNull(opts, "WriteClient.opts");
this.routerClient = this.opts.getRoutedClient();
final Executor pool = this.opts.getAsyncPool();
this.asyncPool = pool != null ? pool : new SerializingExecutor("write_client");
this.writeLimiter = new DefaultWriteLimiter(this.opts.getMaxInFlightWriteRows(), this.opts.getLimitedPolicy());
return true;
}
@Override
public void shutdownGracefully() {
// NO-OP
}
@Override
public CompletableFuture<Result<WriteOk, Err>> write(final Collection<Rows> data, final Context ctx) {
Requires.requireNonNull(data, "Null.data");
final long startCall = Clock.defaultClock().getTick();
return this.writeLimiter.acquireAndDo(data, () -> write0(data, ctx, 0).whenCompleteAsync((r, e) -> {
InnerMetrics.writeQps().mark();
if (r != null) {
if (Utils.isRwLogging()) {
LOG.info("Write to {}, duration={} ms, result={}.", Utils.DB_NAME,
Clock.defaultClock().duration(startCall), r);
}
if (r.isOk()) {
final WriteOk ok = r.getOk();
InnerMetrics.writeRowsSuccess().update(ok.getSuccess());
InnerMetrics.writeRowsFailed().update(ok.getFailed());
return;
}
}
InnerMetrics.writeFailed().mark();
}, this.asyncPool));
}
@Override
public StreamWriteBuf<Rows, WriteOk> streamWrite(final String metric, final Context ctx) {
Requires.requireTrue(Strings.isNotBlank(metric), "Blank.metric");
final CompletableFuture<WriteOk> respFuture = new CompletableFuture<>();
return this.routerClient.routeFor(Collections.singleton(metric))
.thenApply(routes -> routes.values().stream().findFirst().orElseGet(() -> Route.invalid(metric)))
.thenApply(route -> streamWriteTo(route, ctx, Utils.toUnaryObserver(respFuture)))
.thenApply(reqObserver -> new StreamWriteBuf<Rows, WriteOk>() {
private final Collection<Rows> buf = Spines.newBuf();
@Override
public StreamWriteBuf<Rows, WriteOk> write(final Rows val) {
this.buf.add(val);
return this;
}
@Override
public StreamWriteBuf<Rows, WriteOk> write(final Collection<Rows> c) {
this.buf.addAll(c);
return this;
}
@Override
public StreamWriteBuf<Rows, WriteOk> flush() {
if (respFuture.isCompletedExceptionally()) {
respFuture.getNow(null); // throw the exception now
}
if (!this.buf.isEmpty()) {
reqObserver.onNext(this.buf.stream());
this.buf.clear();
}
return this;
}
@Override
public StreamWriteBuf<Rows, WriteOk> writeAndFlush(final Collection<Rows> c) {
flush(); // flush the previous write
reqObserver.onNext(c.stream());
return this;
}
@Override
public CompletableFuture<WriteOk> completed() {
flush();
reqObserver.onCompleted();
return respFuture;
}
}).join();
}
private CompletableFuture<Result<WriteOk, Err>> write0(final Collection<Rows> data, //
final Context ctx, //
final int retries) {
InnerMetrics.writeByRetries(retries).mark();
final Set<String> metrics = data.stream() //
.map(Rows::getMetric) //
.collect(Collectors.toSet());
InnerMetrics.metricsNumPerWrite().update(metrics.size());
// 1. Get routes
return this.routerClient.routeFor(metrics)
// 2. Split data by route info and write to DB
.thenComposeAsync(routes -> Utils.splitDataByRoute(data, routes).entrySet().stream()
// Write to database
.map(e -> writeTo(e.getKey(), e.getValue(), ctx.copy(), retries))
// Reduce and combine write result
.reduce((f1, f2) -> f1.thenCombineAsync(f2, Utils::combineResult, this.asyncPool))
.orElse(Utils.completedCf(WriteOk.emptyOk().mapToResult())), this.asyncPool)
// 3. If failed, refresh route info and retry on INVALID_ROUTE
.thenComposeAsync(r -> {
if (r.isOk()) {
LOG.debug("Success to write to {}, ok={}.", Utils.DB_NAME, r.getOk());
return Utils.completedCf(r);
}
final Err err = r.getErr();
LOG.warn("Failed to write to {}, retries={}, err={}.", Utils.DB_NAME, retries, err);
if (retries + 1 > this.opts.getMaxRetries()) {
LOG.error("Retried {} times still failed.", retries);
return Utils.completedCf(r);
}
// Should refresh route table
final Set<String> toRefresh = err.stream() //
.filter(Utils::shouldRefreshRouteTable) //
.flatMap(e -> e.getFailedWrites().stream()) //
.map(Rows::getMetric) //
.collect(Collectors.toSet());
// Should retries
final List<Rows> rowsToRetry = err.stream() //
.filter(Utils::shouldRetry) //
.flatMap(e -> e.getFailedWrites().stream()) //
.collect(Collectors.toList());
// Should not retries
final Optional<Err> noRetryErr = err.stream() //
.filter(Utils::shouldNotRetry) //
.reduce(Err::combine);
// Async refresh route info
final CompletableFuture<Result<WriteOk, Err>> rwf = this.routerClient.routeRefreshFor(toRefresh)
// Even for some data that does not require a refresh of the routing table,
// we still wait until the routing table is flushed successfully before
// retrying it, in order to give the server a break.
.thenComposeAsync(routes -> write0(rowsToRetry, ctx, retries + 1), this.asyncPool);
return noRetryErr.isPresent() ?
rwf.thenApplyAsync(ret -> Utils.combineResult(noRetryErr.get().mapToResult(), ret),
this.asyncPool) :
rwf.thenApplyAsync(ret -> Utils.combineResult(err.getSubOk().mapToResult(), ret),
this.asyncPool);
}, this.asyncPool);
}
private CompletableFuture<Result<WriteOk, Err>> writeTo(final Endpoint endpoint, //
final Collection<Rows> data, //
final Context ctx, //
final int retries) {
// The cost is worth it
final int rowCount = data.stream() //
.map(Rows::getRowCount) //
.reduce(0, Integer::sum);
final int maxWriteSize = this.opts.getMaxWriteSize();
if (rowCount <= maxWriteSize) {
return writeTo0(endpoint, data, ctx, retries);
}
final Stream.Builder<CompletableFuture<Result<WriteOk, Err>>> fs = Stream.builder();
final PartBuf partBuf = new PartBuf();
for (final Rows rs : data) {
final int rc = rs.getRowCount();
if (partBuf.isNotEmpty() && partBuf.preAdd(rc) > maxWriteSize) {
fs.add(writeTo0(endpoint, partBuf.collectAndReset(), ctx.copy(), retries));
}
partBuf.add(rs);
}
if (partBuf.isNotEmpty()) {
fs.add(writeTo0(endpoint, partBuf.collectAndReset(), ctx.copy(), retries));
}
return fs.build() //
.reduce((f1, f2) -> f1.thenCombineAsync(f2, Utils::combineResult, this.asyncPool)) //
.orElse(Utils.completedCf(WriteOk.emptyOk().mapToResult()));
}
private static class PartBuf {
private Collection<Rows> buf;
private int count;
public void add(final Rows rs) {
if (this.buf == null) {
this.buf = Spines.newBuf();
}
this.buf.add(rs);
this.count += rs.getRowCount();
}
public int preAdd(final int c) {
return this.count + c;
}
public boolean isNotEmpty() {
return this.count > 0;
}
public Collection<Rows> collectAndReset() {
final Collection<Rows> ret = this.buf;
// Cannot reuse the buf, outside will reference it until the response arrives.
this.buf = null;
this.count = 0;
return ret;
}
}
private CompletableFuture<Result<WriteOk, Err>> writeTo0(final Endpoint endpoint, //
final Collection<Rows> data, //
final Context ctx, //
final int retries) {
final CompletableFuture<Storage.WriteResponse> wrf = this.routerClient.invoke(endpoint, //
toWriteRequestObj(data.stream()), //
ctx.with("retries", retries) // server can use this in metrics
);
return wrf.thenApplyAsync(resp -> Utils.toResult(resp, endpoint, data), this.asyncPool);
}
private Observer<Stream<Rows>> streamWriteTo(final Route route, //
final Context ctx, //
final Observer<WriteOk> respObserver) {
final Observer<Storage.WriteRequest> rpcObs = this.routerClient.invokeClientStreaming(route.getEndpoint(), //
Storage.WriteRequest.getDefaultInstance(), //
ctx, //
new Observer<Storage.WriteResponse>() {
@Override
public void onNext(final Storage.WriteResponse value) {
final Result<WriteOk, Err> ret = Utils.toResult(value, route.getEndpoint(), null);
if (ret.isOk()) {
respObserver.onNext(ret.getOk());
} else {
respObserver.onError(new StreamException("Failed to do stream write: " + ret.getErr()));
}
}
@Override
public void onError(final Throwable err) {
respObserver.onError(err);
}
@Override
public void onCompleted() {
respObserver.onCompleted();
}
});
return new Observer<Stream<Rows>>() {
private final String metric = route.getMetric();
@Override
public void onNext(final Stream<Rows> value) {
final Stream<Rows> data = value.filter(rs -> {
if (this.metric.equals(rs.getMetric())) {
return true;
}
throw new StreamException(
String.format("Invalid metric %s, only can write %s.", rs.getMetric(), this.metric));
});
rpcObs.onNext(toWriteRequestObj(data));
}
@Override
public void onError(final Throwable err) {
rpcObs.onError(err);
}
@Override
public void onCompleted() {
rpcObs.onCompleted();
}
};
}
private static class NameDict {
private final Map<String, Integer> nameIndexes = new HashMap<>();
private int index = 0;
public int insert(final String name) {
return this.nameIndexes.computeIfAbsent(name, s -> this.index++);
}
public Iterable<String> toOrdered() {
final String[] arr = new String[this.index];
this.nameIndexes.forEach((name, i) -> arr[i] = name);
return () -> new Iterator<String>() {
private int index = 0;
@Override
public boolean hasNext() {
return this.index < arr.length;
}
@Override
public String next() {
return arr[this.index++];
}
};
}
}
private static class WriteTuple3 {
Storage.WriteMetric.Builder wmcBui;
NameDict tagDict;
NameDict fieldDict;
public WriteTuple3(String metric) {
this.wmcBui = Storage.WriteMetric.newBuilder().setMetric(metric);
this.tagDict = new NameDict();
this.fieldDict = new NameDict();
}
public Storage.WriteMetric.Builder getWmcBui() {
return wmcBui;
}
public NameDict getTagDict() {
return tagDict;
}
public NameDict getFieldDict() {
return fieldDict;
}
public Storage.WriteMetric build() {
return this.wmcBui //
.addAllTagNames(this.tagDict.toOrdered()) //
.addAllFieldNames(this.fieldDict.toOrdered()) //
.build();
}
}
@VisibleForTest
public Storage.WriteRequest toWriteRequestObj(final Stream<Rows> data) {
final Storage.WriteRequest.Builder wrBui = Storage.WriteRequest.newBuilder();
final Map<String, WriteTuple3> tuple3s = new HashMap<>();
data.forEach(rs -> {
final String metric = rs.getMetric();
final WriteTuple3 tp3 = tuple3s.computeIfAbsent(metric, WriteTuple3::new);
final Storage.WriteEntry.Builder weyBui = Storage.WriteEntry.newBuilder();
final NameDict tagDict = tp3.getTagDict();
rs.getSeries().getTags().forEach((tagK, tagV) -> {
if (Value.isNull(tagV)) {
return;
}
final Storage.Tag.Builder tBui = Storage.Tag.newBuilder().setNameIndex(tagDict.insert(tagK))
.setValue(Utils.toProtoValue(tagV));
weyBui.addTags(tBui.build());
});
final NameDict fieldDict = tp3.getFieldDict();
rs.getFields().forEach((ts, fields) -> {
final Storage.FieldGroup.Builder fgBui = Storage.FieldGroup.newBuilder().setTimestamp(ts);
fields.forEach((name, field) -> {
if (Value.isNull(field)) {
return;
}
final Storage.Field.Builder fBui = Storage.Field.newBuilder().setNameIndex(fieldDict.insert(name))
.setValue(Utils.toProtoValue(field));
fgBui.addFields(fBui.build());
});
weyBui.addFieldGroups(fgBui.build());
});
tp3.getWmcBui().addEntries(weyBui.build());
});
tuple3s.values().forEach(tp3 -> wrBui.addMetrics(tp3.build()));
return wrBui.build();
}
@Override
public void display(final Printer out) {
out.println("--- WriteClient ---") //
.print("maxRetries=") //
.println(this.opts.getMaxRetries()) //
.print("maxWriteSize=") //
.println(this.opts.getMaxWriteSize()) //
.print("asyncPool=") //
.println(this.asyncPool);
}
@Override
public String toString() {
return "WriteClient{" + //
"opts=" + opts + //
", routerClient=" + routerClient + //
", asyncPool=" + asyncPool + //
'}';
}
@VisibleForTest
static class DefaultWriteLimiter extends WriteLimiter {
public DefaultWriteLimiter(int maxInFlight, LimitedPolicy policy) {
super(maxInFlight, policy, "write_limiter_acquire");
}
@Override
public int calculatePermits(final Collection<Rows> in) {
return in == null ? 0 : in.stream().map(Rows::getRowCount).reduce(0, Integer::sum);
}
@Override
public Result<WriteOk, Err> rejected(final Collection<Rows> in, final RejectedState state) {
final String errMsg = String.format(
"Write limited by client, acquirePermits=%d, maxPermits=%d, availablePermits=%d.", //
state.acquirePermits(), //
state.maxPermits(), //
state.availablePermits());
return Result.err(Err.writeErr(Result.FLOW_CONTROL, errMsg, null, in));
}
}
}