blob: 22a1b57027665778736fcec25d29bfb462e8125c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage.kv;
import java.io.File;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngine;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.StoreProperties;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestBaseKeyValueStorageEngineFactory {
private static final String STORE_NAME = "myStore";
private static final StorageEngineFactory.StoreMode STORE_MODE = StorageEngineFactory.StoreMode.ReadWrite;
private static final SystemStreamPartition CHANGELOG_SSP =
new SystemStreamPartition("system", "stream", new Partition(0));
private static final Map<String, String> BASE_CONFIG =
ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME),
MockKeyValueStorageEngineFactory.class.getName());
private static final Map<String, String> DISABLE_CACHE =
ImmutableMap.of(String.format("stores.%s.object.cache.size", STORE_NAME), "0");
private static final Map<String, String> DISALLOW_LARGE_MESSAGES =
ImmutableMap.of(String.format(StorageConfig.DISALLOW_LARGE_MESSAGES, STORE_NAME), "true");
private static final Map<String, String> DROP_LARGE_MESSAGES =
ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME), "true");
private static final Map<String, String> ACCESS_LOG_ENABLED =
ImmutableMap.of(String.format("stores.%s.accesslog.enabled", STORE_NAME), "true");
@Mock
private File storeDir;
@Mock
private Serde<String> keySerde;
@Mock
private Serde<String> msgSerde;
@Mock
private MessageCollector changelogCollector;
@Mock
private MetricsRegistry metricsRegistry;
@Mock
private JobContext jobContext;
@Mock
private ContainerContext containerContext;
@Mock
private KeyValueStore<byte[], byte[]> rawKeyValueStore;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
// some metrics objects need this for histogram metric instantiation
when(this.metricsRegistry.newGauge(any(), any())).thenReturn(mock(Gauge.class));
}
@Test(expected = SamzaException.class)
public void testMissingStoreFactory() {
Config config = new MapConfig();
callGetStorageEngine(config, null);
}
@Test(expected = SamzaException.class)
public void testInvalidCacheSize() {
Config config = new MapConfig(BASE_CONFIG,
ImmutableMap.of(String.format("stores.%s.write.cache.batch", STORE_NAME), "100",
String.format("stores.%s.object.cache.size", STORE_NAME), "50"));
callGetStorageEngine(config, null);
}
@Test(expected = SamzaException.class)
public void testMissingKeySerde() {
Config config = new MapConfig(BASE_CONFIG);
when(this.jobContext.getConfig()).thenReturn(config);
new MockKeyValueStorageEngineFactory(this.rawKeyValueStore).getStorageEngine(STORE_NAME, this.storeDir, null,
this.msgSerde, this.changelogCollector, this.metricsRegistry, null, this.jobContext, this.containerContext,
STORE_MODE);
}
@Test(expected = SamzaException.class)
public void testMissingValueSerde() {
Config config = new MapConfig(BASE_CONFIG);
when(this.jobContext.getConfig()).thenReturn(config);
new MockKeyValueStorageEngineFactory(this.rawKeyValueStore).getStorageEngine(STORE_NAME, this.storeDir,
this.keySerde, null, this.changelogCollector, this.metricsRegistry, null, this.jobContext,
this.containerContext, STORE_MODE);
}
@Test
public void testInMemoryKeyValueStore() {
Config config = new MapConfig(DISABLE_CACHE, ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME),
"org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"));
StorageEngine storageEngine = callGetStorageEngine(config, null);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), false, false);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
// config has the in-memory key-value factory, but still calling the test factory, so store will be the test store
assertEquals(this.rawKeyValueStore, serializedKeyValueStore.getStore());
}
@Test
public void testRawStoreOnly() {
Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE);
StorageEngine storageEngine = callGetStorageEngine(config, null);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, false);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
assertEquals(this.rawKeyValueStore, serializedKeyValueStore.getStore());
}
@Test
public void testWithLoggedStore() {
Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE);
StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, true);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
LoggedStore<?, ?> loggedStore = assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
// type generics don't match due to wildcard type, but checking reference equality, so type generics don't matter
// noinspection AssertEqualsBetweenInconvertibleTypes
assertEquals(this.rawKeyValueStore, loggedStore.getStore());
}
@Test
public void testWithLoggedStoreAndCachedStore() {
Config config = new MapConfig(BASE_CONFIG);
StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, true);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
CachedStore<?, ?> cachedStore = assertAndCast(nullSafeKeyValueStore.getStore(), CachedStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(cachedStore.getStore(), SerializedKeyValueStore.class);
LoggedStore<?, ?> loggedStore = assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
// type generics don't match due to wildcard type, but checking reference equality, so type generics don't matter
// noinspection AssertEqualsBetweenInconvertibleTypes
assertEquals(this.rawKeyValueStore, loggedStore.getStore());
}
@Test
public void testDisallowLargeMessages() {
Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE, DISALLOW_LARGE_MESSAGES);
StorageEngine storageEngine = callGetStorageEngine(config, null);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, false);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
LargeMessageSafeStore largeMessageSafeStore =
assertAndCast(serializedKeyValueStore.getStore(), LargeMessageSafeStore.class);
assertEquals(this.rawKeyValueStore, largeMessageSafeStore.getStore());
}
@Test
public void testDisallowLargeMessagesWithCache() {
Config config = new MapConfig(BASE_CONFIG, DISALLOW_LARGE_MESSAGES);
StorageEngine storageEngine = callGetStorageEngine(config, null);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, false);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
LargeMessageSafeStore largeMessageSafeStore =
assertAndCast(serializedKeyValueStore.getStore(), LargeMessageSafeStore.class);
CachedStore<?, ?> cachedStore = assertAndCast(largeMessageSafeStore.getStore(), CachedStore.class);
// type generics don't match due to wildcard type, but checking reference equality, so type generics don't matter
// noinspection AssertEqualsBetweenInconvertibleTypes
assertEquals(this.rawKeyValueStore, cachedStore.getStore());
}
@Test
public void testDropLargeMessages() {
Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE, DROP_LARGE_MESSAGES);
StorageEngine storageEngine = callGetStorageEngine(config, null);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, false);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(nullSafeKeyValueStore.getStore(), SerializedKeyValueStore.class);
LargeMessageSafeStore largeMessageSafeStore =
assertAndCast(serializedKeyValueStore.getStore(), LargeMessageSafeStore.class);
assertEquals(this.rawKeyValueStore, largeMessageSafeStore.getStore());
}
@Test
public void testDropLargeMessagesWithCache() {
Config config = new MapConfig(BASE_CONFIG, DROP_LARGE_MESSAGES);
StorageEngine storageEngine = callGetStorageEngine(config, null);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, false);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
CachedStore<?, ?> cachedStore = assertAndCast(nullSafeKeyValueStore.getStore(), CachedStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(cachedStore.getStore(), SerializedKeyValueStore.class);
LargeMessageSafeStore largeMessageSafeStore =
assertAndCast(serializedKeyValueStore.getStore(), LargeMessageSafeStore.class);
assertEquals(this.rawKeyValueStore, largeMessageSafeStore.getStore());
}
@Test
public void testAccessLogStore() {
Config config = new MapConfig(BASE_CONFIG, DISABLE_CACHE, ACCESS_LOG_ENABLED);
// AccessLoggedStore requires a changelog SSP
StorageEngine storageEngine = callGetStorageEngine(config, CHANGELOG_SSP);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = baseStorageEngineValidation(storageEngine);
assertStoreProperties(keyValueStorageEngine.getStoreProperties(), true, true);
NullSafeKeyValueStore<?, ?> nullSafeKeyValueStore =
assertAndCast(keyValueStorageEngine.getWrapperStore(), NullSafeKeyValueStore.class);
AccessLoggedStore<?, ?> accessLoggedStore =
assertAndCast(nullSafeKeyValueStore.getStore(), AccessLoggedStore.class);
SerializedKeyValueStore<?, ?> serializedKeyValueStore =
assertAndCast(accessLoggedStore.getStore(), SerializedKeyValueStore.class);
LoggedStore<?, ?> loggedStore = assertAndCast(serializedKeyValueStore.getStore(), LoggedStore.class);
// type generics don't match due to wildcard type, but checking reference equality, so type generics don't matter
// noinspection AssertEqualsBetweenInconvertibleTypes
assertEquals(this.rawKeyValueStore, loggedStore.getStore());
}
private static <T extends KeyValueStore<?, ?>> T assertAndCast(KeyValueStore<?, ?> keyValueStore, Class<T> clazz) {
assertTrue("Expected type " + clazz.getName(), clazz.isInstance(keyValueStore));
return clazz.cast(keyValueStore);
}
private KeyValueStorageEngine<?, ?> baseStorageEngineValidation(StorageEngine storageEngine) {
assertTrue(storageEngine instanceof KeyValueStorageEngine);
KeyValueStorageEngine<?, ?> keyValueStorageEngine = (KeyValueStorageEngine<?, ?>) storageEngine;
assertEquals(this.rawKeyValueStore, keyValueStorageEngine.getRawStore());
return keyValueStorageEngine;
}
private static void assertStoreProperties(StoreProperties storeProperties, boolean expectedPersistedToDisk,
boolean expectedLoggedStore) {
assertEquals(expectedPersistedToDisk, storeProperties.isPersistedToDisk());
assertEquals(expectedLoggedStore, storeProperties.isLoggedStore());
}
/**
* @param changelogSSP if non-null, then enables logged store
*/
private StorageEngine callGetStorageEngine(Config config, SystemStreamPartition changelogSSP) {
when(this.jobContext.getConfig()).thenReturn(config);
return new MockKeyValueStorageEngineFactory(this.rawKeyValueStore).getStorageEngine(STORE_NAME, this.storeDir,
this.keySerde, this.msgSerde, this.changelogCollector, this.metricsRegistry, changelogSSP, this.jobContext,
this.containerContext, STORE_MODE);
}
}