blob: fe4d8da7a2a5cd6bea27232022ba6daffd4f2f82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.translator;
import java.util.Map;
import java.util.Optional;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.TableDescriptor;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;;
import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.apache.samza.table.Table;
import org.apache.samza.task.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
* This class contains the core of the SamzaSQL control code that converts the SQL statements to calcite relational graph.
* It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly.
*/
public class QueryTranslator {
private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
private final ScanTranslator scanTranslator;
private final SamzaSqlApplicationConfig sqlConfig;
private final Map<String, SamzaRelConverter> converters;
private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
private transient SamzaRelConverter samzaMsgConverter;
private final String outputTopic;
OutputMapFunction(String outputTopic) {
this.outputTopic = outputTopic;
}
@Override
public void init(Config config, TaskContext taskContext) {
TranslatorContext context = (TranslatorContext) taskContext.getUserContext();
this.samzaMsgConverter = context.getMsgConverter(outputTopic);
}
@Override
public KV<Object, Object> apply(SamzaSqlRelMessage message) {
return this.samzaMsgConverter.convertToSamzaMessage(message);
}
}
public QueryTranslator(SamzaSqlApplicationConfig sqlConfig) {
this.sqlConfig = sqlConfig;
scanTranslator =
new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource());
this.converters = sqlConfig.getSamzaRelConverters();
}
public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc) {
QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
final SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(this.sqlConfig);
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
final TranslatorContext context = new TranslatorContext(appDesc, relRoot, executionContext, this.converters);
final RelNode node = relRoot.project();
final SqlIOResolver ioResolver = context.getExecutionContext().getSamzaSqlApplicationConfig().getIoResolver();
node.accept(new RelShuttleImpl() {
int windowId = 0;
int joinId = 0;
@Override
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
scanTranslator.translate(scan, context);
return node;
}
@Override
public RelNode visit(LogicalFilter filter) {
RelNode node = visitChild(filter, 0, filter.getInput());
new FilterTranslator().translate(filter, context);
return node;
}
@Override
public RelNode visit(LogicalProject project) {
RelNode node = super.visit(project);
new ProjectTranslator().translate(project, context);
return node;
}
@Override
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
joinId++;
new JoinTranslator(joinId, ioResolver).translate(join, context);
return node;
}
@Override
public RelNode visit(LogicalAggregate aggregate) {
RelNode node = super.visit(aggregate);
windowId++;
new LogicalAggregateTranslator(windowId).translate(aggregate, context);
return node;
}
});
String sink = queryInfo.getSink();
SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sink);
MessageStreamImpl<SamzaSqlRelMessage> stream = (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(node.getId());
MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(sink));
Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
if (!tableDescriptor.isPresent()) {
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
String systemName = sinkConfig.getSystemName();
DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
outputStream.sendTo(appDesc.getOutputStream(osd));
} else {
Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
LOG.error(msg);
throw new SamzaException(msg);
}
outputStream.sendTo(outputTable);
}
appDesc.withContextManager(new ContextManager() {
@Override
public void init(Config config, TaskContext taskContext) {
taskContext.setUserContext(context.clone());
}
@Override
public void close() {
}
});
}
}