blob: 804acfcf045d2236f730586c3bcb655bbbc996bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{AsyncDataStream => JavaAsyncDataStream}
import org.apache.flink.streaming.api.functions.async.{AsyncFunction => JavaAsyncFunction, ResultFuture => JavaResultFuture}
import org.apache.flink.streaming.api.scala.async._
import org.apache.flink.util.Preconditions
import scala.concurrent.duration.TimeUnit
/**
* A helper class to apply [[AsyncFunction]] to a data stream.
*
* Example:
* {{{
* val input: DataStream[String] = ...
* val asyncFunction: (String, ResultFuture[String]) => Unit = ...
*
* AsyncDataStream.orderedWait(input, asyncFunction, timeout, TimeUnit.MILLISECONDS, 100)
* }}}
*/
@PublicEvolving
object AsyncDataStream {
private val DEFAULT_QUEUE_CAPACITY = 100
/**
* Apply an asynchronous function on the input data stream. The output order is only maintained
* with respect to watermarks. Stream records which lie between the same two watermarks, can be
* re-ordered.
*
* @param input to apply the async function on
* @param asyncFunction to use
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @param capacity of the operator which is equivalent to the number of concurrent asynchronous
* operations
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit,
capacity: Int)
: DataStream[OUT] = {
val javaAsyncFunction = wrapAsJavaAsyncFunction(asyncFunction)
val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
asScalaStream(JavaAsyncDataStream.unorderedWait[IN, OUT](
input.javaStream,
javaAsyncFunction,
timeout,
timeUnit,
capacity).returns(outType))
}
/**
* Apply an asynchronous function on the input data stream. The output order is only maintained
* with respect to watermarks. Stream records which lie between the same two watermarks, can be
* re-ordered.
*
* @param input to apply the async function on
* @param asyncFunction to use
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit)
: DataStream[OUT] = {
unorderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)
}
/**
* Apply an asynchronous function on the input data stream. The output order is only maintained
* with respect to watermarks. Stream records which lie between the same two watermarks, can be
* re-ordered.
*
* @param input to apply the async function on
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @param capacity of the operator which is equivalent to the number of concurrent asynchronous
* operations
* @param asyncFunction to use
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
timeout: Long,
timeUnit: TimeUnit,
capacity: Int) (
asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
Preconditions.checkNotNull(asyncFunction)
val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
val func = new JavaAsyncFunction[IN, OUT] {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
cleanAsyncFunction(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
}
val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
asScalaStream(JavaAsyncDataStream.unorderedWait[IN, OUT](
input.javaStream,
func,
timeout,
timeUnit,
capacity).returns(outType))
}
/**
* Apply an asynchronous function on the input data stream. The output order is only maintained
* with respect to watermarks. Stream records which lie between the same two watermarks, can be
* re-ordered.
*
* @param input to apply the async function on
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @param asyncFunction to use
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
timeout: Long,
timeUnit: TimeUnit) (
asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
unorderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
}
/**
* Apply an asynchronous function on the input data stream. The output order is the same as the
* input order of the elements.
*
* @param input to apply the async function on
* @param asyncFunction to use
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @param capacity of the operator which is equivalent to the number of concurrent asynchronous
* operations
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit,
capacity: Int)
: DataStream[OUT] = {
val javaAsyncFunction = wrapAsJavaAsyncFunction(asyncFunction)
val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
asScalaStream(JavaAsyncDataStream.orderedWait[IN, OUT](
input.javaStream,
javaAsyncFunction,
timeout,
timeUnit,
capacity).returns(outType))
}
/**
* Apply an asynchronous function on the input data stream. The output order is the same as the
* input order of the elements.
*
* @param input to apply the async function on
* @param asyncFunction to use
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit)
: DataStream[OUT] = {
orderedWait(input, asyncFunction, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)
}
/**
* Apply an asynchronous function on the input data stream. The output order is the same as the
* input order of the elements.
*
* @param input to apply the async function on
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @param capacity of the operator which is equivalent to the number of concurrent asynchronous
* operations
* @param asyncFunction to use
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
timeout: Long,
timeUnit: TimeUnit,
capacity: Int) (
asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
Preconditions.checkNotNull(asyncFunction)
val cleanAsyncFunction = input.executionEnvironment.scalaClean(asyncFunction)
val func = new JavaAsyncFunction[IN, OUT] {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
cleanAsyncFunction(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
}
val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
asScalaStream(JavaAsyncDataStream.orderedWait[IN, OUT](
input.javaStream,
func,
timeout,
timeUnit,
capacity).returns(outType))
}
/**
* Apply an asynchronous function on the input data stream. The output order is the same as the
* input order of the elements.
*
* @param input to apply the async function on
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the timeout
* @param asyncFunction to use
* @tparam IN Type of the input record
* @tparam OUT Type of the output record
* @return the resulting stream containing the asynchronous results
*/
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
timeout: Long,
timeUnit: TimeUnit) (
asyncFunction: (IN, ResultFuture[OUT]) => Unit)
: DataStream[OUT] = {
orderedWait(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY)(asyncFunction)
}
private def wrapAsJavaAsyncFunction[IN, OUT: TypeInformation](
asyncFunction: AsyncFunction[IN, OUT])
: JavaAsyncFunction[IN, OUT] = asyncFunction match {
case richAsyncFunction: RichAsyncFunction[IN, OUT] =>
new ScalaRichAsyncFunctionWrapper[IN, OUT](richAsyncFunction)
case _ =>
new JavaAsyncFunction[IN, OUT] {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
}
}
}