blob: 78cb406650e6ac428f306153c040a67231ec7ea0 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""
Counts words in UTF8 encoded, '\n' delimited text received from the network over a
sliding window of configurable duration.
Usage: structured_sessionization.py <hostname> <port>
<hostname> and <port> describe the TCP server that Structured Streaming
would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit
examples/src/main/python/sql/streaming/structured_sessionization.py
localhost 9999`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import count, session_window
if __name__ == "__main__":
if len(sys.argv) != 3 and len(sys.argv) != 2:
msg = "Usage: structured_sessionization.py <hostname> <port> "
print(msg, file=sys.stderr)
sys.exit(-1)
host = sys.argv[1]
port = int(sys.argv[2])
spark = SparkSession\
.builder\
.appName("StructuredSessionization")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to host:port
lines = spark\
.readStream\
.format('socket')\
.option('host', host)\
.option('port', port)\
.option('includeTimestamp', 'true')\
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
# treat words as sessionId of events
events = lines.select(
explode(split(lines.value, ' ')).alias('sessionId'),
lines.timestamp.alias('eventTime')
)
# Group the data by window and word and compute the count of each group
windowedCounts = events \
.groupBy(session_window(events.eventTime, "10 seconds").alias('session'),
events.sessionId) \
.agg(count("*").alias("numEvents")) \
.selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
"CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
"numEvents")
# Start running the query that prints the session updates to the console
query = windowedCounts\
.writeStream\
.outputMode('update')\
.format('console')\
.start()
query.awaitTermination()