blob: 7f3c250fa3a3f61f0764cec3ade10faba3cc6f2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.dsl.javaapi
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
import org.apache.gearpump.streaming.dsl.window.api.Window
import org.apache.gearpump.streaming.task.Task
/**
* Java DSL
*/
class JavaStream[T](val stream: Stream[T]) {
/** FlatMap on stream */
def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
}
/** Map on stream */
def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
}
/** Only keep the messages that FilterFunction returns true. */
def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
}
/** Does aggregation on the stream */
def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
new JavaStream[T](stream.reduce(fn, description))
}
def log(): Unit = {
stream.log()
}
/** Merges streams of same type together */
def merge(other: JavaStream[T], description: String): JavaStream[T] = {
new JavaStream[T](stream.merge(other.stream, description))
}
/**
* Group by a stream and turns it to a list of sub-streams. Operations chained after
* groupBy applies to sub-streams.
*/
def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
parallelism: Int, description: String): JavaStream[T] = {
new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
}
def window(win: Window, description: String): JavaWindowStream[T] = {
new JavaWindowStream[T](stream.window(win, description))
}
/** Add a low level Processor to process messages */
def process[R](
processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
: JavaStream[R] = {
new JavaStream[R](stream.process(processor, parallelism, conf, description))
}
}
class JavaWindowStream[T](stream: WindowStream[T]) {
def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
description: String): JavaStream[T] = {
new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
}
}