blob: 6d6c23adcaa91aa8dbf14abc40a2b73153a5def9 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.table.remote;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
import org.apache.samza.table.AsyncReadWriteTable;
import org.apache.samza.table.BaseReadWriteTable;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.batching.BatchProvider;
import org.apache.samza.table.batching.AsyncBatchingTable;
import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
import org.apache.samza.table.retry.AsyncRetriableTable;
import org.apache.samza.table.retry.TableRetryPolicy;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
* A Samza {@link ReadWriteTable} backed by a remote data-store or service.
* <p>
* Many stream-processing applications require to look-up data from remote data sources eg: databases,
* web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
* naturally modeled as a join between the incoming stream and a table.
* <p>
* Example use-cases include:
* <ul>
* <li> Augmenting a stream of "page-views" with information from a database of user-profiles; </li>
* <li> Scoring page views with impressions services. </li>
* <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
* </ul>
* <p>
* A {@link RemoteTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
* which encapsulate the functionality of reading and writing data to the remote service. These provide a
* pluggable means to specify I/O operations on the table.
* For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter.
* Optionally, an executor can be specified for invoking the future callbacks which otherwise are
* executed on the threads of the underlying native data store client. This could be useful when
* application might execute long-running operations upon future completions; another use case is to increase
* throughput with more parallelism in the callback executions.
* @param <K> the type of the key in this table
* @param <V> the type of the value in this table
public final class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
implements ReadWriteTable<K, V>, AsyncReadWriteTable<K, V> {
// Read/write functions
protected final TableReadFunction<K, V> readFn;
protected final TableWriteFunction<K, V> writeFn;
// Rate limiting
protected final TableRateLimiter<K, V> readRateLimiter;
protected final TableRateLimiter<K, V> writeRateLimiter;
protected final ExecutorService rateLimitingExecutor;
// Retries
protected final TableRetryPolicy readRetryPolicy;
protected final TableRetryPolicy writeRetryPolicy;
protected final ScheduledExecutorService retryExecutor;
// batch
protected final BatchProvider<K, V> batchProvider;
protected final ScheduledExecutorService batchExecutor;
// Other
protected final ExecutorService callbackExecutor;
// The async table to delegate to
protected final AsyncReadWriteTable<K, V> asyncTable;
* Construct a RemoteTable instance
* @param tableId table id
* @param readFn {@link TableReadFunction} for read operations
* @param writeFn {@link TableWriteFunction} for read operations
* @param readRateLimiter helper for read rate limiting
* @param writeRateLimiter helper for write rate limiting
* @param rateLimitingExecutor executor for executing rate limiting
* @param readRetryPolicy read retry policy
* @param writeRetryPolicy write retry policy
* @param retryExecutor executor for invoking retries
* @param batchProvider batch provider to create a batch instance
* @param batchExecutor scheduled executor for batch
* @param callbackExecutor executor for invoking async callbacks
public RemoteTable(
String tableId,
TableReadFunction readFn,
TableWriteFunction writeFn,
TableRateLimiter<K, V> readRateLimiter,
TableRateLimiter<K, V> writeRateLimiter,
ExecutorService rateLimitingExecutor,
TableRetryPolicy readRetryPolicy,
TableRetryPolicy writeRetryPolicy,
ScheduledExecutorService retryExecutor,
BatchProvider<K, V> batchProvider,
ScheduledExecutorService batchExecutor,
ExecutorService callbackExecutor) {
Preconditions.checkArgument(writeFn != null || readFn != null,
"Must have one of TableReadFunction or TableWriteFunction");
this.readFn = readFn;
this.writeFn = writeFn;
this.readRateLimiter = readRateLimiter;
this.writeRateLimiter = writeRateLimiter;
this.rateLimitingExecutor = rateLimitingExecutor;
this.readRetryPolicy = readRetryPolicy;
this.writeRetryPolicy = writeRetryPolicy;
this.callbackExecutor = callbackExecutor;
this.retryExecutor = retryExecutor;
this.batchProvider = batchProvider;
this.batchExecutor = batchExecutor;
AsyncReadWriteTable table = new AsyncRemoteTable(readFn, writeFn);
if (readRateLimiter != null || writeRateLimiter != null) {
table = new AsyncRateLimitedTable(tableId, table, readRateLimiter, writeRateLimiter, rateLimitingExecutor);
if (readRetryPolicy != null || writeRetryPolicy != null) {
table = new AsyncRetriableTable(tableId, table, readRetryPolicy, writeRetryPolicy, retryExecutor, readFn, writeFn);
if (batchProvider != null) {
table = new AsyncBatchingTable(tableId, table, batchProvider, batchExecutor);
asyncTable = table;
public V get(K key, Object ... args) {
try {
return getAsync(key, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public CompletableFuture<V> getAsync(K key, Object ... args) {
Preconditions.checkNotNull(key, "null key");
return instrument(() -> asyncTable.getAsync(key, args), metrics.numGets, metrics.getNs)
.handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to get the records for " + key, e);
if (result == null) {
return result;
public Map<K, V> getAll(List<K> keys, Object ... args) {
try {
return getAllAsync(keys, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object ... args) {
Preconditions.checkNotNull(keys, "null keys");
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
return instrument(() -> asyncTable.getAllAsync(keys, args), metrics.numGetAlls, metrics.getAllNs)
.handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to get the records for " + keys, e);
result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
return result;
public <T> T read(int opId, Object ... args) {
try {
return (T) readAsync(opId, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public <T> CompletableFuture<T> readAsync(int opId, Object ... args) {
return (CompletableFuture<T>) instrument(() -> asyncTable.readAsync(opId, args), metrics.numReads, metrics.readNs)
.exceptionally(e -> {
throw new SamzaException(String.format("Failed to read, opId=%d", opId), e);
public void put(K key, V value, Object ... args) {
try {
putAsync(key, value, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public CompletableFuture<Void> putAsync(K key, V value, Object ... args) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(key, "null key");
if (value == null) {
return deleteAsync(key, args);
return instrument(() -> asyncTable.putAsync(key, value, args), metrics.numPuts, metrics.putNs)
.exceptionally(e -> {
throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
public void putAll(List<Entry<K, V>> entries, Object ... args) {
try {
putAllAsync(entries, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records, Object ... args) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(records, "null records");
if (records.isEmpty()) {
return CompletableFuture.completedFuture(null);
List<K> deleteKeys =
.filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
List<Entry<K, V>> putRecords =
.filter(e -> e.getValue() != null).collect(Collectors.toList());
CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
? CompletableFuture.completedFuture(null)
: deleteAllAsync(deleteKeys, args);
// Return the combined future
return CompletableFuture.allOf(
instrument(() -> asyncTable.putAllAsync(putRecords, args), metrics.numPutAlls, metrics.putAllNs))
.exceptionally(e -> {
String strKeys = -> r.getKey().toString()).collect(Collectors.joining(","));
throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
public void delete(K key, Object ... args) {
try {
deleteAsync(key, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public CompletableFuture<Void> deleteAsync(K key, Object ... args) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(key, "null key");
return instrument(() -> asyncTable.deleteAsync(key, args), metrics.numDeletes, metrics.deleteNs)
.exceptionally(e -> {
throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
public void deleteAll(List<K> keys, Object ... args) {
try {
deleteAllAsync(keys, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public CompletableFuture<Void> deleteAllAsync(List<K> keys, Object ... args) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(keys, "null keys");
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(null);
return instrument(() -> asyncTable.deleteAllAsync(keys, args), metrics.numDeleteAlls, metrics.deleteAllNs)
.exceptionally(e -> {
throw new SamzaException(String.format("Failed to delete records for " + keys), e);
public <T> T write(int opId, Object ... args) {
try {
return (T) writeAsync(opId, args).get();
} catch (Exception e) {
throw new SamzaException(e);
public <T> CompletableFuture<T> writeAsync(int opId, Object... args) {
return (CompletableFuture<T>) instrument(() -> asyncTable.writeAsync(opId, args), metrics.numWrites, metrics.writeNs)
.exceptionally(e -> {
throw new SamzaException(String.format("Failed to write, opId=%d", opId), e);
public void init(Context context) {
if (readFn != null) {
readFn.init(context, this);
if (writeFn != null) {
writeFn.init(context, this);
public void flush() {
try {
long startNs = clock.nanoTime();
updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
} catch (Exception e) {
String errMsg = "Failed to flush remote store";
logger.error(errMsg, e);
throw new SamzaException(errMsg, e);
public void close() {
public TableReadFunction<K, V> getReadFunction() {
return readFn;
public TableWriteFunction<K, V> getWriteFunction() {
return writeFn;
protected <T> CompletableFuture<T> instrument(Func1<T> func, Counter counter, Timer timer) {
final long startNs = clock.nanoTime();
CompletableFuture<T> ioFuture = func.apply();
if (callbackExecutor != null) {
ioFuture.thenApplyAsync(r -> {
updateTimer(timer, clock.nanoTime() - startNs);
return r;
}, callbackExecutor);
} else {
ioFuture.thenApply(r -> {
updateTimer(timer, clock.nanoTime() - startNs);
return r;
return ioFuture;