blob: 8318e8a2b00bcbbf813bfc1e40ce3e5215a8699f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.testutil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.NotImplementedException;
import org.apache.samza.config.Config;
import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlCompositeKey;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.storage.kv.RocksDbTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
import org.apache.samza.table.TableProvider;
import org.apache.samza.table.TableProviderFactory;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.utils.BaseTableProvider;
public class TestIOResolverFactory implements SqlIOResolverFactory {
public static final String TEST_DB_SYSTEM = "testDb";
public static final String TEST_TABLE_ID = "testDbId";
@Override
public SqlIOResolver create(Config config, Config fullConfig) {
return new TestIOResolver(config);
}
static class TestTableDescriptor extends BaseTableDescriptor {
protected TestTableDescriptor(String tableId) {
super(tableId);
}
@Override
public String getTableId() {
return tableId;
}
@Override
public TableSpec getTableSpec() {
return new TableSpec(tableId, KVSerde.of(new NoOpSerde(), new NoOpSerde()), TestTableProviderFactory.class.getName(), new HashMap<>());
}
}
public static class TestTable implements ReadWriteTable {
public static Map<Object, Object> records = new HashMap<>();
@Override
public Object get(Object key) {
throw new NotImplementedException();
}
@Override
public CompletableFuture getAsync(Object key) {
throw new NotImplementedException();
}
@Override
public Map getAll(List keys) {
throw new NotImplementedException();
}
@Override
public CompletableFuture<Map> getAllAsync(List keys) {
throw new NotImplementedException();
}
@Override
public void close() {
}
@Override
public void put(Object key, Object value) {
if (key == null) {
records.put(System.nanoTime(), value);
} else if (value != null) {
records.put(key, value);
} else {
delete(key);
}
}
@Override
public CompletableFuture<Void> putAsync(Object key, Object value) {
throw new NotImplementedException();
}
@Override
public CompletableFuture<Void> putAllAsync(List list) {
throw new NotImplementedException();
}
@Override
public void delete(Object key) {
records.remove(key);
}
@Override
public CompletableFuture<Void> deleteAsync(Object key) {
throw new NotImplementedException();
}
@Override
public void deleteAll(List keys) {
records.clear();
}
@Override
public CompletableFuture<Void> deleteAllAsync(List keys) {
throw new NotImplementedException();
}
@Override
public void flush() {
}
@Override
public void putAll(List entries) {
throw new NotImplementedException();
}
}
public static class TestTableProviderFactory implements TableProviderFactory {
@Override
public TableProvider getTableProvider(TableSpec tableSpec) {
return new TestTableProvider();
}
}
static class TestTableProvider extends BaseTableProvider {
public TestTableProvider() {
super(null);
}
@Override
public Table getTable() {
return new TestTable();
}
@Override
public Map<String, String> generateConfig(Config jobConfig, Map<String, String> generatedConfig) {
return new HashMap<>();
}
@Override
public void close() {
}
}
private class TestIOResolver implements SqlIOResolver {
private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
private final Config config;
private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
public TestIOResolver(Config config) {
this.config = config;
}
private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
String[] sourceComponents = ioName.split("\\.");
int systemIdx = 0;
int endIdx = sourceComponents.length - 1;
int streamIdx = endIdx;
TableDescriptor tableDescriptor = null;
if (sourceComponents[endIdx].equalsIgnoreCase(SAMZA_SQL_QUERY_TABLE_KEYWORD)) {
streamIdx = endIdx - 1;
tableDescriptor = tableDescMap.get(ioName);
if (tableDescriptor == null) {
if (isSink) {
tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
} else {
String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-");
tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
}
tableDescMap.put(ioName, tableDescriptor);
}
}
Config systemConfigs = config.subset(sourceComponents[systemIdx] + ".");
return new SqlIOConfig(sourceComponents[systemIdx], sourceComponents[streamIdx],
Arrays.asList(sourceComponents), systemConfigs, tableDescriptor);
}
@Override
public SqlIOConfig fetchSourceInfo(String sourceName) {
return fetchIOInfo(sourceName, false);
}
@Override
public SqlIOConfig fetchSinkInfo(String sinkName) {
return fetchIOInfo(sinkName, true);
}
}
}