title: “Features” layout: features













val result = initialRanks.iterate(30) { pages => pages.join(adjacency).where(“pageId”).equalTo(“pageId”) {

(page, adj, out : Collector[Page]) => {
  out.collect(Page(page.id, 0.15 / numPages))
    
  for (n <- adj.neighbors) {
    out.collect(Page(n, 0.85*page.rank/adj.neighbors.length))
  }
}

} .groupBy(“pageId”).sum(“rank”) } {% endhighlight %}


val texts: DataStream[String] = ...

val counts = text .flatMap { line => line.split(“\W+”) } .map { token => Word(token, 1) } .groupBy(“word”) .window(Time.of(5, SECONDS)).every(Time.of(1, SECONDS)) .sum(“freq”) {% endhighlight %}