blob: 98184d564c609a39335d06b2b5fbe5da463240b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.httpfn;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okio.Timeout;
import org.apache.flink.statefun.flink.core.backpressure.BoundedExponentialBackoff;
import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("NullableProblems")
final class RetryingCallback implements Callback {
private static final Duration INITIAL_BACKOFF_DURATION = Duration.ofMillis(10);
private static final Set<Integer> RETRYABLE_HTTP_CODES =
new HashSet<>(Arrays.asList(409, 420, 408, 429, 499, 500));
private static final Logger LOG = LoggerFactory.getLogger(RetryingCallback.class);
private final CompletableFuture<Response> resultFuture;
private final BoundedExponentialBackoff backoff;
private final ToFunctionRequestSummary requestSummary;
private final RemoteInvocationMetrics metrics;
private final BooleanSupplier isShutdown;
private long requestStarted;
RetryingCallback(
ToFunctionRequestSummary requestSummary,
RemoteInvocationMetrics metrics,
Timeout timeout,
BooleanSupplier isShutdown) {
this.resultFuture = new CompletableFuture<>();
this.backoff = new BoundedExponentialBackoff(INITIAL_BACKOFF_DURATION, duration(timeout));
this.requestSummary = requestSummary;
this.metrics = metrics;
this.isShutdown = Objects.requireNonNull(isShutdown);
}
CompletableFuture<Response> future() {
return resultFuture;
}
void attachToCall(Call call) {
this.requestStarted = System.nanoTime();
call.enqueue(this);
}
@Override
public void onFailure(Call call, IOException cause) {
tryWithFuture(() -> onFailureUnsafe(call, cause));
}
@Override
public void onResponse(Call call, Response response) {
tryWithFuture(() -> onResponseUnsafe(call, response));
}
private void onFailureUnsafe(Call call, IOException cause) {
LOG.warn(
"Retriable exception caught while trying to deliver a message: " + requestSummary, cause);
metrics.remoteInvocationFailures();
if (!retryAfterApplyingBackoff(call)) {
throw new IllegalStateException(
"Maximal request time has elapsed. Last cause is attached", cause);
}
}
private void onResponseUnsafe(Call call, Response response) {
if (response.isSuccessful()) {
resultFuture.complete(response);
return;
}
if (!RETRYABLE_HTTP_CODES.contains(response.code()) && response.code() < 500) {
throw new IllegalStateException("Non successful HTTP response code " + response.code());
}
if (!retryAfterApplyingBackoff(call)) {
throw new IllegalStateException(
"Maximal request time has elapsed. Last known error is: invalid HTTP response code "
+ response.code());
}
}
/**
* Retires the original call, after applying backoff.
*
* @return if the request was retried successfully, false otherwise.
*/
private boolean retryAfterApplyingBackoff(Call call) {
if (backoff.applyNow()) {
Call newCall = call.clone();
attachToCall(newCall);
return true;
}
return false;
}
/**
* Executes the runnable, and completes {@link #resultFuture} with any exceptions thrown, during
* its execution.
*/
private void tryWithFuture(RunnableWithException runnable) {
try {
endTimingRequest();
runnable.run();
} catch (Throwable t) {
resultFuture.completeExceptionally(t);
}
}
private static Duration duration(Timeout timeout) {
return Duration.ofNanos(timeout.timeoutNanos());
}
private void endTimingRequest() {
final long nanosecondsElapsed = System.nanoTime() - requestStarted;
final long millisecondsElapsed = TimeUnit.NANOSECONDS.toMillis(nanosecondsElapsed);
metrics.remoteInvocationLatency(millisecondsElapsed);
}
}