blob: 7bc20caceaa90106e18990b6a7b697e607f269df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.examples.streamlet.scala
import scala.util.Random
import org.apache.heron.examples.streamlet.scala.common.ScalaTopologyExampleUtils
import org.apache.heron.streamlet.scala.{Builder, Runner}
import org.apache.heron.streamlet.{Config, JoinType, WindowConfig}
/**
* Classical Music Model
*/
case class ClassicalMusic(composer: String,
title: String,
year: Int,
keyword: String) {
def this() = this(composer = "", title = "", year = 0, keyword = "")
}
/**
* This is a very simple topology that shows a series of Scala Streamlet operations
* on two source streamlets that are created in the light of Classical Music Records.
* Classical Music Records of first source are filtered as older than 1850.
* After then, first and second Classical Music Sources are joined by using Year field as Key
* and Inner as Join Type. Finally, results are printed with Log Sink.
*/
object ScalaClassicalMusicTopology {
val classicalMusics =
List(
ClassicalMusic("Bach", "Bourrée In E Minor", 1717, "guitar"),
ClassicalMusic("Vivaldi", "Four Seasons: Winter", 1723, "rousing"),
ClassicalMusic("Bach", "Air On The G String", 1723, "light"),
ClassicalMusic("Mozart", "Symphony No. 40: I", 1788, "seductive"),
ClassicalMusic("Beethoven", "Symphony No. 9: Ode To Joy", 1824, "joyful"),
ClassicalMusic("Bizet", "Carmen: Habanera", 1875, "seductive")
)
val classicalMusics2 =
List(
ClassicalMusic("Handel", "Water Music: Alla Hornpipe", 1717, "formal"),
ClassicalMusic("Vivaldi", "Four Seasons: Spring", 1723, "formal"),
ClassicalMusic("Bach",
"Cantata 147: Jesu, Joy Of Man's Desiring",
1723,
"wedding"),
ClassicalMusic("Mozart", "Piano Sonata No. 16", 1788, "piano"),
ClassicalMusic("Beethoven", "Symphony No. 9: II", 1824, "powerful"),
ClassicalMusic("Tchaikovsky", "Piano Concerto No. 1", 1875, "piano")
)
def main(args: Array[String]): Unit = {
val builder = Builder.newBuilder()
val musicSource2 = builder
.newSource[ClassicalMusic](() =>
getRandomClassicalMusic(classicalMusics2))
.setName("classical-music-source-2")
val musicSource = builder
.newSource[ClassicalMusic](() => getRandomClassicalMusic(classicalMusics))
.setName("classical-music-source-1")
musicSource
.filter(_.year <= 1850)
.setName("classical-musics-older-than-1850")
.join[Int, ClassicalMusic, String](
musicSource2,
(cm: ClassicalMusic) => cm.year,
(cm: ClassicalMusic) => cm.year,
WindowConfig.TumblingCountWindow(50),
JoinType.INNER,
(cm1: ClassicalMusic, cm2: ClassicalMusic) =>
s"${cm1.composer.toUpperCase} - ${cm1.title} - ${cm2.composer.toUpperCase} - ${cm2.title}"
)
.setName("joined-classical-musics-by-year")
.log()
.setName("log")
val config = Config.defaultConfig()
// Fetches the topology name from the first command-line argument
val topologyName = ScalaTopologyExampleUtils.getTopologyName(args)
// Finally, the processing graph and configuration are passed to the Runner, which converts
// the graph into a Heron topology that can be run in a Heron cluster.
new Runner().run(topologyName, config, builder)
}
private def getRandomClassicalMusic(
list: List[ClassicalMusic]): ClassicalMusic =
list(Random.nextInt(list.size))
}