blob: 405fb2c148882b895f161f5d052b3062caac7aa0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.translator;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.ApplicationContainerContext;
import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.Context;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.SamzaHistogram;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.SamzaSqlInputMessage;
import org.apache.samza.sql.data.SamzaSqlExecutionContext;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.data.SamzaSqlRelMsgMetadata;
import org.apache.samza.sql.interfaces.SamzaRelConverter;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.planner.QueryPlanner;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
import org.apache.samza.sql.util.SamzaSqlQueryParser;
import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
import org.apache.samza.system.descriptors.GenericOutputDescriptor;
import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is used to populate the {@link StreamApplicationDescriptor} using the SQL queries.
* This class contains the core of the SamzaSQL control code that converts the SQL statements to calcite relational graph.
* It then walks the relational graph and then populates the Samza's {@link StreamApplicationDescriptor} accordingly.
*/
public class QueryTranslator {
private final SamzaSqlApplicationConfig sqlConfig;
private final StreamApplicationDescriptor streamAppDescriptor;
private final Map<String, DelegatingSystemDescriptor> systemDescriptors;
private final Map<String, MessageStream<SamzaSqlInputMessage>> inputMsgStreams;
private final Map<String, OutputStream> outputMsgStreams;
private static final Logger LOG = LoggerFactory.getLogger(QueryTranslator.class);
static int opId = 0;
/**
* map function used by SendToOutputStram to convert SamzaRelMessage to KV
* it also maintains SendTo and most Query metrics
*/
private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
private transient SamzaRelConverter samzaMsgConverter;
private transient MetricsRegistry metricsRegistry;
/**
* TODO: [SAMZA-2031]: the time-based metrics here for insert and query are
* currently not accurate because they don't include the time of sendTo() call
* It is not feasible to include it because sendTo operator does not return
* a stream to process its messages to update hte metrics.
*/
/* insert (SendToOutputStream) metrics */
private transient SamzaHistogram insertProcessingTime;
/* query metrics */
private transient SamzaHistogram totalLatencyMs; // (if event time exists) = output time - event time (msec)
private transient SamzaHistogram queryLatencyNs; // = output time - scan time (msec)
private transient SamzaHistogram queuingLatencyMS; // = scan time - arrival time (msec)
private transient Counter queryOutputEvents;
private final String outputTopic;
private final int queryId;
private String queryLogicalId;
private String insertLogicalId;
OutputMapFunction(String queryLogicalId, String insertLogicalId, String outputTopic, int queryId) {
this.outputTopic = outputTopic;
this.queryId = queryId;
this.queryLogicalId = queryLogicalId;
this.insertLogicalId = insertLogicalId;
}
@Override
public void init(Context context) {
TranslatorContext translatorContext =
((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
ContainerContext containerContext = context.getContainerContext();
metricsRegistry = containerContext.getContainerMetricsRegistry();
/* insert (SendToOutputStream) metrics */
insertProcessingTime =
new SamzaHistogram(metricsRegistry, insertLogicalId, TranslatorConstants.TOTAL_LATENCY_MILLIS_NAME);
/* query metrics */
totalLatencyMs = new SamzaHistogram(metricsRegistry, queryLogicalId, TranslatorConstants.TOTAL_LATENCY_MILLIS_NAME);
queryLatencyNs = new SamzaHistogram(metricsRegistry, queryLogicalId, TranslatorConstants.QUERY_LATENCY_NANOS_NAME);
queuingLatencyMS = new SamzaHistogram(metricsRegistry, queryLogicalId,
TranslatorConstants.QUEUEING_LATENCY_MILLIS_NAME);
queryOutputEvents = metricsRegistry.newCounter(queryLogicalId, TranslatorConstants.OUTPUT_EVENTS_NAME);
queryOutputEvents.clear();
}
@Override
public KV<Object, Object> apply(SamzaSqlRelMessage message) {
long beginProcessing = System.nanoTime();
KV<Object, Object> retKV = this.samzaMsgConverter.convertToSamzaMessage(message);
if (message.getSamzaSqlRelRecord().containsField(SamzaSqlRelMessage.OP_NAME)
&& ((String) message.getSamzaSqlRelRecord().getField(SamzaSqlRelMessage.OP_NAME).get()).equalsIgnoreCase(
SamzaSqlRelMessage.DELETE_OP)) {
// If it is a delete op. Set the payload to null so that the record gets deleted.
retKV = new KV<>(retKV.key, null);
}
updateMetrics(beginProcessing, System.nanoTime(), message.getSamzaSqlRelMsgMetadata());
return retKV;
}
/**
* Updates the Diagnostics Metrics (processing time and number of events)
* @param beginProcessing when sendOutput Started processing this message
* @param endProcessing when sendOutput finished processing this message
* @param metadata the event's message metadata
*/
private void updateMetrics(long beginProcessing, long endProcessing, SamzaSqlRelMsgMetadata metadata) {
/* insert (SendToOutputStream) metrics */
insertProcessingTime.update(endProcessing - beginProcessing);
/* query metrics */
queryOutputEvents.inc();
/* TODO: remove scanTime validation once code to assign it is stable */
Validate.isTrue(metadata.hasScanTime());
queryLatencyNs.update(System.nanoTime() - metadata.getScanTimeNanos());
/** TODO: change if hasArrivalTime to validation once arrivalTime is assigned,
and later remove the check once code is stable */
if (metadata.hasArrivalTime()) {
queuingLatencyMS.update(metadata.getScanTimeMillis() - metadata.getArrivalTime());
}
/* since availability of eventTime depends on source, we need the following check */
if (metadata.hasEventTime()) {
totalLatencyMs.update(System.currentTimeMillis() - metadata.getEventTime());
}
}
} // OutputMapFunction
public QueryTranslator(StreamApplicationDescriptor appDesc, SamzaSqlApplicationConfig sqlConfig) {
this.sqlConfig = sqlConfig;
this.streamAppDescriptor = appDesc;
this.systemDescriptors = new HashMap<>();
this.outputMsgStreams = new HashMap<>();
this.inputMsgStreams = new HashMap<>();
}
/**
* For unit testing only
*/
@VisibleForTesting
void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata(), sqlConfig.isQueryPlanOptimizerEnabled());
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
translatorContexts.put(queryId, translatorContext.clone());
appDesc.withApplicationTaskContextFactory(new ApplicationTaskContextFactory<SamzaSqlApplicationContext>() {
@Override
public SamzaSqlApplicationContext create(ExternalContext externalContext, JobContext jobContext,
ContainerContext containerContext, TaskContext taskContext,
ApplicationContainerContext applicationContainerContext) {
return new SamzaSqlApplicationContext(translatorContexts);
}
});
}
/**
* Translate Calcite plan to Samza stream operators.
* @param relRoot Calcite plan in the form of {@link RelRoot}. RelRoot should not include the sink ({@link TableModify})
* @param outputSystemStream Sink associated with the Calcite plan.
* @param translatorContext Context maintained across translations.
* @param queryId query index of the sql statement corresponding to the Calcite plan in multi SQL statement scenario
* starting with index 0.
*/
public void translate(RelRoot relRoot, String outputSystemStream, TranslatorContext translatorContext, int queryId) {
final RelNode node = relRoot.project();
ScanTranslator scanTranslator =
new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId);
/* update input metrics */
String queryLogicalId = String.format(TranslatorConstants.LOGSQLID_TEMPLATE, queryId);
opId = 0;
node.accept(new RelShuttleImpl() {
@Override
public RelNode visit(RelNode relNode) {
// There should never be a TableModify in the calcite plan.
Validate.isTrue(!(relNode instanceof TableModify));
return super.visit(relNode);
}
@Override
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "scan", opId++);
scanTranslator.translate(scan, queryLogicalId, logicalOpId, translatorContext, systemDescriptors,
inputMsgStreams);
return node;
}
@Override
public RelNode visit(LogicalFilter filter) {
RelNode node = visitChild(filter, 0, filter.getInput());
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "filter", opId++);
new FilterTranslator(queryId).translate(filter, logicalOpId, translatorContext);
return node;
}
@Override
public RelNode visit(LogicalProject project) {
RelNode node = super.visit(project);
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "project", opId++);
new ProjectTranslator(queryId).translate(project, logicalOpId, translatorContext);
return node;
}
@Override
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "join", opId++);
new JoinTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix(), queryId).translate(join, translatorContext);
return node;
}
@Override
public RelNode visit(LogicalAggregate aggregate) {
RelNode node = super.visit(aggregate);
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "window", opId++);
new LogicalAggregateTranslator(logicalOpId, sqlConfig.getMetadataTopicPrefix()).translate(aggregate,
translatorContext);
return node;
}
});
String logicalOpId = String.format(TranslatorConstants.LOGOPID_TEMPLATE, queryId, "insert", opId);
sendToOutputStream(queryLogicalId, logicalOpId, outputSystemStream, streamAppDescriptor, translatorContext, node,
queryId);
}
private void sendToOutputStream(String queryLogicalId, String logicalOpId, String sinkStream,
StreamApplicationDescriptor appDesc, TranslatorContext translatorContext, RelNode node, int queryId) {
SqlIOConfig sinkConfig = sqlConfig.getOutputSystemStreamConfigsBySource().get(sinkStream);
MessageStream<SamzaSqlRelMessage> stream = translatorContext.getMessageStream(node.getId());
MessageStream<KV<Object, Object>> outputStream =
stream.map(new OutputMapFunction(queryLogicalId, logicalOpId, sinkStream, queryId));
Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
if (!tableDescriptor.isPresent()) {
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
String systemName = sinkConfig.getSystemName();
DelegatingSystemDescriptor sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
outputStream.sendTo(stm);
// Process system events only if the output is a stream.
if (sqlConfig.isProcessSystemEvents()) {
for (MessageStream<SamzaSqlInputMessage> inputStream : inputMsgStreams.values()) {
MessageStream<KV<Object, Object>> systemEventStream =
inputStream.filter(message -> message.getMetadata().isSystemMessage())
.map(SamzaSqlInputMessage::getKeyAndMessageKV);
systemEventStream.sendTo(stm);
}
}
} else {
Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
throw new SamzaException(msg);
}
outputStream.sendTo(outputTable);
}
}
}