blob: de01969dd4ec6103bea97db10a2fa4689afb3e54 [file] [log] [blame]
package samza.examples.cookbook;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import org.apache.avro.Schema;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.sql.runner.SamzaSqlApplication;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
/**
* In this example, we demonstrate how to use SQL to create a samza job.
*
* <p>Concepts covered: Using sql to perform Stream processing.
*
* To run the below example:
*
* <ol>
* <li>
* Ensure that the topic "PageViewStream" is created <br/>
* ./kafka-topics.sh --zookeeper localhost:2181 --create --topic PageViewStream --partitions 1 --replication-factor 1
* </li>
* <li>
* Run the application using the ./bin/run-app.sh script <br/>
* ./deploy/samza/bin/run-app.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory <br/>
* --config-path=file://$PWD/deploy/samza/config/pageview-filter-sql.properties)
* </li>
* <li>
* Produce some messages to the "PageViewStream" topic <br/>
* Please follow instructions at https://github.com/srinipunuru/samzasqltools on how to produce events into PageViewStream<br/>
* </li>
* <li>
* Consume messages from the "pageview-filter-output" topic (e.g. bin/kafka-console-consumer.sh)
* ./deploy/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic outputTopic <br/>
* --property print.key=true </li>
* </ol>
*
*/
public class PageViewFilterSqlApp extends SamzaSqlApplication {
public static final String CFG_SCHEMA_FILES = "schema.files";
private static final String CFG_SCHEMA_VALUE_FMT = "";
@Override
public void init(StreamGraph streamGraph, Config config) {
String sqlStmt = "insert into kafka.NewLinkedInEmployees select id, Name from ProfileChangeStream";
String schemaFiles = config.get(CFG_SCHEMA_FILES);
HashMap<String, String> newConfig = new HashMap<>();
newConfig.putAll(config);
populateSchemaConfigs(schemaFiles, newConfig);
newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sqlStmt);
super.init(streamGraph, new MapConfig(newConfig));
}
private void populateSchemaConfigs(String schemaFilesValue, HashMap<String, String> config) {
String[] schemaFiles = schemaFilesValue.split(",");
for (String schemaFileValue : schemaFiles) {
try {
File schemaFile = new File(schemaFileValue);
String schemaValue = Schema.parse(schemaFile).toString();
config.put(String.format(CFG_SCHEMA_VALUE_FMT, schemaFile.getName()), schemaValue);
} catch (IOException e) {
throw new SamzaException("Unable to parse the schemaFile " + schemaFileValue, e);
}
}
}
}