blob: 6841cb5fb6490aff1dc150afa98406b6f17a2fc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.library.elasticsearch.bulk;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
import static java.util.Objects.requireNonNull;
@Slf4j
public final class BulkProcessor {
private final ArrayBlockingQueue<Holder> requests;
private final AtomicReference<ElasticSearch> es;
private final int bulkActions;
private final Semaphore semaphore;
public static BulkProcessorBuilder builder() {
return new BulkProcessorBuilder();
}
BulkProcessor(
final AtomicReference<ElasticSearch> es, final int bulkActions,
final Duration flushInterval, final int concurrentRequests) {
requireNonNull(flushInterval, "flushInterval");
this.es = requireNonNull(es, "es");
this.bulkActions = bulkActions;
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
this.requests = new ArrayBlockingQueue<>(bulkActions + 1);
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
1, r -> {
final Thread thread = new Thread(r);
thread.setName("ElasticSearch BulkProcessor");
return thread;
});
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true);
scheduler.scheduleWithFixedDelay(
new RunnableWithExceptionProtection(this::flush,
t -> log.error("flush data to ES failure:", t)), 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
}
public CompletableFuture<Void> add(IndexRequest request) {
return internalAdd(request);
}
public CompletableFuture<Void> add(UpdateRequest request) {
return internalAdd(request);
}
@SneakyThrows
private CompletableFuture<Void> internalAdd(Object request) {
requireNonNull(request, "request");
final CompletableFuture<Void> f = new CompletableFuture<>();
requests.put(new Holder(f, request));
flushIfNeeded();
return f;
}
@SneakyThrows
private void flushIfNeeded() {
if (requests.size() >= bulkActions) {
flush();
}
}
public void flush() {
if (requests.isEmpty()) {
return;
}
try {
semaphore.acquire();
} catch (InterruptedException e) {
log.error("Interrupted when trying to get semaphore to execute bulk requests", e);
return;
}
final List<Holder> batch = new ArrayList<>(requests.size());
requests.drainTo(batch);
final CompletableFuture<Void> flush = doFlush(batch);
flush.whenComplete((ignored1, ignored2) -> semaphore.release());
flush.join();
}
private CompletableFuture<Void> doFlush(final List<Holder> batch) {
log.debug("Executing bulk with {} requests", batch.size());
if (batch.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
try {
final RequestFactory rf = v.requestFactory();
final List<byte[]> bs = new ArrayList<>();
for (final Holder holder : batch) {
bs.add(v.codec().encode(holder.request));
bs.add("\n".getBytes());
}
final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
return es.get().client().execute(rf.bulk().bulk(content))
.aggregate().thenAccept(response -> {
final HttpStatus status = response.status();
if (status != HttpStatus.OK) {
throw new RuntimeException(response.contentUtf8());
}
});
} catch (Exception e) {
return Exceptions.throwUnsafely(e);
}
});
future.whenComplete((ignored, exception) -> {
if (exception != null) {
batch.stream().map(it -> it.future)
.forEach(it -> it.completeExceptionally(exception));
log.error("Failed to execute requests in bulk", exception);
} else {
log.debug("Succeeded to execute {} requests in bulk", batch.size());
batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
}
});
return future;
}
@RequiredArgsConstructor
static class Holder {
private final CompletableFuture<Void> future;
private final Object request;
}
}