blob: 4896e93efbec5e91a9cd050091a85c39200a911d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table.caching;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.hybrid.BaseHybridTableDescriptor;
import com.google.common.base.Preconditions;
/**
* Table descriptor for {@link CachingTable}.
* @param <K> type of the key in the cache
* @param <V> type of the value in the cache
*/
public class CachingTableDescriptor<K, V> extends BaseHybridTableDescriptor<K, V, CachingTableDescriptor<K, V>> {
private Duration readTtl;
private Duration writeTtl;
private long cacheSize;
private TableDescriptor<K, V, ?> cache;
private TableDescriptor<K, V, ?> table;
private boolean isWriteAround;
/**
* {@inheritDoc}
*/
public CachingTableDescriptor(String tableId) {
super(tableId);
}
@Override
public List<? extends TableDescriptor<K, V, ?>> getTableDescriptors() {
return cache != null
? Arrays.asList(cache, table)
: Arrays.asList(table);
}
@Override
public TableSpec getTableSpec() {
validate();
Map<String, String> tableSpecConfig = new HashMap<>();
generateTableSpecConfig(tableSpecConfig);
if (cache != null) {
tableSpecConfig.put(CachingTableProvider.CACHE_TABLE_ID, ((BaseTableDescriptor) cache).getTableSpec().getId());
} else {
if (readTtl != null) {
tableSpecConfig.put(CachingTableProvider.READ_TTL_MS, String.valueOf(readTtl.toMillis()));
}
if (writeTtl != null) {
tableSpecConfig.put(CachingTableProvider.WRITE_TTL_MS, String.valueOf(writeTtl.toMillis()));
}
if (cacheSize > 0) {
tableSpecConfig.put(CachingTableProvider.CACHE_SIZE, String.valueOf(cacheSize));
}
}
tableSpecConfig.put(CachingTableProvider.REAL_TABLE_ID, ((BaseTableDescriptor) table).getTableSpec().getId());
tableSpecConfig.put(CachingTableProvider.WRITE_AROUND, String.valueOf(isWriteAround));
return new TableSpec(tableId, serde, CachingTableProviderFactory.class.getName(), tableSpecConfig);
}
/**
* Specify a cache (as Table descriptor) to be used for caching.
* Cache get is not synchronized with put for better parallelism in the read path
* of {@link CachingTable}. As such, cache table implementation is expected to be
* thread-safe for concurrent accesses.
* @param cache cache table descriptor
* @return this descriptor
*/
public CachingTableDescriptor withCache(TableDescriptor<K, V, ?> cache) {
this.cache = cache;
return this;
}
/**
* Specify the target table descriptor for the actual table input/output.
* @param table the target table descriptor
* @return this descriptor
*/
public CachingTableDescriptor withTable(TableDescriptor<K, V, ?> table) {
this.table = table;
return this;
}
/**
* Specify the TTL for each read access, ie. record is expired after
* the TTL duration since last read access of each key.
* @param readTtl read TTL
* @return this descriptor
*/
public CachingTableDescriptor withReadTtl(Duration readTtl) {
this.readTtl = readTtl;
return this;
}
/**
* Specify the TTL for each write access, ie. record is expired after
* the TTL duration since last write access of each key.
* @param writeTtl write TTL
* @return this descriptor
*/
public CachingTableDescriptor withWriteTtl(Duration writeTtl) {
this.writeTtl = writeTtl;
return this;
}
/**
* Specify the max cache size for size-based eviction.
* @param cacheSize max size of the cache
* @return this descriptor
*/
public CachingTableDescriptor withCacheSize(long cacheSize) {
this.cacheSize = cacheSize;
return this;
}
/**
* Specify if write-around policy should be used to bypass writing
* to cache for put operations. This is useful when put() is the
* dominant operation and get() has no locality with recent puts.
* @return this descriptor
*/
public CachingTableDescriptor withWriteAround() {
this.isWriteAround = true;
return this;
}
@Override
protected void validate() {
super.validate();
Preconditions.checkNotNull(table, "Actual table is required.");
if (cache == null) {
Preconditions.checkNotNull(readTtl, "readTtl must be specified.");
} else {
Preconditions.checkArgument(readTtl == null && writeTtl == null && cacheSize == 0,
"Invalid to specify both {cache} and {readTtl|writeTtl|cacheSize} at the same time.");
}
}
}