blob: 4c45dded2d2bf9edd04a9105460c0ec75c0462df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.descriptors;
import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.table.batching.BatchProvider;
import org.apache.samza.table.remote.TablePart;
import org.apache.samza.table.remote.TableRateLimiter;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.table.remote.TableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.util.RateLimiter;
import com.google.common.base.Preconditions;
/**
* Table descriptor for remote store backed tables
*
* @param <K> the type of the key
* @param <V> the type of the value
*/
public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, RemoteTableDescriptor<K, V>> {
public static final String PROVIDER_FACTORY_CLASS_NAME = "org.apache.samza.table.remote.RemoteTableProviderFactory";
public static final String DEFAULT_RATE_LIMITER_CLASS_NAME = "org.apache.samza.util.EmbeddedTaggedRateLimiter";
/**
* Tag to be used for provision credits for rate limiting read operations from the remote table.
* Caller must pre-populate the credits with this tag when specifying a custom rate limiter instance
* through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
* TableRateLimiter.CreditFunction)}
*/
public static final String RL_READ_TAG = "readTag";
/**
* Tag to be used for provision credits for rate limiting write operations into the remote table.
* Caller can optionally populate the credits with this tag when specifying a custom rate limiter instance
* through {@link RemoteTableDescriptor#withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction,
* TableRateLimiter.CreditFunction)} and it needs the write functionality.
*/
public static final String RL_WRITE_TAG = "writeTag";
public static final String READ_FN = "io.read.func";
public static final String WRITE_FN = "io.write.func";
public static final String RATE_LIMITER = "io.ratelimiter";
//Key name for table api read rate limit
public static final String READ_CREDITS = "io.read.credits";
//Key name for table api write rate limit
public static final String WRITE_CREDITS = "io.write.credits";
public static final String READ_CREDIT_FN = "io.read.credit.func";
public static final String WRITE_CREDIT_FN = "io.write.credit.func";
public static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
public static final String READ_RETRY_POLICY = "io.read.retry.policy";
public static final String WRITE_RETRY_POLICY = "io.write.retry.policy";
public static final String BATCH_PROVIDER = "io.batch.provider";
// Input support for a specific remote store (optional)
private TableReadFunction<K, V> readFn;
// Output support for a specific remote store (optional)
private TableWriteFunction<K, V> writeFn;
// Rate limiter for client-side throttling; it is set by withRateLimiter()
private RateLimiter rateLimiter;
// Indicate whether read rate limiter is enabled or not
private boolean enableReadRateLimiter = true;
// Indicate whether write rate limiter is enabled or not
private boolean enableWriteRateLimiter = true;
// Batching support to reduce traffic volume sent to the remote store.
private BatchProvider<K, V> batchProvider;
// Rates for constructing the default rate limiter when they are non-zero
private Map<String, Integer> tagCreditsMap = new HashMap<>();
private TableRateLimiter.CreditFunction<K, V> readCreditFn;
private TableRateLimiter.CreditFunction<K, V> writeCreditFn;
private TableRetryPolicy readRetryPolicy;
private TableRetryPolicy writeRetryPolicy;
// By default execute future callbacks on the native client threads
// ie. no additional thread pool for callbacks.
private int asyncCallbackPoolSize = -1;
/**
* Constructs a table descriptor instance
* @param tableId Id of the table, it must conform to pattern {@literal [\\d\\w-_]+}
*/
public RemoteTableDescriptor(String tableId) {
super(tableId);
}
/**
* Use specified TableReadFunction with remote table and a retry policy.
* @param readFn read function instance
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn) {
Preconditions.checkNotNull(readFn, "null read function");
this.readFn = readFn;
return this;
}
/**
* Use specified TableWriteFunction with remote table and a retry policy.
* @param writeFn write function instance
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn) {
Preconditions.checkNotNull(writeFn, "null write function");
this.writeFn = writeFn;
return this;
}
/**
* Use specified {@link TableRetryPolicy} with the {@link TableReadFunction}.
* @param retryPolicy retry policy for the write function
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadRetryPolicy(TableRetryPolicy retryPolicy) {
Preconditions.checkNotNull(readFn, "null read function");
Preconditions.checkNotNull(retryPolicy, "null retry policy");
this.readRetryPolicy = retryPolicy;
return this;
}
/**
* Use specified {@link TableRetryPolicy} with the {@link TableWriteFunction}.
* @param retryPolicy retry policy for the write function
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteRetryPolicy(TableRetryPolicy retryPolicy) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(retryPolicy, "null retry policy");
this.writeRetryPolicy = retryPolicy;
return this;
}
/**
* Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
* of credits to be charged from the rate limiter for table read and write operations.
* This is an advanced API that provides greater flexibility to throttle each record in the table
* with different number of credits. For most common use-cases eg: limit the number of read/write
* operations, please instead use the {@link RemoteTableDescriptor#withReadRateLimit(int)} and
* {@link RemoteTableDescriptor#withWriteRateLimit(int)}.
*
* @param rateLimiter rate limiter instance to be used for throttling
* @param readCreditFn credit function for rate limiting read operations
* @param writeCreditFn credit function for rate limiting write operations
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withRateLimiter(RateLimiter rateLimiter,
TableRateLimiter.CreditFunction<K, V> readCreditFn,
TableRateLimiter.CreditFunction<K, V> writeCreditFn) {
Preconditions.checkNotNull(rateLimiter, "null read rate limiter");
this.rateLimiter = rateLimiter;
this.readCreditFn = readCreditFn;
this.writeCreditFn = writeCreditFn;
return this;
}
/**
* Disable both read and write rate limiter. If the read rate limiter is enabled, the user must provide a rate limiter
* by calling {@link #withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
* or {@link #withReadRateLimit(int)}. If the write rate limiter is enabled, the user must provide a rate limiter
* by calling {@link #withRateLimiter(RateLimiter, TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
* or {@link #withWriteRateLimit(int)}. By default, both read and write rate limiters are enabled.
*
* @return this table descriptor instance.
*/
public RemoteTableDescriptor<K, V> withRateLimiterDisabled() {
withReadRateLimiterDisabled();
withWriteRateLimiterDisabled();
return this;
}
/**
* Disable the read rate limiter.
*
* @return this table descriptor instance.
*/
public RemoteTableDescriptor<K, V> withReadRateLimiterDisabled() {
this.enableReadRateLimiter = false;
return this;
}
/**
* Disable the write rate limiter.
*
* @return this table descriptor instance.
*/
public RemoteTableDescriptor<K, V> withWriteRateLimiterDisabled() {
this.enableWriteRateLimiter = false;
return this;
}
/**
* Specify the rate limit for table read operations. If the read rate limit is set with this method
* it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
* TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
* and vice versa.
* Note that this is the total credit of rate limit for the entire job, each task will get a per task
* credit of creditsPerSec/tasksCount. Hence creditsPerSec should be greater than total number of tasks.
* @param creditsPerSec rate limit for read operations; must be positive and greater than total number tasks
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadRateLimit(int creditsPerSec) {
Preconditions.checkArgument(creditsPerSec > 0, "Max read rate must be a positive number.");
tagCreditsMap.put(RL_READ_TAG, creditsPerSec);
return this;
}
/**
* Specify the rate limit for table write operations. If the write rate limit is set with this method
* it is invalid to call {@link RemoteTableDescriptor#withRateLimiter(RateLimiter,
* TableRateLimiter.CreditFunction, TableRateLimiter.CreditFunction)}
* and vice versa.
* Note that this is the total credit of rate limit for the entire job, each task will get a per task
* credit of creditsPerSec/tasksCount. Hence creditsPerSec should be greater than total number of tasks.
* @param creditsPerSec rate limit for write operations; must be positive and greater than total number tasks
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteRateLimit(int creditsPerSec) {
Preconditions.checkArgument(creditsPerSec > 0, "Max write rate must be a positive number.");
tagCreditsMap.put(RL_WRITE_TAG, creditsPerSec);
return this;
}
/**
* Specify the size of the thread pool for the executor used to execute
* callbacks of CompletableFutures of async Table operations. By default, these
* futures are completed (called) by the threads of the native store client. Depending
* on the implementation of the native client, it may or may not allow executing long
* running operations in the callbacks. This config can be used to execute the callbacks
* from a separate executor to decouple from the native client. If configured, this
* thread pool is shared by all read and write operations.
* @param poolSize max number of threads in the executor for async callbacks
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withAsyncCallbackExecutorPoolSize(int poolSize) {
this.asyncCallbackPoolSize = poolSize;
return this;
}
public RemoteTableDescriptor<K, V> withBatchProvider(BatchProvider<K, V> batchProvider) {
this.batchProvider = batchProvider;
return this;
}
@Override
public String getProviderFactoryClassName() {
return PROVIDER_FACTORY_CLASS_NAME;
}
@Override
public Map<String, String> toConfig(Config jobConfig) {
Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
writeRateLimiterConfig(jobConfig, tableConfig);
// Handle readCredit functions
if (readCreditFn != null) {
addTableConfig(READ_CREDIT_FN, SerdeUtils.serialize("read credit function", readCreditFn), tableConfig);
addTablePartConfig(READ_CREDIT_FN, readCreditFn, jobConfig, tableConfig);
}
// Handle writeCredit functions
if (writeCreditFn != null) {
addTableConfig(WRITE_CREDIT_FN, SerdeUtils.serialize("write credit function", writeCreditFn), tableConfig);
addTablePartConfig(WRITE_CREDIT_FN, writeCreditFn, jobConfig, tableConfig);
}
// Handle read retry policy
if (readRetryPolicy != null) {
addTableConfig(READ_RETRY_POLICY, SerdeUtils.serialize("read retry policy", readRetryPolicy), tableConfig);
addTablePartConfig(READ_RETRY_POLICY, readRetryPolicy, jobConfig, tableConfig);
}
// Handle write retry policy
if (writeRetryPolicy != null) {
addTableConfig(WRITE_RETRY_POLICY, SerdeUtils.serialize("write retry policy", writeRetryPolicy), tableConfig);
addTablePartConfig(WRITE_RETRY_POLICY, writeRetryPolicy, jobConfig, tableConfig);
}
addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig);
// Handle table reader function
if (readFn != null) {
addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
addTablePartConfig(READ_FN, readFn, jobConfig, tableConfig);
}
// Handle table write function
if (writeFn != null) {
addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
addTablePartConfig(WRITE_FN, writeFn, jobConfig, tableConfig);
}
if (batchProvider != null) {
addTableConfig(BATCH_PROVIDER, SerdeUtils.serialize("batch provider", batchProvider), tableConfig);
addTablePartConfig(BATCH_PROVIDER, batchProvider, jobConfig, tableConfig);
}
return Collections.unmodifiableMap(tableConfig);
}
// Handle rate limiter
private void writeRateLimiterConfig(Config jobConfig, Map<String, String> tableConfig) {
if (!tagCreditsMap.isEmpty()) {
RateLimiter defaultRateLimiter;
try {
@SuppressWarnings("unchecked")
Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
defaultRateLimiter = ctor.newInstance(tagCreditsMap);
} catch (Exception ex) {
throw new SamzaException("Failed to create default rate limiter", ex);
}
addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
if (defaultRateLimiter instanceof TablePart) {
addTablePartConfig(RATE_LIMITER, (TablePart) defaultRateLimiter, jobConfig, tableConfig);
}
} else if (rateLimiter != null) {
addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
if (rateLimiter instanceof TablePart) {
addTablePartConfig(RATE_LIMITER, (TablePart) rateLimiter, jobConfig, tableConfig);
}
}
//Write table api read/write rate limit
if (this.enableReadRateLimiter && tagCreditsMap.containsKey(RL_READ_TAG)) {
addTableConfig(READ_CREDITS, String.valueOf(tagCreditsMap.get(RL_READ_TAG)), tableConfig);
}
if (this.enableWriteRateLimiter && tagCreditsMap.containsKey(RL_WRITE_TAG)) {
addTableConfig(WRITE_CREDITS, String.valueOf(tagCreditsMap.get(RL_WRITE_TAG)), tableConfig);
}
}
@Override
protected void validate() {
Preconditions.checkArgument(writeFn != null || readFn != null,
"Must have one of TableReadFunction or TableWriteFunction");
Preconditions.checkArgument(rateLimiter == null || tagCreditsMap.isEmpty(),
"Only one of rateLimiter instance or read/write limits can be specified");
// Assume callback executor pool should have no more than 20 threads
Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
"too many threads for async callback executor.");
if (readFn != null && enableReadRateLimiter) {
Preconditions.checkArgument(readCreditFn != null || tagCreditsMap.containsKey(RL_READ_TAG),
"Read rate limiter is enabled, there is no read rate limiter configured.");
}
if (writeFn != null && enableWriteRateLimiter) {
Preconditions.checkArgument(writeCreditFn != null || tagCreditsMap.containsKey(RL_WRITE_TAG),
"Write rate limiter is enabled, there is no write rate limiter configured.");
}
}
/**
* Helper method to add table part config items to table configuration
* @param tablePartKey key of the table part
* @param tablePart table part
* @param jobConfig job configuration
* @param tableConfig table configuration
*/
protected void addTablePartConfig(String tablePartKey, TablePart tablePart, Config jobConfig,
Map<String, String> tableConfig) {
tablePart.toConfig(jobConfig, new MapConfig(tableConfig))
.forEach((k, v) -> addTableConfig(String.format("%s.%s", tablePartKey, k), v, tableConfig));
}
}