blob: 8736ec50e21833730fed4d7bc21af789ac08990a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Data Streaming
:javaFile: {javaCodeDir}/DataStreaming.java
== Overview
Ignite provides a Data Streaming API that can be used to inject large amounts of continuous streams of data into an Ignite cluster.
The Data Streaming API is designed to be scalable and fault-tolerant, and provides _at-least-once_ delivery semantics for the data streamed into Ignite, meaning each entry is processed at least once.
Data is streamed into a cache via a <<Data Streamers, data streamer>> associated with the cache. Data streamers automatically buffer the data and group it into batches for better performance and send it in parallel to multiple nodes.
The Data Streaming API provides the following features:
* The data that is added to a data streamer is automatically partitioned and distributed between the nodes.
* You can process the data concurrently in a colocated fashion.
* Clients can perform concurrent SQL queries on the data as it is being streamed in.
image:images/data_streaming.png[Data Streaming]
== Data Streamers
A data streamer is associated with a specific cache and provides an interface for streaming data into the cache.
In a typical scenario, you obtain a data streamer and use one of its methods to stream data into the cache, and Ignite takes care of data partitioning and colocation by batching data entries according to partitioning rules to avoid unnecessary data movement.
You can obtain the data streamer for a specific cache as follows:
[tabs]
--
tab:Java[]
[source, java]
----
include::{javaFile}[tag=dataStreamer1,indent=0]
----
In the Java version of Ignite, a data streamer is an implementation of the `IgniteDataStreamer` interface. `IgniteDataStreamer` provides a number of `addData(...)` methods for adding key-value pairs to caches. Refer to the link:{javadoc_base_url}/org/apache/ignite/IgniteDataStreamer.html[IgniteDataStreamer] javadoc for the complete list of methods.
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer1,indent=0]
----
tab:C++[unsupported]
--
== Overwriting Existing Keys
By default, data streamers do not overwrite existing data and skip entries that are already in the cache. You can change that behavior by setting the `allowOverwrite` property of the data streamer to `true`.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=dataStreamer2,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DataStreaming.cs[tag=dataStreamer2,indent=0]
----
tab:C++[unsupported]
--
NOTE: When `allowOverwrite` is set to `false` (default), the updates are not propagated to the link:persistence/external-storage[external storage] (if it is used).
== Processing Data
In cases when you need to execute custom logic before adding new data, you can use a stream receiver.
A stream receiver is used to process the data in a colocated manner before it is stored into the cache.
The logic implemented in a stream receiver is executed on the node where data is to be stored.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=streamReceiver,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DataStreaming.cs[tag=streamReceiver,indent=0]
----
tab:C++[unsupported]
--
NOTE: Note that a stream receiver does not put data into the cache automatically. You need to call one of the `put(...)` methods explicitly.
[IMPORTANT]
====
The class definitions of the stream receivers to be executed on remote nodes must be available on the nodes. This can be achieved in two ways:
* Add the classes to the classpath of the nodes;
* Enable link:code-deployment/peer-class-loading[peer class loading].
====
=== Stream Transformer
A stream transformer is a convenient implementation of a stream receiver, that updates the data in the stream.
Stream transformers take advantage of the colocation feature and update the data on the node where it is going to be stored.
In the example below, we use a stream transformer to increment a counter for each distinct word found in the text stream.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=streamTransformer,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DataStreaming.cs[tag=streamTransformer,indent=0]
----
tab:C++[unsupported]
--
=== Stream Visitor
A stream visitor is another implementation of a stream receiver, which visits every key-value pair in the stream. The visitor does not update the cache. If a pair needs to be stored in the cache, one of the `put(...)` methods must be called explicitly.
In the example below, we have 2 caches: "marketData", and "instruments". We receive market data ticks and put them into the streamer for the "marketData" cache. The stream visitor for the "marketData" streamer is invoked on the cluster member mapped to the particular market symbol. Upon receiving individual market ticks it updates the "instrument" cache with the latest market price.
Note, that we do not update the "marketData" cache at all, leaving it empty. We simply use it for colocated processing of the market data within the cluster directly on the node where the data is stored.
[tabs]
--
tab:Java[]
[source,java]
----
include::{javaFile}[tag=stream-visitor,indent=0]
----
tab:C#/.NET[]
[source,csharp]
----
include::code-snippets/dotnet/DataStreaming.cs[tag=streamVisitor,indent=0]
----
tab:C++[unsupported]
--
== Configuring Data Streamer Thread Pool Size
The data streamer thread pool is dedicated to process messages coming from the data streamers.
The default pool size is `max(8, total number of cores)`.
Use `IgniteConfiguration.setDataStreamerThreadPoolSize(...)` to change the pool size.
[tabs]
--
tab:XML[]
[source,xml]
----
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStreamerThreadPoolSize" value="10"/>
<!-- other properties -->
</bean>
----
tab:Java[]
[source,java]
----
include::{javaFile}[tag=pool-size,indent=0]
----
tab:C#/.NET[unsupported]
tab:C++[unsupported]
--