blob: c6cfd1fdd4e2657400f15eead9ebeedd174ad7e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import java.net.URI
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.{Source, SourceSplit}
import org.apache.flink.api.connector.source.lib.NumberSequenceSource
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.{JobClient, JobListener}
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.SplittableIterator
import _root_.scala.language.implicitConversions
import scala.collection.JavaConverters._
@Public
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
/**
* @return the wrapped Java environment
*/
def getJavaEnv: JavaEnv = javaEnv
/**
* Gets the config object.
*/
def getConfig = javaEnv.getConfig
/**
* Gets cache files.
*/
def getCachedFiles = javaEnv.getCachedFiles
/**
* Gets the config JobListeners.
*/
@PublicEvolving
def getJobListeners = javaEnv.getJobListeners
/**
* Sets the parallelism for operations executed through this environment.
* Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
* with x parallel instances. This value can be overridden by specific operations using
* [[DataStream#setParallelism(int)]].
*/
def setParallelism(parallelism: Int): Unit = {
javaEnv.setParallelism(parallelism)
}
/**
* Sets the runtime execution mode for the application (see [[RuntimeExecutionMode]]).
* This is equivalent to setting the "execution.runtime-mode" in your application's
* configuration file.
*
* We recommend users to NOT use this method but set the "execution.runtime-mode"
* using the command-line when submitting the application. Keeping the application code
* configuration-free allows for more flexibility as the same application will be able to
* be executed in any execution mode.
*
* @param executionMode the desired execution mode.
* @return The execution environment of your application.
*/
@PublicEvolving
def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment = {
javaEnv.setRuntimeMode(executionMode)
this
}
/**
* Sets the maximum degree of parallelism defined for the program.
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
**/
def setMaxParallelism(maxParallelism: Int): Unit = {
javaEnv.setMaxParallelism(maxParallelism)
}
/**
* Returns the default parallelism for this execution environment. Note that this
* value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
*/
def getParallelism = javaEnv.getParallelism
/**
* Returns the maximum degree of parallelism defined for the program.
*
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
*/
def getMaxParallelism = javaEnv.getMaxParallelism
/**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. By default the output buffers flush frequently to provide
* low latency and to aid smooth developer experience. Setting the parameter
* can result in three logical modes:
*
* <ul>
* <li>A positive integer triggers flushing periodically by that integer</li>
* <li>0 triggers flushing after every record thus minimizing latency</li>
* <li>-1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
* </ul>
*/
def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
javaEnv.setBufferTimeout(timeoutMillis)
this
}
/**
* Gets the default buffer timeout set for this environment
*/
def getBufferTimeout = javaEnv.getBufferTimeout
/**
* Disables operator chaining for streaming operators. Operator chaining
* allows non-shuffle operations to be co-located in the same thread fully
* avoiding serialization and de-serialization.
*
*/
@PublicEvolving
def disableOperatorChaining(): StreamExecutionEnvironment = {
javaEnv.disableOperatorChaining()
this
}
// ------------------------------------------------------------------------
// Checkpointing Settings
// ------------------------------------------------------------------------
/**
* Gets the checkpoint config, which defines values like checkpoint interval, delay between
* checkpoints, etc.
*/
def getCheckpointConfig = javaEnv.getCheckpointConfig()
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
*
* The job draws checkpoints periodically, in the given interval. The state will be
* stored in the configured state backend.
*
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. If the "force" parameter is set to true, the system will execute the
* job nonetheless.
*
* @param interval
* Time interval between state checkpoints in millis.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
* @param force
* If true checkpointing will be enabled for iterative jobs as well.
*/
@deprecated
@PublicEvolving
def enableCheckpointing(interval : Long,
mode: CheckpointingMode,
force: Boolean) : StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval, mode, force)
this
}
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
*
* The job draws checkpoints periodically, in the given interval. The system uses the
* given [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once").
* The state will be stored in the configured state backend.
*
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
*
* @param interval
* Time interval between state checkpoints in milliseconds.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
*/
def enableCheckpointing(interval : Long,
mode: CheckpointingMode) : StreamExecutionEnvironment = {
javaEnv.enableCheckpointing(interval, mode)
this
}
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
*
* The job draws checkpoints periodically, in the given interval. The program will use
* [[CheckpointingMode.EXACTLY_ONCE]] mode. The state will be stored in the
* configured state backend.
*
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
*
* @param interval
* Time interval between state checkpoints in milliseconds.
*/
def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
}
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of streaming
* operator states. Time interval between state checkpoints is specified in in millis.
*
* Setting this option assumes that the job is used in production and thus if not stated
* explicitly otherwise with calling the [[setRestartStrategy]] method in case of
* failure the job will be resubmitted to the cluster indefinitely.
*/
@deprecated
@PublicEvolving
def enableCheckpointing() : StreamExecutionEnvironment = {
javaEnv.enableCheckpointing()
this
}
def getCheckpointingMode = javaEnv.getCheckpointingMode()
/**
* Sets the state backend that describes how to store operator. It defines the data structures
* that hold state during execution (for example hash tables, RocksDB, or other data stores).
*
* State managed by the state backend includes both keyed state that is accessible on
* [[org.apache.flink.streaming.api.scala.KeyedStream]], as well as state
* maintained directly by the user code that implements
* [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction]].
*
* The [[org.apache.flink.runtime.state.hashmap.HashMapStateBackend]] maintains state in
* heap memory, as objects. It is lightweight without extra dependencies, but is limited to JVM
* heap memory.
*
* In contrast, the '''EmbeddedRocksDBStateBackend''' stores its state in an embedded
* '''RocksDB''' instance. This state backend can store very large state that exceeds memory
* and spills to local disk. All key/value state (including windows) is stored in the key/value
* index of RocksDB.
*
* In both cases, fault tolerance is managed via the jobs
* [[org.apache.flink.runtime.state.CheckpointStorage]] which configures how and where state
* backends persist during a checkpoint.
*
* @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
* @see #getStateBackend()
*/
@PublicEvolving
def setStateBackend(backend: StateBackend): StreamExecutionEnvironment = {
javaEnv.setStateBackend(backend)
this
}
/**
* Returns the state backend that defines how to store and checkpoint state.
*/
@PublicEvolving
def getStateBackend: StateBackend = javaEnv.getStateBackend()
/**
* Sets the default savepoint directory, where savepoints will be written to
* if no is explicitly provided when triggered.
*
* @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
* @see #getDefaultSavepointDirectory()
*/
@PublicEvolving
def setDefaultSavepointDirectory(savepointDirectory: String): StreamExecutionEnvironment = {
javaEnv.setDefaultSavepointDirectory(savepointDirectory)
this
}
/**
* Sets the default savepoint directory, where savepoints will be written to
* if no is explicitly provided when triggered.
*
* @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
* @see #getDefaultSavepointDirectory()
*/
@PublicEvolving
def setDefaultSavepointDirectory(savepointDirectory: URI): StreamExecutionEnvironment = {
javaEnv.setDefaultSavepointDirectory(savepointDirectory)
this
}
/**
* Sets the default savepoint directory, where savepoints will be written to
* if no is explicitly provided when triggered.
*
* @return This StreamExecutionEnvironment itself, to allow chaining of function calls.
* @see #getDefaultSavepointDirectory()
*/
@PublicEvolving
def setDefaultSavepointDirectory(savepointDirectory: Path): StreamExecutionEnvironment = {
javaEnv.setDefaultSavepointDirectory(savepointDirectory)
this
}
/**
* Gets the default savepoint directory for this Job.
*
* @see #setDefaultSavepointDirectory(Path)
*/
@PublicEvolving
def getDefaultSavepointDirectory: Path = javaEnv.getDefaultSavepointDirectory
/**
* Sets the restart strategy configuration. The configuration specifies which restart strategy
* will be used for the execution graph in case of a restart.
*
* @param restartStrategyConfiguration Restart strategy configuration to be set
*/
@PublicEvolving
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit = {
javaEnv.setRestartStrategy(restartStrategyConfiguration)
}
/**
* Returns the specified restart strategy configuration.
*
* @return The restart strategy configuration to be used
*/
@PublicEvolving
def getRestartStrategy: RestartStrategyConfiguration = {
javaEnv.getRestartStrategy()
}
/**
* Sets the number of times that failed tasks are re-executed. A value of zero
* effectively disables fault tolerance. A value of "-1" indicates that the system
* default value (as defined in the configuration) should be used.
*
* @deprecated This method will be replaced by [[setRestartStrategy()]]. The
* FixedDelayRestartStrategyConfiguration contains the number of execution retries.
*/
@PublicEvolving
def setNumberOfExecutionRetries(numRetries: Int): Unit = {
javaEnv.setNumberOfExecutionRetries(numRetries)
}
/**
* Gets the number of times the system will try to re-execute failed tasks. A value
* of "-1" indicates that the system default value (as defined in the configuration)
* should be used.
*
* @deprecated This method will be replaced by [[getRestartStrategy]]. The
* FixedDelayRestartStrategyConfiguration contains the number of execution retries.
*/
@PublicEvolving
def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
// --------------------------------------------------------------------------------------------
// Registry for types and serializers
// --------------------------------------------------------------------------------------------
/**
* Adds a new Kryo default serializer to the Runtime.
* <p/>
* Note that the serializer instance must be serializable (as defined by
* java.io.Serializable), because it may be distributed to the worker nodes
* by java serialization.
*
* @param type
* The class of the types serialized with the given serializer.
* @param serializer
* The serializer to use.
*/
def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
`type`: Class[_],
serializer: T)
: Unit = {
javaEnv.addDefaultKryoSerializer(`type`, serializer)
}
/**
* Adds a new Kryo default serializer to the Runtime.
*
* @param type
* The class of the types serialized with the given serializer.
* @param serializerClass
* The class of the serializer to use.
*/
def addDefaultKryoSerializer(`type`: Class[_], serializerClass: Class[_ <: Serializer[_]]) {
javaEnv.addDefaultKryoSerializer(`type`, serializerClass)
}
/**
* Registers the given type with the serializer at the [[KryoSerializer]].
*
* Note that the serializer instance must be serializable (as defined by java.io.Serializable),
* because it may be distributed to the worker nodes by java serialization.
*/
def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
clazz: Class[_],
serializer: T)
: Unit = {
javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
}
/**
* Registers the given type with the serializer at the [[KryoSerializer]].
*/
def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
}
/**
* Registers the given type with the serialization stack. If the type is eventually
* serialized as a POJO, then the type is registered with the POJO serializer. If the
* type ends up being serialized with Kryo, then it will be registered at Kryo to make
* sure that only tags are written.
*
*/
def registerType(typeClass: Class[_]) {
javaEnv.registerType(typeClass)
}
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using
* [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
*
* @param characteristic The time characteristic.
* @deprecated In Flink 1.12 the default stream time characteristic has been changed to
* [[TimeCharacteristic.EventTime]], thus you don't need to call this method for
* enabling event-time support anymore. Explicitly using processing-time windows and
* timers works in event-time mode. If you need to disable watermarks, please use
* [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long]]. If
* you are using [[TimeCharacteristic.IngestionTime]], please manually set an
* appropriate [[WatermarkStrategy]]. If you are using generic "time window"
* operations (for example [[KeyedStream.timeWindow()]] that change behaviour based
* on the time characteristic, please use equivalent operations that explicitly
* specify processing time or event time.
*/
@deprecated
@PublicEvolving
def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
javaEnv.setStreamTimeCharacteristic(characteristic)
}
/**
* Gets the time characteristic/
*
* @see #setStreamTimeCharacteristic
* @return The time characteristic.
*/
@PublicEvolving
def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic()
/**
* Sets all relevant options contained in the [[ReadableConfig]] such as e.g.
* [[org.apache.flink.streaming.api.environment.StreamPipelineOptions#TIME_CHARACTERISTIC]].
* It will reconfigure [[StreamExecutionEnvironment]],
* [[org.apache.flink.api.common.ExecutionConfig]] and
* [[org.apache.flink.streaming.api.environment.CheckpointConfig]].
*
* It will change the value of a setting only if a corresponding option was set in the
* `configuration`. If a key is not present, the current value of a field will remain
* untouched.
*
* @param configuration a configuration to read the values from
* @param classLoader a class loader to use when loading classes
*/
@PublicEvolving
def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit = {
javaEnv.configure(configuration, classLoader)
}
// --------------------------------------------------------------------------------------------
// Data stream creations
// --------------------------------------------------------------------------------------------
/**
* Creates a new DataStream that contains a sequence of numbers. This source is a parallel source.
* If you manually set the parallelism to `1` the emitted elements are in order.
*
* @deprecated Use [[fromSequence(long, long)]] instead to create a new data stream
* that contains [[NumberSequenceSource]].
*/
@deprecated
def generateSequence(from: Long, to: Long): DataStream[Long] = {
new DataStream[java.lang.Long](javaEnv.generateSequence(from, to))
.asInstanceOf[DataStream[Long]]
}
/**
* Creates a new data stream that contains a sequence of numbers (longs) and is useful for
* testing and for cases that just need a stream of N events of any kind.
*
* The generated source splits the sequence into as many parallel sub-sequences as there are
* parallel source readers. Each sub-sequence will be produced in order. If the parallelism is
* limited to one, the source will produce one sequence in order.
*
* This source is always bounded. For very long sequences (for example over the entire domain
* of long integer values), you may consider executing the application in a streaming manner
* because of the end bound that is pretty far away.
*
* Use [[fromSource(Source,WatermarkStrategy, String)]] together with
* [[NumberSequenceSource]] if you required more control over the created sources. For
* example, if you want to set a [[WatermarkStrategy]].
*/
def fromSequence(from: Long, to: Long): DataStream[Long] = {
new DataStream[java.lang.Long](javaEnv.fromSequence(from, to))
.asInstanceOf[DataStream[Long]]
}
/**
* Creates a DataStream that contains the given elements. The elements must all be of the
* same type.
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromElements[T: TypeInformation](data: T*): DataStream[T] = {
fromCollection(data)
}
/**
* Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
* because the framework may move the elements into the cluster if needed.
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] = {
require(data != null, "Data must not be null.")
val typeInfo = implicitly[TypeInformation[T]]
val collection = scala.collection.JavaConversions.asJavaCollection(data)
asScalaStream(javaEnv.fromCollection(collection, typeInfo))
}
/**
* Creates a DataStream from the given [[Iterator]].
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromCollection(data.asJava, typeInfo))
}
/**
* Creates a DataStream from the given [[SplittableIterator]].
*/
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]):
DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromParallelCollection(data, typeInfo))
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*/
def readTextFile(filePath: String): DataStream[String] =
asScalaStream(javaEnv.readTextFile(filePath))
/**
* Creates a data stream that represents the Strings produced by reading the given file
* line wise. The character set with the given name will be used to read the files.
*/
def readTextFile(filePath: String, charsetName: String): DataStream[String] =
asScalaStream(javaEnv.readTextFile(filePath, charsetName))
/**
* Reads the given file with the given input format. The file path should be passed
* as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
*/
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
DataStream[T] =
asScalaStream(javaEnv.readFile(inputFormat, filePath))
/**
* Creates a DataStream that contains the contents of file created while
* system watches the given path. The file will be read with the system's
* default character set. The user can check the monitoring interval in milliseconds,
* and the way file modifications are handled. By default it checks for only new files
* every 100 milliseconds.
*
*/
@Deprecated
def readFileStream(StreamPath: String, intervalMillis: Long = 100,
watchType: FileMonitoringFunction.WatchType =
FileMonitoringFunction.WatchType.ONLY_NEW_FILES): DataStream[String] =
asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType))
/**
* Reads the contents of the user-specified path based on the given [[FileInputFormat]].
* Depending on the provided [[FileProcessingMode]].
*
* @param inputFormat
* The input format used to create the data stream
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or
* "hdfs://host:port/file/path")
* @param watchType
* The mode in which the source should operate, i.e. monitor path and react
* to new data, or process once and exit
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis)
* between consecutive path scans
* @param filter
* The files to be excluded from the processing
* @return The data stream that represents the data read from the given file
* @deprecated Use [[FileInputFormat#setFilesFilter(FilePathFilter)]] to set a filter and
* [[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]]
*/
@PublicEvolving
@Deprecated
def readFile[T: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode,
interval: Long,
filter: FilePathFilter): DataStream[T] = {
asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter))
}
/**
* Reads the contents of the user-specified path based on the given [[FileInputFormat]].
* Depending on the provided [[FileProcessingMode]], the source
* may periodically monitor (every `interval` ms) the path for new data
* ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
* once the data currently in the path and exit
* ([[FileProcessingMode.PROCESS_ONCE]]). In addition,
* if the path contains files not to be processed, the user can specify a custom
* [[FilePathFilter]]. As a default implementation you can use
* [[FilePathFilter.createDefaultFilter()]].
*
* ** NOTES ON CHECKPOINTING: ** If the `watchType` is set to
* [[FileProcessingMode#PROCESS_ONCE]], the source monitors the path ** once **,
* creates the [[org.apache.flink.core.fs.FileInputSplit FileInputSplits]]
* to be processed, forwards them to the downstream
* [[ContinuousFileReaderOperator readers]] to read the actual data,
* and exits, without waiting for the readers to finish reading. This
* implies that no more checkpoint barriers are going to be forwarded
* after the source exits, thus having no checkpoints after that point.
*
* @param inputFormat
* The input format used to create the data stream
* @param filePath
* The path of the file, as a URI (e.g., "file:///some/local/file" or
* "hdfs://host:port/file/path")
* @param watchType
* The mode in which the source should operate, i.e. monitor path and react
* to new data, or process once and exit
* @param interval
* In the case of periodic path monitoring, this specifies the interval (in millis)
* between consecutive path scans
* @return The data stream that represents the data read from the given file
*/
@PublicEvolving
def readFile[T: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode,
interval: Long): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))
}
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
* character set. The maximum retry interval is specified in seconds, in case
* of temporary service outage reconnection is initiated every second.
*/
@PublicEvolving
def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
DataStream[String] =
asScalaStream(javaEnv.socketTextStream(hostname, port))
/**
* Generic method to create an input data stream with a specific input format.
* Since all data streams need specific information about their types, this method needs to
* determine the type of the data produced by the input format. It will attempt to determine the
* data type by reflection, unless the input format implements the ResultTypeQueryable interface.
*/
@PublicEvolving
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
if (inputFormat.isInstanceOf[ResultTypeQueryable[_]]) {
asScalaStream(javaEnv.createInput(inputFormat))
} else {
asScalaStream(javaEnv.createInput(inputFormat, implicitly[TypeInformation[T]]))
}
/**
* Create a DataStream using a user defined source function for arbitrary
* source functionality. By default sources have a parallelism of 1.
* To enable parallel execution, the user defined source should implement
* ParallelSourceFunction or extend RichParallelSourceFunction.
* In these cases the resulting source will have the parallelism of the environment.
* To change this afterwards call DataStreamSource.setParallelism(int)
*
*/
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")
val cleanFun = scalaClean(function)
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.addSource(cleanFun, typeInfo))
}
/**
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
*/
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
require(function != null, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
val cleanFun = scalaClean(function)
override def run(ctx: SourceContext[T]) {
cleanFun(ctx)
}
override def cancel() = {}
}
addSource(sourceFunction)
}
/**
* Create a DataStream using a [[Source]].
*/
@Experimental
def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
}
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with a generated
* default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
*/
def execute() = javaEnv.execute()
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with the provided name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
*/
def execute(jobName: String) = javaEnv.execute(jobName)
/**
* Register a [[JobListener]] in this environment. The [[JobListener]] will be
* notified on specific job status changed.
*/
@PublicEvolving
def registerJobListener(jobListener: JobListener): Unit = {
javaEnv.registerJobListener(jobListener)
}
/**
* Clear all registered [[JobListener]]s.
*/
@PublicEvolving def clearJobListeners(): Unit = {
javaEnv.clearJobListeners()
}
/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with a generated
* default name.
*
* <b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle
* of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient
* implementation.
*
* @return A [[JobClient]] that can be used to communicate with the submitted job,
* completed on submission succeeded.
*/
@PublicEvolving
def executeAsync(): JobClient = javaEnv.executeAsync()
/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with the provided name.
*
* <b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle
* of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient
* implementation.
*
* @return A [[JobClient]] that can be used to communicate with the submitted job,
* completed on submission succeeded.
*/
@PublicEvolving
def executeAsync(jobName: String): JobClient = javaEnv.executeAsync(jobName)
/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*/
def getExecutionPlan = javaEnv.getExecutionPlan
/**
* Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job.
* This call clears previously registered
* [[org.apache.flink.api.dag.Transformation transformations]].
*
* @return The StreamGraph representing the transformations
*/
@Internal
def getStreamGraph = javaEnv.getStreamGraph
/**
* Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job.
* This call clears previously registered
* [[org.apache.flink.api.dag.Transformation transformations]].
*
* @param jobName Desired name of the job
* @return The StreamGraph representing the transformations
*/
@Internal
def getStreamGraph(jobName: String) = javaEnv.getStreamGraph(jobName)
/**
* Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job
* with the option to clear previously registered
* [[org.apache.flink.api.dag.Transformation transformations]]. Clearing the transformations
* allows, for example, to not re-execute the same operations when calling
* [[execute()]] multiple times.
*
* @param jobName Desired name of the job
* @param clearTransformations Whether or not to clear previously registered transformations
* @return The StreamGraph representing the transformations
*/
@Internal
def getStreamGraph(jobName: String, clearTransformations: Boolean) =
javaEnv.getStreamGraph(jobName, clearTransformations)
/**
* Getter of the wrapped [[org.apache.flink.streaming.api.environment.StreamExecutionEnvironment]]
*
* @return The encased ExecutionEnvironment
*/
@Internal
def getWrappedStreamExecutionEnvironment = javaEnv
/**
* Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
*/
private[flink] def scalaClean[F <: AnyRef](f: F): F = {
if (getConfig.isClosureCleanerEnabled) {
ClosureCleaner.clean(f, true, getConfig.getClosureCleanerLevel)
} else {
ClosureCleaner.ensureSerializable(f)
}
f
}
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (which will be distributed via BlobServer), or files in a distributed file
* system. The runtime will copy the files temporarily to a local cache, if needed.
*
* The [[org.apache.flink.api.common.functions.RuntimeContext]] can be obtained inside UDFs
* via [[org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()]] and
* provides access [[org.apache.flink.api.common.cache.DistributedCache]] via
* [[org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()]].
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
* "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
def registerCachedFile(filePath: String, name: String): Unit = {
javaEnv.registerCachedFile(filePath, name)
}
/**
* Registers a file at the distributed cache under the given name. The file will be accessible
* from any user-defined function in the (distributed) runtime under a local path. Files
* may be local files (which will be distributed via BlobServer), or files in a distributed file
* system. The runtime will copy the files temporarily to a local cache, if needed.
*
* The [[org.apache.flink.api.common.functions.RuntimeContext]] can be obtained inside UDFs
* via [[org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()]] and
* provides access [[org.apache.flink.api.common.cache.DistributedCache]] via
* [[org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()]].
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
* "hdfs://host:port/and/path")
* @param name The name under which the file is registered.
* @param executable flag indicating whether the file should be executable
*/
def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit = {
javaEnv.registerCachedFile(filePath, name, executable)
}
/**
* Returns whether Unaligned Checkpoints are enabled.
*/
def isUnalignedCheckpointsEnabled: Boolean = javaEnv.isUnalignedCheckpointsEnabled
/**
* Returns whether Unaligned Checkpoints are force-enabled.
*/
def isForceUnalignedCheckpoints: Boolean = javaEnv.isForceUnalignedCheckpoints
}
object StreamExecutionEnvironment {
/**
* Sets the default parallelism that will be used for the local execution
* environment created by [[createLocalEnvironment()]].
*
* @param parallelism The default parallelism to use for local execution.
*/
@PublicEvolving
def setDefaultLocalParallelism(parallelism: Int) : Unit =
JavaEnv.setDefaultLocalParallelism(parallelism)
/**
* Gets the default parallelism that will be used for the local execution environment created by
* [[createLocalEnvironment()]].
*/
@PublicEvolving
def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
// --------------------------------------------------------------------------
// context environment
// --------------------------------------------------------------------------
/**
* Creates an execution environment that represents the context in which the program is
* currently executed. If the program is invoked standalone, this method returns a local
* execution environment. If the program is invoked from within the command line client
* to be submitted to a cluster, this method returns the execution environment of this cluster.
*/
def getExecutionEnvironment: StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment)
}
// --------------------------------------------------------------------------
// local environment
// --------------------------------------------------------------------------
/**
* Creates a local execution environment. The local execution environment will run the
* program in a multi-threaded fashion in the same JVM as the environment was created in.
*
* This method sets the environment's default parallelism to given parameter, which
* defaults to the value set via [[setDefaultLocalParallelism(Int)]].
*/
def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}
/**
* Creates a local execution environment. The local execution environment will run the
* program in a multi-threaded fashion in the same JVM as the environment was created in.
*
* @param parallelism The parallelism for the local environment.
* @param configuration Pass a custom configuration into the cluster.
*/
def createLocalEnvironment(parallelism: Int, configuration: Configuration):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism, configuration))
}
/**
* Creates a [[StreamExecutionEnvironment]] for local program execution that also starts the
* web monitoring UI.
*
* The local execution environment will run the program in a multi-threaded fashion in
* the same JVM as the environment was created in. It will use the parallelism specified in the
* parameter.
*
* If the configuration key 'rest.port' was set in the configuration, that particular
* port will be used for the web UI. Otherwise, the default port (8081) will be used.
*
* @param config optional config for the local execution
* @return The created StreamExecutionEnvironment
*/
@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
val conf: Configuration = if (config == null) new Configuration() else config
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}
// --------------------------------------------------------------------------
// remote environment
// --------------------------------------------------------------------------
/**
* Creates a remote execution environment. The remote environment sends (parts of) the program to
* a cluster for execution. Note that all file paths used in the program must be accessible from
* the cluster. The execution will use the cluster's default parallelism, unless the
* parallelism is set explicitly via [[StreamExecutionEnvironment.setParallelism()]].
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
* program uses
* user-defined functions, user-defined input formats, or any libraries,
* those must be
* provided in the JAR files.
*/
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
}
/**
* Creates a remote execution environment. The remote environment sends (parts of) the program
* to a cluster for execution. Note that all file paths used in the program must be accessible
* from the cluster. The execution will use the specified parallelism.
*
* @param host The host name or address of the master (JobManager),
* where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param parallelism The parallelism to use during the execution.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
* program uses
* user-defined functions, user-defined input formats, or any libraries,
* those must be
* provided in the JAR files.
*/
def createRemoteEnvironment(
host: String,
port: Int,
parallelism: Int,
jarFiles: String*): StreamExecutionEnvironment = {
val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
javaEnv.setParallelism(parallelism)
new StreamExecutionEnvironment(javaEnv)
}
}