blob: 7ed39c9428112e26fe3c118bc37551a641ae6c36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.examples.java.relational.util.WebLogData
import org.apache.flink.util.Collector
/**
* This program processes web logs and relational data.
* It implements the following relational query:
*
* {{{
* SELECT
* r.pageURL,
* r.pageRank,
* r.avgDuration
* FROM documents d JOIN rankings r
* ON d.url = r.url
* WHERE CONTAINS(d.text, [keywords])
* AND r.rank > [rank]
* AND NOT EXISTS
* (
* SELECT * FROM Visits v
* WHERE v.destUrl = d.url
* AND v.visitDate < [date]
* );
* }}}
*
*
* Input files are plain text CSV files using the pipe character ('|') as field separator.
* The tables referenced in the query can be generated using the
* [org.apache.flink.examples.java.relational.util.WebLogDataGenerator]] and
* have the following schemas
*
* {{{
* CREATE TABLE Documents (
* url VARCHAR(100) PRIMARY KEY,
* contents TEXT );
*
* CREATE TABLE Rankings (
* pageRank INT,
* pageURL VARCHAR(100) PRIMARY KEY,
* avgDuration INT );
*
* CREATE TABLE Visits (
* sourceIP VARCHAR(16),
* destURL VARCHAR(100),
* visitDate DATE,
* adRevenue FLOAT,
* userAgent VARCHAR(64),
* countryCode VARCHAR(3),
* languageCode VARCHAR(6),
* searchWord VARCHAR(32),
* duration INT );
* }}}
*
*
* Usage
* {{{
* WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.examples.java.relational.util.WebLogData]].
*
* This example shows how to use:
*
* - tuple data types
* - projection and join projection
* - the CoGroup transformation for an anti-join
*
*/
object WebLogAnalysis {
def main(args: Array[String]) {
val params: ParameterTool = ParameterTool.fromArgs(args)
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
val documents = getDocumentsDataSet(env, params)
val ranks = getRanksDataSet(env, params)
val visits = getVisitsDataSet(env, params)
val filteredDocs = documents
.filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
val filteredRanks = ranks
.filter(rank => rank._1 > 40)
val filteredVisits = visits
.filter(visit => visit._2.substring(0, 4).toInt == 2007)
val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
(doc, rank) => rank
}.withForwardedFieldsSecond("*")
val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
(ranks, visits, out: Collector[(Int, String, Int)]) =>
if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
}.withForwardedFieldsFirst("*")
// emit result
if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", "|")
env.execute("Scala WebLogAnalysis Example")
} else {
println("Printing result to stdout. Use --output to specify output path.")
result.print()
}
}
private def getDocumentsDataSet(env: ExecutionEnvironment, params: ParameterTool):
DataSet[(String, String)] = {
if (params.has("documents")) {
env.readCsvFile[(String, String)](
params.get("documents"),
fieldDelimiter = "|",
includedFields = Array(0, 1))
} else {
println("Executing WebLogAnalysis example with default documents data set.")
println("Use --documents to specify file input.")
val documents = WebLogData.DOCUMENTS map {
case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
}
env.fromCollection(documents)
}
}
private def getRanksDataSet(env: ExecutionEnvironment, params: ParameterTool):
DataSet[(Int, String, Int)] = {
if (params.has("ranks")) {
env.readCsvFile[(Int, String, Int)](
params.get("ranks"),
fieldDelimiter = "|",
includedFields = Array(0, 1, 2))
} else {
println("Executing WebLogAnalysis example with default ranks data set.")
println("Use --ranks to specify file input.")
val ranks = WebLogData.RANKS map {
case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
}
env.fromCollection(ranks)
}
}
private def getVisitsDataSet(env: ExecutionEnvironment, params: ParameterTool):
DataSet[(String, String)] = {
if (params.has("visits")) {
env.readCsvFile[(String, String)](
params.get("visits"),
fieldDelimiter = "|",
includedFields = Array(1, 2))
} else {
println("Executing WebLogAnalysis example with default visits data set.")
println("Use --visits to specify file input.")
val visits = WebLogData.VISITS map {
case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
}
env.fromCollection(visits)
}
}
}