blob: c31348fb3b14fbc14bdd37b5a18612c9bdc643db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.remote;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
import com.google.common.base.Preconditions;
/**
* Table descriptor for remote store backed tables
*
* @param <K> the type of the key
* @param <V> the type of the value
*/
public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
/**
* Tag to be used for provision credits for rate limiting read operations from the remote table.
* Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
* through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
* TableRateLimiter.CreditFunction)}
*/
public static final String RL_READ_TAG = "readTag";
/**
* Tag to be used for provision credits for rate limiting write operations into the remote table.
* Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
* through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
* TableRateLimiter.CreditFunction)} and it needs the write functionality.
*/
public static final String RL_WRITE_TAG = "writeTag";
// Input support for a specific remote store (required)
private TableReadFunction<K, V> readFn;
// Output support for a specific remote store (optional)
private TableWriteFunction<K, V> writeFn;
// Rate limiter for client-side throttling;
// can either be constructed indirectly from rates or overridden by withRateLimiter()
private RateLimiter rateLimiter;
// Rates for constructing the default rate limiter when they are non-zero
private Map<String, Integer> tagCreditsMap = new HashMap<>();
private TableRateLimiter.CreditFunction<K, V> readCreditFn;
private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
private TableRetryPolicy readRetryPolicy;
private TableRetryPolicy writeRetryPolicy;
// By default execute future callbacks on the native client threads
// ie. no additional thread pool for callbacks.
private int asyncCallbackPoolSize = -1;
/**
* Constructs a table descriptor instance
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
*/
public RemoteTableDescriptor(String tableId) {
super(tableId);
}
/**
* Constructs a table descriptor instance
* @param tableId Id of the table, it must confirm to pattern { @literal [\\d\\w-_]+ }
* @param serde the serde for key and value
*/
public RemoteTableDescriptor(String tableId, KVSerde<K, V> serde) {
super(tableId, serde);
}
@Override
public TableSpec getTableSpec() {
validate();
Map<String, String> tableSpecConfig = new HashMap<>();
generateTableSpecConfig(tableSpecConfig);
// Serialize and store reader/writer functions
tableSpecConfig.put(RemoteTableProvider.READ_FN, SerdeUtils.serialize("read function", readFn));
if (writeFn != null) {
tableSpecConfig.put(RemoteTableProvider.WRITE_FN, SerdeUtils.serialize("write function", writeFn));
}
// Serialize the rate limiter if specified
if (!tagCreditsMap.isEmpty()) {
rateLimiter = new EmbeddedTaggedRateLimiter(tagCreditsMap);
}
if (rateLimiter != null) {
tableSpecConfig.put(RemoteTableProvider.RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter));
}
// Serialize the readCredit and writeCredit functions
if (readCreditFn != null) {
tableSpecConfig.put(RemoteTableProvider.READ_CREDIT_FN, SerdeUtils.serialize(
"read credit function", readCreditFn));
}
if (writeCreditFn != null) {
tableSpecConfig.put(RemoteTableProvider.WRITE_CREDIT_FN, SerdeUtils.serialize(
"write credit function", writeCreditFn));
}
if (readRetryPolicy != null) {
tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
"read retry policy", readRetryPolicy));
}
if (writeRetryPolicy != null) {
tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
"write retry policy", writeRetryPolicy));
}
tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));
return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
}
/**
* Use specified TableReadFunction with remote table and a retry policy.
* @param readFn read function instance
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
Preconditions.checkNotNull(readFn, "null read function");
this.readFn = readFn;
return this;
}
/**
* Use specified TableWriteFunction with remote table and a retry policy.
* @param writeFn write function instance
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
Preconditions.checkNotNull(writeFn, "null write function");
this.writeFn = writeFn;
return this;
}
/**
* Use specified TableReadFunction with remote table.
* @param readFn read function instance
* @param retryPolicy retry policy for the read function
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
Preconditions.checkNotNull(readFn, "null read function");
Preconditions.checkNotNull(retryPolicy, "null retry policy");
this.readFn = readFn;
this.readRetryPolicy = retryPolicy;
return this;
}
/**
* Use specified TableWriteFunction with remote table.
* @param writeFn write function instance
* @param retryPolicy retry policy for the write function
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(retryPolicy, "null retry policy");
this.writeFn = writeFn;
this.writeRetryPolicy = retryPolicy;
return this;
}
/**
* Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
* of credits to be charged from the rate limiter for table read and write operations.
* This is an advanced API that provides greater flexibility to throttle each record in the table
* with different number of credits. For most common use-cases eg: limit the number of read/write
* operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
* {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
*
* @param rateLimiter rate limiter instance to be used for throttling
* @param readCreditFn credit function for rate limiting read operations
* @param writeCreditFn credit function for rate limiting write operations
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
TableRateLimiter.CreditFunction<K, V> readCreditFn,
TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
this.rateLimiter = rateLimiter;
this.readCreditFn = readCreditFn;
this.writeCreditFn = writeCreditFn;
return this;
}
/**
* Specify the rate limit for table read operations. If the read rate limit is set with this method
* it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
* TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
* and vice versa.
* @param creditsPerSec rate limit for read operations; must be positive
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
return this;
}
/**
* Specify the rate limit for table write operations. If the write rate limit is set with this method
* it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
* TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
* and vice versa.
* @param creditsPerSec rate limit for write operations; must be positive
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
return this;
}
/**
* Specify the size of the thread pool for the executor used to execute
* callbacks of CompletableFutures of async Table operations. By default, these
* futures are completed (called) by the threads of the native store client. Depending
* on the implementation of the native client, it may or may not allow executing long
* running operations in the callbacks. This config can be used to execute the callbacks
* from a separate executor to decouple from the native client. If configured, this
* thread pool is shared by all read and write operations.
* @param poolSize max number of threads in the executor for async callbacks
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
this.asyncCallbackPoolSize = poolSize;
return this;
}
@Override
protected void validate() {
super.validate();
Preconditions.checkNotNull(readFn, "TableReadFunction is required.");
Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
"Only one of rateLimiter instance or read/write limits can be specified");
// Assume callback executor pool should have no more than 20 threads
Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
"too many threads for async callback executor.");
}
}