blob: f0d3bd3a2c50c6a36fac74ef746eca7017f0c44a [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<!-- Bugyard widget embed -->
<script type="text/javascript">
!function(){
if("function" != typeof window.bugyard){
var a = function(){ a.c(arguments) };
a.q = [], a.c = function(b){ a.q.push(b) };
window.bugyard = a;
var b = document.createElement("script");
b.setAttribute("data-bugyard", "610961912c35ff001493163a");
b.setAttribute("async", "async");
b.setAttribute("defer", "defer");
b.setAttribute("src", "/assets/js/bugyard.min.js");
document.getElementsByTagName("head")[0].appendChild(b);
}
}();
window.bugyard("hideButton");
</script>
<!-- Bugyard widget embed -->
<link rel="preload" href='/assets/js/code-tabs.js?1' as="script" crossorigin>
<link rel="preload" href='/assets/js/page-nav.js' as="script" crossorigin>
<link rel="preload" href='/assets/js/docs-menu.js?20201005' as="script" crossorigin>
<style>:root{--gg-red:#ec1c24;--gg-orange:#ec1c24;--gg-orange-dark:#bc440b;--gg-orange-filter:invert(47%) sepia(61%) saturate(1950%) hue-rotate(345deg) brightness(100%) contrast(95%);--gg-dark-gray:#333333;--orange-line-thickness:3px;--block-code-background:rgba(241, 241, 241, 20%);--inline-code-background:rgba(241, 241, 241, 90%);--padding-top:25px;--link-color:#ec1c24;--body-background:#fcfcfc}header{min-height:var(--header-height);background:#fff;box-shadow:0 4px 10px 0 #eee,0 0 4px 0 #d5d5d5;z-index:1}header>.container{display:grid;grid-template-columns:auto auto 1fr auto auto auto;grid-template-areas:'left-toggle home nav ver api search lang';grid-template-rows:40px;flex-direction:row;align-items:center;justify-content:flex-start;padding:12px 20px;max-width:1400px;margin:0 auto}header nav>ul{padding:0;margin:0;list-style:none;display:inherit}header .dropdown{display:none;position:fixed;top:calc(var(--header-height) - 12px);width:auto;background:#fff;box-shadow:0 4px 4px 0 rgba(0,0,0,.24),0 0 4px 0 rgba(0,0,0,.12);border-radius:4px;padding-top:10px;padding-bottom:12px;z-index:2}header .dropdown li{display:flex}header .dropdown a{color:grey!important;font-size:16px;padding-top:5px;padding-bottom:4px}header .menu{border:none;background:0 0;width:40px;height:40px;margin-right:12px;grid-area:left-toggle}header .menu img{width:18px;height:12px}header .search-close,header .top-nav-toggle{background:0 0;border:none;padding:0;width:36px;height:36px;display:inline-flex;align-items:center;justify-content:center;color:var(--gg-dark-gray);font-size:26px}header .search-toggle{grid-area:search}header .top-nav-toggle{grid-area:top-toggle}header .home{grid-area:home;margin-right:auto}header .home img{height:36px}header #api-docs{grid-area:api;margin:0;display:flex}header #api-docs .dropdown{padding:.5em 0}header #api-docs a{padding:9px 14px;color:var(--gg-dark-gray)!important;text-decoration:none;white-space:nowrap}header #api-docs .dropdown-item a{font-weight:400;display:block;width:100%;min-width:150px}header #lang-selector li{list-style:none;display:flex;padding:9px 14px}header #lang-selector li a{display:flex;color:#000;align-items:center}header #lang-selector li a span{font-size:10px;margin-left:5px}header #lang-selector li a img{width:25px}header #lang-selector li .dropdown{margin-left:-70px}header #lang-selector li .dropdown .dropdown-item{padding:0 1em;margin-bottom:8px}header #lang-selector li .dropdown .dropdown-item a span{font-size:14px}header .search{margin-left:auto;margin-right:20px;grid-area:search}header .search input[type=search]{color:var(--gg-dark-gray);background:rgba(255,255,255,.8);border:1px solid #ccc;padding:10px 15px;font-family:inherit;max-width:148px;height:37px;font-size:14px;-webkit-appearance:unset;appearance:unset}header #version-selector{list-style:none;grid-area:ver;line-height:28px;border-radius:0;margin-right:10px;border:none;color:var(--gg-dark-gray);padding:5px 16px 5px 10px;white-space:nowrap;font-size:14px;width:auto;text-align:right;box-sizing:border-box;text-align-last:right;-moz-appearance:none;-webkit-appearance:none;appearance:none;direction:rtl}header #version-selector option{direction:ltr}header>nav{grid-area:nav;font-size:18px;display:flex;flex-direction:row;margin:0 20px}header #lang-selector{grid-area:lang}header .search-close{margin-right:10px}@media (max-width:600px){header .search{margin-right:5px}header .search input[type=search]{max-width:110px}}header:not(.narrow-header) .search-close,header:not(.narrow-header) .top-nav-toggle{display:none}@media (max-width:670px){header>.container{grid-template-columns:auto 1fr auto;grid-template-areas:'left-toggle home search' 'ver api lang'}header #lang-selector li{justify-content:flex-end}}pre,pre.rouge{padding:8px 15px;background:var(--block-code-background)!important;border-radius:5px;border:1px solid #e5e5e5;overflow-x:auto;min-height:36px;line-height:18px;color:#545454}code{color:#545454}pre.rouge code{background:0 0!important}:not(pre)>code{background:var(--inline-code-background);padding:.1em .5em;background-clip:padding-box;border-radius:3px;color:#545454;font-size:90%}.listingblock .content{position:relative}.highlight{color:#586e75}.highlight .c1{color:#657b83}.highlight .nt{color:#b58900}.highlight .o{color:#93a1a1}.highlight .k{color:#6c71c4}.highlight .kt{color:#cb4b16}.highlight .s,.highlight .s1{color:#859900}.highlight .nc{color:#b58900}.highlight .na{color:#268bd2}body{font-family:'Open Sans',sans-serif}h1,h2{color:#000;font-weight:400;font-family:'Open Sans'}h1{font-size:36px;line-height:40px}a{text-decoration:none;color:var(--link-color)}section{color:#545454}.admonitionblock .icon .title{display:none}body{--header-height:64px;--promotion-bar-height:35px;--footer-height:104px;--footer-gap:60px;padding:0;margin:0;display:flex;flex-direction:column;min-height:100vh;background-color:var(--body-background);font-family:'Open Sans',sans-serif}body>section{flex:1}header{position:-webkit-sticky;position:sticky;top:0;z-index:2}*{box-sizing:border-box}@media (max-width:670px){body{--header-height:97px}}.left-nav{padding:10px 20px;width:289px;overflow-y:auto;top:calc(var(--header-height) + var(--promotion-bar-height));height:calc(100vh - var(--header-height) - var(--promotion-bar-height));font-family:'Open Sans';padding-top:var(--padding-top);background-color:var(--body-background)}.left-nav li{list-style:none}.left-nav a,.left-nav button{text-decoration:none;color:#757575;font-size:16px;display:inline-flex;width:100%;margin:2px 0;padding:.25em .375em;background:0 0;border:none;font:inherit;text-align:left}.left-nav a.active{color:var(--link-color)}.left-nav .nav-group{margin-left:6px;font-size:14px}.left-nav nav{border-left:2px solid #ddd;margin-bottom:5px}.left-nav nav.collapsed{display:none}.left-nav nav>li>a,.left-nav nav>li>button{padding-left:20px;text-align:left}.left-nav nav>li>a.active{border-left:var(--orange-line-thickness) solid var(--active-color);padding-left:calc(20px - var(--orange-line-thickness))}.left-nav nav.sub_pages{border:none}.left-nav nav.sub_pages a{padding-left:32px}.left-nav .state-indicator{margin-left:auto;margin-top:5px;width:6.2px;height:10px;flex:0 0 auto;filter:invert(49%) sepia(4%) saturate(5%) hue-rotate(23deg) brightness(92%) contrast(90%)}.left-nav button.expanded .state-indicator{transform:rotate(90deg)}.right-nav{width:289px;padding:12px 26px;overflow-y:auto;height:calc(100vh - var(--header-height));top:0;position:-webkit-sticky;position:sticky;display:flex;flex-direction:column;font-family:'Open sans';padding-top:var(--padding-top);background-color:#fff}.right-nav ul{list-style:none;padding:0;margin:0}.right-nav li{padding:0}.right-nav a{--border-width:0px;font-size:14px;color:#757575;padding-left:calc(15px * var(--nesting-level) + 8px - var(--border-width));margin:.3em 0;display:inline-block}.right-nav .sectlevel1{border-left:2px solid #ddd}.right-nav .sectlevel1{--nesting-level:0}.right-nav .sectlevel2{--nesting-level:1}.right-nav .sectlevel3{--nesting-level:2}@media (max-width:1200px){.right-nav{width:230px}}.right-nav footer{font-size:12px;padding:calc(var(--footer-gap) * .3) 0 5px;text-align:left;margin:auto 0 0}section.page-docs{display:grid;grid-template-columns:auto 1fr auto;grid-template-rows:100%;grid-template-areas:'left-nav content right-nav';line-height:20px;max-width:1440px;margin:auto;width:100%}section.page-docs>article{border-left:1px solid #eee;background-color:#fff;padding:0 50px 30px;grid-area:content;overflow:hidden;font-family:sans-serif;font-size:16px;color:#545454;line-height:1.6em}section.page-docs>article h1,section.page-docs>article h2{font-family:'Open Sans'}@media (max-width:800px){section.page-docs>article{padding-left:15px;padding-right:15px}}section.page-docs .edit-link{position:relative;top:10px;right:10px;float:right;padding-top:calc(var(--header-height) + var(--padding-top));margin-top:calc((-1 * var(--header-height)))}section.page-docs h1,section.page-docs h2{margin-bottom:0}section.page-docs h2[id]{margin-top:var(--margin-top);margin-bottom:calc(var(--margin-top) * .5);z-index:-1}section.page-docs .title{font-style:italic}section.page-docs h2[id]{--margin-top:1.2em}.left-nav{bottom:0;position:-webkit-sticky;position:sticky}.left-nav{grid-area:left-nav}.right-nav{grid-area:right-nav}.left-nav__overlay{display:none;background:rgba(0,0,0,.5);z-index:1;position:fixed;top:var(--header-height);bottom:0;left:0;right:0}@media (max-width:990px){body:not(.hide-left-nav) .left-nav__overlay{display:block}nav.left-nav{background:#fafafa;grid-area:left-nav;box-shadow:0 4px 4px 0 rgba(0,0,0,.24),0 0 4px 0 rgba(0,0,0,.12);min-height:calc(100vh - var(--header-height));max-height:calc(100vh - var(--header-height));position:fixed;bottom:0;top:var(--header-height);z-index:2}section.page-docs>article{grid-column-start:left-nav;grid-column-end:content;grid-row:content}}@media (max-width:800px){nav.right-nav{display:none}}:target:before{content:"";display:block;margin-top:calc(var(--header-height) * -1);height:var(--header-height);width:1px}@media (min-width:600px) and (max-width:900px){:target:before{content:"";display:block;width:1px;margin-top:-150px;height:150px}}
#header #promotion-bar { background-color: #333333; padding: 8px; }
#header #promotion-bar p { font-size: 14px; line-height: 1.4em; font-weight: 600; padding: 0; margin: 0; color: #f0f0f0; text-align: center;}
#header #promotion-bar p a { color: #FCB903; } </style>
<meta name="ignite-version" content="3.1.0" />
<title>Streaming Data | Ignite Documentation</title>
<link rel="canonical" href="/docs/latest/developers-guide/data-streamer" />
<link rel="shortcut icon" href="/favicon.ico">
<meta name='viewport' content='width=device-width, height=device-height, initial-scale=1.0, minimum-scale=1.0'>
<link rel="preload" as="style" href="/assets/css/fonts.css" />
<link rel="stylesheet" href="/assets/css/fonts.css" media="print" onload="this.media='all'">
<noscript>
<link rel="stylesheet" href="/assets/css/fonts.css">
</noscript>
<link href="/docs/pagefind/pagefind-ui.css" rel="stylesheet">
<script src="/docs/pagefind/pagefind-ui.js"></script>
<script>
window.addEventListener('DOMContentLoaded', () => {
new PagefindUI({
element: "#search",
showSubResults: true,
showImages: false,
});
});
</script>
<script src="/assets/js/mermaid.min.js"></script>
<script>
document.addEventListener('DOMContentLoaded', function () {
const blocks = document.querySelectorAll('.mermaid');
if (blocks.length) {
mermaid.initialize({ startOnLoad: false });
mermaid.run();
}
});
</script>
<script>
// AnchorJS - v4.2.0 - 2019-01-01
// https://github.com/bryanbraun/anchorjs
// Copyright (c) 2019 Bryan Braun; Licensed MIT
!function(A,e){"use strict";"function"==typeof define&&define.amd?define([],e):"object"==typeof module&&module.exports?module.exports=e():(A.AnchorJS=e(),A.anchors=new A.AnchorJS)}(this,function(){"use strict";return function(A){function f(A){A.icon=A.hasOwnProperty("icon")?A.icon:"",A.visible=A.hasOwnProperty("visible")?A.visible:"hover",A.placement=A.hasOwnProperty("placement")?A.placement:"right",A.ariaLabel=A.hasOwnProperty("ariaLabel")?A.ariaLabel:"Anchor",A.class=A.hasOwnProperty("class")?A.class:"",A.base=A.hasOwnProperty("base")?A.base:"",A.truncate=A.hasOwnProperty("truncate")?Math.floor(A.truncate):64,A.titleText=A.hasOwnProperty("titleText")?A.titleText:""}function p(A){var e;if("string"==typeof A||A instanceof String)e=[].slice.call(document.querySelectorAll(A));else{if(!(Array.isArray(A)||A instanceof NodeList))throw new Error("The selector provided to AnchorJS was invalid.");e=[].slice.call(A)}return e}this.options=A||{},this.elements=[],f(this.options),this.isTouchDevice=function(){return!!("ontouchstart"in window||window.DocumentTouch&&document instanceof DocumentTouch)},this.add=function(A){var e,t,i,n,o,s,a,r,c,h,l,u,d=[];if(f(this.options),"touch"===(l=this.options.visible)&&(l=this.isTouchDevice()?"always":"hover"),A||(A="h2, h3, h4, h5, h6"),0===(e=p(A)).length)return this;for(function(){if(null===document.head.querySelector("style.anchorjs")){var A,e=document.createElement("style");e.className="anchorjs",e.appendChild(document.createTextNode("")),void 0===(A=document.head.querySelector('[rel="stylesheet"], style'))?document.head.appendChild(e):document.head.insertBefore(e,A),e.sheet.insertRule(" .anchorjs-link { opacity: 0; text-decoration: none; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale; }",e.sheet.cssRules.length),e.sheet.insertRule(" *:hover > .anchorjs-link, .anchorjs-link:focus { opacity: 1; }",e.sheet.cssRules.length),e.sheet.insertRule(" [data-anchorjs-icon]::after { content: attr(data-anchorjs-icon); }",e.sheet.cssRules.length),e.sheet.insertRule(' @font-face { font-family: "anchorjs-icons"; src: url(data:n/a;base64,AAEAAAALAIAAAwAwT1MvMg8yG2cAAAE4AAAAYGNtYXDp3gC3AAABpAAAAExnYXNwAAAAEAAAA9wAAAAIZ2x5ZlQCcfwAAAH4AAABCGhlYWQHFvHyAAAAvAAAADZoaGVhBnACFwAAAPQAAAAkaG10eASAADEAAAGYAAAADGxvY2EACACEAAAB8AAAAAhtYXhwAAYAVwAAARgAAAAgbmFtZQGOH9cAAAMAAAAAunBvc3QAAwAAAAADvAAAACAAAQAAAAEAAHzE2p9fDzz1AAkEAAAAAADRecUWAAAAANQA6R8AAAAAAoACwAAAAAgAAgAAAAAAAAABAAADwP/AAAACgAAA/9MCrQABAAAAAAAAAAAAAAAAAAAAAwABAAAAAwBVAAIAAAAAAAIAAAAAAAAAAAAAAAAAAAAAAAMCQAGQAAUAAAKZAswAAACPApkCzAAAAesAMwEJAAAAAAAAAAAAAAAAAAAAARAAAAAAAAAAAAAAAAAAAAAAQAAg//0DwP/AAEADwABAAAAAAQAAAAAAAAAAAAAAIAAAAAAAAAIAAAACgAAxAAAAAwAAAAMAAAAcAAEAAwAAABwAAwABAAAAHAAEADAAAAAIAAgAAgAAACDpy//9//8AAAAg6cv//f///+EWNwADAAEAAAAAAAAAAAAAAAAACACEAAEAAAAAAAAAAAAAAAAxAAACAAQARAKAAsAAKwBUAAABIiYnJjQ3NzY2MzIWFxYUBwcGIicmNDc3NjQnJiYjIgYHBwYUFxYUBwYGIwciJicmNDc3NjIXFhQHBwYUFxYWMzI2Nzc2NCcmNDc2MhcWFAcHBgYjARQGDAUtLXoWOR8fORYtLTgKGwoKCjgaGg0gEhIgDXoaGgkJBQwHdR85Fi0tOAobCgoKOBoaDSASEiANehoaCQkKGwotLXoWOR8BMwUFLYEuehYXFxYugC44CQkKGwo4GkoaDQ0NDXoaShoKGwoFBe8XFi6ALjgJCQobCjgaShoNDQ0NehpKGgobCgoKLYEuehYXAAAADACWAAEAAAAAAAEACAAAAAEAAAAAAAIAAwAIAAEAAAAAAAMACAAAAAEAAAAAAAQACAAAAAEAAAAAAAUAAQALAAEAAAAAAAYACAAAAAMAAQQJAAEAEAAMAAMAAQQJAAIABgAcAAMAAQQJAAMAEAAMAAMAAQQJAAQAEAAMAAMAAQQJAAUAAgAiAAMAAQQJAAYAEAAMYW5jaG9yanM0MDBAAGEAbgBjAGgAbwByAGoAcwA0ADAAMABAAAAAAwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAH//wAP) format("truetype"); }',e.sheet.cssRules.length)}}(),t=document.querySelectorAll("[id]"),i=[].map.call(t,function(A){return A.id}),o=0;o<e.length;o++)if(this.hasAnchorJSLink(e[o]))d.push(o);else{if(e[o].hasAttribute("id"))n=e[o].getAttribute("id");else if(e[o].hasAttribute("data-anchor-id"))n=e[o].getAttribute("data-anchor-id");else{for(c=r=this.urlify(e[o].textContent),a=0;void 0!==s&&(c=r+"-"+a),a+=1,-1!==(s=i.indexOf(c)););s=void 0,i.push(c),e[o].setAttribute("id",c),n=c}n.replace(/-/g," "),(h=document.createElement("a")).className="anchorjs-link "+this.options.class,h.setAttribute("aria-label",this.options.ariaLabel),h.setAttribute("data-anchorjs-icon",this.options.icon),this.options.titleText&&(h.title=this.options.titleText),u=document.querySelector("base")?window.location.pathname+window.location.search:"",u=this.options.base||u,h.href=u+"#"+n,"always"===l&&(h.style.opacity="1"),""===this.options.icon&&(h.style.font="1em/1 anchorjs-icons","left"===this.options.placement&&(h.style.lineHeight="inherit")),"left"===this.options.placement?(h.style.position="absolute",h.style.marginLeft="-1em",h.style.paddingRight="0.5em",e[o].insertBefore(h,e[o].firstChild)):(h.style.paddingLeft="0.375em",e[o].appendChild(h))}for(o=0;o<d.length;o++)e.splice(d[o]-o,1);return this.elements=this.elements.concat(e),this},this.remove=function(A){for(var e,t,i=p(A),n=0;n<i.length;n++)(t=i[n].querySelector(".anchorjs-link"))&&(-1!==(e=this.elements.indexOf(i[n]))&&this.elements.splice(e,1),i[n].removeChild(t));return this},this.removeAll=function(){this.remove(this.elements)},this.urlify=function(A){return this.options.truncate||f(this.options),A.trim().replace(/\'/gi,"").replace(/[& +$,:;=?@"#{}|^~[`%!'<>\]\.\/\(\)\*\\\n\t\b\v]/g,"-").replace(/-{2,}/g,"-").substring(0,this.options.truncate).replace(/^-+|-+$/gm,"").toLowerCase()},this.hasAnchorJSLink=function(A){var e=A.firstChild&&-1<(" "+A.firstChild.className+" ").indexOf(" anchorjs-link "),t=A.lastChild&&-1<(" "+A.lastChild.className+" ").indexOf(" anchorjs-link ");return e||t||!1}}});
</script>
</head>
<body>
<header>
<!--#include virtual="/includes/promotion_banner.html" -->
<div class="container">
<button type='button' class='menu' title='Docs menu'>
<img src="/assets/images/menu-icon.svg" width="18" height="12" alt="menu icon" />
</button>
<div class='home'>
<a href="/" class='home' title='Apache Ignite home'>
<img src="/assets/images/apache_ignite_logo.svg" alt="Apache Ignite logo" width="103" height="36" >
</a>
</div>
<select id="product-selector">
<option value="/docs/ignite2/latest" >Ignite 2</option>
<option value="/docs/ignite3/latest"selected>Ignite 3</option>
</select>
<select id="version-selector">
<option value="3.1.0">3.1.0</option>
</select>
<nav id="api-docs"><ul>
<li><a href="#">APIs</a>
<nav class='dropdown'>
<ul>
<li class="dropdown-item"><a href="/releases/ignite3/3.1.0/javadoc/">Java</a></li>
<li class="dropdown-item"><a href="/releases/ignite3/3.1.0/dotnetdoc/">C#/.NET</a></li>
<li class="dropdown-item"><a href="/releases/ignite3/3.1.0/cppdoc/">C++</a></li>
<li class="dropdown-item"><a href="/releases/ignite3/3.1.0/openapi.yaml">OpenAPI</a></li>
</ul>
</nav>
</li>
<li><a href="#">Examples</a>
<nav class="dropdown">
<ul>
<li class="dropdown-item"><a href="https://github.com/apache/ignite/tree/master/examples" target="_blank" rel="noopener" title="Apache Ignite Java examples">Java</a></li>
<li class="dropdown-item"><a href="https://github.com/apache/ignite/tree/master/modules/platforms/dotnet/examples" target="_blank" rel="noopener" title="Apache Ignite C#/.NET examples">C#/.NET</a></li>
<li class="dropdown-item"><a href="https://github.com/apache/ignite/tree/master/modules/platforms/cpp/examples" target="_blank" rel="noopener" title="Apache Ignite C++ examples">C++</a></li>
<li class="dropdown-item"><a href="https://github.com/apache/ignite/tree/master/modules/platforms/python/examples" target="_blank" rel="noopener" title="Apache Ignite Python examples">Python</a></li>
<li class="dropdown-item"><a href="https://github.com/apache/ignite/tree/master/modules/platforms/nodejs/examples" target="_blank" rel="noopener" title="Apache Ignite NodeJS examples">NodeJS</a></li>
<li class="dropdown-item"><a href="https://github.com/apache/ignite/tree/master/modules/platforms/php/examples" target="_blank" rel="noopener" title="Apache Ignite PHP examples">PHP</a></li>
</ul>
</nav>
</li></ul>
</nav>
<div id="search-button"></div>
<nav id="lang-selector"><ul>
<li><a href="#"><img src="/assets/images/icon_lang_en_75x75.jpg" alt="English language icon" width="25" height="25" /><span></span></a>
<nav class="dropdown">
<li class="dropdown-item"><a href="/docs/latest/" ><img src="/assets/images/icon_lang_en_75x75.jpg" alt="English language icon" width="25" height="25" /><span>English</span></a></li>
<li class="dropdown-item"><a href="https://www.ignite-service.cn/doc/java/" target="_blank" rel="noopener"><img src="/assets/images/icon_lang_cn_75x75.jpg" width="25" height="25" alt="Chinese language icon" /><span>Chinese</span></a></li>
</nav>
</li></ul>
</nav>
<button type='button' class='top-nav-toggle'></button>
</div>
<div id="search-wrapper">
<div id="search"></div>
</div>
</header>
<link rel="stylesheet" href="/assets/css/docs.css">
<section class='page-docs'>
<nav class='left-nav' data-swiftype-index='false'>
<li>
<a href="/docs/ignite3/latest/index" class='' >About Apache Ignite 3</a>
</li>
<li>
<button type='button' class='group-toggle collapsed '>Installation<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/ignite3/latest/installation/installing-using-zip"
class=''
>Installing Using ZIP Archive</a>
</li>
<li>
<a href="/docs/ignite3/latest/installation/deb-rpm"
class=''
>Installing DEB or RPM package</a>
</li>
<li>
<a href="/docs/ignite3/latest/installation/installing-using-docker"
class=''
>Installing Docker</a>
</li>
<li>
<a href="/docs/ignite3/latest/installation/migration-from-ai3-1"
class=''
>Migration From Ignite 3.0</a>
</li>
<li>
<button
type='button'
class='collapsed '>Migration From Ignite 2<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/installation/migration-from-ai2/config" class=''>Configuration Migration</a></li>
<li><a href="/docs/ignite3/latest/installation/migration-from-ai2/persistent-migration" class=''>Persistent Data Migration</a></li>
<li><a href="/docs/ignite3/latest/installation/migration-from-ai2/ai2-functions" class=''>SQL Function Comparison</a></li>
</nav>
</li>
</nav>
</li>
<li>
<button type='button' class='group-toggle collapsed '>Getting Started<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/ignite3/latest/quick-start/getting-started-guide"
class=''
>Quick Start</a>
</li>
<li>
<a href="/docs/ignite3/latest/quick-start/start-cluster"
class=''
>Start Ignite 3 Cluster</a>
</li>
<li>
<a href="/docs/ignite3/latest/quick-start/explore-sql"
class=''
>Explore SQL Capabilities</a>
</li>
<li>
<a href="/docs/ignite3/latest/quick-start/persist-data"
class=''
>Persist Your Data</a>
</li>
<li>
<a href="/docs/ignite3/latest/quick-start/java-api"
class=''
>Use Java API</a>
</li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/quick-start/embedded-mode" class='' >Embedded Mode</a>
</li>
<li>
<a href="/docs/ignite3/latest/ignite-cli-tool" class='' >Ignite CLI Tool</a>
</li>
<li>
<button type='button' class='group-toggle expanded '>Developers Guide<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class='nav-group expanded'>
<li>
<a href="/docs/ignite3/latest/developers-guide/table-api"
class=''
>Table API</a>
</li>
<li>
<button
type='button'
class='collapsed '>Clients<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/developers-guide/clients/overview" class=''>Overview</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/java" class=''>Java Clients</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/dotnet" class=''>.NET Clients</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/cpp" class=''>C++ Clients</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/python" class=''>Python Clients</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/ado" class=''>ADO.NET Integration</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/linq" class=''>.NET LINQ Queries</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/clients/jdbc-driver" class=''>JDBC Driver</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>Working with SQL<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/developers-guide/sql/calcite-based-sql-engine" class=''>Introduction</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/sql/sql-api" class=''>SQL API</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/sql/jdbc-driver" class=''>JDBC Driver</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/sql/odbc/odbc-driver" class=''>ODBC Driver</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/sql/odbc/connection-string" class=''>ODBC Connection String</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/sql/odbc/querying-modifying-data" class=''>Querying and Modifying Data</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/sql/odbc/specification" class=''>Standard Conformance</a></li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/developers-guide/java-to-tables"
class=''
>Tables from Java Classes</a>
</li>
<li>
<button
type='button'
class='collapsed '>Distributed Computing<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/developers-guide/compute/compute" class=''>Compute API</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/compute/serialization" class=''>Serialization</a></li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/developers-guide/transactions"
class=''
>Transactions</a>
</li>
<li>
<a href="/docs/ignite3/latest/developers-guide/data-streamer"
class='active'
>Data Streaming</a>
</li>
<li>
<a href="/docs/ignite3/latest/developers-guide/code-deployment/code-deployment"
class=''
>Code Deployment</a>
</li>
<li>
<button
type='button'
class='collapsed '>Events<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/developers-guide/events/overview" class=''>Overview</a></li>
<li><a href="/docs/ignite3/latest/developers-guide/events/events-list" class=''>Events List</a></li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/developers-guide/rest/rest-api"
class=''
>REST API</a>
</li>
</nav>
</li>
<li>
<button type='button' class='group-toggle collapsed '>Administrator's Guide<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class='nav-group collapsed'>
<li>
<button
type='button'
class='collapsed '>Ignite Configuration<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/administrators-guide/config/node-config" class=''>Node Configuration Parameters</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/config/cluster-config" class=''>Cluster Configuration Parameters</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/config/cli-config" class=''>CLI Configuration</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>Distributed Storage<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/administrators-guide/storage/storage-overview" class=''>Storage Overview</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/storage/storage-profiles" class=''>Storage Profiles</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/storage/data-partitions" class=''>Data Partitions</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/storage/distribution-zones" class=''>Distribution Zones</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>Storage Engines<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/administrators-guide/storage/engines/storage-engines" class=''>Storage Engines Overview</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/storage/engines/aipersist" class=''>AIPersist Engine</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/storage/engines/rocksdb" class=''>RocksDB Engine</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/storage/engines/aimem" class=''>AIMem Engine</a></li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/administrators-guide/lifecycle"
class=''
>Cluster Lifecycle</a>
</li>
<li>
<a href="/docs/ignite3/latest/administrators-guide/colocation"
class=''
>Data Colocation</a>
</li>
<li>
<a href="/docs/ignite3/latest/administrators-guide/disaster-recovery"
class=''
>Disaster Recovery for Partitions</a>
</li>
<li>
<a href="/docs/ignite3/latest/administrators-guide/system-groups-recovery"
class=''
>Disaster Recovery for System Groups</a>
</li>
<li>
<a href="/docs/ignite3/latest/administrators-guide/cluster-security"
class=''
>Cluster Security</a>
</li>
<li>
<button
type='button'
class='collapsed '>Security and Authentication<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/administrators-guide/security/ssl-tls" class=''>SSL/TLS</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/security/authentication" class=''>Authentication</a></li>
</nav>
</li>
<li>
<button
type='button'
class='collapsed '>Metrics and Monitoring<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/administrators-guide/metrics/configuring-metrics" class=''>Configuring Metrics</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/metrics/metrics-list" class=''>Metrics List</a></li>
<li><a href="/docs/ignite3/latest/administrators-guide/metrics/system-views" class=''>System Views</a></li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/administrators-guide/handling-exceptions"
class=''
>Handling Exceptions</a>
</li>
</nav>
</li>
<li>
<button type='button' class='group-toggle collapsed '>SQL Reference<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/ignite3/latest/sql-reference/ddl"
class=''
>Data Definition Language (DDL)</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/dml"
class=''
>Data Manipulation Language (DML)</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/transactions"
class=''
>Transactions</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/distribution-zones"
class=''
>Distribution Zones</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/data-types"
class=''
>Data Types</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/operators-and-functions"
class=''
>Supported Operators and Functions</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/operational-commands"
class=''
>Operational Commands</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/grammar-reference"
class=''
>Grammar Reference</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/keywords"
class=''
>Keywords</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-reference/sql-conformance"
class=''
>SQL Conformance</a>
</li>
<li>
<button
type='button'
class='collapsed '>Explain Statement<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class="sub_pages collapsed">
<li><a href="/docs/ignite3/latest/sql-reference/explain-statement" class=''>Explain Statement</a></li>
<li><a href="/docs/ignite3/latest/sql-reference/explain-operators-list" class=''>List Of Operators</a></li>
</nav>
</li>
</nav>
</li>
<li>
<button type='button' class='group-toggle collapsed '>SQL Performance Tuning<img class="state-indicator" src="/assets/images/left-nav-arrow.svg" width="6" height="10"></button>
<nav class='nav-group collapsed'>
<li>
<a href="/docs/ignite3/latest/sql-tuning/sql-tuning"
class=''
>SQL Performance Tuning</a>
</li>
<li>
<a href="/docs/ignite3/latest/sql-tuning/using-explain"
class=''
>Improving Queries</a>
</li>
</nav>
</li>
<li>
<a href="/docs/ignite3/latest/general-tips" class='' >General Configuration Tips</a>
</li>
<li>
<a href="/docs/ignite3/latest/glossary/glossary" class='' >Glossary</a>
</li>
</nav>
<div class="left-nav__overlay"></div>
<article data-swiftype-index='true'>
<a class='edit-link' href="https://github.com/apache/ignite-3/tree/main/docs/_docs/developers-guide/data-streamer.adoc" target="_blank">Edit</a>
<h1>Streaming Data</h1>
<div id="preamble">
<div class="sectionbody">
<div class="paragraph">
<p>Data streaming provides a fast, efficient method for loading, organizing, and distributing large volumes of data across your cluster.
Data streamer accepts a stream of data and distributes data entries across the cluster, where the processing takes place. Data streaming is available in all table views.</p>
</div>
<div class="imageblock">
<div class="content">
<img src="/docs/ignite3/3.1.0/images/data_streaming.png" alt="data streaming">
</div>
</div>
<div class="paragraph">
<p>Data streaming provides at-least-once delivery guarantee.</p>
</div>
</div>
</div>
<div class="sect1">
<h2 id="using-data-streamer-api">Using Data Streamer API</h2>
<div class="sectionbody">
<div class="paragraph">
<p>The <a href="https://ignite.apache.org/releases/3.1.0/javadoc/org/apache/ignite/table/DataStreamerTarget.html">Data Streamer API</a> lets you load large amounts of data into your cluster quickly and reliably using a publisher–subscriber model, where you create a publisher that streams your data entries to a table view, and the system distributes these entries across the cluster. You can configure how the data is processed via a <code>DataStreamerOptions</code> object that allows to set batch sizes, auto-flush intervals, retry limits.</p>
</div>
<div class="sect2">
<h3 id="configuring-data-streamer">Configuring Data Streamer</h3>
<div class="paragraph">
<p><code>DataStreamerOptions</code> lets you fine-tune how data is streamed into your cluster by setting parameters for batching, retries, parallelism, and auto-flush timing:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">DataStreamerOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="nc">DataStreamerOptions</span><span class="o">.</span><span class="na">builder</span><span class="o">()</span>
<span class="o">.</span><span class="na">pageSize</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
<span class="o">.</span><span class="na">perPartitionParallelOperations</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">autoFlushInterval</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
<span class="o">.</span><span class="na">retryLimit</span><span class="o">(</span><span class="mi">16</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="kt">var</span> <span class="n">options</span> <span class="p">=</span> <span class="k">new</span> <span class="n">DataStreamerOptions</span>
<span class="p">{</span>
<span class="n">PageSize</span> <span class="p">=</span> <span class="m">1000</span><span class="p">,</span>
<span class="n">RetryLimit</span> <span class="p">=</span> <span class="m">8</span><span class="p">,</span>
<span class="n">AutoFlushInterval</span> <span class="p">=</span> <span class="n">TimeSpan</span><span class="p">.</span><span class="nf">FromSeconds</span><span class="p">(</span><span class="m">3</span><span class="p">)</span>
<span class="p">};</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="ulist">
<ul>
<li>
<p><code>pageSize</code>: Specifies the number of entries to process in each page or chunk of data.</p>
</li>
<li>
<p><code>perPartitionParallelOperations</code>: Determines the number of parallel operations allowed on each partition.</p>
</li>
<li>
<p><code>autoFlushInterval</code>: Defines the time interval (in milliseconds) after which the system automatically flushes any incomplete buffers.</p>
</li>
<li>
<p><code>retryLimit</code>: Specifies the maximum number of retry attempts for a failed data submission before giving up.</p>
</li>
</ul>
</div>
</div>
<div class="sect2">
<h3 id="streaming-data">Streaming Data</h3>
<div class="paragraph">
<p>Before data is streamed to the cluster, each entry must be wrapped in an instance of the <code>DataStreamerItem&lt;T&gt;</code> class. This wrapper allows you to perform <code>PUT</code> and <code>REMOVE</code> operations with data:</p>
</div>
<div class="ulist">
<ul>
<li>
<p>Use <code>DataStreamerItem.of(entry)</code> to insert new entries into the table.</p>
</li>
<li>
<p>Use <code>DataStreamerItem.removed(entry)</code> to delete existing ones.</p>
</li>
</ul>
</div>
<div class="paragraph">
<p>Wrapped data then can be passed to a publisher and streamed to the table.</p>
</div>
<div class="paragraph">
<p>The example below demonstrates how to use <a href="/docs/ignite3/latest/developers-guide/table-api#record-view"><code>RecordView</code></a>, create a publisher, configure the data streamer, insert account records into the existing <code>accounts</code> table and then delete them:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">RecordViewPojoDataStreamerExample</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kt">int</span> <span class="no">ACCOUNTS_COUNT</span> <span class="o">=</span> <span class="mi">1000</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="nc">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span>
<span class="cm">/**
* Assuming the 'accounts' table already exists.
*/</span>
<span class="k">try</span> <span class="o">(</span><span class="nc">IgniteClient</span> <span class="n">client</span> <span class="o">=</span> <span class="nc">IgniteClient</span><span class="o">.</span><span class="na">builder</span><span class="o">()</span>
<span class="o">.</span><span class="na">addresses</span><span class="o">(</span><span class="s">"127.0.0.1:10800"</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">())</span> <span class="o">{</span>
<span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;</span> <span class="n">view</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">tables</span><span class="o">().</span><span class="na">table</span><span class="o">(</span><span class="s">"accounts"</span><span class="o">).</span><span class="na">recordView</span><span class="o">(</span><span class="nc">Account</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">streamAccountDataPut</span><span class="o">(</span><span class="n">view</span><span class="o">);</span>
<span class="n">streamAccountDataRemove</span><span class="o">(</span><span class="n">view</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="cm">/**
* Streaming data using DataStreamerOperationType#PUT operation type.
*/</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">streamAccountDataPut</span><span class="o">(</span><span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;</span> <span class="n">view</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">DataStreamerOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="nc">DataStreamerOptions</span><span class="o">.</span><span class="na">builder</span><span class="o">()</span>
<span class="o">.</span><span class="na">pageSize</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
<span class="o">.</span><span class="na">perPartitionParallelOperations</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">autoFlushInterval</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
<span class="o">.</span><span class="na">retryLimit</span><span class="o">(</span><span class="mi">16</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">streamerFut</span><span class="o">;</span>
<span class="k">try</span> <span class="o">(</span><span class="kt">var</span> <span class="n">publisher</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SubmissionPublisher</span><span class="o">&lt;</span><span class="nc">DataStreamerItem</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;&gt;())</span> <span class="o">{</span>
<span class="n">streamerFut</span> <span class="o">=</span> <span class="n">view</span><span class="o">.</span><span class="na">streamData</span><span class="o">(</span><span class="n">publisher</span><span class="o">,</span> <span class="n">options</span><span class="o">);</span>
<span class="nc">ThreadLocalRandom</span> <span class="n">rnd</span> <span class="o">=</span> <span class="nc">ThreadLocalRandom</span><span class="o">.</span><span class="na">current</span><span class="o">();</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="no">ACCOUNTS_COUNT</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
<span class="nc">Account</span> <span class="n">entry</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Account</span><span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="s">"name"</span> <span class="o">+</span> <span class="n">i</span><span class="o">,</span> <span class="n">rnd</span><span class="o">.</span><span class="na">nextLong</span><span class="o">(</span><span class="mi">100_000</span><span class="o">),</span> <span class="n">rnd</span><span class="o">.</span><span class="na">nextBoolean</span><span class="o">());</span>
<span class="n">publisher</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="nc">DataStreamerItem</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">entry</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">streamerFut</span><span class="o">.</span><span class="na">join</span><span class="o">();</span>
<span class="o">}</span>
<span class="cm">/**
* Streaming data using DataStreamerOperationType#REMOVE operation type
*/</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">streamAccountDataRemove</span><span class="o">(</span><span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;</span> <span class="n">view</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">DataStreamerOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="nc">DataStreamerOptions</span><span class="o">.</span><span class="na">builder</span><span class="o">()</span>
<span class="o">.</span><span class="na">pageSize</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
<span class="o">.</span><span class="na">perPartitionParallelOperations</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">.</span><span class="na">autoFlushInterval</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
<span class="o">.</span><span class="na">retryLimit</span><span class="o">(</span><span class="mi">16</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">streamerFut</span><span class="o">;</span>
<span class="k">try</span> <span class="o">(</span><span class="kt">var</span> <span class="n">publisher</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SubmissionPublisher</span><span class="o">&lt;</span><span class="nc">DataStreamerItem</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;&gt;())</span> <span class="o">{</span>
<span class="n">streamerFut</span> <span class="o">=</span> <span class="n">view</span><span class="o">.</span><span class="na">streamData</span><span class="o">(</span><span class="n">publisher</span><span class="o">,</span> <span class="n">options</span><span class="o">);</span>
<span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="no">ACCOUNTS_COUNT</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
<span class="nc">Account</span> <span class="n">entry</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Account</span><span class="o">(</span><span class="n">i</span><span class="o">);</span>
<span class="n">publisher</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="nc">DataStreamerItem</span><span class="o">.</span><span class="na">removed</span><span class="o">(</span><span class="n">entry</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">streamerFut</span><span class="o">.</span><span class="na">join</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">using</span> <span class="nn">Apache.Ignite</span><span class="p">;</span>
<span class="k">using</span> <span class="nn">Apache.Ignite.Table</span><span class="p">;</span>
<span class="k">using</span> <span class="nn">var</span> <span class="n">client</span> <span class="p">=</span> <span class="k">await</span> <span class="n">IgniteClient</span><span class="p">.</span><span class="nf">StartAsync</span><span class="p">(</span><span class="k">new</span><span class="p">(</span><span class="s">"localhost"</span><span class="p">));</span>
<span class="n">ITable</span><span class="p">?</span> <span class="n">table</span> <span class="p">=</span> <span class="k">await</span> <span class="n">client</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"accounts"</span><span class="p">);</span>
<span class="n">IRecordView</span><span class="p">&lt;</span><span class="n">Account</span><span class="p">&gt;</span> <span class="n">view</span> <span class="p">=</span> <span class="n">table</span><span class="p">!.</span><span class="n">GetRecordView</span><span class="p">&lt;</span><span class="n">Account</span><span class="p">&gt;();</span>
<span class="kt">var</span> <span class="n">options</span> <span class="p">=</span> <span class="k">new</span> <span class="n">DataStreamerOptions</span>
<span class="p">{</span>
<span class="n">PageSize</span> <span class="p">=</span> <span class="m">10_000</span><span class="p">,</span>
<span class="n">AutoFlushInterval</span> <span class="p">=</span> <span class="n">TimeSpan</span><span class="p">.</span><span class="nf">FromSeconds</span><span class="p">(</span><span class="m">1</span><span class="p">),</span>
<span class="n">RetryLimit</span> <span class="p">=</span> <span class="m">32</span>
<span class="p">};</span>
<span class="k">await</span> <span class="n">view</span><span class="p">.</span><span class="nf">StreamDataAsync</span><span class="p">(</span><span class="nf">GetAccountsToAdd</span><span class="p">(</span><span class="m">5_000</span><span class="p">),</span> <span class="n">options</span><span class="p">);</span>
<span class="k">await</span> <span class="n">view</span><span class="p">.</span><span class="nf">StreamDataAsync</span><span class="p">(</span><span class="nf">GetAccountsToRemove</span><span class="p">(</span><span class="m">1_000</span><span class="p">),</span> <span class="n">options</span><span class="p">);</span>
<span class="k">async</span> <span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">DataStreamerItem</span><span class="p">&lt;</span><span class="n">Account</span><span class="p">&gt;&gt;</span> <span class="nf">GetAccountsToAdd</span><span class="p">(</span><span class="kt">int</span> <span class="n">count</span><span class="p">)</span>
<span class="p">{</span>
<span class="k">for</span> <span class="p">(</span><span class="kt">int</span> <span class="n">i</span> <span class="p">=</span> <span class="m">0</span><span class="p">;</span> <span class="n">i</span> <span class="p">&lt;</span> <span class="n">count</span><span class="p">;</span> <span class="n">i</span><span class="p">++)</span>
<span class="p">{</span>
<span class="k">yield</span> <span class="k">return</span> <span class="n">DataStreamerItem</span><span class="p">.</span><span class="nf">Create</span><span class="p">(</span>
<span class="k">new</span> <span class="nf">Account</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="s">$"Account </span><span class="p">{</span><span class="n">i</span><span class="p">}</span><span class="s">"</span><span class="p">));</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">async</span> <span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">DataStreamerItem</span><span class="p">&lt;</span><span class="n">Account</span><span class="p">&gt;&gt;</span> <span class="nf">GetAccountsToRemove</span><span class="p">(</span><span class="kt">int</span> <span class="n">count</span><span class="p">)</span>
<span class="p">{</span>
<span class="k">for</span> <span class="p">(</span><span class="kt">int</span> <span class="n">i</span> <span class="p">=</span> <span class="m">0</span><span class="p">;</span> <span class="n">i</span> <span class="p">&lt;</span> <span class="n">count</span><span class="p">;</span> <span class="n">i</span><span class="p">++)</span>
<span class="p">{</span>
<span class="k">yield</span> <span class="k">return</span> <span class="n">DataStreamerItem</span><span class="p">.</span><span class="nf">Create</span><span class="p">(</span>
<span class="k">new</span> <span class="nf">Account</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="kt">string</span><span class="p">.</span><span class="n">Empty</span><span class="p">),</span> <span class="n">DataStreamerOperationType</span><span class="p">.</span><span class="n">Remove</span><span class="p">);</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">public</span> <span class="k">record</span> <span class="nc">Account</span><span class="p">(</span><span class="kt">int</span> <span class="n">Id</span><span class="p">,</span> <span class="kt">string</span> <span class="n">Name</span><span class="p">);</span></code></pre>
</div>
</div></code-tab></code-tabs>
</div>
<div class="sect2">
<h3 id="streaming-with-receiver">Streaming with Receiver</h3>
<div class="paragraph">
<p>The Apache Ignite 3 streaming API supports advanced streaming scenarios by allowing you to create a custom receiver that defines server-side processing logic. Use a receiver when you need to process or transform data on the server, update multiple tables from a single data stream, or work with incoming data that does not match a table schema.</p>
</div>
<div class="paragraph">
<p>With a receiver, you can stream data in any format, as it is schema-agnostic.
The receiver also has access to the full Ignite 3 API through the <a href="https://ignite.apache.org/releases/3.1.0/javadoc/org/apache/ignite/table/DataStreamerReceiverContext.html"><code>DataStreamerReceiverContext</code></a>.</p>
</div>
<div class="paragraph">
<p>The data streamer controls data flow by requesting items only when partition buffers have space. <code>DataStreamerOptions.perPartitionParallelOperations</code> controls how many buffers can be allocated per partition. When buffers are full, the streamer stops requesting more data until some items are processed.
Additionally, if a <code>resultSubscriber</code> is specified, it also applies backpressure on the streamer. If the subscriber is slow at consuming results, the streamer reduces its request rate from the publisher accordingly.</p>
</div>
<div class="paragraph">
<p>To use a receiver, you need to implement the <a href="https://ignite.apache.org/releases/3.1.0/javadoc/org/apache/ignite/table/DataStreamerReceiver.html"><code>DataStreamerReceiver</code></a> interface. The receiver&#8217;s <code>receive</code> method processes each batch of items streamed to the server, so you can apply custom logic and return results for each item as needed:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nd">@Nullable</span> <span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">List</span><span class="o">&lt;</span><span class="no">R</span><span class="o">&gt;&gt;</span> <span class="nf">receive</span><span class="o">(</span>
<span class="nc">List</span><span class="o">&lt;</span><span class="no">T</span><span class="o">&gt;</span> <span class="n">page</span><span class="o">,</span>
<span class="nc">DataStreamerReceiverContext</span> <span class="n">ctx</span><span class="o">,</span>
<span class="nd">@Nullable</span> <span class="no">A</span> <span class="n">arg</span><span class="o">);</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="n">ValueTask</span><span class="p">&lt;</span><span class="n">IList</span><span class="p">&lt;</span><span class="n">TResult</span><span class="p">&gt;?&gt;</span> <span class="nf">ReceiveAsync</span><span class="p">(</span>
<span class="n">IList</span><span class="p">&lt;</span><span class="n">TItem</span><span class="p">&gt;</span> <span class="n">page</span><span class="p">,</span>
<span class="n">TArg</span> <span class="n">arg</span><span class="p">,</span>
<span class="n">IDataStreamerReceiverContext</span> <span class="n">context</span><span class="p">,</span>
<span class="n">CancellationToken</span> <span class="n">cancellationToken</span><span class="p">);</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="ulist">
<ul>
<li>
<p><code>page</code>: The current batch of data items to process.</p>
</li>
<li>
<p><code>ctx</code>: The receiver context, which lets you interact with Ignite 3 API.</p>
</li>
<li>
<p><code>arg</code>: An optional argument that can be used to pass custom parameters to your receiver logic.</p>
</li>
</ul>
</div>
</div>
<div class="sect2">
<h3 id="examples">Examples</h3>
<div class="sect3">
<h4 id="updating-multiple-tables">Updating Multiple Tables</h4>
<div class="paragraph">
<p>The following example demonstrates how to implement a receiver that processes data containing customer and address information, and updates two separate tables on the server:</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p>First, create the custom receiver that will extract data from the provided source and write it into two separate tables: <code>customers</code> and <code>addresses</code>.</p>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">TwoTableReceiver</span> <span class="kd">implements</span> <span class="nc">DataStreamerReceiver</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Void</span><span class="o">,</span> <span class="nc">Void</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="nd">@Nullable</span> <span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">List</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;&gt;</span> <span class="nf">receive</span><span class="o">(</span><span class="nc">List</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">page</span><span class="o">,</span> <span class="nc">DataStreamerReceiverContext</span> <span class="n">ctx</span><span class="o">,</span> <span class="nd">@Nullable</span> <span class="nc">Void</span> <span class="n">arg</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// List&lt;Tuple&gt; is the source data. Those tuples do not conform to any table and can have arbitrary data.</span>
<span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">customersTable</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">ignite</span><span class="o">().</span><span class="na">tables</span><span class="o">().</span><span class="na">table</span><span class="o">(</span><span class="s">"customers"</span><span class="o">).</span><span class="na">recordView</span><span class="o">();</span>
<span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">addressesTable</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">ignite</span><span class="o">().</span><span class="na">tables</span><span class="o">().</span><span class="na">table</span><span class="o">(</span><span class="s">"addresses"</span><span class="o">).</span><span class="na">recordView</span><span class="o">();</span>
<span class="k">for</span> <span class="o">(</span><span class="nc">Tuple</span> <span class="n">sourceItem</span> <span class="o">:</span> <span class="n">page</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// For each source item, receiver extracts customer and address data and upserts it into respective tables.</span>
<span class="nc">Tuple</span> <span class="n">customer</span> <span class="o">=</span> <span class="nc">Tuple</span><span class="o">.</span><span class="na">create</span><span class="o">()</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">intValue</span><span class="o">(</span><span class="s">"customerId"</span><span class="o">))</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">stringValue</span><span class="o">(</span><span class="s">"customerName"</span><span class="o">))</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"addressId"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">intValue</span><span class="o">(</span><span class="s">"addressId"</span><span class="o">));</span>
<span class="nc">Tuple</span> <span class="n">address</span> <span class="o">=</span> <span class="nc">Tuple</span><span class="o">.</span><span class="na">create</span><span class="o">()</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">intValue</span><span class="o">(</span><span class="s">"addressId"</span><span class="o">))</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"street"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">stringValue</span><span class="o">(</span><span class="s">"street"</span><span class="o">))</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"city"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">stringValue</span><span class="o">(</span><span class="s">"city"</span><span class="o">));</span>
<span class="n">customersTable</span><span class="o">.</span><span class="na">upsert</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="n">customer</span><span class="o">);</span>
<span class="n">addressesTable</span><span class="o">.</span><span class="na">upsert</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="n">address</span><span class="o">);</span>
<span class="o">}</span>
<span class="k">return</span> <span class="kc">null</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">class</span> <span class="nc">TwoTableReceiver</span> <span class="p">:</span> <span class="n">IDataStreamerReceiver</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">,</span> <span class="kt">object</span><span class="p">?,</span> <span class="kt">object</span><span class="p">&gt;</span>
<span class="p">{</span>
<span class="k">public</span> <span class="k">async</span> <span class="n">ValueTask</span><span class="p">&lt;</span><span class="n">IList</span><span class="p">&lt;</span><span class="kt">object</span><span class="p">&gt;?&gt;</span> <span class="nf">ReceiveAsync</span><span class="p">(</span>
<span class="n">IList</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">page</span><span class="p">,</span>
<span class="kt">object</span><span class="p">?</span> <span class="n">arg</span><span class="p">,</span>
<span class="n">IDataStreamerReceiverContext</span> <span class="n">context</span><span class="p">,</span>
<span class="n">CancellationToken</span> <span class="n">cancellationToken</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">IRecordView</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">customerTable</span> <span class="p">=</span> <span class="p">(</span><span class="k">await</span> <span class="n">context</span><span class="p">.</span><span class="n">Ignite</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"customers"</span><span class="p">))!.</span><span class="n">RecordBinaryView</span><span class="p">;</span>
<span class="n">IRecordView</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">addressesTable</span> <span class="p">=</span> <span class="p">(</span><span class="k">await</span> <span class="n">context</span><span class="p">.</span><span class="n">Ignite</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"addresses"</span><span class="p">))!.</span><span class="n">RecordBinaryView</span><span class="p">;</span>
<span class="k">foreach</span> <span class="p">(</span><span class="n">IIgniteTuple</span> <span class="n">sourceItem</span> <span class="k">in</span> <span class="n">page</span><span class="p">)</span>
<span class="p">{</span>
<span class="c1">// For each source item, the receiver extracts customer and address data and upserts it into respective tables.</span>
<span class="kt">var</span> <span class="n">customer</span> <span class="p">=</span> <span class="k">new</span> <span class="n">IgniteTuple</span>
<span class="p">{</span>
<span class="p">[</span><span class="s">"id"</span><span class="p">]</span> <span class="p">=</span> <span class="n">sourceItem</span><span class="p">[</span><span class="s">"customerId"</span><span class="p">],</span>
<span class="p">[</span><span class="s">"name"</span><span class="p">]</span> <span class="p">=</span> <span class="n">sourceItem</span><span class="p">[</span><span class="s">"customerName"</span><span class="p">],</span>
<span class="p">[</span><span class="s">"addressId"</span><span class="p">]</span> <span class="p">=</span> <span class="n">sourceItem</span><span class="p">[</span><span class="s">"addressId"</span><span class="p">]</span>
<span class="p">};</span>
<span class="kt">var</span> <span class="n">address</span> <span class="p">=</span> <span class="k">new</span> <span class="n">IgniteTuple</span>
<span class="p">{</span>
<span class="p">[</span><span class="s">"id"</span><span class="p">]</span> <span class="p">=</span> <span class="n">sourceItem</span><span class="p">[</span><span class="s">"addressId"</span><span class="p">],</span>
<span class="p">[</span><span class="s">"street"</span><span class="p">]</span> <span class="p">=</span> <span class="n">sourceItem</span><span class="p">[</span><span class="s">"street"</span><span class="p">],</span>
<span class="p">[</span><span class="s">"city"</span><span class="p">]</span> <span class="p">=</span> <span class="n">sourceItem</span><span class="p">[</span><span class="s">"city"</span><span class="p">],</span>
<span class="p">};</span>
<span class="k">await</span> <span class="n">customerTable</span><span class="p">.</span><span class="nf">UpsertAsync</span><span class="p">(</span><span class="k">null</span><span class="p">,</span> <span class="n">customer</span><span class="p">);</span>
<span class="k">await</span> <span class="n">addressesTable</span><span class="p">.</span><span class="nf">UpsertAsync</span><span class="p">(</span><span class="k">null</span><span class="p">,</span> <span class="n">address</span><span class="p">);</span>
<span class="p">}</span>
<span class="k">return</span> <span class="k">null</span><span class="p">;</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Create a descriptor that refers to your receiver implementation. This descriptor will be passed later to a <code>SubmissionPublisher</code> when streaming data.</p>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">DataStreamerReceiverDescriptor</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Void</span><span class="o">,</span> <span class="nc">Void</span><span class="o">&gt;</span> <span class="n">desc</span> <span class="o">=</span> <span class="nc">DataStreamerReceiverDescriptor</span>
<span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="nc">TwoTableReceiver</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="n">ReceiverDescriptor</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">,</span> <span class="kt">object</span><span class="p">?,</span> <span class="kt">object</span><span class="p">&gt;</span> <span class="n">desc</span> <span class="p">=</span> <span class="n">ReceiverDescriptor</span><span class="p">.</span><span class="nf">Of</span><span class="p">(</span><span class="k">new</span> <span class="nf">TwoTableReceiver</span><span class="p">());</span></code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Next, obtain the target table to partition the data for streaming. In this example we partition by <code>customerId</code> to ensure the receiver is <a href="/docs/ignite3/latest/administrators-guide/colocation">colocated</a> with the customer data, enabling local upserts. Then define how to extract keys and payloads from the source, and stream the data using a <code>SubmissionPublisher</code>.</p>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="c1">// Example source data</span>
<span class="nc">List</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">sourceData</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">10</span><span class="o">)</span>
<span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="n">i</span> <span class="o">-&gt;</span> <span class="nc">Tuple</span><span class="o">.</span><span class="na">create</span><span class="o">()</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"customerId"</span><span class="o">,</span> <span class="n">i</span><span class="o">)</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"customerName"</span><span class="o">,</span> <span class="s">"Customer "</span> <span class="o">+</span> <span class="n">i</span><span class="o">)</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"addressId"</span><span class="o">,</span> <span class="n">i</span><span class="o">)</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"street"</span><span class="o">,</span> <span class="s">"Street "</span> <span class="o">+</span> <span class="n">i</span><span class="o">)</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"city"</span><span class="o">,</span> <span class="s">"City "</span> <span class="o">+</span> <span class="n">i</span><span class="o">))</span>
<span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="nc">Collectors</span><span class="o">.</span><span class="na">toList</span><span class="o">());</span>
<span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">streamerFut</span><span class="o">;</span>
<span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">customersTable</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">tables</span><span class="o">().</span><span class="na">table</span><span class="o">(</span><span class="s">"customers"</span><span class="o">).</span><span class="na">recordView</span><span class="o">();</span>
<span class="c1">// Extract the target table key from each source item; since the source has "customerId" but the target table uses "id", the function maps customerId to id accordingly.</span>
<span class="nc">Function</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">keyFunc</span> <span class="o">=</span> <span class="n">sourceItem</span> <span class="o">-&gt;</span> <span class="nc">Tuple</span><span class="o">.</span><span class="na">create</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">intValue</span><span class="o">(</span><span class="s">"customerId"</span><span class="o">));</span>
<span class="c1">// Extract the data payload sent to the receiver. In this case, we use the entire source item as the payload.</span>
<span class="nc">Function</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">payloadFunc</span> <span class="o">=</span> <span class="nc">Function</span><span class="o">.</span><span class="na">identity</span><span class="o">();</span>
<span class="c1">// Stream data using a publisher.</span>
<span class="k">try</span> <span class="o">(</span><span class="kt">var</span> <span class="n">publisher</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SubmissionPublisher</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;())</span> <span class="o">{</span>
<span class="n">streamerFut</span> <span class="o">=</span> <span class="n">customersTable</span><span class="o">.</span><span class="na">streamData</span><span class="o">(</span>
<span class="n">publisher</span><span class="o">,</span>
<span class="n">desc</span><span class="o">,</span>
<span class="n">keyFunc</span><span class="o">,</span>
<span class="n">payloadFunc</span><span class="o">,</span>
<span class="kc">null</span><span class="o">,</span> <span class="c1">// Optional receiver arguments</span>
<span class="kc">null</span><span class="o">,</span> <span class="c1">// Result subscriber</span>
<span class="kc">null</span> <span class="c1">// Options</span>
<span class="o">);</span>
<span class="k">for</span> <span class="o">(</span><span class="nc">Tuple</span> <span class="n">item</span> <span class="o">:</span> <span class="n">sourceData</span><span class="o">)</span> <span class="o">{</span>
<span class="n">publisher</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="n">item</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">streamerFut</span><span class="o">.</span><span class="na">join</span><span class="o">();</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">sourceData</span> <span class="p">=</span> <span class="nf">GetSourceData</span><span class="p">();</span>
<span class="n">IRecordView</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">customersTable</span> <span class="p">=</span> <span class="p">(</span><span class="k">await</span> <span class="n">client</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"customers"</span><span class="p">))!.</span><span class="n">RecordBinaryView</span><span class="p">;</span>
<span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="kt">object</span><span class="p">&gt;</span> <span class="n">streamerResults</span> <span class="p">=</span> <span class="n">customersTable</span><span class="p">.</span><span class="nf">StreamDataAsync</span><span class="p">(</span>
<span class="n">sourceData</span><span class="p">,</span>
<span class="n">desc</span><span class="p">,</span>
<span class="n">x</span> <span class="p">=&gt;</span> <span class="k">new</span> <span class="n">IgniteTuple</span> <span class="p">{</span> <span class="p">[</span><span class="s">"id"</span><span class="p">]</span> <span class="p">=</span> <span class="n">x</span><span class="p">[</span><span class="s">"customerId"</span><span class="p">]</span> <span class="p">},</span>
<span class="n">x</span> <span class="p">=&gt;</span> <span class="n">x</span><span class="p">,</span>
<span class="k">null</span><span class="p">,</span>
<span class="n">DataStreamerOptions</span><span class="p">.</span><span class="n">Default</span><span class="p">,</span>
<span class="n">CancellationToken</span><span class="p">.</span><span class="n">None</span><span class="p">);</span>
<span class="k">await</span> <span class="k">foreach</span> <span class="p">(</span><span class="kt">object</span> <span class="n">result</span> <span class="k">in</span> <span class="n">streamerResults</span><span class="p">)</span>
<span class="p">{</span>
<span class="c1">// ...</span>
<span class="p">}</span>
<span class="k">static</span> <span class="k">async</span> <span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="nf">GetSourceData</span><span class="p">()</span>
<span class="p">{</span>
<span class="k">await</span> <span class="n">Task</span><span class="p">.</span><span class="nf">Yield</span><span class="p">();</span> <span class="c1">// Simulate async enumeration.</span>
<span class="k">for</span> <span class="p">(</span><span class="kt">int</span> <span class="n">i</span> <span class="p">=</span> <span class="m">0</span><span class="p">;</span> <span class="n">i</span> <span class="p">&lt;</span> <span class="m">10</span><span class="p">;</span> <span class="n">i</span><span class="p">++)</span>
<span class="p">{</span>
<span class="k">yield</span> <span class="k">return</span> <span class="k">new</span> <span class="n">IgniteTuple</span>
<span class="p">{</span>
<span class="p">[</span><span class="s">"customerId"</span><span class="p">]</span> <span class="p">=</span> <span class="n">i</span><span class="p">,</span>
<span class="p">[</span><span class="s">"customerName"</span><span class="p">]</span> <span class="p">=</span> <span class="s">$"Customer </span><span class="p">{</span><span class="n">i</span><span class="p">}</span><span class="s">"</span><span class="p">,</span>
<span class="p">[</span><span class="s">"addressId"</span><span class="p">]</span> <span class="p">=</span> <span class="n">i</span><span class="p">,</span>
<span class="p">[</span><span class="s">"street"</span><span class="p">]</span> <span class="p">=</span> <span class="s">$"Street </span><span class="p">{</span><span class="n">i</span><span class="p">}</span><span class="s">"</span><span class="p">,</span>
<span class="p">[</span><span class="s">"city"</span><span class="p">]</span> <span class="p">=</span> <span class="s">$"City </span><span class="p">{</span><span class="n">i</span><span class="p">}</span><span class="s">"</span>
<span class="p">};</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab></code-tabs>
</li>
</ol>
</div>
</div>
<div class="sect3">
<h4 id="distributed-computations">Distributed Computations</h4>
<div class="paragraph">
<p>You can also use a streamer with a receiver to perform distributed computations, such as per-item calculations and <a href="/docs/ignite3/latest/compute/compute#mapreduce-tasks">map-reduce</a> tasks on the returned results.</p>
</div>
<div class="paragraph">
<p>This example demonstrates a simulated fraud detection process, which typically involves intensive processing of each transaction using ML models.</p>
</div>
<div class="olist arabic">
<ol class="arabic">
<li>
<p>First, create a custom receiver that will handle fraud detection computations on the results:</p>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">FraudDetectorReceiver</span> <span class="kd">implements</span> <span class="nc">DataStreamerReceiver</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Void</span><span class="o">,</span> <span class="nc">Tuple</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="nd">@Nullable</span> <span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">List</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;&gt;</span> <span class="nf">receive</span><span class="o">(</span><span class="nc">List</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">page</span><span class="o">,</span> <span class="nc">DataStreamerReceiverContext</span> <span class="n">ctx</span><span class="o">,</span> <span class="nd">@Nullable</span> <span class="nc">Void</span> <span class="n">arg</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">List</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">results</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">ArrayList</span><span class="o">&lt;&gt;(</span><span class="n">page</span><span class="o">.</span><span class="na">size</span><span class="o">());</span>
<span class="k">for</span> <span class="o">(</span><span class="nc">Tuple</span> <span class="n">tx</span> <span class="o">:</span> <span class="n">page</span><span class="o">)</span> <span class="o">{</span>
<span class="n">results</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">detectFraud</span><span class="o">(</span><span class="n">tx</span><span class="o">));</span>
<span class="o">}</span>
<span class="k">return</span> <span class="nc">CompletableFuture</span><span class="o">.</span><span class="na">completedFuture</span><span class="o">(</span><span class="n">results</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">private</span> <span class="kd">static</span> <span class="nc">Tuple</span> <span class="nf">detectFraud</span><span class="o">(</span><span class="nc">Tuple</span> <span class="n">txInfo</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Simulate fraud detection processing.</span>
<span class="kt">double</span> <span class="n">fraudRisk</span> <span class="o">=</span> <span class="nc">Math</span><span class="o">.</span><span class="na">random</span><span class="o">();</span>
<span class="c1">// Add result to the tuple and return.</span>
<span class="k">return</span> <span class="n">txInfo</span><span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"fraudRisk"</span><span class="o">,</span> <span class="n">fraudRisk</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="k">class</span> <span class="nc">FraudDetectorReceiver</span> <span class="p">:</span> <span class="n">IDataStreamerReceiver</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">,</span> <span class="kt">object</span><span class="p">?,</span> <span class="n">IIgniteTuple</span><span class="p">&gt;</span>
<span class="p">{</span>
<span class="k">public</span> <span class="k">async</span> <span class="n">ValueTask</span><span class="p">&lt;</span><span class="n">IList</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;?&gt;</span> <span class="nf">ReceiveAsync</span><span class="p">(</span>
<span class="n">IList</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">page</span><span class="p">,</span>
<span class="kt">object</span><span class="p">?</span> <span class="n">arg</span><span class="p">,</span>
<span class="n">IDataStreamerReceiverContext</span> <span class="n">context</span><span class="p">,</span>
<span class="n">CancellationToken</span> <span class="n">cancellationToken</span><span class="p">)</span>
<span class="p">{</span>
<span class="kt">var</span> <span class="n">result</span> <span class="p">=</span> <span class="k">new</span> <span class="n">List</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;(</span><span class="n">page</span><span class="p">.</span><span class="n">Count</span><span class="p">);</span>
<span class="k">foreach</span> <span class="p">(</span><span class="kt">var</span> <span class="n">tx</span> <span class="k">in</span> <span class="n">page</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">IIgniteTuple</span> <span class="n">resTuple</span> <span class="p">=</span> <span class="k">await</span> <span class="nf">DetectFraud</span><span class="p">(</span><span class="n">tx</span><span class="p">);</span>
<span class="n">result</span><span class="p">.</span><span class="nf">Add</span><span class="p">(</span><span class="n">resTuple</span><span class="p">);</span>
<span class="p">}</span>
<span class="k">return</span> <span class="n">result</span><span class="p">;</span>
<span class="p">}</span>
<span class="k">private</span> <span class="k">static</span> <span class="k">async</span> <span class="n">Task</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="nf">DetectFraud</span><span class="p">(</span><span class="n">IIgniteTuple</span> <span class="n">transaction</span><span class="p">)</span>
<span class="p">{</span>
<span class="c1">// Simulate fraud detection logic - add a random risk score to the tuple.</span>
<span class="k">await</span> <span class="n">Task</span><span class="p">.</span><span class="nf">Delay</span><span class="p">(</span><span class="m">10</span><span class="p">);</span>
<span class="n">transaction</span><span class="p">[</span><span class="s">"fraudRisk"</span><span class="p">]</span> <span class="p">=</span> <span class="n">Random</span><span class="p">.</span><span class="n">Shared</span><span class="p">.</span><span class="nf">NextDouble</span><span class="p">();</span>
<span class="k">return</span> <span class="n">transaction</span><span class="p">;</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab></code-tabs>
</li>
<li>
<p>Next, stream a list of sample transactions across the cluster using a dummy table that partitions data by transaction ID and <code>FraudDetectorReceiver</code> for fraud detection. Subscribe to the results to log each processed transaction, handle errors, and confirm when streaming completes:</p>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="kd">public</span> <span class="kt">void</span> <span class="nf">runReceiverStreamProcessing</span><span class="o">()</span> <span class="o">{</span>
<span class="c1">// Source data is a list of financial transactions.</span>
<span class="c1">// We distribute this processing across the cluster, then gather and return results.</span>
<span class="nc">List</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">sourceData</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">10</span><span class="o">)</span>
<span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="n">i</span> <span class="o">-&gt;</span> <span class="nc">Tuple</span><span class="o">.</span><span class="na">create</span><span class="o">()</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"txId"</span><span class="o">,</span> <span class="n">i</span><span class="o">)</span>
<span class="o">.</span><span class="na">set</span><span class="o">(</span><span class="s">"txData"</span><span class="o">,</span> <span class="s">"{some-json-data}"</span><span class="o">))</span>
<span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="nc">Collectors</span><span class="o">.</span><span class="na">toList</span><span class="o">());</span>
<span class="nc">DataStreamerReceiverDescriptor</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Void</span><span class="o">,</span> <span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">desc</span> <span class="o">=</span> <span class="nc">DataStreamerReceiverDescriptor</span>
<span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="nc">FraudDetectorReceiver</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">streamerFut</span><span class="o">;</span>
<span class="c1">// Streaming requires a target table to partition data.</span>
<span class="c1">// Use a dummy table for this scenario, because we are not going to store any data.</span>
<span class="nc">TableDefinition</span> <span class="n">txDummyTableDef</span> <span class="o">=</span> <span class="nc">TableDefinition</span><span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="s">"tx_dummy"</span><span class="o">)</span>
<span class="o">.</span><span class="na">columns</span><span class="o">(</span><span class="n">column</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="nc">ColumnType</span><span class="o">.</span><span class="na">INTEGER</span><span class="o">))</span>
<span class="o">.</span><span class="na">primaryKey</span><span class="o">(</span><span class="s">"id"</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="nc">Table</span> <span class="n">dummyTable</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">catalog</span><span class="o">().</span><span class="na">createTable</span><span class="o">(</span><span class="n">txDummyTableDef</span><span class="o">);</span>
<span class="c1">// Source data has "txId" field, but target dummy table has "id" column, so keyFunc maps "txId" to "id".</span>
<span class="nc">Function</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">keyFunc</span> <span class="o">=</span> <span class="n">sourceItem</span> <span class="o">-&gt;</span> <span class="nc">Tuple</span><span class="o">.</span><span class="na">create</span><span class="o">().</span><span class="na">set</span><span class="o">(</span><span class="s">"id"</span><span class="o">,</span> <span class="n">sourceItem</span><span class="o">.</span><span class="na">value</span><span class="o">(</span><span class="s">"txId"</span><span class="o">));</span>
<span class="c1">// Payload function is used to extract the payload (data that goes to the receiver) from the source item.</span>
<span class="c1">// In our case, we want to use the whole source item as the payload.</span>
<span class="nc">Function</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">,</span> <span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">payloadFunc</span> <span class="o">=</span> <span class="nc">Function</span><span class="o">.</span><span class="na">identity</span><span class="o">();</span>
<span class="nc">Flow</span><span class="o">.</span><span class="na">Subscriber</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">resultSubscriber</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Flow</span><span class="o">.</span><span class="na">Subscriber</span><span class="o">&lt;&gt;()</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onSubscribe</span><span class="o">(</span><span class="nc">Flow</span><span class="o">.</span><span class="na">Subscription</span> <span class="n">subscription</span><span class="o">)</span> <span class="o">{</span>
<span class="n">subscription</span><span class="o">.</span><span class="na">request</span><span class="o">(</span><span class="nc">Long</span><span class="o">.</span><span class="na">MAX_VALUE</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onNext</span><span class="o">(</span><span class="nc">Tuple</span> <span class="n">item</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Transaction processed: "</span> <span class="o">+</span> <span class="n">item</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onError</span><span class="o">(</span><span class="nc">Throwable</span> <span class="n">throwable</span><span class="o">)</span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error during streaming: "</span> <span class="o">+</span> <span class="n">throwable</span><span class="o">.</span><span class="na">getMessage</span><span class="o">());</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onComplete</span><span class="o">()</span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Streaming completed."</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">};</span>
<span class="k">try</span> <span class="o">(</span><span class="kt">var</span> <span class="n">publisher</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SubmissionPublisher</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;())</span> <span class="o">{</span>
<span class="n">streamerFut</span> <span class="o">=</span> <span class="n">dummyTable</span><span class="o">.</span><span class="na">recordView</span><span class="o">().</span><span class="na">streamData</span><span class="o">(</span>
<span class="n">publisher</span><span class="o">,</span>
<span class="n">desc</span><span class="o">,</span>
<span class="n">keyFunc</span><span class="o">,</span>
<span class="n">payloadFunc</span><span class="o">,</span>
<span class="kc">null</span><span class="o">,</span> <span class="c1">// Arg</span>
<span class="n">resultSubscriber</span><span class="o">,</span>
<span class="kc">null</span> <span class="c1">// Options</span>
<span class="o">);</span>
<span class="k">for</span> <span class="o">(</span><span class="nc">Tuple</span> <span class="n">item</span> <span class="o">:</span> <span class="n">sourceData</span><span class="o">)</span> <span class="o">{</span>
<span class="n">publisher</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="n">item</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="n">streamerFut</span><span class="o">.</span><span class="na">join</span><span class="o">();</span>
<span class="o">}</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="c1">// Source data is a list of financial transactions.</span>
<span class="c1">// We want to distribute this processing across the cluster, then gather and return results</span>
<span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">data</span> <span class="p">=</span> <span class="nf">GetSourceData</span><span class="p">();</span>
<span class="n">ReceiverDescriptor</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">,</span> <span class="kt">object</span><span class="p">?,</span> <span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">fraudDetectorReceiverDesc</span> <span class="p">=</span> <span class="n">ReceiverDescriptor</span><span class="p">.</span><span class="nf">Of</span><span class="p">(</span><span class="k">new</span> <span class="nf">FraudDetectorReceiver</span><span class="p">());</span>
<span class="c1">// Streaming requires a target table to partition data.</span>
<span class="c1">// Use a dummy table for this scenario, because we are not going to store any data.</span>
<span class="k">await</span> <span class="n">client</span><span class="p">.</span><span class="n">Sql</span><span class="p">.</span><span class="nf">ExecuteScriptAsync</span><span class="p">(</span><span class="s">"CREATE TABLE IF NOT EXISTS TX_DUMMY (ID LONG)"</span><span class="p">);</span>
<span class="n">ITable</span> <span class="n">dummyTable</span> <span class="p">=</span> <span class="k">await</span> <span class="n">client</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"TX_DUMMY"</span><span class="p">);</span>
<span class="c1">// Source data has "txId" field, but target dummy table has "id" column, so keyFunc maps "txId" to "id".</span>
<span class="n">Func</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">,</span> <span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">keyFunc</span> <span class="p">=</span> <span class="n">tuple</span> <span class="p">=&gt;</span> <span class="k">new</span> <span class="n">IgniteTuple</span> <span class="p">{</span> <span class="p">[</span><span class="s">"id"</span><span class="p">]</span> <span class="p">=</span> <span class="n">tuple</span><span class="p">[</span><span class="s">"txId"</span><span class="p">]</span> <span class="p">};</span>
<span class="c1">// Payload function is used to extract the payload (data that goes to the receiver) from the source item.</span>
<span class="c1">// In our case, we want to use the whole source item as the payload.</span>
<span class="n">Func</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">,</span> <span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">payloadFunc</span> <span class="p">=</span> <span class="n">tuple</span> <span class="p">=&gt;</span> <span class="n">tuple</span><span class="p">;</span>
<span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">results</span> <span class="p">=</span> <span class="n">dummyTable</span><span class="p">.</span><span class="n">RecordBinaryView</span><span class="p">.</span><span class="nf">StreamDataAsync</span><span class="p">(</span>
<span class="n">data</span><span class="p">,</span>
<span class="n">fraudDetectorReceiverDesc</span><span class="p">,</span>
<span class="n">keyFunc</span><span class="p">,</span>
<span class="n">payloadFunc</span><span class="p">,</span>
<span class="n">receiverArg</span><span class="p">:</span> <span class="k">null</span><span class="p">);</span>
<span class="k">await</span> <span class="k">foreach</span> <span class="p">(</span><span class="n">IIgniteTuple</span> <span class="n">processedTx</span> <span class="k">in</span> <span class="n">results</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">Console</span><span class="p">.</span><span class="nf">WriteLine</span><span class="p">(</span><span class="s">"Transaction processed: "</span> <span class="p">+</span> <span class="n">processedTx</span><span class="p">);</span>
<span class="p">}</span>
<span class="k">async</span> <span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="nf">GetSourceData</span><span class="p">()</span>
<span class="p">{</span>
<span class="k">await</span> <span class="n">Task</span><span class="p">.</span><span class="nf">Yield</span><span class="p">();</span> <span class="c1">// Simulate async data source.</span>
<span class="k">for</span> <span class="p">(</span><span class="kt">int</span> <span class="n">i</span> <span class="p">=</span> <span class="m">0</span><span class="p">;</span> <span class="n">i</span> <span class="p">&lt;</span> <span class="m">1000</span><span class="p">;</span> <span class="n">i</span><span class="p">++)</span>
<span class="p">{</span>
<span class="k">yield</span> <span class="k">return</span> <span class="k">new</span> <span class="n">IgniteTuple</span>
<span class="p">{</span>
<span class="p">[</span><span class="s">"txId"</span><span class="p">]</span> <span class="p">=</span> <span class="n">i</span><span class="p">,</span>
<span class="p">[</span><span class="s">"txData"</span><span class="p">]</span> <span class="p">=</span> <span class="s">"{some-json-data}"</span>
<span class="p">};</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab></code-tabs>
</li>
</ol>
</div>
</div>
<div class="sect3">
<h4 id="custom-marshallers-in-net">Custom Marshallers in .NET</h4>
<div class="paragraph">
<p>In .NET, you can define custom marshallers by implementing the <a href="https://ignite.apache.org/releases/3.1.0/dotnetdoc/api/Apache.Ignite.Marshalling.IMarshaller-1.html"><code>IMarshaller</code></a> interface.</p>
</div>
<div class="paragraph">
<p>For example, the code below demonstrates how to use <code>JsonMarshaller</code> to serialize data, arguments, and results.</p>
</div>
<div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="n">ITable</span><span class="p">?</span> <span class="n">table</span> <span class="p">=</span> <span class="k">await</span> <span class="n">client</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"my-table"</span><span class="p">);</span>
<span class="n">ReceiverDescriptor</span><span class="p">&lt;</span><span class="n">MyData</span><span class="p">,</span> <span class="n">MyArg</span><span class="p">,</span> <span class="n">MyResult</span><span class="p">&gt;</span> <span class="n">receiverDesc</span> <span class="p">=</span> <span class="n">ReceiverDescriptor</span><span class="p">.</span><span class="nf">Of</span><span class="p">(</span><span class="k">new</span> <span class="nf">MyReceiver</span><span class="p">());</span>
<span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">MyData</span><span class="p">&gt;</span> <span class="n">data</span> <span class="p">=</span> <span class="n">Enumerable</span>
<span class="p">.</span><span class="nf">Range</span><span class="p">(</span><span class="m">1</span><span class="p">,</span> <span class="m">100</span><span class="p">)</span>
<span class="p">.</span><span class="nf">Select</span><span class="p">(</span><span class="n">x</span> <span class="p">=&gt;</span> <span class="k">new</span> <span class="nf">MyData</span><span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="s">$"Name </span><span class="p">{</span><span class="n">x</span><span class="p">}</span><span class="s">"</span><span class="p">))</span>
<span class="p">.</span><span class="nf">ToAsyncEnumerable</span><span class="p">();</span>
<span class="n">IAsyncEnumerable</span><span class="p">&lt;</span><span class="n">MyResult</span><span class="p">&gt;</span> <span class="n">results</span> <span class="p">=</span> <span class="n">table</span><span class="p">!.</span><span class="n">RecordBinaryView</span><span class="p">.</span><span class="nf">StreamDataAsync</span><span class="p">(</span>
<span class="n">data</span><span class="p">:</span> <span class="n">data</span><span class="p">,</span>
<span class="n">receiver</span><span class="p">:</span> <span class="n">receiverDesc</span><span class="p">,</span>
<span class="n">keySelector</span><span class="p">:</span> <span class="n">dataItem</span> <span class="p">=&gt;</span> <span class="k">new</span> <span class="n">IgniteTuple</span> <span class="p">{</span> <span class="p">[</span><span class="s">"id"</span><span class="p">]</span> <span class="p">=</span> <span class="n">dataItem</span><span class="p">.</span><span class="n">Id</span> <span class="p">},</span>
<span class="n">payloadSelector</span><span class="p">:</span> <span class="n">dataItem</span> <span class="p">=&gt;</span> <span class="n">dataItem</span><span class="p">,</span>
<span class="n">receiverArg</span><span class="p">:</span> <span class="k">new</span> <span class="nf">MyArg</span><span class="p">(</span><span class="s">"Some info"</span><span class="p">));</span>
<span class="k">await</span> <span class="k">foreach</span> <span class="p">(</span><span class="n">MyResult</span> <span class="n">result</span> <span class="k">in</span> <span class="n">results</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">Console</span><span class="p">.</span><span class="nf">WriteLine</span><span class="p">(</span><span class="n">result</span><span class="p">);</span>
<span class="p">}</span>
<span class="k">public</span> <span class="k">record</span> <span class="nc">MyData</span><span class="p">(</span><span class="kt">int</span> <span class="n">Id</span><span class="p">,</span> <span class="kt">string</span> <span class="n">Name</span><span class="p">);</span>
<span class="k">public</span> <span class="k">record</span> <span class="nc">MyArg</span><span class="p">(</span><span class="kt">string</span> <span class="n">Info</span><span class="p">);</span>
<span class="k">public</span> <span class="k">record</span> <span class="nc">MyResult</span><span class="p">(</span><span class="n">MyData</span> <span class="n">Data</span><span class="p">,</span> <span class="n">MyArg</span> <span class="n">Arg</span><span class="p">);</span>
<span class="k">public</span> <span class="k">class</span> <span class="nc">MyReceiver</span> <span class="p">:</span> <span class="n">IDataStreamerReceiver</span><span class="p">&lt;</span><span class="n">MyData</span><span class="p">,</span> <span class="n">MyArg</span><span class="p">,</span> <span class="n">MyResult</span><span class="p">&gt;</span>
<span class="p">{</span>
<span class="k">public</span> <span class="n">IMarshaller</span><span class="p">&lt;</span><span class="n">MyData</span><span class="p">&gt;</span> <span class="n">PayloadMarshaller</span> <span class="p">=&gt;</span>
<span class="k">new</span> <span class="n">JsonMarshaller</span><span class="p">&lt;</span><span class="n">MyData</span><span class="p">&gt;();</span>
<span class="k">public</span> <span class="n">IMarshaller</span><span class="p">&lt;</span><span class="n">MyArg</span><span class="p">&gt;</span> <span class="n">ArgumentMarshaller</span> <span class="p">=&gt;</span>
<span class="k">new</span> <span class="n">JsonMarshaller</span><span class="p">&lt;</span><span class="n">MyArg</span><span class="p">&gt;();</span>
<span class="k">public</span> <span class="n">IMarshaller</span><span class="p">&lt;</span><span class="n">MyResult</span><span class="p">&gt;</span> <span class="n">ResultMarshaller</span> <span class="p">=&gt;</span>
<span class="k">new</span> <span class="n">JsonMarshaller</span><span class="p">&lt;</span><span class="n">MyResult</span><span class="p">&gt;();</span>
<span class="k">public</span> <span class="n">ValueTask</span><span class="p">&lt;</span><span class="n">IList</span><span class="p">&lt;</span><span class="n">MyResult</span><span class="p">&gt;?&gt;</span> <span class="nf">ReceiveAsync</span><span class="p">(</span><span class="n">IList</span><span class="p">&lt;</span><span class="n">MyData</span><span class="p">&gt;</span> <span class="n">page</span><span class="p">,</span> <span class="n">MyArg</span> <span class="n">arg</span><span class="p">,</span> <span class="n">IDataStreamerReceiverContext</span> <span class="n">context</span><span class="p">,</span> <span class="n">CancellationToken</span> <span class="n">cancellationToken</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">IList</span><span class="p">&lt;</span><span class="n">MyResult</span><span class="p">&gt;</span> <span class="n">results</span> <span class="p">=</span> <span class="n">page</span>
<span class="p">.</span><span class="nf">Select</span><span class="p">(</span><span class="n">data</span> <span class="p">=&gt;</span> <span class="k">new</span> <span class="nf">MyResult</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">arg</span><span class="p">))</span>
<span class="p">.</span><span class="nf">ToList</span><span class="p">();</span>
<span class="k">return</span> <span class="n">ValueTask</span><span class="p">.</span><span class="nf">FromResult</span><span class="p">(</span><span class="n">results</span><span class="p">)!;</span>
<span class="p">}</span>
<span class="p">}</span></code></pre>
</div>
</div>
</div>
</div>
</div>
</div>
<div class="sect1">
<h2 id="tracking-failed-entries">Tracking Failed Entries</h2>
<div class="sectionbody">
<div class="paragraph">
<p>If the data streamer fails to process any entries, it collects the failed items in a <code>DataStreamerException</code>. You can catch this exception and access the failed entries using the <code>failedItems()</code> method, as shown in the example below.</p>
</div>
<div class="paragraph">
<p>You can catch both asynchronous errors during background streaming and immediate submission errors:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;</span> <span class="n">view</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">tables</span><span class="o">().</span><span class="na">table</span><span class="o">(</span><span class="s">"accounts"</span><span class="o">).</span><span class="na">recordView</span><span class="o">(</span><span class="nc">Account</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="nc">CompletableFuture</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">&gt;</span> <span class="n">streamerFut</span><span class="o">;</span>
<span class="k">try</span> <span class="o">(</span><span class="kt">var</span> <span class="n">publisher</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SubmissionPublisher</span><span class="o">&lt;</span><span class="nc">DataStreamerItem</span><span class="o">&lt;</span><span class="nc">Account</span><span class="o">&gt;&gt;())</span> <span class="o">{</span>
<span class="n">streamerFut</span> <span class="o">=</span> <span class="n">view</span><span class="o">.</span><span class="na">streamData</span><span class="o">(</span><span class="n">publisher</span><span class="o">,</span> <span class="n">options</span><span class="o">)</span>
<span class="o">.</span><span class="na">exceptionally</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Failed items during background streaming: "</span> <span class="o">+</span>
<span class="o">((</span><span class="nc">DataStreamerException</span><span class="o">)</span><span class="n">e</span><span class="o">.</span><span class="na">getCause</span><span class="o">()).</span><span class="na">failedItems</span><span class="o">());</span>
<span class="k">return</span> <span class="kc">null</span><span class="o">;</span>
<span class="o">});</span>
<span class="cm">/** Trying to insert an account record. */</span>
<span class="nc">Account</span> <span class="n">entry</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Account</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="s">"Account name"</span><span class="o">,</span> <span class="n">rnd</span><span class="o">.</span><span class="na">nextLong</span><span class="o">(</span><span class="mi">100_000</span><span class="o">),</span> <span class="n">rnd</span><span class="o">.</span><span class="na">nextBoolean</span><span class="o">());</span>
<span class="n">publisher</span><span class="o">.</span><span class="na">submit</span><span class="o">(</span><span class="nc">DataStreamerItem</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">entry</span><span class="o">));</span>
<span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="nc">DataStreamerException</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
<span class="cm">/** Handle entries that failed during submission. */</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Failed items during submission: "</span> <span class="o">+</span> <span class="n">e</span><span class="o">.</span><span class="na">failedItems</span><span class="o">());</span>
<span class="o">}</span>
<span class="n">streamerFut</span><span class="o">.</span><span class="na">join</span><span class="o">();</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="n">ITable</span><span class="p">?</span> <span class="n">table</span> <span class="p">=</span> <span class="k">await</span> <span class="n">Client</span><span class="p">.</span><span class="n">Tables</span><span class="p">.</span><span class="nf">GetTableAsync</span><span class="p">(</span><span class="s">"my-table"</span><span class="p">);</span>
<span class="n">IRecordView</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">view</span> <span class="p">=</span> <span class="n">table</span><span class="p">!.</span><span class="n">RecordBinaryView</span><span class="p">;</span>
<span class="n">IList</span><span class="p">&lt;</span><span class="n">IIgniteTuple</span><span class="p">&gt;</span> <span class="n">data</span> <span class="p">=</span> <span class="p">[</span><span class="k">new</span> <span class="n">IgniteTuple</span> <span class="p">{</span> <span class="p">[</span><span class="s">"key"</span><span class="p">]</span> <span class="p">=</span> <span class="m">1L</span><span class="p">,</span> <span class="p">[</span><span class="s">"val"</span><span class="p">]</span> <span class="p">=</span> <span class="s">"v"</span> <span class="p">}];</span>
<span class="k">try</span>
<span class="p">{</span>
<span class="k">await</span> <span class="n">view</span><span class="p">.</span><span class="nf">StreamDataAsync</span><span class="p">(</span><span class="n">data</span><span class="p">.</span><span class="nf">ToAsyncEnumerable</span><span class="p">());</span>
<span class="p">}</span>
<span class="k">catch</span> <span class="p">(</span><span class="n">DataStreamerException</span> <span class="n">e</span><span class="p">)</span>
<span class="p">{</span>
<span class="n">Console</span><span class="p">.</span><span class="nf">WriteLine</span><span class="p">(</span><span class="s">"Failed items: "</span> <span class="p">+</span> <span class="kt">string</span><span class="p">.</span><span class="nf">Join</span><span class="p">(</span><span class="s">","</span><span class="p">,</span> <span class="n">e</span><span class="p">.</span><span class="n">FailedItems</span><span class="p">));</span>
<span class="p">}</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="sect2">
<h3 id="tuning-memory-usage">Tuning Memory Usage</h3>
<div class="paragraph">
<p>The data streamer may require a significant amount of memory to handle the requests in an orderly manner. Depending on your environment, you may want to increase or reduce the amount of memory reserved by the data streamer.</p>
</div>
<div class="paragraph">
<p>For every node in the cluster, the streamer reserves an amount of memory equal to <code>pageSize</code> (1000 entries by default) multiplied by <code>perPartitionParallelOperations</code> (1 by default) setting. For example, a 10-partition table with default parameters and average entry size of 1KB will reserve 10MB for operations.</p>
</div>
<div class="paragraph">
<p>You can change these options while creating a <code>DataStreamerOptions</code> object:</p>
</div>
<code-tabs><code-tab data-tab='Java'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="java"><span class="nc">RecordView</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;</span> <span class="n">view</span> <span class="o">=</span> <span class="n">client</span><span class="o">.</span><span class="na">tables</span><span class="o">().</span><span class="na">table</span><span class="o">(</span><span class="s">"accounts"</span><span class="o">).</span><span class="na">recordView</span><span class="o">();</span>
<span class="kt">var</span> <span class="n">publisher</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">SubmissionPublisher</span><span class="o">&lt;</span><span class="nc">Tuple</span><span class="o">&gt;();</span>
<span class="kt">var</span> <span class="n">options</span> <span class="o">=</span> <span class="nc">DataStreamerOptions</span><span class="o">.</span><span class="na">builder</span><span class="o">()</span>
<span class="o">.</span><span class="na">pageSize</span><span class="o">(</span><span class="mi">10_000</span><span class="o">)</span>
<span class="o">.</span><span class="na">perPartitionParallelOperations</span><span class="o">(</span><span class="mi">10</span><span class="o">)</span>
<span class="o">.</span><span class="na">build</span><span class="o">();</span>
<span class="n">streamerFut</span> <span class="o">=</span> <span class="n">view</span><span class="o">.</span><span class="na">streamData</span><span class="o">(</span><span class="n">publisher</span><span class="o">,</span> <span class="n">options</span><span class="o">);</span></code></pre>
</div>
</div></code-tab><code-tab data-tab='.NET'><div class="listingblock">
<div class="content">
<pre class="rouge highlight"><code data-lang="csharp"><span class="c1">// .NET streamer does not have a perPartitionParallelOperations option yet.</span>
<span class="kt">var</span> <span class="n">options</span> <span class="p">=</span> <span class="k">new</span> <span class="n">DataStreamerOptions</span>
<span class="p">{</span>
<span class="n">PageSize</span> <span class="p">=</span> <span class="m">10_000</span>
<span class="p">};</span></code></pre>
</div>
</div></code-tab></code-tabs>
<div class="paragraph">
<p>Additionally, the data streamer periodically flushes incomplete buffers to ensure that messages are not delayed indefinitely. This is especially useful when a buffer fills slowly or never completely fills due to uneven data distribution.</p>
</div>
<div class="paragraph">
<p>This behavior is controlled by the <code>autoFlushInterval</code> property, which is set to 5000 ms by default. You can also configure the <code>retryLimit</code> parameter to define the maximum number of retry attempts for failed submissions, with a default value of 16.</p>
</div>
</div>
</div>
</div>
<div class="copyright">
© 2025 The Apache Software Foundation.<br/>
Apache, Apache Ignite, the Apache feather and the Apache Ignite logo are either registered trademarks or trademarks of The Apache Software Foundation.
</div>
</article>
<nav class="right-nav" data-swiftype-index='false'>
<div class="toc-wrapper">
<ul class="sectlevel1">
<li><a href="#using-data-streamer-api">Using Data Streamer API</a>
<ul class="sectlevel2">
<li><a href="#configuring-data-streamer">Configuring Data Streamer</a></li>
<li><a href="#streaming-data">Streaming Data</a></li>
<li><a href="#streaming-with-receiver">Streaming with Receiver</a></li>
<li><a href="#examples">Examples</a>
<ul class="sectlevel3">
<li><a href="#updating-multiple-tables">Updating Multiple Tables</a></li>
<li><a href="#distributed-computations">Distributed Computations</a></li>
<li><a href="#custom-marshallers-in-net">Custom Marshallers in .NET</a></li>
</ul>
</li>
</ul>
</li>
<li><a href="#tracking-failed-entries">Tracking Failed Entries</a>
<ul class="sectlevel2">
<li><a href="#tuning-memory-usage">Tuning Memory Usage</a></li>
</ul>
</li>
</ul>
</div>
<nav class="promo-nav">
<!--#include virtual="/includes/docs_rightnav_promotion.html" -->
<!--a href="#" data-trigger-bugyard-feedback="true" id="doc-feedback-btn">Docs Feedback</a-->
</nav>
</nav>
</section>
<script type='module' src='/assets/js/code-copy-to-clipboard.js' async crossorigin></script>
<script>
// inits deep anchors -- needs to be done here because of https://www.bryanbraun.com/anchorjs/#dont-run-it-too-late
anchors.add('.page-docs h1, .page-docs h2, .page-docs h3:not(.discrete), .page-docs h4, .page-docs h5');
anchors.options = {
placement: 'right',
visible: 'always'
};
</script>
<script type='module' src='/assets/js/index.js?1763137387' async crossorigin></script>
<script type='module' src='/assets/js/versioning.js?1763137387' async crossorigin></script>
<script type='module' src='/assets/js/railroad-diagram.js?1763137387' async></script>
<script type='module' src='/assets/js/search.js?1763137387' defer crossorigin></script>
<link rel="stylesheet" href="/assets/css/styles.css?1763137387" media="print" onload="this.media='all'">
<noscript><link media="all" rel="stylesheet" href="/assets/css/styles.css?1763137387"></noscript>
<link rel="stylesheet" href="/assets/css/docsearch.min.css" media="print" onload="this.media='all'">
<noscript><link media="all" rel="stylesheet" href="/assets/css/docsearch.min.css"></noscript>
</body>
</html>