blob: b062758db118e8b062351b8db561f0a9762f4889 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.client.impl;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.samza.SamzaException;
import org.apache.samza.config.*;
import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.serializers.StringSerdeFactory;
import org.apache.samza.sql.client.exceptions.ExecutorException;
import org.apache.samza.sql.client.interfaces.ExecutionStatus;
import org.apache.samza.sql.client.interfaces.*;
import org.apache.samza.sql.client.util.Pair;
import org.apache.samza.sql.client.util.RandomAccessQueue;
import org.apache.samza.sql.dsl.SamzaSqlDslConverter;
import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
import org.apache.samza.sql.interfaces.RelSchemaProviderFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
import org.apache.samza.sql.util.JsonUtil;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.kafka.KafkaSystemFactory;
import org.apache.samza.tools.avro.AvroSchemaGenRelConverterFactory;
import org.apache.samza.tools.avro.AvroSerDeFactory;
import org.apache.samza.tools.json.JsonRelConverterFactory;
import org.apache.samza.tools.schemas.ProfileChangeEvent;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Samza implementation of Executor for Samza SQL Shell.
*/
public class SamzaExecutor implements SqlExecutor {
private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutor.class);
private static final String SAMZA_SYSTEM_LOG = "log";
private static final String SAMZA_SYSTEM_KAFKA = "kafka";
private static final String SAMZA_SQL_OUTPUT = "samza.sql.output";
private static final String SAMZA_SQL_SYSTEM_KAFKA_ADDRESS = "samza.sql.system.kafka.address";
private static final String DEFAULT_SERVER_ADDRESS = "localhost:2181";
// The maximum number of rows of data we keep when user pauses the display view and data accumulates.
private static final int RANDOM_ACCESS_QUEUE_CAPACITY = 5000;
private static final int DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT = 20000;
private static final String ZOOKEEPER_BROKERS_TOPICS_PATH = "/brokers/topics";
private static RandomAccessQueue<OutgoingMessageEnvelope> outputData =
new RandomAccessQueue<>(OutgoingMessageEnvelope.class, RANDOM_ACCESS_QUEUE_CAPACITY);
private static AtomicInteger execIdSeq = new AtomicInteger(0);
private Map<Integer, SamzaSqlApplicationRunner> executions = new HashMap<>();
private SamzaExecutorEnvironmentVariableHandler environmentVariableHandler =
new SamzaExecutorEnvironmentVariableHandler();
// -- implementation of SqlExecutor ------------------------------------------
@Override
public void start(ExecutionContext context) {
}
@Override
public void stop(ExecutionContext context) throws ExecutorException {
Iterator<Integer> iter = executions.keySet().iterator();
while (iter.hasNext()) {
stopExecution(context, iter.next());
iter.remove();
}
outputData.clear();
}
@Override
public EnvironmentVariableHandler getEnvironmentVariableHandler() {
return environmentVariableHandler;
}
@Override
public List<String> listTables(ExecutionContext context) throws ExecutorException {
String address = environmentVariableHandler.getEnvironmentVariable(SAMZA_SQL_SYSTEM_KAFKA_ADDRESS);
if (address == null || address.isEmpty()) {
address = DEFAULT_SERVER_ADDRESS;
}
try {
ZkClient zkClient = new ZkClient(address, DEFAULT_ZOOKEEPER_CLIENT_TIMEOUT);
return zkClient.getChildren(ZOOKEEPER_BROKERS_TOPICS_PATH)
.stream()
.map(x -> SAMZA_SYSTEM_KAFKA + "." + x)
.collect(Collectors.toList());
} catch (ZkTimeoutException ex) {
throw new ExecutorException(ex);
}
}
@Override
public SqlSchema getTableSchema(ExecutionContext context, String tableName) throws ExecutorException {
/*
* currently Shell works only for systems that has Avro schemas
*/
int execId = execIdSeq.incrementAndGet();
Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId);
Config samzaSqlConfig = new MapConfig(staticConfigs);
SqlSchema sqlSchema;
try {
SqlIOResolver ioResolver = SamzaSqlApplicationConfig.createIOResolver(samzaSqlConfig);
SqlIOConfig sourceInfo = ioResolver.fetchSourceInfo(tableName);
RelSchemaProvider schemaProvider =
SamzaSqlApplicationConfig.initializePlugin("RelSchemaProvider", sourceInfo.getRelSchemaProviderName(),
samzaSqlConfig, SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN,
(o, c) -> ((RelSchemaProviderFactory) o).create(sourceInfo.getSystemStream(), c));
sqlSchema = schemaProvider.getSqlSchema();
} catch (SamzaException ex) {
throw new ExecutorException(ex);
}
return sqlSchema;
}
@Override
public QueryResult executeQuery(ExecutionContext context, String statement) throws ExecutorException {
outputData.clear();
int execId = execIdSeq.incrementAndGet();
Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId);
List<String> sqlStmts = formatSqlStmts(Collections.singletonList(statement));
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
SamzaSqlApplicationRunner runner;
try {
runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
runner.run(null);
} catch (SamzaException ex) {
throw new ExecutorException(ex);
}
executions.put(execId, runner);
LOG.debug("Executing sql. Id ", execId);
SqlSchema resultSchema = null;
// TODO: after fixing the TODO in generateResultSchema function, we can uncomment the following piece of code.
// resultSchema = generateResultSchema(new MapConfig(staticConfigs));
return new QueryResult(execId, resultSchema);
}
@Override
public int getRowCount() {
return outputData.getSize();
}
@Override
public List<String[]> retrieveQueryResult(ExecutionContext context, int startRow, int endRow) {
List<String[]> results = new ArrayList<>();
for (OutgoingMessageEnvelope row : outputData.get(startRow, endRow)) {
results.add(getFormattedRow(row));
}
return results;
}
@Override
public List<String[]> consumeQueryResult(ExecutionContext context, int startRow, int endRow) {
List<String[]> results = new ArrayList<>();
for (OutgoingMessageEnvelope row : outputData.consume(startRow, endRow)) {
results.add(getFormattedRow(row));
}
return results;
}
@Override
public NonQueryResult executeNonQuery(ExecutionContext context, File sqlFile) throws ExecutorException {
LOG.info("Sql file path: " + sqlFile.getPath());
List<String> executedStmts;
try {
executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList());
} catch (IOException e) {
throw new ExecutorException(e);
}
LOG.info("Sql statements in Sql file: " + executedStmts.toString());
List<String> submittedStmts = new ArrayList<>();
List<String> nonSubmittedStmts = new ArrayList<>();
validateExecutedStmts(executedStmts, submittedStmts, nonSubmittedStmts);
if (submittedStmts.isEmpty()) {
throw new ExecutorException("Nothing to execute. Note: SELECT statements are ignored.");
}
NonQueryResult result = executeNonQuery(context, submittedStmts);
return new NonQueryResult(result.getExecutionId(), submittedStmts, nonSubmittedStmts);
}
@Override
public NonQueryResult executeNonQuery(ExecutionContext context, List<String> statement) throws ExecutorException {
int execId = execIdSeq.incrementAndGet();
Map<String, String> staticConfigs = fetchSamzaSqlConfig(execId);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(formatSqlStmts(statement)));
SamzaSqlApplicationRunner runner;
try {
runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
runner.run(null);
} catch (SamzaException ex) {
throw new ExecutorException(ex);
}
executions.put(execId, runner);
LOG.debug("Executing sql. Id ", execId);
return new NonQueryResult(execId);
}
@Override
public void stopExecution(ExecutionContext context, int exeId) throws ExecutorException {
SamzaSqlApplicationRunner runner = executions.get(exeId);
if (runner != null) {
LOG.debug("Stopping execution ", exeId);
try {
runner.kill();
} catch (SamzaException ex) {
throw new ExecutorException(ex);
}
} else {
throw new ExecutorException("Trying to stop a non-existing SQL execution " + exeId);
}
}
@Override
public void removeExecution(ExecutionContext context, int exeId) throws ExecutorException {
SamzaSqlApplicationRunner runner = executions.get(exeId);
if (runner != null) {
if (runner.status().getStatusCode().equals(ApplicationStatus.StatusCode.Running)) {
throw new ExecutorException("Trying to remove an ongoing execution " + exeId);
}
executions.remove(exeId);
} else {
throw new ExecutorException("Trying to remove a non-existing SQL execution " + exeId);
}
}
@Override
public ExecutionStatus queryExecutionStatus(int execId) throws ExecutorException {
SamzaSqlApplicationRunner runner = executions.get(execId);
if (runner == null) {
throw new ExecutorException("Execution " + execId + " does not exist.");
}
return queryExecutionStatus(runner);
}
@Override
public List<SqlFunction> listFunctions(ExecutionContext context) {
/*
* TODO: currently the Shell only shows some UDFs supported by Samza internally. We may need to require UDFs
* to provide a function of getting their "SamzaSqlUdfDisplayInfo", then we can get the UDF information from
* SamzaSqlApplicationConfig.udfResolver(or SamzaSqlApplicationConfig.udfMetadata) instead of registering
* UDFs one by one as below.
*/
List<SqlFunction> udfs = new ArrayList<>();
udfs.add(new SamzaSqlUdfDisplayInfo("RegexMatch", "Matches the string to the regex",
Arrays.asList(SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false),
SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.STRING, false, false)),
SqlFieldSchema.createPrimitiveSchema(SamzaSqlFieldType.BOOLEAN, false, false)));
return udfs;
}
@Override
public String getVersion() {
return this.getClass().getPackage().getImplementationVersion();
}
static void saveOutputMessage(OutgoingMessageEnvelope messageEnvelope) {
outputData.add(messageEnvelope);
}
private boolean processEnvironmentVariable(String name, String value) {
return true;
}
Map<String, String> fetchSamzaSqlConfig(int execId) {
HashMap<String, String> staticConfigs = new HashMap<>();
staticConfigs.put(JobConfig.JOB_NAME, "sql-job-" + execId);
staticConfigs.put(JobConfig.PROCESSOR_ID, String.valueOf(execId));
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
staticConfigs.put(TaskConfig.GROUPER_FACTORY, SingleContainerGrouperFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
String configIOResolverDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SOURCE_RESOLVER_DOMAIN, "config");
staticConfigs.put(configIOResolverDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
ConfigBasedIOResolverFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_UDF_RESOLVER, "config");
staticConfigs.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
staticConfigs.put("serializers.registry.avro.class", AvroSerDeFactory.class.getName());
staticConfigs.put(AvroSerDeFactory.CFG_AVRO_SCHEMA, ProfileChangeEvent.SCHEMA$.toString());
String kafkaSystemConfigPrefix =
String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_KAFKA);
String avroSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_KAFKA);
staticConfigs.put(kafkaSystemConfigPrefix + "samza.factory", KafkaSystemFactory.class.getName());
staticConfigs.put(kafkaSystemConfigPrefix + "samza.key.serde", "string");
staticConfigs.put(kafkaSystemConfigPrefix + "samza.msg.serde", "avro");
staticConfigs.put(kafkaSystemConfigPrefix + "consumer.zookeeper.connect", "localhost:2181");
staticConfigs.put(kafkaSystemConfigPrefix + "producer.bootstrap.servers", "localhost:9092");
staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.reset", "true");
staticConfigs.put(kafkaSystemConfigPrefix + "samza.offset.default", "oldest");
staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
staticConfigs.put(avroSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
String logSystemConfigPrefix =
String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_LOG);
String logSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_LOG);
staticConfigs.put(logSystemConfigPrefix + "samza.factory", CliLoggingSystemFactory.class.getName());
staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "json");
staticConfigs.put(logSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
String avroSamzaToRelMsgConverterDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
AvroSchemaGenRelConverterFactory.class.getName());
String jsonSamzaToRelMsgConverterDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "json");
staticConfigs.put(jsonSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
JsonRelConverterFactory.class.getName());
String configAvroRelSchemaProviderDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_REL_SCHEMA_PROVIDER_DOMAIN, "config");
staticConfigs.put(configAvroRelSchemaProviderDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
FileSystemAvroRelSchemaProviderFactory.class.getName());
staticConfigs.put(
configAvroRelSchemaProviderDomain + FileSystemAvroRelSchemaProviderFactory.CFG_SCHEMA_DIR,
"/tmp/schemas/");
List<Pair<String, String>> allEnvironmentVariables = environmentVariableHandler.getAllEnvironmentVariables();
for (Pair<String, String> p : allEnvironmentVariables) {
staticConfigs.put(p.getL(), p.getR());
}
return staticConfigs;
}
private List<String> formatSqlStmts(List<String> statements) {
return statements.stream().map(sql -> {
if (!sql.toLowerCase().startsWith("insert")) {
String formattedSql = String.format("insert into log.outputStream %s", sql);
LOG.debug("Sql formatted. ", sql, formattedSql);
return formattedSql;
} else {
return sql;
}
}).collect(Collectors.toList());
}
private void validateExecutedStmts(List<String> statements, List<String> submittedStmts,
List<String> nonSubmittedStmts) {
for (String sql : statements) {
if (sql.isEmpty()) {
continue;
}
if (!sql.toLowerCase().startsWith("insert")) {
nonSubmittedStmts.add(sql);
} else {
submittedStmts.add(sql);
}
}
}
SqlSchema generateResultSchema(Config config) {
SamzaSqlDslConverter converter = (SamzaSqlDslConverter) new SamzaSqlDslConverterFactory().create(config);
RelRoot relRoot = converter.convertDsl("").iterator().next();
List<String> colNames = new ArrayList<>();
List<String> colTypeNames = new ArrayList<>();
for (RelDataTypeField dataTypeField : relRoot.validatedRowType.getFieldList()) {
colNames.add(dataTypeField.getName());
colTypeNames.add(dataTypeField.getType().toString());
}
// TODO: Need to find a way to convert the relational to SQL Schema. After fixing this TODO, please resolve the TODOs
// in QueryResult class and executeQuery().
return new SqlSchema(colNames, Collections.emptyList());
}
private String[] getFormattedRow(OutgoingMessageEnvelope row) {
String[] formattedRow = new String[1];
String outputFormat = environmentVariableHandler.getEnvironmentVariable(SAMZA_SQL_OUTPUT);
if (outputFormat == null || !outputFormat.equalsIgnoreCase(MessageFormat.PRETTY.toString())) {
formattedRow[0] = getCompressedFormat(row);
} else {
formattedRow[0] = getPrettyFormat(row);
}
return formattedRow;
}
private ExecutionStatus queryExecutionStatus(SamzaSqlApplicationRunner runner) throws ExecutorException {
ApplicationStatus.StatusCode code = runner.status().getStatusCode();
switch (code) {
case New:
return ExecutionStatus.New;
case Running:
return ExecutionStatus.Running;
case SuccessfulFinish:
return ExecutionStatus.SuccessfulFinish;
case UnsuccessfulFinish:
return ExecutionStatus.UnsuccessfulFinish;
}
throw new ExecutorException("Unsupported status code: " + code);
}
private String getPrettyFormat(OutgoingMessageEnvelope envelope) {
String value = new String((byte[]) envelope.getMessage());
ObjectMapper mapper = new ObjectMapper();
String formattedValue;
try {
Object json = mapper.readValue(value, Object.class);
formattedValue = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
} catch (IOException ex) {
formattedValue = value;
LOG.error("getPrettyFormat failed with exception while formatting json ", ex);
}
return formattedValue;
}
private String getCompressedFormat(OutgoingMessageEnvelope envelope) {
return new String((byte[]) envelope.getMessage());
}
private enum MessageFormat {
PRETTY,
COMPACT
}
class SamzaExecutorEnvironmentVariableHandler extends EnvironmentVariableHandlerImpl {
SamzaExecutorEnvironmentVariableHandler() {
}
protected EnvironmentVariableSpecs initializeEnvironmentVariableSpecs() {
HashMap<String, EnvironmentVariableSpecs.Spec> specMap = new HashMap<>();
specMap.put("samza.sql.output",
new EnvironmentVariableSpecs.Spec(new String[]{"pretty", "compact"}, "compact"));
return new EnvironmentVariableSpecs(specMap);
}
protected boolean processEnvironmentVariable(String name, String value) {
return SamzaExecutor.this.processEnvironmentVariable(name, value);
}
protected boolean isAcceptUnknowName() {
return true;
}
}
}