blob: cb86cacbed5dca66a09091a8487bf819544e5a2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.caching;
import com.google.common.base.Preconditions;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.table.BaseReadWriteTable;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.utils.TableMetricsUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
/**
* A hybrid table incorporating a cache with a Samza table. The cache is
* represented as a {@link ReadWriteTable}.
*
* The intented use case is to optimize the latency of accessing the actual table, e.g.
* remote tables, when eventual consistency between cache and table is acceptable.
* The cache is expected to support TTL such that the values can be refreshed at some
* point.
*
* {@link CachingTable} supports write-through and write-around (writes bypassing cache) policies.
* For write-through policy, it supports read-after-write semantics because the value is
* cached after written to the table.
*
* Note that there is no synchronization in {@link CachingTable} because it is impossible to
* implement a critical section between table read/write and cache update in the async
* code paths without serializing all async operations for the same keys. Given stale
* data is a presumed trade-off for using a cache with table, it should be acceptable
* for the data in table and cache to be temporarily out-of-sync. Moreover, unsynchronized
* operations in {@link CachingTable} also deliver higher performance when there is contention.
*
* @param <K> type of the table key
* @param <V> type of the table value
*/
public class CachingTable<K, V> extends BaseReadWriteTable<K, V>
implements ReadWriteTable<K, V> {
private final ReadWriteTable<K, V> table;
private final ReadWriteTable<K, V> cache;
private final boolean isWriteAround;
// Common caching stats
private AtomicLong hitCount = new AtomicLong();
private AtomicLong missCount = new AtomicLong();
public CachingTable(String tableId, ReadWriteTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
super(tableId);
this.table = table;
this.cache = cache;
this.isWriteAround = isWriteAround;
}
@Override
public void init(Context context) {
super.init(context);
TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
tableMetricsUtil.newGauge("hit-rate", () -> hitRate());
tableMetricsUtil.newGauge("miss-rate", () -> missRate());
tableMetricsUtil.newGauge("req-count", () -> requestCount());
}
/**
* Lookup the cache and return the keys that are missed in cache
* @param keys keys to be looked up
* @param records result map
* @return list of keys missed in the cache
*/
private List<K> lookupCache(List<K> keys, Map<K, V> records, Object ... args) {
List<K> missKeys = new ArrayList<>();
records.putAll(cache.getAll(keys, args));
keys.forEach(k -> {
if (!records.containsKey(k)) {
missKeys.add(k);
}
});
return missKeys;
}
@Override
public V get(K key, Object ... args) {
try {
return getAsync(key, args).get();
} catch (Exception e) {
throw new SamzaException(e);
}
}
@Override
public CompletableFuture<V> getAsync(K key, Object ... args) {
incCounter(metrics.numGets);
V value = cache.get(key, args);
if (value != null) {
hitCount.incrementAndGet();
return CompletableFuture.completedFuture(value);
}
long startNs = clock.nanoTime();
missCount.incrementAndGet();
return table.getAsync(key, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to get the record for " + key, e);
} else {
if (result != null) {
cache.put(key, result, args);
}
updateTimer(metrics.getNs, clock.nanoTime() - startNs);
return result;
}
});
}
@Override
public Map<K, V> getAll(List<K> keys, Object ... args) {
try {
return getAllAsync(keys, args).get();
} catch (Exception e) {
throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys, Object ... args) {
incCounter(metrics.numGetAlls);
// Make a copy of entries which might be immutable
Map<K, V> getAllResult = new HashMap<>();
List<K> missingKeys = lookupCache(keys, getAllResult);
if (missingKeys.isEmpty()) {
return CompletableFuture.completedFuture(getAllResult);
}
long startNs = clock.nanoTime();
return table.getAllAsync(missingKeys, args).handle((records, e) -> {
if (e != null) {
throw new SamzaException("Failed to get records for " + keys, e);
} else {
if (records != null) {
cache.putAll(records.entrySet().stream()
.map(r -> new Entry<>(r.getKey(), r.getValue()))
.collect(Collectors.toList()), args);
getAllResult.putAll(records);
}
updateTimer(metrics.getAllNs, clock.nanoTime() - startNs);
return getAllResult;
}
});
}
@Override
public void put(K key, V value, Object ... args) {
try {
putAsync(key, value, args).get();
} catch (Exception e) {
throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> putAsync(K key, V value, Object ... args) {
incCounter(metrics.numPuts);
Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
long startNs = clock.nanoTime();
return table.putAsync(key, value, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e);
} else if (!isWriteAround) {
if (value == null) {
cache.delete(key, args);
} else {
cache.put(key, value, args);
}
}
updateTimer(metrics.putNs, clock.nanoTime() - startNs);
return result;
});
}
@Override
public void putAll(List<Entry<K, V>> records, Object ... args) {
try {
putAllAsync(records, args).get();
} catch (Exception e) {
throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records, Object ... args) {
incCounter(metrics.numPutAlls);
long startNs = clock.nanoTime();
Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
return table.putAllAsync(records, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to put records " + records, e);
} else if (!isWriteAround) {
cache.putAll(records, args);
}
updateTimer(metrics.putAllNs, clock.nanoTime() - startNs);
return result;
});
}
@Override
public void delete(K key, Object ... args) {
try {
deleteAsync(key, args).get();
} catch (Exception e) {
throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> deleteAsync(K key, Object ... args) {
incCounter(metrics.numDeletes);
long startNs = clock.nanoTime();
Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
return table.deleteAsync(key, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to delete the record for " + key, e);
} else if (!isWriteAround) {
cache.delete(key, args);
}
updateTimer(metrics.deleteNs, clock.nanoTime() - startNs);
return result;
});
}
@Override
public void deleteAll(List<K> keys, Object ... args) {
try {
deleteAllAsync(keys, args).get();
} catch (Exception e) {
throw new SamzaException(e);
}
}
@Override
public CompletableFuture<Void> deleteAllAsync(List<K> keys, Object ... args) {
incCounter(metrics.numDeleteAlls);
long startNs = clock.nanoTime();
Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
return table.deleteAllAsync(keys, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to delete the record for " + keys, e);
} else if (!isWriteAround) {
cache.deleteAll(keys, args);
}
updateTimer(metrics.deleteAllNs, clock.nanoTime() - startNs);
return result;
});
}
@Override
public <T> CompletableFuture<T> readAsync(int opId, Object... args) {
incCounter(metrics.numReads);
long startNs = clock.nanoTime();
return table.readAsync(opId, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to read, opId=" + opId, e);
}
updateTimer(metrics.readNs, clock.nanoTime() - startNs);
return (T) result;
});
}
@Override
public <T> CompletableFuture<T> writeAsync(int opId, Object... args) {
incCounter(metrics.numWrites);
long startNs = clock.nanoTime();
return table.writeAsync(opId, args).handle((result, e) -> {
if (e != null) {
throw new SamzaException("Failed to write, opId=" + opId, e);
}
updateTimer(metrics.writeNs, clock.nanoTime() - startNs);
return (T) result;
});
}
@Override
public synchronized void flush() {
incCounter(metrics.numFlushes);
long startNs = clock.nanoTime();
Preconditions.checkNotNull(table, "Cannot flush a read-only table: " + table);
table.flush();
updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
}
@Override
public void close() {
cache.close();
table.close();
}
double hitRate() {
long reqs = requestCount();
return reqs == 0 ? 1.0 : (double) hitCount.get() / reqs;
}
double missRate() {
long reqs = requestCount();
return reqs == 0 ? 1.0 : (double) missCount.get() / reqs;
}
long requestCount() {
return hitCount.get() + missCount.get();
}
}