blob: 4e9977bbe22f8d96a64b76d7c3565013f53ab8fe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Streaming Data
To stream a large amount of data, use the data streamer. Data streaming provides a quicker and more efficient way to load, organize and optimally distribute your data. Data streamer accepts a stream of data and distributes data entries across the cluster, where the processing takes place. Data streaming is available in all table views.
image::images/data_streaming.png[]
Data streaming provides at-least-once delivery guarantee.
== Using Data Streamer API
[tabs]
--
tab:Java[]
[source, java]
----
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<Tuple>()) {
var options = DataStreamerOptions.builder().batchSize(batchSize).build();
streamerFut = view.streamData(publisher, options);
var tuple1 = Tuple.create().set("id", 1L).set("name", "foo");
var tuple2 = Tuple.create().set("id", 2L).set("name", "bar");
publisher.submit(tuple1);
publisher.submit(tuple2);
}
streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
----
tab:.NET[]
[source, csharp]
----
public async Task TestBasicStreamingRecordBinaryView()
{
var options = DataStreamerOptions.Default with { BatchSize = 10 };
var data = Enumerable.Range(0, Count).Select(x => new IgniteTuple { ["id"] = 1L, ["name"] = "foo" }).ToList();
await TupleView.StreamDataAsync(data.ToAsyncEnumerable(), options);
}
----
--
== Configuring Data Streamer Properties
All data streamer parameters can be configured by using the `DataStreamerOptions` object. For example, the code snippet below sets the data streamer to have 3 retries:
[tabs]
--
tab:Java[]
[source,java]
----
RecordView<Tuple> view = defaultTable().recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.retryLimit(3)
.build();
CompletableFuture<Void> streamerFut = view.streamData(publisher, options);
----
--
=== Tuning Memory Usage
Data streamer may require significant amount of memory to handle the requests in orderly manner. Depending on your environment, you may want to increase or reduce the amount of memory reserved by the data streamer.
For every node in the cluster, the streamer reserves an amount of memory equal to `batchSize` (1000 entries by default) multiplied by `perNodeParallelOperations` (4 by default) setting. For example, a 10-node cluster with default parameters and average entry size of 1KB will reserve 40MB for operations.
You can change these options the same way you would work with any other options:
[tabs]
--
tab:Java[]
[source,java]
----
RecordView<Tuple> view = defaultTable().recordView();
var publisher = new SubmissionPublisher<Tuple>();
var options = DataStreamerOptions.builder()
.batchSize(10000)
.perNodeParallelOperations(10)
.build();
CompletableFuture<Void> streamerFut = view.streamData(publisher, options);
----
--
Additionally, the data streamer periodically flushes incomplete buffers to avoid messages being stuck for a long time (a specific buffer can fill up slowly or never fill completely at all, depending on the data distribution). This is configured with the `autoFlushFrequency` (5000ms by default) property.