blob: aad62db8f74f97c08775a35faa2a0b559696c4cf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils.config;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.spi.config.table.CompletionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.TunerConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class TableConfigSerDeTest {
@Test
public void testSerDe()
throws IOException {
TableConfigBuilder tableConfigBuilder = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable");
{
// Default table config
TableConfig tableConfig = tableConfigBuilder.build();
checkDefaultTableConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkDefaultTableConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkDefaultTableConfig(tableConfigToCompare);
// Backward-compatible for raw table name and lower case table type
ObjectNode tableConfigJson = (ObjectNode) tableConfigBuilder.build().toJsonNode();
tableConfigJson.put(TableConfig.TABLE_NAME_KEY, "testTable");
tableConfigJson.put(TableConfig.TABLE_TYPE_KEY, "offline");
tableConfigToCompare = JsonUtils.jsonNodeToObject(tableConfigJson, TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkDefaultTableConfig(tableConfigToCompare);
}
{
// With quota config
QuotaConfig quotaConfig = new QuotaConfig("30g", "100.00");
TableConfig tableConfig = tableConfigBuilder.setQuotaConfig(quotaConfig).build();
checkQuotaConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkQuotaConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkQuotaConfig(tableConfigToCompare);
}
{
// With tenant config
TableConfig tableConfig =
tableConfigBuilder.setServerTenant("aServerTenant").setBrokerTenant("aBrokerTenant").build();
checkTenantConfigWithoutTagOverride(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkTenantConfigWithoutTagOverride(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkTenantConfigWithoutTagOverride(tableConfigToCompare);
// With tag override config
TagOverrideConfig tagOverrideConfig = new TagOverrideConfig("aRTConsumingTag_REALTIME", null);
tableConfig = tableConfigBuilder.setTagOverrideConfig(tagOverrideConfig).build();
checkTenantConfigWithTagOverride(tableConfig);
// Serialize then de-serialize
tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkTenantConfigWithTagOverride(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkTenantConfigWithTagOverride(tableConfigToCompare);
}
{
// With SegmentAssignmentStrategyConfig
ReplicaGroupStrategyConfig replicaGroupStrategyConfig = new ReplicaGroupStrategyConfig("memberId", 5);
TableConfig tableConfig = tableConfigBuilder.setSegmentAssignmentStrategy("ReplicaGroupSegmentAssignmentStrategy")
.setReplicaGroupStrategyConfig(replicaGroupStrategyConfig).build();
checkSegmentAssignmentStrategyConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkSegmentAssignmentStrategyConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkSegmentAssignmentStrategyConfig(tableConfigToCompare);
}
{
// With completion config
CompletionConfig completionConfig = new CompletionConfig("DEFAULT");
TableConfig tableConfig = tableConfigBuilder.setCompletionConfig(completionConfig).build();
checkCompletionConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkCompletionConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkCompletionConfig(tableConfigToCompare);
}
{
// With routing config
RoutingConfig routingConfig =
new RoutingConfig("builder", Arrays.asList("pruner0", "pruner1", "pruner2"), "selector");
TableConfig tableConfig = tableConfigBuilder.setRoutingConfig(routingConfig).build();
checkRoutingConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkRoutingConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkRoutingConfig(tableConfigToCompare);
}
{
// With query config
QueryConfig queryConfig = new QueryConfig(1000L, true, true, Collections.singletonMap("func(a)", "b"));
TableConfig tableConfig = tableConfigBuilder.setQueryConfig(queryConfig).build();
checkQueryConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkQueryConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkQueryConfig(tableConfigToCompare);
}
{
// With instance assignment config
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false));
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE, instanceAssignmentConfig)).build();
checkInstanceAssignmentConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkInstanceAssignmentConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkInstanceAssignmentConfig(tableConfigToCompare);
}
{
// With field config
Map<String, String> properties = new HashMap<>();
properties.put("foo", "bar");
properties.put("foobar", "potato");
List<FieldConfig> fieldConfigList = Arrays.asList(new FieldConfig("column1", FieldConfig.EncodingType.DICTIONARY,
Lists.newArrayList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE), null, properties),
new FieldConfig("column2", null, Collections.emptyList(), null, null),
new FieldConfig("column3", FieldConfig.EncodingType.RAW, Collections.emptyList(),
FieldConfig.CompressionCodec.SNAPPY, null));
TableConfig tableConfig = tableConfigBuilder.setFieldConfigList(fieldConfigList).build();
checkFieldConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkFieldConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkFieldConfig(tableConfigToCompare);
}
{
// with upsert config
TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
// Serialize then de-serialize
checkTableConfigWithUpsertConfig(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class));
checkTableConfigWithUpsertConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// with dedup config
DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5);
TableConfig tableConfig = tableConfigBuilder.setDedupConfig(dedupConfig).build();
// Serialize then de-serialize
checkTableConfigWithDedupConfig(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class));
checkTableConfigWithDedupConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// with SegmentsValidationAndRetentionConfig
TableConfig tableConfig = tableConfigBuilder.setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL).build();
checkSegmentsValidationAndRetentionConfig(
JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class));
checkSegmentsValidationAndRetentionConfig(
TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// With ingestion config
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(
new BatchIngestionConfig(Collections.singletonList(Collections.singletonMap("batchType", "s3")), "APPEND",
"HOURLY"));
ingestionConfig.setStreamIngestionConfig(
new StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType", "kafka"))));
ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
ingestionConfig.setTransformConfigs(
Arrays.asList(new TransformConfig("bar", "func(moo)"), new TransformConfig("zoo", "myfunc()")));
ingestionConfig.setComplexTypeConfig(new ComplexTypeConfig(Arrays.asList("c1", "c2"), ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, Collections.emptyMap()));
ingestionConfig.setAggregationConfigs(
Arrays.asList(new AggregationConfig("SUM__bar", "SUM(bar)"), new AggregationConfig("MIN_foo", "MIN(foo)")));
TableConfig tableConfig = tableConfigBuilder.setIngestionConfig(ingestionConfig).build();
checkIngestionConfig(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkIngestionConfig(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkIngestionConfig(tableConfigToCompare);
}
{
// With tier config
List<TierConfig> tierConfigList = Lists.newArrayList(
new TierConfig("tierA", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "10d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tierA_tag_OFFLINE", null, null),
new TierConfig("tierB", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tierB_tag_OFFLINE", null, null),
new TierConfig("tier0", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, Lists.newArrayList("seg0"),
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tierB_tag_OFFLINE", null, null));
TableConfig tableConfig = tableConfigBuilder.setTierConfigList(tierConfigList).build();
checkTierConfigList(tableConfig);
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
checkTierConfigList(tableConfigToCompare);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
checkTierConfigList(tableConfigToCompare);
}
{
// With tuner config
String name = "testTuner";
Map<String, String> props = new HashMap<>();
props.put("key", "value");
TunerConfig tunerConfig = new TunerConfig(name, props);
TableConfig tableConfig = tableConfigBuilder.setTunerConfigList(Lists.newArrayList(tunerConfig)).build();
// Serialize then de-serialize
TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class);
assertEquals(tableConfigToCompare, tableConfig);
TunerConfig tunerConfigToCompare = tableConfigToCompare.getTunerConfigsList().get(0);
assertEquals(tunerConfigToCompare.getName(), name);
assertEquals(tunerConfigToCompare.getTunerProperties(), props);
tableConfigToCompare = TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig));
assertEquals(tableConfigToCompare, tableConfig);
tunerConfigToCompare = tableConfigToCompare.getTunerConfigsList().get(0);
assertEquals(tunerConfigToCompare.getName(), name);
assertEquals(tunerConfigToCompare.getTunerProperties(), props);
}
}
private void checkSegmentsValidationAndRetentionConfig(TableConfig tableConfig) {
// TODO validate other fields of SegmentsValidationAndRetentionConfig.
assertEquals(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(), CommonConstants.HTTP_PROTOCOL);
}
private void checkDefaultTableConfig(TableConfig tableConfig) {
// Check mandatory fields
assertEquals(tableConfig.getTableName(), "testTable_OFFLINE");
assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
assertNotNull(tableConfig.getValidationConfig());
assertNotNull(tableConfig.getTenantConfig());
assertNotNull(tableConfig.getIndexingConfig());
assertNotNull(tableConfig.getCustomConfig());
// Check optional fields
assertNull(tableConfig.getQuotaConfig());
assertNull(tableConfig.getRoutingConfig());
assertNull(tableConfig.getQueryConfig());
assertNull(tableConfig.getInstanceAssignmentConfigMap());
assertNull(tableConfig.getSegmentAssignmentConfigMap());
assertNull(tableConfig.getFieldConfigList());
// Serialize
ObjectNode tableConfigJson = (ObjectNode) tableConfig.toJsonNode();
assertEquals(tableConfigJson.get(TableConfig.TABLE_NAME_KEY).asText(), "testTable_OFFLINE");
assertEquals(tableConfigJson.get(TableConfig.TABLE_TYPE_KEY).asText(), "OFFLINE");
assertTrue(tableConfigJson.get(TableConfig.VALIDATION_CONFIG_KEY) instanceof ObjectNode);
assertTrue(tableConfigJson.get(TableConfig.TENANT_CONFIG_KEY) instanceof ObjectNode);
assertTrue(tableConfigJson.get(TableConfig.INDEXING_CONFIG_KEY) instanceof ObjectNode);
assertTrue(tableConfigJson.get(TableConfig.CUSTOM_CONFIG_KEY) instanceof ObjectNode);
assertFalse(tableConfigJson.has(TableConfig.QUOTA_CONFIG_KEY));
assertFalse(tableConfigJson.has(TableConfig.TASK_CONFIG_KEY));
assertFalse(tableConfigJson.has(TableConfig.ROUTING_CONFIG_KEY));
assertFalse(tableConfigJson.has(TableConfig.QUERY_CONFIG_KEY));
assertFalse(tableConfigJson.has(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY));
assertFalse(tableConfigJson.has(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY));
assertFalse(tableConfigJson.has(TableConfig.FIELD_CONFIG_LIST_KEY));
}
private void checkQuotaConfig(TableConfig tableConfig) {
QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
assertNotNull(quotaConfig);
assertEquals(quotaConfig.getStorage(), "30G");
assertEquals(quotaConfig.getMaxQueriesPerSecond(), "100.0");
}
private void checkTenantConfigWithoutTagOverride(TableConfig tableConfig) {
TenantConfig tenantConfig = tableConfig.getTenantConfig();
assertNotNull(tenantConfig);
assertEquals(tenantConfig.getServer(), "aServerTenant");
assertEquals(tenantConfig.getBroker(), "aBrokerTenant");
assertNull(tenantConfig.getTagOverrideConfig());
}
private void checkTenantConfigWithTagOverride(TableConfig tableConfig) {
TenantConfig tenantConfig = tableConfig.getTenantConfig();
assertNotNull(tenantConfig);
assertEquals(tenantConfig.getServer(), "aServerTenant");
assertEquals(tenantConfig.getBroker(), "aBrokerTenant");
assertNotNull(tenantConfig.getTagOverrideConfig());
assertEquals(tenantConfig.getTagOverrideConfig().getRealtimeConsuming(), "aRTConsumingTag_REALTIME");
assertNull(tenantConfig.getTagOverrideConfig().getRealtimeCompleted());
}
private void checkSegmentAssignmentStrategyConfig(TableConfig tableConfig) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
assertEquals(validationConfig.getSegmentAssignmentStrategy(), "ReplicaGroupSegmentAssignmentStrategy");
ReplicaGroupStrategyConfig replicaGroupStrategyConfig = validationConfig.getReplicaGroupStrategyConfig();
assertNotNull(replicaGroupStrategyConfig);
assertEquals(replicaGroupStrategyConfig.getPartitionColumn(), "memberId");
assertEquals(replicaGroupStrategyConfig.getNumInstancesPerPartition(), 5);
}
private void checkCompletionConfig(TableConfig tableConfig) {
CompletionConfig completionConfig = tableConfig.getValidationConfig().getCompletionConfig();
assertNotNull(completionConfig);
assertEquals(completionConfig.getCompletionMode(), "DEFAULT");
}
private void checkRoutingConfig(TableConfig tableConfig) {
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
assertNotNull(routingConfig);
assertEquals(routingConfig.getRoutingTableBuilderName(), "builder");
assertEquals(routingConfig.getSegmentPrunerTypes(), Arrays.asList("pruner0", "pruner1", "pruner2"));
assertEquals(routingConfig.getInstanceSelectorType(), "selector");
}
private void checkQueryConfig(TableConfig tableConfig) {
QueryConfig queryConfig = tableConfig.getQueryConfig();
assertNotNull(queryConfig);
assertEquals(queryConfig.getTimeoutMs(), Long.valueOf(1000L));
assertEquals(queryConfig.getDisableGroovy(), Boolean.TRUE);
assertEquals(queryConfig.getExpressionOverrideMap(), Collections.singletonMap("func(a)", "b"));
}
private void checkIngestionConfig(TableConfig tableConfig) {
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
assertNotNull(ingestionConfig);
assertNotNull(ingestionConfig.getFilterConfig());
assertEquals(ingestionConfig.getFilterConfig().getFilterFunction(), "filterFunc(foo)");
List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
assertNotNull(transformConfigs);
assertEquals(transformConfigs.size(), 2);
assertEquals(transformConfigs.get(0).getColumnName(), "bar");
assertEquals(transformConfigs.get(0).getTransformFunction(), "func(moo)");
assertEquals(transformConfigs.get(1).getColumnName(), "zoo");
assertEquals(transformConfigs.get(1).getTransformFunction(), "myfunc()");
assertNotNull(ingestionConfig.getBatchIngestionConfig());
assertNotNull(ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps());
assertEquals(ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps().size(), 1);
assertEquals(ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps().get(0).get("batchType"), "s3");
assertEquals(ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(), "APPEND");
assertEquals(ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency(), "HOURLY");
assertNotNull(ingestionConfig.getStreamIngestionConfig());
assertNotNull(ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps());
assertEquals(ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().size(), 1);
assertEquals(ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().get(0).get("streamType"), "kafka");
}
private void checkTierConfigList(TableConfig tableConfig) {
List<TierConfig> tierConfigsList = tableConfig.getTierConfigsList();
assertNotNull(tierConfigsList);
assertEquals(tierConfigsList.size(), 3);
assertEquals(tierConfigsList.get(0).getName(), "tierA");
assertEquals(tierConfigsList.get(0).getSegmentSelectorType(), TierFactory.TIME_SEGMENT_SELECTOR_TYPE);
assertEquals(tierConfigsList.get(0).getSegmentAge(), "10d");
assertEquals(tierConfigsList.get(0).getStorageType(), TierFactory.PINOT_SERVER_STORAGE_TYPE);
assertEquals(tierConfigsList.get(0).getServerTag(), "tierA_tag_OFFLINE");
assertEquals(tierConfigsList.get(1).getName(), "tierB");
assertEquals(tierConfigsList.get(1).getSegmentSelectorType(), TierFactory.TIME_SEGMENT_SELECTOR_TYPE);
assertEquals(tierConfigsList.get(1).getSegmentAge(), "30d");
assertEquals(tierConfigsList.get(1).getStorageType(), TierFactory.PINOT_SERVER_STORAGE_TYPE);
assertEquals(tierConfigsList.get(1).getServerTag(), "tierB_tag_OFFLINE");
assertEquals(tierConfigsList.get(2).getName(), "tier0");
assertEquals(tierConfigsList.get(2).getSegmentSelectorType(), TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
assertNull(tierConfigsList.get(2).getSegmentAge());
assertEquals(tierConfigsList.get(2).getSegmentList(), Lists.newArrayList("seg0"));
assertEquals(tierConfigsList.get(2).getStorageType(), TierFactory.PINOT_SERVER_STORAGE_TYPE);
assertEquals(tierConfigsList.get(2).getServerTag(), "tierB_tag_OFFLINE");
}
private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
assertNotNull(instanceAssignmentConfigMap);
assertEquals(instanceAssignmentConfigMap.size(), 1);
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE));
InstanceAssignmentConfig instanceAssignmentConfig = instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE);
InstanceTagPoolConfig tagPoolConfig = instanceAssignmentConfig.getTagPoolConfig();
assertEquals(tagPoolConfig.getTag(), "tenant_OFFLINE");
assertTrue(tagPoolConfig.isPoolBased());
assertEquals(tagPoolConfig.getNumPools(), 3);
assertNull(tagPoolConfig.getPools());
InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
assertNotNull(constraintConfig);
assertEquals(constraintConfig.getConstraints(), Arrays.asList("constraint1", "constraint2"));
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
instanceAssignmentConfig.getReplicaGroupPartitionConfig();
assertTrue(replicaGroupPartitionConfig.isReplicaGroupBased());
assertEquals(replicaGroupPartitionConfig.getNumInstances(), 0);
assertEquals(replicaGroupPartitionConfig.getNumReplicaGroups(), 3);
assertEquals(replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(), 5);
assertEquals(replicaGroupPartitionConfig.getNumPartitions(), 0);
assertEquals(replicaGroupPartitionConfig.getNumInstancesPerPartition(), 0);
}
private void checkFieldConfig(TableConfig tableConfig) {
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
assertNotNull(fieldConfigList);
assertEquals(fieldConfigList.size(), 3);
FieldConfig firstFieldConfig = fieldConfigList.get(0);
assertEquals(firstFieldConfig.getName(), "column1");
assertEquals(firstFieldConfig.getEncodingType(), FieldConfig.EncodingType.DICTIONARY);
assertEquals(firstFieldConfig.getIndexTypes().get(0), FieldConfig.IndexType.INVERTED);
assertEquals(firstFieldConfig.getIndexTypes().get(1), FieldConfig.IndexType.RANGE);
Map<String, String> expectedProperties = new HashMap<>();
expectedProperties.put("foo", "bar");
expectedProperties.put("foobar", "potato");
assertEquals(firstFieldConfig.getProperties(), expectedProperties);
FieldConfig secondFieldConfig = fieldConfigList.get(1);
assertEquals(secondFieldConfig.getName(), "column2");
assertNull(secondFieldConfig.getEncodingType());
assertNull(secondFieldConfig.getIndexType());
assertEquals(secondFieldConfig.getIndexTypes().size(), 0);
assertNull(secondFieldConfig.getProperties());
FieldConfig thirdFieldConfig = fieldConfigList.get(2);
assertEquals(thirdFieldConfig.getName(), "column3");
assertEquals(thirdFieldConfig.getEncodingType(), FieldConfig.EncodingType.RAW);
assertNull(thirdFieldConfig.getIndexType());
assertEquals(thirdFieldConfig.getCompressionCodec(), FieldConfig.CompressionCodec.SNAPPY);
assertNull(thirdFieldConfig.getProperties());
}
private void checkTableConfigWithUpsertConfig(TableConfig tableConfig) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
assertNotNull(upsertConfig);
assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
}
private void checkTableConfigWithDedupConfig(TableConfig tableConfig) {
DedupConfig dedupConfig = tableConfig.getDedupConfig();
assertNotNull(dedupConfig);
assertTrue(dedupConfig.isDedupEnabled());
assertEquals(dedupConfig.getHashFunction(), HashFunction.MD5);
}
}