val result = initialRanks.iterate(30) { pages => pages.join(adjacency).where(“pageId”).equalTo(“pageId”) {
(page, adj, out : Collector[Page]) => { out.collect(Page(page.id, 0.15 / numPages)) for (n <- adj.neighbors) { out.collect(Page(n, 0.85*page.rank/adj.neighbors.length)) } }
} .groupBy(“pageId”).sum(“rank”) } {% endhighlight %}
val texts: DataStream[String] = ...
val counts = text .flatMap { line => line.split(“\W+”) } .map { token => Word(token, 1) } .groupBy(“word”) .window(Time.of(5, SECONDS)).every(Time.of(1, SECONDS)) .sum(“freq”) {% endhighlight %}