blob: 8b647ed8e9a5b65f3e255f9f68276ebbcf529bdc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.table;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.table.descriptors.LocalTableDescriptor;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class to generate table configs.
*/
public class TableConfigGenerator {
private static final Logger LOG = LoggerFactory.getLogger(TableConfigGenerator.class);
/**
* Generate configuration for provided tables
*
* @param jobConfig existing job config
* @param tableDescriptors table descriptors, for which configuration to be generated
* @return table configuration
*/
public static Map<String, String> generate(Config jobConfig, List<TableDescriptor> tableDescriptors) {
Map<String, String> tableConfig = new HashMap<>();
tableDescriptors.forEach(tableDescriptor -> tableConfig.putAll(tableDescriptor.toConfig(jobConfig)));
LOG.info("TableConfigGenerator has generated configs {}", tableConfig);
return tableConfig;
}
/**
* Generate serde configuration for provided tables
*
* @param tableDescriptors table descriptors, for which serde configuration to be generated
* @return serde configuration for tables
*/
public static Map<String, String> generateSerdeConfig(List<TableDescriptor> tableDescriptors) {
Map<String, String> serdeConfigs = new HashMap<>();
// Collect key and msg serde instances for all the tables
Map<String, Serde> tableKeySerdes = new HashMap<>();
Map<String, Serde> tableValueSerdes = new HashMap<>();
HashSet<Serde> serdes = new HashSet<>();
tableDescriptors.stream()
.filter(d -> d instanceof LocalTableDescriptor)
.forEach(d -> {
LocalTableDescriptor ld = (LocalTableDescriptor) d;
tableKeySerdes.put(ld.getTableId(), ld.getSerde().getKeySerde());
tableValueSerdes.put(ld.getTableId(), ld.getSerde().getValueSerde());
});
serdes.addAll(tableKeySerdes.values());
serdes.addAll(tableValueSerdes.values());
// Generate serde names
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
Base64.Encoder base64Encoder = Base64.getEncoder();
Map<Serde, String> serdeUUIDs = new HashMap<>();
serdes.forEach(serde -> {
String serdeName = serdeUUIDs.computeIfAbsent(serde,
s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
serdeConfigs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE, serdeName),
base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
});
// Set key and msg serdes for tables to the serde names generated above
tableKeySerdes.forEach((tableId, serde) -> {
String keySerdeConfigKey = String.format(JavaTableConfig.STORE_KEY_SERDE, tableId);
serdeConfigs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});
tableValueSerdes.forEach((tableId, serde) -> {
String valueSerdeConfigKey = String.format(JavaTableConfig.STORE_MSG_SERDE, tableId);
serdeConfigs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});
return serdeConfigs;
}
}