blob: a3fc1783bf2d9f70d238a789a8aea5d83563b6bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.kv;
import java.io.File;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngine;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.StoreProperties;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.ScalaJavaUtil;
/**
* This encapsulates all the steps needed to create a key value storage engine.
* This is meant to be extended by the specific key value store factory implementations which will in turn override the
* getKVStore method to return a raw key-value store.
*/
public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
private static final String WRITE_BATCH_SIZE = "write.batch.size";
private static final int DEFAULT_WRITE_BATCH_SIZE = 500;
private static final String OBJECT_CACHE_SIZE = "object.cache.size";
private static final int DEFAULT_OBJECT_CACHE_SIZE = 1000;
/**
* Implement this to return a KeyValueStore instance for the given store name, which will be used as the underlying
* raw store.
*
* @param storeName Name of the store
* @param storeDir The directory of the store
* @param registry MetricsRegistry to which to publish store specific metrics.
* @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
* @param jobContext Information about the job in which the task is executing.
* @param containerContext Information about the container in which the task is executing.
* @return A raw KeyValueStore instance
*/
protected abstract KeyValueStore<byte[], byte[]> getKVStore(String storeName,
File storeDir,
MetricsRegistry registry,
SystemStreamPartition changeLogSystemStreamPartition,
JobContext jobContext,
ContainerContext containerContext,
StoreMode storeMode);
/**
* Constructs a key-value StorageEngine and returns it to the caller
*
* @param storeName The name of the storage engine.
* @param storeDir The directory of the storage engine.
* @param keySerde The serializer to use for serializing keys when reading or writing to the store.
* @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
* @param changelogCollector MessageCollector the storage engine uses to persist changes.
* @param registry MetricsRegistry to which to publish storage-engine specific metrics.
* @param changelogSSP Samza system stream partition from which to receive the changelog.
* @param containerContext Information about the container in which the task is executing.
**/
public StorageEngine getStorageEngine(String storeName,
File storeDir,
Serde<K> keySerde,
Serde<V> msgSerde,
MessageCollector changelogCollector,
MetricsRegistry registry,
SystemStreamPartition changelogSSP,
JobContext jobContext,
ContainerContext containerContext,
StoreMode storeMode) {
Config storageConfigSubset = jobContext.getConfig().subset("stores." + storeName + ".", true);
StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
Optional<String> storeFactory = storageConfig.getStorageFactoryClassName(storeName);
StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
if (!storeFactory.isPresent() || StringUtils.isBlank(storeFactory.get())) {
throw new SamzaException(
String.format("Store factory not defined for store %s. Cannot proceed with KV store creation!", storeName));
}
if (!storeFactory.get().equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
storePropertiesBuilder.setPersistedToDisk(true);
}
int batchSize = storageConfigSubset.getInt(WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
int cacheSize = storageConfigSubset.getInt(OBJECT_CACHE_SIZE, Math.max(batchSize, DEFAULT_OBJECT_CACHE_SIZE));
if (cacheSize > 0 && cacheSize < batchSize) {
throw new SamzaException(
String.format("cache.size for store %s cannot be less than batch.size as batched values reside in cache.",
storeName));
}
if (keySerde == null) {
throw new SamzaException(
String.format("Must define a key serde when using key value storage for store %s.", storeName));
}
if (msgSerde == null) {
throw new SamzaException(
String.format("Must define a message serde when using key value storage for store %s.", storeName));
}
KeyValueStore<byte[], byte[]> rawStore =
getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode);
KeyValueStore<byte[], byte[]> maybeLoggedStore = buildMaybeLoggedStore(changelogSSP,
storeName, registry, storePropertiesBuilder, rawStore, changelogCollector);
// this also applies serialization and caching layers
KeyValueStore<K, V> toBeAccessLoggedStore = buildStoreWithLargeMessageHandling(storeName, registry,
maybeLoggedStore, storageConfig, cacheSize, batchSize, keySerde, msgSerde);
KeyValueStore<K, V> maybeAccessLoggedStore =
buildMaybeAccessLoggedStore(storeName, toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig,
keySerde);
KeyValueStore<K, V> nullSafeStore = new NullSafeKeyValueStore<>(maybeAccessLoggedStore);
KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry);
HighResolutionClock clock = buildClock(jobContext.getConfig());
return new KeyValueStorageEngine<>(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize,
ScalaJavaUtil.toScalaFunction(clock::nanoTime));
}
/**
* Wraps {@code storeToWrap} into a {@link LoggedStore} if {@code changelogSSP} is defined.
* Otherwise, returns the original {@code storeToWrap}.
*/
private static KeyValueStore<byte[], byte[]> buildMaybeLoggedStore(SystemStreamPartition changelogSSP,
String storeName,
MetricsRegistry registry,
StoreProperties.StorePropertiesBuilder storePropertiesBuilder,
KeyValueStore<byte[], byte[]> storeToWrap,
MessageCollector changelogCollector) {
if (changelogSSP == null) {
return storeToWrap;
} else {
LoggedStoreMetrics loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry);
storePropertiesBuilder.setLoggedStore(true);
return new LoggedStore<>(storeToWrap, changelogSSP, changelogCollector, loggedStoreMetrics);
}
}
/**
* Wraps {@code storeToWrap} with the proper layers to handle large messages.
* If "disallow.large.messages" is enabled, then the message will be serialized and the size will be checked before
* storing in the serialized message in the cache.
* If "disallow.large.messages" is disabled, then the deserialized message will be stored in the cache. If
* "drop.large.messages" is enabled, then large messages will not be sent to the logged store.
*/
private static <T, U> KeyValueStore<T, U> buildStoreWithLargeMessageHandling(String storeName,
MetricsRegistry registry,
KeyValueStore<byte[], byte[]> storeToWrap,
StorageConfig storageConfig,
int cacheSize,
int batchSize,
Serde<T> keySerde,
Serde<U> msgSerde) {
int maxMessageSize = storageConfig.getChangelogMaxMsgSizeBytes(storeName);
if (storageConfig.getDisallowLargeMessages(storeName)) {
/*
* The store wrapping ordering is done this way so that a large message cannot end up in the cache. However, it
* also means that serialized data is in the cache, so performance will be worse since the data needs to be
* deserialized even when cached.
*/
KeyValueStore<byte[], byte[]> maybeCachedStore =
buildMaybeCachedStore(storeName, registry, storeToWrap, cacheSize, batchSize);
// this will throw a RecordTooLargeException when a large message is encountered
LargeMessageSafeStore largeMessageSafeKeyValueStore =
new LargeMessageSafeStore(maybeCachedStore, storeName, false, maxMessageSize);
return buildSerializedStore(storeName, registry, largeMessageSafeKeyValueStore, keySerde, msgSerde);
} else {
KeyValueStore<byte[], byte[]> toBeSerializedStore;
if (storageConfig.getDropLargeMessages(storeName)) {
toBeSerializedStore = new LargeMessageSafeStore(storeToWrap, storeName, true, maxMessageSize);
} else {
toBeSerializedStore = storeToWrap;
}
KeyValueStore<T, U> serializedStore =
buildSerializedStore(storeName, registry, toBeSerializedStore, keySerde, msgSerde);
/*
* Allows deserialized entries to be stored in the cache, but it means that a large message may end up in the
* cache even though it was not persisted to the logged store.
*/
return buildMaybeCachedStore(storeName, registry, serializedStore, cacheSize, batchSize);
}
}
/**
* Wraps {@code storeToWrap} with a {@link CachedStore} if caching is enabled.
* Otherwise, returns the {@code storeToWrap}.
*/
private static <T, U> KeyValueStore<T, U> buildMaybeCachedStore(String storeName, MetricsRegistry registry,
KeyValueStore<T, U> storeToWrap, int cacheSize, int batchSize) {
if (cacheSize > 0) {
CachedStoreMetrics cachedStoreMetrics = new CachedStoreMetrics(storeName, registry);
return new CachedStore<>(storeToWrap, cacheSize, batchSize, cachedStoreMetrics);
} else {
return storeToWrap;
}
}
/**
* Wraps {@code storeToWrap} with a {@link SerializedKeyValueStore}.
*/
private static <T, U> KeyValueStore<T, U> buildSerializedStore(String storeName,
MetricsRegistry registry,
KeyValueStore<byte[], byte[]> storeToWrap,
Serde<T> keySerde,
Serde<U> msgSerde) {
SerializedKeyValueStoreMetrics serializedMetrics = new SerializedKeyValueStoreMetrics(storeName, registry);
return new SerializedKeyValueStore<>(storeToWrap, keySerde, msgSerde, serializedMetrics);
}
/**
* Wraps {@code storeToWrap} with an {@link AccessLoggedStore} if enabled.
* Otherwise, returns the {@code storeToWrap}.
*/
private static <T, U> KeyValueStore<T, U> buildMaybeAccessLoggedStore(String storeName,
KeyValueStore<T, U> storeToWrap,
MessageCollector changelogCollector,
SystemStreamPartition changelogSSP,
StorageConfig storageConfig,
Serde<T> keySerde) {
if (storageConfig.getAccessLogEnabled(storeName)) {
return new AccessLoggedStore<>(storeToWrap, changelogCollector, changelogSSP, storageConfig, storeName, keySerde);
} else {
return storeToWrap;
}
}
/**
* If "metrics.timer.enabled" is enabled, then returns a {@link HighResolutionClock} that uses
* {@link System#nanoTime}.
* Otherwise, returns a clock which always returns 0.
*/
private static HighResolutionClock buildClock(Config config) {
MetricsConfig metricsConfig = new MetricsConfig(config);
if (metricsConfig.getMetricsTimerEnabled()) {
return System::nanoTime;
} else {
return () -> 0;
}
}
}