blob: c911d516c8162931f18e2736e5b08356c0d4d5a9 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=UA-61232409-1"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments);}
gtag('js', new Date());
gtag('config', 'UA-61232409-1');
</script>
<meta charset="UTF-8">
<meta name="ignite-version" content="2.9.0" />
<title>Data Streaming | Ignite Documentation</title>
<link rel="canonical" href="/docs/latest/data-streaming" />
<link rel="stylesheet" href="/assets/css/styles.css?1600975886">
<link rel="stylesheet" href="/assets/css/asciidoc-pygments.css">
<link rel="shortcut icon" href="/favicon.ico">
<meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>
<script type="text/javascript" src="/assets/js/anchor.min.js?1600975886"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.css" />
</head>
<body>
<header>
<!--#include virtual="/includes/promotion_banner.html" -->
<div class="container">
<button type='button' class='menu' title='Docs menu'>
<img src="/assets/images/menu-icon.svg"/>
</button>
<div class='home'>
<a href="/" class='home' title='Apache Ignite home'>
<img src="/assets/images/apache_ignite_logo.svg" alt="Apache Ignite logo" width="103" >
</a>
</div>
<nav>
</nav>
<select id="version-selector">
<option value="2.9.0">2.9.0</option>
</select>
<a href="https://github.com/apache/ignite" title='GitHub' class='github' target="_blank">
<img src="/assets/images/github-gray.svg" alt="GitHub logo">
</a>
<form class='search'>
<button class="search-close" type='button'><img src='/assets/images/cancel.svg'></button>
<input type="search" placeholder="Search…" id="search-input">
</form>
<button type='button' class='search-toggle'><img src='/assets/images/search.svg'></button>
<button type='button' class='top-nav-toggle'></button>
</div>
</header>
<link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
<nav class='left-nav' data-swiftype-index='false'>
<li>
<a href="/docs/latest/preface" class='' >Preface</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Quick Start Guides<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/quick-start/java"
class=''
>Java</a>
</li>
<li>
<a href="/docs/latest/quick-start/dotnet"
class=''
>.NET/C#</a>
</li>
<li>
<a href="/docs/latest/quick-start/cpp"
class=''
>C++</a>
</li>
<li>
<a href="/docs/latest/quick-start/python"
class=''
>Python</a>
</li>
<li>
<a href="/docs/latest/quick-start/nodejs"
class=''
>Node.JS</a>
</li>
<li>
<a href="/docs/latest/quick-start/sql"
class=''
>SQL</a>
</li>
<li>
<a href="/docs/latest/quick-start/php"
class=''
>PHP</a>
</li>
<li>
<a href="/docs/latest/quick-start/restapi"
class=''
>REST API</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="/installation" class='group-toggle collapsed '>Installation<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/installation/installing-using-zip"
class=''
>Installing Using ZIP Archive</a>
</li>
<li>
<a href="/docs/latest/installation/installing-using-docker"
class=''
>Installing Using Docker</a>
</li>
<li>
<a href="/docs/latest/installation/deb-rpm"
class=''
>Installing DEB or RPM package</a>
</li>
<li>
<button
type='button'
class='collapsed '>Kubernetes<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//installation/kubernetes/amazon-eks-deployment" class=''>Amazon EKS</a></li>
<li><a href="/docs/latest//installation/kubernetes/azure-deployment" class=''>Azure Kubernetes Service</a></li>
<li><a href="/docs/latest//installation/kubernetes/gke-deployment" class=''>Google Kubernetes Engine</a></li>
</nav>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Setting Up<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/setup"
class=''
>Setting Up Ignite for Java</a>
</li>
<li>
<a href="/docs/latest/setup-dotnet"
class=''
>Setting Up Ignite for .NET/C#</a>
</li>
</nav>
</li>
<li>
<a href="/docs/latest/understanding-configuration" class='' >Understanding Configuration</a>
</li>
<li>
<a href="/docs/latest/logging" class='' >Configuring Logging</a>
</li>
<li>
<a href="/docs/latest/starting-nodes" class='' >Starting and Stopping Nodes</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Clustering<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/clustering/clustering"
class=''
>Overview</a>
</li>
<li>
<a href="/docs/latest/clustering/tcp-ip-discovery"
class=''
>TCP/IP Discovery</a>
</li>
<li>
<a href="/docs/latest/clustering/zookeeper-discovery"
class=''
>ZooKeeper Discovery</a>
</li>
<li>
<a href="/docs/latest/clustering/discovery-in-the-cloud"
class=''
>Discovery in the Cloud</a>
</li>
<li>
<a href="/docs/latest/clustering/network-configuration"
class=''
>Network Configuration</a>
</li>
<li>
<a href="/docs/latest/clustering/connect-client-nodes"
class=''
>Connecting Client Nodes</a>
</li>
<li>
<a href="/docs/latest/clustering/running-client-nodes-behind-nat"
class=''
>Running Client Nodes Behind NAT</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Data Modeling<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/data-modeling/data-modeling"
class=''
>Introduction</a>
</li>
<li>
<a href="/docs/latest/data-modeling/data-partitioning"
class=''
>Data Partitioning</a>
</li>
<li>
<a href="/docs/latest/data-modeling/affinity-collocation"
class=''
>Affinity Colocation</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Configuring Memory<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/memory-architecture"
class=''
>Memory Architecture</a>
</li>
<li>
<a href="/docs/latest/memory-configuration/data-regions"
class=''
>Configuring Data Regions</a>
</li>
<li>
<a href="/docs/latest/memory-configuration/eviction-policies"
class=''
>Eviction Policies</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Configuring Caches<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/configuring-caches/configuration-overview"
class=''
>Cache Configuration</a>
</li>
<li>
<a href="/docs/latest/configuring-caches/configuring-backups"
class=''
>Configuring Partition Backups</a>
</li>
<li>
<a href="/docs/latest/configuring-caches/atomicity-modes"
class=''
>Atomicity Modes</a>
</li>
<li>
<a href="/docs/latest/configuring-caches/expiry-policies"
class=''
>Expiry Policy</a>
</li>
<li>
<a href="/docs/latest/configuring-caches/on-heap-caching"
class=''
>On-Heap Caching</a>
</li>
<li>
<a href="/docs/latest/configuring-caches/cache-groups"
class=''
>Cache Groups</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Persistence<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/persistence/native-persistence"
class=''
>Ignite Persistence</a>
</li>
<li>
<a href="/docs/latest/persistence/external-storage"
class=''
>External Storage</a>
</li>
<li>
<a href="/docs/latest/persistence/swap"
class=''
>Swapping</a>
</li>
<li>
<a href="/docs/latest/persistence/custom-cache-store"
class=''
>Implementing Custom Cache Store</a>
</li>
<li>
<a href="/docs/latest/persistence/snapshots"
class=''
>Cluster Snapshots</a>
</li>
<li>
<a href="/docs/latest/persistence/disk-compression"
class=''
>Disk Compression</a>
</li>
<li>
<a href="/docs/latest/persistence/persistence-tuning"
class=''
>Tuning Persistence</a>
</li>
</nav>
</li>
<li>
<a href="/docs/latest/baseline-topology" class='' >Baseline Topology</a>
</li>
<li>
<a href="/docs/latest/cluster-states" class='' >Cluster States</a>
</li>
<li>
<a href="/docs/latest/data-rebalancing" class='' >Data Rebalancing</a>
</li>
<li>
<a href="/docs/latest/partition-loss-policy" class='' >Partition Loss Policy</a>
</li>
<li>
<a href="/docs/latest/deploying-user-code" class='' >Deploying User Code</a>
</li>
<li>
<a href="/docs/latest/peer-class-loading" class='' >Peer Class Loading</a>
</li>
<li>
<a href="/docs/latest/data-streaming" class='' >Data Streaming</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Using Key-Value Cache API<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/key-value-api/basic-cache-operations"
class=''
>Basic Cache Operations</a>
</li>
<li>
<a href="/docs/latest/key-value-api/binary-objects"
class=''
>Working with Binary Objects</a>
</li>
<li>
<a href="/docs/latest/key-value-api/using-scan-queries"
class=''
>Using Scan Queries</a>
</li>
<li>
<a href="/docs/latest/read-repair"
class=''
>Read Repair</a>
</li>
</nav>
</li>
<li>
<a href="/docs/latest/key-value-api/continuous-queries" class='' >Using Continuous Queries</a>
</li>
<li>
<a href="/docs/latest/key-value-api/transactions" class='' >Performing Transactions</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Working with SQL<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/SQL/sql-introduction"
class=''
>Introduction</a>
</li>
<li>
<a href="/docs/latest/SQL/schemas"
class=''
>Understanding Schemas</a>
</li>
<li>
<a href="/docs/latest/SQL/indexes"
class=''
>Defining Indexes</a>
</li>
<li>
<a href="/docs/latest/SQL/sql-api"
class=''
>Using SQL API</a>
</li>
<li>
<a href="/docs/latest/SQL/distributed-joins"
class=''
>Distributed Joins</a>
</li>
<li>
<a href="/docs/latest/SQL/sql-transactions"
class=''
>SQL Transactions</a>
</li>
<li>
<a href="/docs/latest/SQL/custom-sql-func"
class=''
>Custom SQL Functions</a>
</li>
<li>
<a href="/docs/latest/SQL/JDBC/jdbc-driver"
class=''
>JDBC Driver</a>
</li>
<li>
<a href="/docs/latest/SQL/JDBC/jdbc-client-driver"
class=''
>JDBC Client Driver</a>
</li>
<li>
<a href="/docs/latest/transactions/mvcc"
class=''
>Multiversion Concurrency Control</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Distributed Computing<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/distributed-computing/distributed-computing"
class=''
>Distributed Computing API</a>
</li>
<li>
<a href="/docs/latest/distributed-computing/cluster-groups"
class=''
>Cluster Groups</a>
</li>
<li>
<a href="/docs/latest/distributed-computing/executor-service"
class=''
>Executor Service</a>
</li>
<li>
<a href="/docs/latest/distributed-computing/map-reduce"
class=''
>MapReduce API</a>
</li>
<li>
<a href="/docs/latest/distributed-computing/load-balancing"
class=''
>Load Balancing</a>
</li>
<li>
<a href="/docs/latest/distributed-computing/fault-tolerance"
class=''
>Fault Tolerance</a>
</li>
<li>
<a href="/docs/latest/distributed-computing/job-scheduling"
class=''
>Job Scheduling</a>
</li>
</nav>
</li>
<li>
<a href="/docs/latest/collocated-computations" class='' >Colocating Computations with Data</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Working with Events<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/events/listening-to-events"
class=''
>Enabling and Listenting to Events</a>
</li>
<li>
<a href="/docs/latest/events/events"
class=''
>Events</a>
</li>
</nav>
</li>
<li>
<a href="/docs/latest/near-cache" class='' >Near Caches</a>
</li>
<li>
<a href="/docs/latest/platform-cache" class='' >.NET Platform Cache</a>
</li>
<li>
<a href="/docs/latest/services/services" class='' >Services</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Distributed Data Structures<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/data-structures/queue-and-set"
class=''
>Queue and Set</a>
</li>
<li>
<a href="/docs/latest/data-structures/atomic-types"
class=''
>Atomic Types</a>
</li>
<li>
<a href="/docs/latest/data-structures/countdownlatch"
class=''
>CountDownLatch</a>
</li>
<li>
<a href="/docs/latest/data-structures/atomic-sequence"
class=''
>Atomic Sequence</a>
</li>
<li>
<a href="/docs/latest/data-structures/semaphore"
class=''
>Semaphore</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Machine Learning<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/machine-learning/machine-learning"
class=''
>Machine Learning</a>
</li>
<li>
<a href="/docs/latest/machine-learning/partition-based-dataset"
class=''
>Partition Based Dataset</a>
</li>
<li>
<a href="/docs/latest/machine-learning/updating-trained-models"
class=''
>Updating Trained Models</a>
</li>
<li>
<button
type='button'
class='collapsed '>Binary Classification<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//machine-learning/binary-classification/introduction" class=''>Introduction</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/linear-svm" class=''>Linear SVM (Support Vector Machine)</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/decision-trees" class=''>Decision Trees</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/multilayer-perceptron" class=''>Multilayer Perceptron</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/logistic-regression" class=''>Logistic Regression</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/knn-classification" class=''>k-NN Classification</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/ann" class=''>ANN (Approximate Nearest Neighbor)</a></li>
<li><a href="/docs/latest//machine-learning/binary-classification/naive-bayes" class=''>Naive Bayes</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>Regression<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//machine-learning/regression/introduction" class=''>Introduction</a></li>
<li><a href="/docs/latest//machine-learning/regression/linear-regression" class=''>Linear Regression</a></li>
<li><a href="/docs/latest//machine-learning/regression/decision-trees-regression" class=''>Decision Trees Regression</a></li>
<li><a href="/docs/latest//machine-learning/regression/knn-regression" class=''>k-NN Regression</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>Clustering<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//machine-learning/clustering/introduction" class=''>Introduction</a></li>
<li><a href="/docs/latest//machine-learning/clustering/k-means-clustering" class=''>K-Means Clustering</a></li>
<li><a href="/docs/latest//machine-learning/clustering/gaussian-mixture" class=''>Gaussian mixture (GMM)</a></li>
</nav>
</li>
<li>
<a href="/docs/latest/machine-learning/preprocessing"
class=''
>Preprocessing</a>
</li>
<li>
<button
type='button'
class='collapsed '>Model Selection<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//machine-learning/model-selection/introduction" class=''>Introduction</a></li>
<li><a href="/docs/latest//machine-learning/model-selection/evaluator" class=''>Evaluator</a></li>
<li><a href="/docs/latest//machine-learning/model-selection/split-the-dataset-on-test-and-train-datasets" class=''>Split the dataset on test and train datasets</a></li>
<li><a href="/docs/latest//machine-learning/model-selection/hyper-parameter-tuning" class=''>Hyper-parameter tuning</a></li>
<li><a href="/docs/latest//machine-learning/model-selection/pipeline-api" class=''>Pipeline API</a></li>
</nav>
</li>
<li>
<a href="/docs/latest/machine-learning/multiclass-classification"
class=''
>Multiclass Classification</a>
</li>
<li>
<button
type='button'
class='collapsed '>Ensemble Methods<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//machine-learning/ensemble-methods/introduction" class=''></a></li>
<li><a href="/docs/latest//machine-learning/ensemble-methods/stacking" class=''>Stacking</a></li>
<li><a href="/docs/latest//machine-learning/ensemble-methods/baggin" class=''>Bagging</a></li>
<li><a href="/docs/latest//machine-learning/ensemble-methods/random-forest" class=''>Random Forest</a></li>
<li><a href="/docs/latest//machine-learning/ensemble-methods/gradient-boosting" class=''>Gradient Boosting</a></li>
</nav>
</li>
<li>
<a href="/docs/latest/machine-learning/recommendation-systems"
class=''
>Recommendation Systems</a>
</li>
<li>
<button
type='button'
class='collapsed '>Importing Model<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//machine-learning/importing-model/introduction" class=''>Introduction</a></li>
<li><a href="/docs/latest//machine-learning/importing-model/model-import-from-gxboost" class=''>Import Model from XGBoost</a></li>
<li><a href="/docs/latest//machine-learning/importing-model/model-import-from-apache-spark" class=''>Import Model from Apache Spark</a></li>
</nav>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Monitoring<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/monitoring-metrics/intro"
class=''
>Introduction</a>
</li>
<li>
<a href="/docs/latest/monitoring-metrics/cluster-id"
class=''
>Cluster ID and Tag</a>
</li>
<li>
<button
type='button'
class='collapsed '>Metrics<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//monitoring-metrics/configuring-metrics" class=''>Configuring Metrics</a></li>
<li><a href="/docs/latest//monitoring-metrics/metrics" class=''>JMX Metrics</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>New Metrics System<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//monitoring-metrics/new-metrics-system" class=''>Introduction</a></li>
<li><a href="/docs/latest//monitoring-metrics/new-metrics" class=''>Metrics</a></li>
</nav>
</li>
<li>
<a href="/docs/latest/monitoring-metrics/system-views"
class=''
>System Views</a>
</li>
<li>
<a href="/docs/latest/monitoring-metrics/tracing"
class=''
>Tracing</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="/security" class='group-toggle collapsed '>Security<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/security/authentication"
class=''
>Authentication</a>
</li>
<li>
<a href="/docs/latest/security/ssl-tls"
class=''
>SSL/TLS</a>
</li>
<li>
<button
type='button'
class='collapsed '>Transparent Data Encryption<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//security/tde" class=''>Introduction</a></li>
<li><a href="/docs/latest//security/master-key-rotation" class=''>Master key rotation</a></li>
</nav>
</li>
<li>
<a href="/docs/latest/security/sandbox"
class=''
>Sandbox</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Thin Clients<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/thin-clients/getting-started-with-thin-clients"
class=''
>Thin Clients Overview</a>
</li>
<li>
<a href="/docs/latest/thin-clients/java-thin-client"
class=''
>Java Thin Client</a>
</li>
<li>
<a href="/docs/latest/thin-clients/dotnet-thin-client"
class=''
>.NET Thin Client</a>
</li>
<li>
<a href="/docs/latest/thin-clients/cpp-thin-client"
class=''
>C++ Thin Client</a>
</li>
<li>
<a href="/docs/latest/thin-clients/python-thin-client"
class=''
>Python Thin Client</a>
</li>
<li>
<a href="/docs/latest/thin-clients/php-thin-client"
class=''
>PHP Thin Client</a>
</li>
<li>
<a href="/docs/latest/thin-clients/nodejs-thin-client"
class=''
>Node.js Thin Client</a>
</li>
<li>
<button
type='button'
class='collapsed '>Binary Client Protocol<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/latest//binary-client-protocol/binary-client-protocol" class=''>Binary Client Protocol</a></li>
<li><a href="/docs/latest//binary-client-protocol/data-format" class=''>Data Format</a></li>
<li><a href="/docs/latest//binary-client-protocol/key-value-queries" class=''>Key-Value Queries</a></li>
<li><a href="/docs/latest//binary-client-protocol/sql-and-scan-queries" class=''>SQL and Scan Queries</a></li>
<li><a href="/docs/latest//binary-client-protocol/binary-type-metadata" class=''>Binary Types Metadata</a></li>
<li><a href="/docs/latest//binary-client-protocol/cache-configuration" class=''>Cache Configuration</a></li>
</nav>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>ODBC Driver<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/SQL/ODBC/odbc-driver"
class=''
>ODBC Driver</a>
</li>
<li>
<a href="/docs/latest/SQL/ODBC/connection-string-dsn"
class=''
>Connection String and DSN</a>
</li>
<li>
<a href="/docs/latest/SQL/ODBC/querying-modifying-data"
class=''
>Querying and Modifying Data</a>
</li>
<li>
<a href="/docs/latest/SQL/ODBC/specification"
class=''
>Specification</a>
</li>
<li>
<a href="/docs/latest/SQL/ODBC/data-types"
class=''
>Data Types</a>
</li>
<li>
<a href="/docs/latest/SQL/ODBC/error-codes"
class=''
>Error Codes</a>
</li>
</nav>
</li>
<li>
<a href="/docs/latest/restapi" class='' >REST API</a>
</li>
<li>
<a href="/docs/latest/control-script" class='' >Control Script</a>
</li>
<li>
<a href="/docs/latest/plugins" class='' >Plugins</a>
</li>
<li>
<a href="/docs/latest/sqlline" class='' >SQLLine</a>
</li>
<li>
<button type='button' data-guide-url="" class='group-toggle collapsed '>Ignite for Spark<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/ignite-for-spark/overview"
class=''
>Overview</a>
</li>
<li>
<a href="/docs/latest/ignite-for-spark/ignitecontext-and-rdd"
class=''
>IgniteContext and IgniteRDD</a>
</li>
<li>
<a href="/docs/latest/ignite-for-spark/ignite-dataframe"
class=''
>Ignite DataFrame</a>
</li>
<li>
<a href="/docs/latest/ignite-for-spark/installation"
class=''
>Installation</a>
</li>
<li>
<a href="/docs/latest/ignite-for-spark/spark-shell"
class=''
>Test Ignite with Spark-shell</a>
</li>
<li>
<a href="/docs/latest/ignite-for-spark/troubleshooting"
class=''
>Troubleshooting</a>
</li>
</nav>
</li>
<li>
<button type='button' data-guide-url="/sql-reference/sql-reference-overview" class='group-toggle collapsed '>SQL Reference<img class="state-indicator" src="/assets/images/left-nav-arrow.svg"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/latest/sql-reference/sql-conformance"
class=''
>SQL Conformance</a>
</li>
<li>
<a href="/docs/latest/sql-reference/ddl"
class=''
>Data Definition Language (DDL)</a>
</li>
<li>
<a href="/docs/latest/sql-reference/dml"
class=''
>Data Manipulation Language (DML)</a>
</li>
<li>
<a href="/docs/latest/sql-reference/transactions"
class=''
>Transactions</a>
</li>
<li>
<a href="/docs/latest/sql-reference/operational-commands"
class=''
>Operational Commands</a>
</li>
<li>
<a href="/docs/latest/sql-reference/aggregate-functions"
class=''
>Aggregate functions</a>
</li>
<li>
<a href="/docs/latest/sql-reference/numeric-functions"
class=''
>Numeric Functions</a>
</li>
<li>
<a href="/docs/latest/sql-reference/string-functions"
class=''
>String Functions</a>
</li>
<li>
<a href="/docs/latest/sql-reference/date-time-functions"
class=''
>Data and Time Functions</a>
</li>
<li>
<a href="/docs/latest/sql-reference/system-functions"
class=''
>System Functions</a>
</li>
<li>
<a href="/docs/latest/sql-reference/data-types"
class=''
>Data Types</a>
</li>
</nav>
</li>
</nav>
<div class="left-nav__overlay"></div>
<article data-swiftype-index='true'>
<a class='edit-link' href="https://github.com/apache/ignite/tree/IGNITE-7595/docs/_docs/data-streaming.adoc" target="_blank">Edit</a>
<h1>Data Streaming</h1>
<div class="sect1">
<h2 id="overview">Overview</h2>
<div class="sectionbody">
<div class="paragraph">
<p>Ignite provides a Data Streaming API that can be used to inject large amounts of continuous streams of data into an Ignite cluster.
The Data Streaming API is designed to be scalable and fault-tolerant, and provides <em>at-least-once</em> delivery semantics for the data streamed into Ignite, meaning each entry is processed at least once.</p>
</div>
<div class="paragraph">
<p>Data is streamed into a cache via a <a href="#data-streamers">data streamer</a> associated with the cache. Data streamers automatically buffer the data and group it into batches for better performance and send it in parallel to multiple nodes.</p>
</div>
<div class="paragraph">
<p>The Data Streaming API provides the following features:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>The data that is added to a data streamer is automatically partitioned and distributed between the nodes.</p>
</li>
<li>
<p>You can process the data concurrently in a colocated fashion.</p>
</li>
<li>
<p>Clients can perform concurrent SQL queries on the data as it is being streamed in.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p><span class="image"><img src="/docs/2.9.0/images/data_streaming.png" alt="Data Streaming"></span></p>
</div>
</div>
</div>
<div class="sect1">
<h2 id="data-streamers">Data Streamers</h2>
<div class="sectionbody">
<div class="paragraph">
<p>A data streamer is associated with a specific cache and provides an interface for streaming data into the cache.</p>
</div>
<div class="paragraph">
<p>In a typical scenario, you obtain a data streamer and use one of its methods to stream data into the cache, and Ignite takes care of data partitioning and colocation by batching data entries according to partitioning rules to avoid unnecessary data movement.</p>
</div>
<div class="paragraph">
<p>You can obtain the data streamer for a specific cache as follows:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="c1">// Get the data streamer reference and stream data.</span>
<span class="k">try</span> <span class="o">(</span><span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">stmr</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"myCache"</span><span class="o">))</span> <span class="o">{</span>
<span class="c1">// Stream entries.</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="mi">100000</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">addData</span><span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">.</span><span class="na">toString</span><span class="o">(</span><span class="n">i</span><span class="o">));</span>
<span class="o">}</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"dataStreamerExample output:"</span> <span class="o">+</span> <span class="n">cache</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">99999</span><span class="o">));</span></code></pre>
</div>
</div>
<div class="paragraph">
<p>In the Java version of Ignite, a data streamer is an implementation of the <code>IgniteDataStreamer</code> interface. <code>IgniteDataStreamer</code> provides a number of <code>addData(&#8230;&#8203;)</code> methods for adding key-value pairs to caches. Refer to the <a href="https://ignite.apache.org/releases/2.9.0/javadoc/org/apache/ignite/IgniteDataStreamer.html">IgniteDataStreamer</a> javadoc for the complete list of methods.</p>
</div></code-tab><code-tab data-tab='C#/.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">using</span> <span class="p">(</span><span class="kt">var</span> <span class="n">stmr</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetDataStreamer</span><span class="p">&lt;</span><span class="kt">int</span><span class="p">,</span> <span class="kt">string</span><span class="p">&gt;(</span><span class="s">"myCache"</span><span class="p">))</span>
<span class="p">{</span>
<span class="k">for</span> <span class="p">(</span><span class="kt">var</span> <span class="n">i</span> <span class="p">=</span> <span class="m">0</span><span class="p">;</span> <span class="n">i</span> <span class="p">&lt;</span> <span class="m">1000</span><span class="p">;</span> <span class="n">i</span><span class="p">++)</span>
<span class="n">stmr</span><span class="p">.</span><span class="nf">AddData</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">i</span><span class="p">.</span><span class="nf">ToString</span><span class="p">());</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C++' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C++.</code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
<div class="sect1">
<h2 id="overwriting-existing-keys">Overwriting Existing Keys</h2>
<div class="sectionbody">
<div class="paragraph">
<p>By default, data streamers do not overwrite existing data and skip entries that are already in the cache. You can change that behavior by setting the <code>allowOverwrite</code> property of the data streamer to <code>true</code>.</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="n">stmr</span><span class="o">.</span><span class="na">allowOverwrite</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C#/.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="n">stmr</span><span class="p">.</span><span class="n">AllowOverwrite</span> <span class="p">=</span> <span class="k">true</span><span class="p">;</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C++' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C++.</code></pre>
</div>
</div></code-tab></code-tabs>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
When <code>allowOverwrite</code> is set to <code>false</code> (default), the updates are not propagated to the <a href="/docs/latest/persistence/external-storage">external storage</a> (if it is used).
</td>
</tr>
</table>
</div>
</div>
</div>
<div class="sect1">
<h2 id="processing-data">Processing Data</h2>
<div class="sectionbody">
<div class="paragraph">
<p>In cases when you need to execute custom logic before adding new data, you can use a stream receiver.
A stream receiver is used to process the data in a colocated manner before it is stored into the cache.
The logic implemented in a stream receiver is executed on the node where data is to be stored.</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="k">try</span> <span class="o">(</span><span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span> <span class="n">stmr</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"myCache"</span><span class="o">))</span> <span class="o">{</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">allowOverwrite</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">receiver</span><span class="o">((</span><span class="nc">StreamReceiver</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;)</span> <span class="o">(</span><span class="n">cache</span><span class="o">,</span> <span class="n">entries</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">entries</span><span class="o">.</span><span class="na">forEach</span><span class="o">(</span><span class="n">entry</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="c1">// do something with the entry</span>
<span class="n">cache</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">entry</span><span class="o">.</span><span class="na">getKey</span><span class="o">(),</span> <span class="n">entry</span><span class="o">.</span><span class="na">getValue</span><span class="o">());</span>
<span class="o">}));</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C#/.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">private</span> <span class="k">class</span> <span class="nc">MyStreamReceiver</span> <span class="p">:</span> <span class="n">IStreamReceiver</span><span class="p">&lt;</span><span class="kt">int</span><span class="p">,</span> <span class="kt">string</span><span class="p">&gt;</span>
<span class="p">{</span>
<span class="k">public</span> <span class="k">void</span> <span class="nf">Receive</span><span class="p">(</span><span class="n">ICache</span><span class="p">&lt;</span><span class="kt">int</span><span class="p">,</span> <span class="kt">string</span><span class="p">&gt;</span> <span class="n">cache</span><span class="p">,</span> <span class="n">ICollection</span><span class="p">&lt;</span><span class="n">ICacheEntry</span><span class="p">&lt;</span><span class="kt">int</span><span class="p">,</span> <span class="kt">string</span><span class="p">&gt;&gt;</span> <span class="n">entries</span><span class="p">)</span>
<span class="p">{</span>
<span class="k">foreach</span> <span class="p">(</span><span class="kt">var</span> <span class="n">entry</span> <span class="k">in</span> <span class="n">entries</span><span class="p">)</span>
<span class="p">{</span>
<span class="c1">// do something with the entry</span>
<span class="n">cache</span><span class="p">.</span><span class="nf">Put</span><span class="p">(</span><span class="n">entry</span><span class="p">.</span><span class="n">Key</span><span class="p">,</span> <span class="n">entry</span><span class="p">.</span><span class="n">Value</span><span class="p">);</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">public</span> <span class="k">static</span> <span class="k">void</span> <span class="nf">StreamReceiverDemo</span><span class="p">()</span>
<span class="p">{</span>
<span class="kt">var</span> <span class="n">ignite</span> <span class="p">=</span> <span class="n">Ignition</span><span class="p">.</span><span class="nf">Start</span><span class="p">();</span>
<span class="k">using</span> <span class="p">(</span><span class="kt">var</span> <span class="n">stmr</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetDataStreamer</span><span class="p">&lt;</span><span class="kt">int</span><span class="p">,</span> <span class="kt">string</span><span class="p">&gt;(</span><span class="s">"myCache"</span><span class="p">))</span>
<span class="p">{</span>
<span class="n">stmr</span><span class="p">.</span><span class="n">AllowOverwrite</span> <span class="p">=</span> <span class="k">true</span><span class="p">;</span>
<span class="n">stmr</span><span class="p">.</span><span class="n">Receiver</span> <span class="p">=</span> <span class="k">new</span> <span class="nf">MyStreamReceiver</span><span class="p">();</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C++' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C++.</code></pre>
</div>
</div></code-tab></code-tabs>
<div class="admonitionblock note">
<table>
<tr>
<td class="icon">
<div class="title">Note</div>
</td>
<td class="content">
Note that a stream receiver does not put data into the cache automatically. You need to call one of the <code>put(&#8230;&#8203;)</code> methods explicitly.
</td>
</tr>
</table>
</div>
<div class="admonitionblock important">
<table>
<tr>
<td class="icon">
<div class="title">Important</div>
</td>
<td class="content">
<div class="paragraph">
<p>The class definitions of the stream receivers to be executed on remote nodes must be available on the nodes. This can be achieved in two ways:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Add the classes to the classpath of the nodes;</p>
</li>
<li>
<p>Enable <a href="/docs/latest/peer-class-loading">peer class loading</a>.</p>
</li>
</ul>
</div>
</td>
</tr>
</table>
</div>
<div class="sect2">
<h3 id="stream-transformer">Stream Transformer</h3>
<div class="paragraph">
<p>A stream transformer is a convenient implementation of a stream receiver, that updates the data in the stream.
Stream transformers take advantage of the colocation feature and update the data on the node where it is going to be stored.</p>
</div>
<div class="paragraph">
<p>In the example below, we use a stream transformer to increment a counter for each distinct word found in the text stream.</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">String</span><span class="o">[]</span> <span class="n">text</span> <span class="o">=</span> <span class="o">{</span> <span class="s">"hello"</span><span class="o">,</span> <span class="s">"world"</span><span class="o">,</span> <span class="s">"hello"</span><span class="o">,</span> <span class="s">"Ignite"</span> <span class="o">};</span>
<span class="nc">CacheConfiguration</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Long</span><span class="o">&gt;</span> <span class="n">cfg</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CacheConfiguration</span><span class="o">&lt;&gt;(</span><span class="s">"wordCountCache"</span><span class="o">);</span>
<span class="nc">IgniteCache</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Long</span><span class="o">&gt;</span> <span class="n">stmCache</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">getOrCreateCache</span><span class="o">(</span><span class="n">cfg</span><span class="o">);</span>
<span class="k">try</span> <span class="o">(</span><span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Long</span><span class="o">&gt;</span> <span class="n">stmr</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="n">stmCache</span><span class="o">.</span><span class="na">getName</span><span class="o">()))</span> <span class="o">{</span>
<span class="c1">// Allow data updates.</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">allowOverwrite</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="c1">// Configure data transformation to count instances of the same word.</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">receiver</span><span class="o">(</span><span class="nc">StreamTransformer</span><span class="o">.</span><span class="na">from</span><span class="o">((</span><span class="n">e</span><span class="o">,</span> <span class="n">arg</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="c1">// Get current count.</span>
<span class="nc">Long</span> <span class="n">val</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">();</span>
<span class="c1">// Increment count by 1.</span>
<span class="n">e</span><span class="o">.</span><span class="na">setValue</span><span class="o">(</span><span class="n">val</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">?</span> <span class="mi">1L</span> <span class="o">:</span> <span class="n">val</span> <span class="o">+</span> <span class="mi">1</span><span class="o">);</span>
<span class="k">return</span> <span class="kc">null</span><span class="o">;</span>
<span class="o">}));</span>
<span class="c1">// Stream words into the streamer cache.</span>
<span class="k">for</span> <span class="o">(</span><span class="nc">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">text</span><span class="o">)</span>
<span class="n">stmr</span><span class="o">.</span><span class="na">addData</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1L</span><span class="o">);</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C#/.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">class</span> <span class="nc">MyEntryProcessor</span> <span class="p">:</span> <span class="n">ICacheEntryProcessor</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">long</span><span class="p">,</span> <span class="kt">object</span><span class="p">,</span> <span class="kt">object</span><span class="p">&gt;</span>
<span class="p">{</span>
<span class="k">public</span> <span class="kt">object</span> <span class="nf">Process</span><span class="p">(</span><span class="n">IMutableCacheEntry</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">long</span><span class="p">&gt;</span> <span class="n">e</span><span class="p">,</span> <span class="kt">object</span> <span class="n">arg</span><span class="p">)</span>
<span class="p">{</span>
<span class="c1">//get current count</span>
<span class="kt">var</span> <span class="n">val</span> <span class="p">=</span> <span class="n">e</span><span class="p">.</span><span class="n">Value</span><span class="p">;</span>
<span class="c1">//increment count by 1</span>
<span class="n">e</span><span class="p">.</span><span class="n">Value</span> <span class="p">=</span> <span class="n">val</span> <span class="p">==</span> <span class="m">0</span> <span class="p">?</span> <span class="m">1L</span> <span class="p">:</span> <span class="n">val</span> <span class="p">+</span> <span class="m">1</span><span class="p">;</span>
<span class="k">return</span> <span class="k">null</span><span class="p">;</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">public</span> <span class="k">static</span> <span class="k">void</span> <span class="nf">StreamTransformerDemo</span><span class="p">()</span>
<span class="p">{</span>
<span class="kt">var</span> <span class="n">ignite</span> <span class="p">=</span> <span class="n">Ignition</span><span class="p">.</span><span class="nf">Start</span><span class="p">(</span><span class="k">new</span> <span class="n">IgniteConfiguration</span>
<span class="p">{</span>
<span class="n">DiscoverySpi</span> <span class="p">=</span> <span class="k">new</span> <span class="n">TcpDiscoverySpi</span>
<span class="p">{</span>
<span class="n">LocalPort</span> <span class="p">=</span> <span class="m">48500</span><span class="p">,</span>
<span class="n">LocalPortRange</span> <span class="p">=</span> <span class="m">20</span><span class="p">,</span>
<span class="n">IpFinder</span> <span class="p">=</span> <span class="k">new</span> <span class="n">TcpDiscoveryStaticIpFinder</span>
<span class="p">{</span>
<span class="n">Endpoints</span> <span class="p">=</span> <span class="k">new</span><span class="p">[]</span>
<span class="p">{</span>
<span class="s">"127.0.0.1:48500..48520"</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">});</span>
<span class="kt">var</span> <span class="n">cfg</span> <span class="p">=</span> <span class="k">new</span> <span class="nf">CacheConfiguration</span><span class="p">(</span><span class="s">"wordCountCache"</span><span class="p">);</span>
<span class="kt">var</span> <span class="n">stmCache</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetOrCreateCache</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">long</span><span class="p">&gt;(</span><span class="n">cfg</span><span class="p">);</span>
<span class="k">using</span> <span class="p">(</span><span class="kt">var</span> <span class="n">stmr</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetDataStreamer</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">long</span><span class="p">&gt;(</span><span class="n">stmCache</span><span class="p">.</span><span class="n">Name</span><span class="p">))</span>
<span class="p">{</span>
<span class="c1">//Allow data updates</span>
<span class="n">stmr</span><span class="p">.</span><span class="n">AllowOverwrite</span> <span class="p">=</span> <span class="k">true</span><span class="p">;</span>
<span class="c1">//Configure data transformation to count instances of the same word</span>
<span class="n">stmr</span><span class="p">.</span><span class="n">Receiver</span> <span class="p">=</span> <span class="k">new</span> <span class="n">StreamTransformer</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">long</span><span class="p">,</span> <span class="kt">object</span><span class="p">,</span> <span class="kt">object</span><span class="p">&gt;(</span><span class="k">new</span> <span class="nf">MyEntryProcessor</span><span class="p">());</span>
<span class="c1">//stream words into the streamer cache</span>
<span class="k">foreach</span> <span class="p">(</span><span class="kt">var</span> <span class="n">word</span> <span class="k">in</span> <span class="nf">GetWords</span><span class="p">())</span>
<span class="p">{</span>
<span class="n">stmr</span><span class="p">.</span><span class="nf">AddData</span><span class="p">(</span><span class="n">word</span><span class="p">,</span> <span class="m">1L</span><span class="p">);</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="n">Console</span><span class="p">.</span><span class="nf">WriteLine</span><span class="p">(</span><span class="n">stmCache</span><span class="p">.</span><span class="nf">Get</span><span class="p">(</span><span class="s">"a"</span><span class="p">));</span>
<span class="n">Console</span><span class="p">.</span><span class="nf">WriteLine</span><span class="p">(</span><span class="n">stmCache</span><span class="p">.</span><span class="nf">Get</span><span class="p">(</span><span class="s">"b"</span><span class="p">));</span>
<span class="p">}</span>
<span class="k">static</span> <span class="n">IEnumerable</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">&gt;</span> <span class="nf">GetWords</span><span class="p">()</span>
<span class="p">{</span>
<span class="c1">//populate words list somehow</span>
<span class="k">return</span> <span class="n">Enumerable</span><span class="p">.</span><span class="nf">Repeat</span><span class="p">(</span><span class="s">"a"</span><span class="p">,</span> <span class="m">3</span><span class="p">).</span><span class="nf">Concat</span><span class="p">(</span><span class="n">Enumerable</span><span class="p">.</span><span class="nf">Repeat</span><span class="p">(</span><span class="s">"b"</span><span class="p">,</span> <span class="m">2</span><span class="p">));</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C++' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C++.</code></pre>
</div>
</div></code-tab></code-tabs>
</div>
<div class="sect2">
<h3 id="stream-visitor">Stream Visitor</h3>
<div class="paragraph">
<p>A stream visitor is another implementation of a stream receiver, which visits every key-value pair in the stream. The visitor does not update the cache. If a pair needs to be stored in the cache, one of the <code>put(&#8230;&#8203;)</code> methods must be called explicitly.</p>
</div>
<div class="paragraph">
<p>In the example below, we have 2 caches: "marketData", and "instruments". We receive market data ticks and put them into the streamer for the "marketData" cache. The stream visitor for the "marketData" streamer is invoked on the cluster member mapped to the particular market symbol. Upon receiving individual market ticks it updates the "instrument" cache with the latest market price.</p>
</div>
<div class="paragraph">
<p>Note, that we do not update the "marketData" cache at all, leaving it empty. We simply use it for colocated processing of the market data within the cluster directly on the node where the data is stored.</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="kd">static</span> <span class="kd">class</span> <span class="nc">Instrument</span> <span class="o">{</span>
<span class="kd">final</span> <span class="nc">String</span> <span class="n">symbol</span><span class="o">;</span>
<span class="nc">Double</span> <span class="n">latest</span><span class="o">;</span>
<span class="nc">Double</span> <span class="n">high</span><span class="o">;</span>
<span class="nc">Double</span> <span class="n">low</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">Instrument</span><span class="o">(</span><span class="nc">String</span> <span class="n">symbol</span><span class="o">)</span> <span class="o">{</span>
<span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="kd">static</span> <span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Double</span><span class="o">&gt;</span> <span class="nf">getMarketData</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">//populate market data somehow</span>
<span class="k">return</span> <span class="k">new</span> <span class="nc">HashMap</span><span class="o">&lt;&gt;();</span>
<span class="o">}</span>
<span class="nd">@Test</span>
<span class="kt">void</span> <span class="nf">streamVisitorExample</span><span class="o">()</span> <span class="o">{</span>
<span class="k">try</span> <span class="o">(</span><span class="nc">Ignite</span> <span class="n">ignite</span> <span class="o">=</span> <span class="nc">Ignition</span><span class="o">.</span><span class="na">start</span><span class="o">())</span> <span class="o">{</span>
<span class="nc">CacheConfiguration</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Double</span><span class="o">&gt;</span> <span class="n">mrktDataCfg</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CacheConfiguration</span><span class="o">&lt;&gt;(</span><span class="s">"marketData"</span><span class="o">);</span>
<span class="nc">CacheConfiguration</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Instrument</span><span class="o">&gt;</span> <span class="n">instCfg</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CacheConfiguration</span><span class="o">&lt;&gt;(</span><span class="s">"instruments"</span><span class="o">);</span>
<span class="c1">// Cache for market data ticks streamed into the system.</span>
<span class="nc">IgniteCache</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Double</span><span class="o">&gt;</span> <span class="n">mrktData</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">getOrCreateCache</span><span class="o">(</span><span class="n">mrktDataCfg</span><span class="o">);</span>
<span class="c1">// Cache for financial instruments.</span>
<span class="nc">IgniteCache</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Instrument</span><span class="o">&gt;</span> <span class="n">instCache</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">getOrCreateCache</span><span class="o">(</span><span class="n">instCfg</span><span class="o">);</span>
<span class="k">try</span> <span class="o">(</span><span class="nc">IgniteDataStreamer</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Double</span><span class="o">&gt;</span> <span class="n">mktStmr</span> <span class="o">=</span> <span class="n">ignite</span><span class="o">.</span><span class="na">dataStreamer</span><span class="o">(</span><span class="s">"marketData"</span><span class="o">))</span> <span class="o">{</span>
<span class="c1">// Note that we do not populate the 'marketData' cache (it remains empty).</span>
<span class="c1">// Instead we update the 'instruments' cache based on the latest market price.</span>
<span class="n">mktStmr</span><span class="o">.</span><span class="na">receiver</span><span class="o">(</span><span class="nc">StreamVisitor</span><span class="o">.</span><span class="na">from</span><span class="o">((</span><span class="n">cache</span><span class="o">,</span> <span class="n">e</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="nc">String</span> <span class="n">symbol</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span>
<span class="nc">Double</span> <span class="n">tick</span> <span class="o">=</span> <span class="n">e</span><span class="o">.</span><span class="na">getValue</span><span class="o">();</span>
<span class="nc">Instrument</span> <span class="n">inst</span> <span class="o">=</span> <span class="n">instCache</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">symbol</span><span class="o">);</span>
<span class="k">if</span> <span class="o">(</span><span class="n">inst</span> <span class="o">==</span> <span class="kc">null</span><span class="o">)</span>
<span class="n">inst</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Instrument</span><span class="o">(</span><span class="n">symbol</span><span class="o">);</span>
<span class="c1">// Update instrument price based on the latest market tick.</span>
<span class="n">inst</span><span class="o">.</span><span class="na">high</span> <span class="o">=</span> <span class="nc">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">inst</span><span class="o">.</span><span class="na">high</span><span class="o">,</span> <span class="n">tick</span><span class="o">);</span>
<span class="n">inst</span><span class="o">.</span><span class="na">low</span> <span class="o">=</span> <span class="nc">Math</span><span class="o">.</span><span class="na">min</span><span class="o">(</span><span class="n">inst</span><span class="o">.</span><span class="na">low</span><span class="o">,</span> <span class="n">tick</span><span class="o">);</span>
<span class="n">inst</span><span class="o">.</span><span class="na">latest</span> <span class="o">=</span> <span class="n">tick</span><span class="o">;</span>
<span class="c1">// Update the instrument cache.</span>
<span class="n">instCache</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">inst</span><span class="o">);</span>
<span class="o">}));</span>
<span class="c1">// Stream market data into the cluster.</span>
<span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Double</span><span class="o">&gt;</span> <span class="n">marketData</span> <span class="o">=</span> <span class="n">getMarketData</span><span class="o">();</span>
<span class="k">for</span> <span class="o">(</span><span class="nc">Map</span><span class="o">.</span><span class="na">Entry</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Double</span><span class="o">&gt;</span> <span class="n">tick</span> <span class="o">:</span> <span class="n">marketData</span><span class="o">.</span><span class="na">entrySet</span><span class="o">())</span>
<span class="n">mktStmr</span><span class="o">.</span><span class="na">addData</span><span class="o">(</span><span class="n">tick</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C#/.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">class</span> <span class="nc">Instrument</span>
<span class="p">{</span>
<span class="k">public</span> <span class="k">readonly</span> <span class="kt">string</span> <span class="n">Symbol</span><span class="p">;</span>
<span class="k">public</span> <span class="kt">double</span> <span class="n">Latest</span> <span class="p">{</span> <span class="k">get</span><span class="p">;</span> <span class="k">set</span><span class="p">;</span> <span class="p">}</span>
<span class="k">public</span> <span class="kt">double</span> <span class="n">High</span> <span class="p">{</span> <span class="k">get</span><span class="p">;</span> <span class="k">set</span><span class="p">;</span> <span class="p">}</span>
<span class="k">public</span> <span class="kt">double</span> <span class="n">Low</span> <span class="p">{</span> <span class="k">get</span><span class="p">;</span> <span class="k">set</span><span class="p">;</span> <span class="p">}</span>
<span class="k">public</span> <span class="nf">Instrument</span><span class="p">(</span><span class="kt">string</span> <span class="n">symbol</span><span class="p">)</span>
<span class="p">{</span>
<span class="k">this</span><span class="p">.</span><span class="n">Symbol</span> <span class="p">=</span> <span class="n">symbol</span><span class="p">;</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">private</span> <span class="k">static</span> <span class="n">Dictionary</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">double</span><span class="p">&gt;</span> <span class="nf">getMarketData</span><span class="p">()</span>
<span class="p">{</span>
<span class="c1">//populate market data somehow</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">Dictionary</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">double</span><span class="p">&gt;</span>
<span class="p">{</span>
<span class="p">[</span><span class="s">"foo"</span><span class="p">]</span> <span class="p">=</span> <span class="m">1.0</span><span class="p">,</span>
<span class="p">[</span><span class="s">"foo"</span><span class="p">]</span> <span class="p">=</span> <span class="m">2.0</span><span class="p">,</span>
<span class="p">[</span><span class="s">"foo"</span><span class="p">]</span> <span class="p">=</span> <span class="m">3.0</span>
<span class="p">};</span>
<span class="p">}</span>
<span class="k">public</span> <span class="k">static</span> <span class="k">void</span> <span class="nf">StreamVisitorDemo</span><span class="p">()</span>
<span class="p">{</span>
<span class="kt">var</span> <span class="n">ignite</span> <span class="p">=</span> <span class="n">Ignition</span><span class="p">.</span><span class="nf">Start</span><span class="p">(</span><span class="k">new</span> <span class="n">IgniteConfiguration</span>
<span class="p">{</span>
<span class="n">DiscoverySpi</span> <span class="p">=</span> <span class="k">new</span> <span class="n">TcpDiscoverySpi</span>
<span class="p">{</span>
<span class="n">LocalPort</span> <span class="p">=</span> <span class="m">48500</span><span class="p">,</span>
<span class="n">LocalPortRange</span> <span class="p">=</span> <span class="m">20</span><span class="p">,</span>
<span class="n">IpFinder</span> <span class="p">=</span> <span class="k">new</span> <span class="n">TcpDiscoveryStaticIpFinder</span>
<span class="p">{</span>
<span class="n">Endpoints</span> <span class="p">=</span> <span class="k">new</span><span class="p">[]</span>
<span class="p">{</span>
<span class="s">"127.0.0.1:48500..48520"</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="p">});</span>
<span class="kt">var</span> <span class="n">mrktDataCfg</span> <span class="p">=</span> <span class="k">new</span> <span class="nf">CacheConfiguration</span><span class="p">(</span><span class="s">"marketData"</span><span class="p">);</span>
<span class="kt">var</span> <span class="n">instCfg</span> <span class="p">=</span> <span class="k">new</span> <span class="nf">CacheConfiguration</span><span class="p">(</span><span class="s">"instruments"</span><span class="p">);</span>
<span class="c1">// Cache for market data ticks streamed into the system</span>
<span class="kt">var</span> <span class="n">mrktData</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetOrCreateCache</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">double</span><span class="p">&gt;(</span><span class="n">mrktDataCfg</span><span class="p">);</span>
<span class="c1">// Cache for financial instruments</span>
<span class="kt">var</span> <span class="n">instCache</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetOrCreateCache</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="n">Instrument</span><span class="p">&gt;(</span><span class="n">instCfg</span><span class="p">);</span>
<span class="k">using</span> <span class="p">(</span><span class="kt">var</span> <span class="n">mktStmr</span> <span class="p">=</span> <span class="n">ignite</span><span class="p">.</span><span class="n">GetDataStreamer</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">double</span><span class="p">&gt;(</span><span class="s">"marketData"</span><span class="p">))</span>
<span class="p">{</span>
<span class="c1">// Note that we do not populate 'marketData' cache (it remains empty).</span>
<span class="c1">// Instead we update the 'instruments' cache based on the latest market price.</span>
<span class="n">mktStmr</span><span class="p">.</span><span class="n">Receiver</span> <span class="p">=</span> <span class="k">new</span> <span class="n">StreamVisitor</span><span class="p">&lt;</span><span class="kt">string</span><span class="p">,</span> <span class="kt">double</span><span class="p">&gt;((</span><span class="n">cache</span><span class="p">,</span> <span class="n">e</span><span class="p">)</span> <span class="p">=&gt;</span>
<span class="p">{</span>
<span class="kt">var</span> <span class="n">symbol</span> <span class="p">=</span> <span class="n">e</span><span class="p">.</span><span class="n">Key</span><span class="p">;</span>
<span class="kt">var</span> <span class="n">tick</span> <span class="p">=</span> <span class="n">e</span><span class="p">.</span><span class="n">Value</span><span class="p">;</span>
<span class="n">Instrument</span> <span class="n">inst</span> <span class="p">=</span> <span class="n">instCache</span><span class="p">.</span><span class="nf">Get</span><span class="p">(</span><span class="n">symbol</span><span class="p">);</span>
<span class="k">if</span> <span class="p">(</span><span class="n">inst</span> <span class="p">==</span> <span class="k">null</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">inst</span> <span class="p">=</span> <span class="k">new</span> <span class="nf">Instrument</span><span class="p">(</span><span class="n">symbol</span><span class="p">);</span>
<span class="p">}</span>
<span class="c1">// Update instrument price based on the latest market tick.</span>
<span class="n">inst</span><span class="p">.</span><span class="n">High</span> <span class="p">=</span> <span class="n">Math</span><span class="p">.</span><span class="nf">Max</span><span class="p">(</span><span class="n">inst</span><span class="p">.</span><span class="n">High</span><span class="p">,</span> <span class="n">tick</span><span class="p">);</span>
<span class="n">inst</span><span class="p">.</span><span class="n">Low</span> <span class="p">=</span> <span class="n">Math</span><span class="p">.</span><span class="nf">Min</span><span class="p">(</span><span class="n">inst</span><span class="p">.</span><span class="n">Low</span><span class="p">,</span> <span class="n">tick</span><span class="p">);</span>
<span class="n">inst</span><span class="p">.</span><span class="n">Latest</span> <span class="p">=</span> <span class="n">tick</span><span class="p">;</span>
<span class="p">});</span>
<span class="kt">var</span> <span class="n">marketData</span> <span class="p">=</span> <span class="nf">getMarketData</span><span class="p">();</span>
<span class="k">foreach</span> <span class="p">(</span><span class="kt">var</span> <span class="n">tick</span> <span class="k">in</span> <span class="n">marketData</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">mktStmr</span><span class="p">.</span><span class="nf">AddData</span><span class="p">(</span><span class="n">tick</span><span class="p">);</span>
<span class="p">}</span>
<span class="n">mktStmr</span><span class="p">.</span><span class="nf">Flush</span><span class="p">();</span>
<span class="n">Console</span><span class="p">.</span><span class="nf">Write</span><span class="p">(</span><span class="n">instCache</span><span class="p">.</span><span class="nf">Get</span><span class="p">(</span><span class="s">"foo"</span><span class="p">));</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C++' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C++.</code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
</div>
<div class="sect1">
<h2 id="configuring-data-streamer-thread-pool-size">Configuring Data Streamer Thread Pool Size</h2>
<div class="sectionbody">
<div class="paragraph">
<p>The data streamer thread pool is dedicated to process messages coming from the data streamers.</p>
</div>
<div class="paragraph">
<p>The default pool size is <code>max(8, total number of cores)</code>.
Use <code>IgniteConfiguration.setDataStreamerThreadPoolSize(&#8230;&#8203;)</code> to change the pool size.</p>
</div>
<code-tabs><code-tab data-tab='XML'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="xml"><span class="nt">&lt;bean</span> <span class="na">class=</span><span class="s">"org.apache.ignite.configuration.IgniteConfiguration"</span><span class="nt">&gt;</span>
<span class="nt">&lt;property</span> <span class="na">name=</span><span class="s">"dataStreamerThreadPoolSize"</span> <span class="na">value=</span><span class="s">"10"</span><span class="nt">/&gt;</span>
<span class="c">&lt;!-- other properties --&gt;</span>
<span class="nt">&lt;/bean&gt;</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">IgniteConfiguration</span> <span class="n">cfg</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">IgniteConfiguration</span><span class="o">();</span>
<span class="n">cfg</span><span class="o">.</span><span class="na">setDataStreamerThreadPoolSize</span><span class="o">(</span><span class="mi">10</span><span class="o">);</span>
<span class="nc">Ignite</span> <span class="n">ignite</span> <span class="o">=</span> <span class="nc">Ignition</span><span class="o">.</span><span class="na">start</span><span class="o">(</span><span class="n">cfg</span><span class="o">);</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='C#/.NET' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C#/.NET. You can use XML configuration.</code></pre>
</div>
</div></code-tab><code-tab data-tab='C++' data-unavailable='true'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code>This API is not presently available for C++. You can use XML configuration.</code></pre>
</div>
</div></code-tab></code-tabs>
</div>
</div>
<div class="copyright">
© 2020 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
</div>
</article>
<nav class="right-nav" data-swiftype-index='false'>
<ul class="sectlevel1">
<li><a href="#overview">Overview</a></li>
<li><a href="#data-streamers">Data Streamers</a></li>
<li><a href="#overwriting-existing-keys">Overwriting Existing Keys</a></li>
<li><a href="#processing-data">Processing Data</a>
<ul class="sectlevel2">
<li><a href="#stream-transformer">Stream Transformer</a></li>
<li><a href="#stream-visitor">Stream Visitor</a></li>
</ul>
</li>
<li><a href="#configuring-data-streamer-thread-pool-size">Configuring Data Streamer Thread Pool Size</a></li>
</ul>
<footer>
</footer>
</nav>
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async crossorigin></script>
<script>
// inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late
anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
anchors.options = {
placement: 'right',
visible: 'always'
};
</script>
<!-- load google fonts async -->
<script type="text/javascript">
WebFontConfig = {
google: { families: [ 'Open+Sans:300,400,600,700&display=swap' ] }
};
(function() {
var wf = document.createElement('script');
wf.src = 'https://ajax.googleapis.com/ajax/libs/webfont/1/webfont.js';
wf.type = 'text/javascript';
wf.async = 'true';
var s = document.getElementsByTagName('script')[0];
s.parentNode.insertBefore(wf, s);
})(); </script>
<script src="https://cdn.jsdelivr.net/npm/docsearch.js@2/dist/cdn/docsearch.min.js"></script>
<script>
docsearch({
// Your apiKey and indexName will be given to you once
// we create your config
apiKey: '3eee686c0ebe39eff3baeb18c56fa5f8',
indexName: 'apache_ignite',
// Replace inputSelector with a CSS selector
// matching your search input
inputSelector: '#search-input',
// algoliaOptions: { 'facetFilters': ["version:$VERSION"] },
// Set debug to true to inspect the dropdown
debug: false,
});
</script>
<script type='module' src='/assets/js/index.js?1600975886' async crossorigin></script>
<script type='module' src='/assets/js/versioning.js?1600975886' async crossorigin></script>
</body>
</html>