blob: 4e281876b0767f384f272837076b10a47786286b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import java.nio.ByteBuffer;
/**
* Factory for creating state stores in Kafka Streams.
*/
public class Stores {
/**
* Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
*
* @param name the name of the store
* @return the factory that can be used to specify other options or configurations for the store; never null
*/
public static StoreFactory create(final String name) {
return new StoreFactory() {
@Override
public <K> ValueFactory<K> withKeys(final Serde<K> keySerde) {
return new ValueFactory<K>() {
@Override
public <V> KeyValueFactory<K, V> withValues(final Serde<V> valueSerde) {
return new KeyValueFactory<K, V>() {
@Override
public InMemoryKeyValueFactory<K, V> inMemory() {
return new InMemoryKeyValueFactory<K, V>() {
private int capacity = Integer.MAX_VALUE;
@Override
public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) {
if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive");
this.capacity = capacity;
return this;
}
@Override
public StateStoreSupplier build() {
if (capacity < Integer.MAX_VALUE) {
return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde);
}
return new InMemoryKeyValueStoreSupplier<>(name, keySerde, valueSerde);
}
};
}
@Override
public PersistentKeyValueFactory<K, V> persistent() {
return new PersistentKeyValueFactory<K, V>() {
private int numSegments = 0;
private long retentionPeriod = 0L;
private boolean retainDuplicates = false;
@Override
public PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) {
this.numSegments = numSegments;
this.retentionPeriod = retentionPeriod;
this.retainDuplicates = retainDuplicates;
return this;
}
@Override
public StateStoreSupplier build() {
if (numSegments > 0) {
return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde);
}
return new RocksDBKeyValueStoreSupplier<>(name, keySerde, valueSerde);
}
};
}
};
}
};
}
};
}
public static abstract class StoreFactory {
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link String}s.
*
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<String> withStringKeys() {
return withKeys(Serdes.String());
}
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Integer}s.
*
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<Integer> withIntegerKeys() {
return withKeys(Serdes.Integer());
}
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Long}s.
*
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<Long> withLongKeys() {
return withKeys(Serdes.Long());
}
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link Double}s.
*
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<Double> withDoubleKeys() {
return withKeys(Serdes.Double());
}
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be {@link ByteBuffer}.
*
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<ByteBuffer> withByteBufferKeys() {
return withKeys(Serdes.ByteBuffer());
}
/**
* Begin to create a {@link KeyValueStore} by specifying the keys will be byte arrays.
*
* @return the interface used to specify the type of values; never null
*/
public ValueFactory<byte[]> withByteArrayKeys() {
return withKeys(Serdes.ByteArray());
}
/**
* Begin to create a {@link KeyValueStore} by specifying the keys.
*
* @param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
* @return the interface used to specify the type of values; never null
*/
public <K> ValueFactory<K> withKeys(Class<K> keyClass) {
return withKeys(Serdes.serdeFrom(keyClass));
}
/**
* Begin to create a {@link KeyValueStore} by specifying the serializer and deserializer for the keys.
*
* @param keySerde the serialization factory for keys; may be null
* @return the interface used to specify the type of values; never null
*/
public abstract <K> ValueFactory<K> withKeys(Serde<K> keySerde);
}
/**
* The factory for creating off-heap key-value stores.
*
* @param <K> the type of keys
*/
public static abstract class ValueFactory<K> {
/**
* Use {@link String} values.
*
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, String> withStringValues() {
return withValues(Serdes.String());
}
/**
* Use {@link Integer} values.
*
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, Integer> withIntegerValues() {
return withValues(Serdes.Integer());
}
/**
* Use {@link Long} values.
*
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, Long> withLongValues() {
return withValues(Serdes.Long());
}
/**
* Use {@link Double} values.
*
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, Double> withDoubleValues() {
return withValues(Serdes.Double());
}
/**
* Use {@link ByteBuffer} for values.
*
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, ByteBuffer> withByteBufferValues() {
return withValues(Serdes.ByteBuffer());
}
/**
* Use byte arrays for values.
*
* @return the interface used to specify the remaining key-value store options; never null
*/
public KeyValueFactory<K, byte[]> withByteArrayValues() {
return withValues(Serdes.ByteArray());
}
/**
* Use values of the specified type.
*
* @param valueClass the class for the values, which must be one of the types for which Kafka has built-in serdes
* @return the interface used to specify the remaining key-value store options; never null
*/
public <V> KeyValueFactory<K, V> withValues(Class<V> valueClass) {
return withValues(Serdes.serdeFrom(valueClass));
}
/**
* Use the specified serializer and deserializer for the values.
*
* @param valueSerde the serialization factory for values; may be null
* @return the interface used to specify the remaining key-value store options; never null
*/
public abstract <V> KeyValueFactory<K, V> withValues(Serde<V> valueSerde);
}
/**
* The interface used to specify the different kinds of key-value stores.
*
* @param <K> the type of keys
* @param <V> the type of values
*/
public interface KeyValueFactory<K, V> {
/**
* Keep all key-value entries in-memory, although for durability all entries are recorded in a Kafka topic that can be
* read to restore the entries if they are lost.
*
* @return the factory to create in-memory key-value stores; never null
*/
InMemoryKeyValueFactory<K, V> inMemory();
/**
* Keep all key-value entries off-heap in a local database, although for durability all entries are recorded in a Kafka
* topic that can be read to restore the entries if they are lost.
*
* @return the factory to create in-memory key-value stores; never null
*/
PersistentKeyValueFactory<K, V> persistent();
}
/**
* The interface used to create in-memory key-value stores.
*
* @param <K> the type of keys
* @param <V> the type of values
*/
public interface InMemoryKeyValueFactory<K, V> {
/**
* Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
* equivalent to not placing a limit on the number of entries.
*
* @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
* @return this factory
* @throws IllegalArgumentException if the capacity is not positive
*/
InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
/**
* Return the instance of StateStoreSupplier of new key-value store.
* @return the state store supplier; never null
*/
StateStoreSupplier build();
}
/**
* The interface used to create off-heap key-value stores that use a local database.
*
* @param <K> the type of keys
* @param <V> the type of values
*/
public interface PersistentKeyValueFactory<K, V> {
/**
* Set the persistent store as a windowed key-value store
*
* @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
* @param numSegments the maximum number of segments for rolling the windowed store
* @param retainDuplicates whether or not to retain duplicate data within the window
*/
PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates);
/**
* Return the instance of StateStoreSupplier of new key-value store.
* @return the key-value store; never null
*/
StateStoreSupplier build();
}
}