SQE currently supports the following commands:
All commands are formatted as a JSON map with a single entry. The key is the name of the command and the value is a map formatted according to the command type. This command map can contain multiple fields, sections or clauses. Commands are typically loaded from one or more JSON files that are an array of commands:
[ {"Set": {...}}, {"Set": {...}}, {"CreateStream": {...}}, {"Query": {...}}, {"Query": {...}} ]
CreateStream allows for creating input streams to the topology. For example, you can create a stream from a topic in a Kafka cluster. The JSON representation looks like:
{ "createStream": { "streamName": "<STREAM_NAME>", "objectName": "<OBJECT_NAME>", "spoutName": "<SPOUT_NAME>", "spoutType": "<SPOUT_TYPE>", "deserializer": "<DESERIALIZER>", "options": { "option1": value1, "optionN": valueN } } }
{ "CreateStream": { "streamName": "big.query.data", "objectName": "big.query.data", "spoutName": "FIXED", "spoutType": "NON_TRANSACTIONAL", "options": { "jw.sqe.spout.fixed.fields": ["DateString", "AccountToken", "UserName", "HappyDanceCount"], "jw.sqe.spout.fixed.values": [ ["2015-05-01 00:00", "Account1", "Joe", 1], ["2015-05-01 01:00", "Account1", "Bob", -1], ["2015-05-01 02:00", "Account1", "Susy", 1], ["2015-05-01 03:00", "Account1", "Mr. Fancy Pants", 1], ["2015-05-01 04:00", "Account1", "Joe", -1], ["2015-05-01 05:00", "Account1", "Bob", 1], ["2015-05-01 06:00", "Account1", "Susy", 1], ["2015-05-01 07:00", "Account1", "Mr. Fancy Pants", 1], ["2015-05-01 08:00", "Account2", "Joe", 1], ["2015-05-01 09:00", "Account2", "Bob", 1], ["2015-05-01 10:00", "Account2", "Susy", -1], ["2015-05-01 11:00", "Account2", "Mr. Fancy Pants", 1], ["2015-05-01 12:00", "Account2", "Joe", 1], ["2015-05-01 13:00", "Account2", "Bob", 1], ["2015-05-01 14:00", "Account2", "Susy", 1], ["2015-05-01 15:00", "Account2", "Mr. Fancy Pants", -1] ] } } }
Set adds or overwrites entries in the global config map. This can be used to set config options within a command file. The JSON representation looks lke:
{"Set": {"key":"<KEY>","value":"<VALUE>"}}
{"Set": {"key":"jw.sqe.state.redis.datatype","value":"HASH"}}
Queries allow you to run SQL like queries against an input stream and to persist/aggregate against a Trident state. The JSON representation of a query looks like:
{ "Query": { "insertInto":{ "objectName":"<OBJECT_NAME>", "stateName": "<STATE_NAME>", "stateType": "<STATE_TYPE>", "fields":[<FIELD_LIST>], "options": { "option1": value1, "optionN": valueN } }, "select":{"expressions":[<EXPRESSION_LIST>]}, "from":{"objectName":"<OBJECT_NAME>"}, "where":<PREDICATE_EXPRESSION> } }
This clause tells the query engine what object (table, view, stream, etc.) the results are delivered to, what the fields are named, and if it needs to persist data to a certain state. Similar to SQL, the ordering of the fields in this clause lines up with the expressions in the Select clause. InsertInto contains the following fields:
This clause represents the selection of fields and transformations and aggregations on those fields from the input streams. The only field of the Select clause is a list of expressions. Expressions can come in one of three types:
{"C":<CONSTANT>}
. Non-string constants can also be represented as just the literal. Internally, numerical constants are represented as either an Integer, Long or Double value.{"<FUNCTION_NAME>":[<EXPRESSION_LIST>]}
.The From clause tells SQE what input stream the query is referencing. The only field is “objectName.” Typically, this is going to be the name of the Avro schema of the input data, but can include named input streams of tuples in the future.
The Where clause is an optional clause that allows filtering of data in the query by specifying which data should be kept. This clause accepts a single predicate expression, but you can chain together expressions using expressions like AND and OR, similar to other languages.
SQE automatically determines which expressions are key fields and which expressions are aggregate/value fields. If a top level expression in the Select clause is a field, constant or transform expression, then it is a key field for the purposes of aggregations and persisting state. If a top level expression is an aggregation expression, then it is a value field.
This is an example of a query of top level device analytics by the minute. It includes sums for embeds, plays, completes. Additionally, it creates a HyperLogLog bitmap of all UserTokens for play events. The input stream is filtered to only include data has positive embeds, plays, or completes. It then uses the transactional RedisMapState to persist the results.
{ "Query": { "insertInto":{ "objectName":"MinuteDeviceMeasures", "stateName": "REDIS", "stateType": "TRANSACTIONAL", "fields":["DateTime","Device","Embeds","Plays","Completes","TimeWatched","AdImpressions","HllViewers"] }, "select":{ "expressions":[ {"FormatDate":[{"ParseDate":["DateGMTISO",{"C":"yyyy-MM-dd HH:mm:ss Z"}]},{"C":"yyyy-MM-dd'T'HH"}]}, "Device", {"Sum":["Embeds"]},{"Sum":["Plays"]},{"Sum":["Completes"]},{"Sum":["TimeWatched"]},{"Sum":["AdImpressions"]}, {"CreateHll":[{"If": [{"GreaterThan":["Plays",0]}, "UserToken",null]}]} ] }, "from":{"objectName":"com.jwplayer.analytics.avro.Ping"}, "where":{"Or":[{"Or":[{">":["Embeds",0]},{">":["Plays",0]}]},{">":["Completes",0]}]} } }