blob: 34ffbd49fd32c349a25a1fa6e25cf2f08c2a56ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.table;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.ConfigRewriter;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.LongSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
import org.apache.samza.storage.kv.RocksDbTableDescriptor;
import org.apache.samza.storage.kv.RocksDbTableProviderFactory;
import org.apache.samza.table.TableConfigGenerator;
import org.apache.samza.table.TableDescriptorsProvider;
import org.apache.samza.table.remote.RemoteTableDescriptor;
import org.apache.samza.table.remote.RemoteTableProviderFactory;
import org.apache.samza.table.remote.TableReadFunction;
import org.apache.samza.util.RateLimiter;
import org.apache.samza.util.Util;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.*;
/**
* Table descriptors provider tests for both remote and local tables
*/
public class TestTableDescriptorsProvider {
@Test
public void testWithNoConfiguredTableDescriptorProviderClass() throws Exception {
Map<String, String> configs = new HashMap<>();
String tableRewriterName = "tableRewriter";
Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
Assert.assertTrue(resultConfig.size() == 0);
}
@Test
public void testWithNonTableDescriptorsProviderClass() throws Exception {
Map<String, String> configs = new HashMap<>();
String tableRewriterName = "tableRewriter";
configs.put("tables.descriptors.provider.class", MySampleNonTableDescriptorsProvider.class.getName());
Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
Assert.assertTrue(resultConfig.size() == 1);
JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
Assert.assertTrue(tableConfig.getTableIds().size() == 0);
}
@Test
public void testWithTableDescriptorsProviderClass() throws Exception {
Map<String, String> configs = new HashMap<>();
String tableRewriterName = "tableRewriter";
String jobName = "test-job";
configs.put(JobConfig.JOB_NAME(), jobName);
configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName());
Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
Assert.assertNotNull(resultConfig);
Assert.assertTrue(!resultConfig.isEmpty());
String localTableId = "local-table-1";
String remoteTableId = "remote-table-1";
JavaStorageConfig storageConfig = new JavaStorageConfig(resultConfig);
Assert.assertTrue(storageConfig.getStoreNames().size() == 1);
Assert.assertEquals(storageConfig.getStoreNames().get(0), localTableId);
Assert.assertEquals(storageConfig.getStorageFactoryClassName(localTableId),
RocksDbKeyValueStorageEngineFactory.class.getName());
Assert.assertTrue(storageConfig.getStorageKeySerde(localTableId).startsWith("StringSerde"));
Assert.assertTrue(storageConfig.getStorageMsgSerde(localTableId).startsWith("StringSerde"));
Config storeConfig = resultConfig.subset("stores." + localTableId + ".", true);
Assert.assertEquals(4, storeConfig.size());
Assert.assertEquals(4096, storeConfig.getInt("rocksdb.block.size.bytes"));
JavaTableConfig tableConfig = new JavaTableConfig(resultConfig);
Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId),
RocksDbTableProviderFactory.class.getName());
Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId),
RemoteTableProviderFactory.class.getName());
Assert.assertTrue(tableConfig.getKeySerde(localTableId).startsWith("StringSerde"));
Assert.assertTrue(tableConfig.getValueSerde(localTableId).startsWith("StringSerde"));
Assert.assertTrue(tableConfig.getKeySerde(remoteTableId).startsWith("StringSerde"));
Assert.assertTrue(tableConfig.getValueSerde(remoteTableId).startsWith("LongSerde"));
Assert.assertEquals(tableConfig.getTableProviderFactory(localTableId), RocksDbTableProviderFactory.class.getName());
Assert.assertEquals(tableConfig.getTableProviderFactory(remoteTableId), RemoteTableProviderFactory.class.getName());
}
public static class MySampleNonTableDescriptorsProvider {
}
static class MyReadFunction implements TableReadFunction {
@Override
public CompletableFuture getAsync(Object key) {
return null;
}
@Override
public boolean isRetriable(Throwable exception) {
return false;
}
}
public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider {
@Override
public List<TableDescriptor> getTableDescriptors(Config config) {
List<TableDescriptor> tableDescriptors = new ArrayList<>();
final RateLimiter readRateLimiter = mock(RateLimiter.class);
final MyReadFunction readFn = new MyReadFunction();
tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1")
.withReadFunction(readFn)
.withRateLimiter(readRateLimiter, null, null)
.withSerde(KVSerde.of(new StringSerde(), new LongSerde())));
tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")
.withBlockSize(4096)
.withSerde(KVSerde.of(new StringSerde(), new StringSerde())));
return tableDescriptors;
}
}
/**
* A sample config rewriter to generate table configs. It instantiates the configured tableDescriptorsProvider class
* which implements {@link TableDescriptorsProvider} and generates the table configs.
*/
public static class MySampleTableConfigRewriter implements ConfigRewriter {
@Override
public Config rewrite(String name, Config config) {
String tableDescriptorsProviderClassName = config.get("tables.descriptors.provider.class");
if (tableDescriptorsProviderClassName == null || tableDescriptorsProviderClassName.isEmpty()) {
// tableDescriptorsProviderClass is not configured
return config;
}
try {
if (!TableDescriptorsProvider.class.isAssignableFrom(Class.forName(tableDescriptorsProviderClassName))) {
// The configured class does not implement TableDescriptorsProvider.
return config;
}
TableDescriptorsProvider tableDescriptorsProvider =
Util.getObj(tableDescriptorsProviderClassName, TableDescriptorsProvider.class);
List<TableDescriptor> tableDescs = tableDescriptorsProvider.getTableDescriptors(config);
return new MapConfig(Arrays.asList(config, TableConfigGenerator.generateConfigsForTableDescs(config, tableDescs)));
} catch (Exception e) {
throw new ConfigException(String.format("Invalid configuration for TableDescriptorsProvider class: %s",
tableDescriptorsProviderClassName), e);
}
}
}
}