blob: 0221c5cda67b15848a5dab63a8f3ef6e81ce3516 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.flink.core.nettyclient;
import static java.util.Optional.ofNullable;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter;
public final class NettyRequestReplySpec {
// property names in the spec
public static final String CALL_TIMEOUT_PROPERTY = "call";
public static final String CONNECT_TIMEOUT_PROPERTY = "connect";
public static final String POOLED_CONNECTION_TTL_PROPERTY = "pool_ttl";
public static final String CONNECTION_POOL_MAX_SIZE_PROPERTY = "pool_size";
public static final String MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES_PROPERTY = "payload_max_bytes";
public static final String TRUST_CA_CERTS_PROPERTY = "trust_cacerts";
public static final String CLIENT_CERT_PROPERTY = "client_cert";
public static final String CLIENT_KEY_PROPERTY = "client_key";
public static final String CLIENT_KEY_PASSWORD_PROPERTY = "client_key_password";
public static final String TIMEOUTS_PROPERTY = "timeouts";
// spec default values
@VisibleForTesting public static final Duration DEFAULT_CALL_TIMEOUT = Duration.ofMinutes(2);
@VisibleForTesting public static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(20);
@VisibleForTesting
public static final Duration DEFAULT_POOLED_CONNECTION_TTL = Duration.ofSeconds(15);
@VisibleForTesting public static final int DEFAULT_CONNECTION_POOL_MAX_SIZE = 1024;
@VisibleForTesting
public static final int DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES = 32 * 1048576;
// spec values
public final Duration callTimeout;
public final Duration connectTimeout;
public final Duration pooledConnectionTTL;
public final int connectionPoolMaxSize;
public final int maxRequestOrResponseSizeInBytes;
private final String trustedCaCerts;
private final String clientCerts;
private final String clientKey;
private final String clientKeyPassword;
public NettyRequestReplySpec(
@JsonProperty(CALL_TIMEOUT_PROPERTY) Duration callTimeout,
@JsonProperty(CONNECT_TIMEOUT_PROPERTY) Duration connectTimeout,
@JsonProperty(POOLED_CONNECTION_TTL_PROPERTY) Duration pooledConnectionTTL,
@JsonProperty(CONNECTION_POOL_MAX_SIZE_PROPERTY) Integer connectionPoolMaxSize,
@JsonProperty(MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES_PROPERTY)
Integer maxRequestOrResponseSizeInBytes,
@JsonProperty(TRUST_CA_CERTS_PROPERTY) String trustedCaCerts,
@JsonProperty(CLIENT_CERT_PROPERTY) String clientCerts,
@JsonProperty(CLIENT_KEY_PROPERTY) String clientKey,
@JsonProperty(CLIENT_KEY_PASSWORD_PROPERTY) String clientKeyPassword,
@JsonProperty(TIMEOUTS_PROPERTY) Timeouts timeouts) {
this.trustedCaCerts = trustedCaCerts;
this.clientCerts = clientCerts;
this.clientKey = clientKey;
this.clientKeyPassword = clientKeyPassword;
this.callTimeout =
firstPresentOrDefault(
ofNullable(timeouts).map(Timeouts::getCallTimeout),
ofNullable(callTimeout),
() -> DEFAULT_CALL_TIMEOUT);
this.connectTimeout =
firstPresentOrDefault(
ofNullable(timeouts).map(Timeouts::getConnectTimeout),
ofNullable(connectTimeout),
() -> DEFAULT_CONNECT_TIMEOUT);
this.pooledConnectionTTL =
ofNullable(pooledConnectionTTL).orElse(DEFAULT_POOLED_CONNECTION_TTL);
this.connectionPoolMaxSize =
ofNullable(connectionPoolMaxSize).orElse(DEFAULT_CONNECTION_POOL_MAX_SIZE);
this.maxRequestOrResponseSizeInBytes =
ofNullable(maxRequestOrResponseSizeInBytes)
.orElse(DEFAULT_MAX_REQUEST_OR_RESPONSE_SIZE_IN_BYTES);
}
public Optional<String> getTrustedCaCerts() {
return Optional.ofNullable(trustedCaCerts);
}
public Optional<String> getClientCerts() {
return Optional.ofNullable(clientCerts);
}
public Optional<String> getClientKey() {
return Optional.ofNullable(clientKey);
}
public Optional<String> getClientKeyPassword() {
return Optional.ofNullable(clientKeyPassword);
}
/**
* This is a copy of {@linkplain
* org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec.Timeouts}, to
* ease the migration from the {@code DefaultHttpRequestReplyClientFactory}.
*/
public static final class Timeouts {
private static final Duration DEFAULT_HTTP_TIMEOUT = Duration.ofMinutes(1);
private static final Duration DEFAULT_HTTP_CONNECT_TIMEOUT = Duration.ofSeconds(10);
private Duration callTimeout = DEFAULT_HTTP_TIMEOUT;
private Duration connectTimeout = DEFAULT_HTTP_CONNECT_TIMEOUT;
@JsonSetter("call")
public void setCallTimeout(Duration callTimeout) {
this.callTimeout = requireNonZeroDuration(callTimeout);
}
@JsonSetter("connect")
public void setConnectTimeout(Duration connectTimeout) {
this.connectTimeout = requireNonZeroDuration(connectTimeout);
}
public Duration getCallTimeout() {
return callTimeout;
}
public Duration getConnectTimeout() {
return connectTimeout;
}
private static Duration requireNonZeroDuration(Duration duration) {
Objects.requireNonNull(duration);
if (duration.equals(Duration.ZERO)) {
throw new IllegalArgumentException("Timeout durations must be larger than 0.");
}
return duration;
}
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private static <T> T firstPresentOrDefault(Optional<T> a, Optional<T> b, Supplier<T> orElse) {
return a.orElseGet(() -> b.orElseGet(orElse));
}
}