blob: ca2264ed7ffa8e56b234893c47ae9699253dc027 [file] [log] [blame]
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=9"/>
<meta name="generator" content="Doxygen 1.8.6"/>
<title>Impala: be/src/runtime/data-stream-sender.cc Source File</title>
<link href="tabs.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="jquery.js"></script>
<script type="text/javascript" src="dynsections.js"></script>
<link href="navtree.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="resize.js"></script>
<script type="text/javascript" src="navtree.js"></script>
<script type="text/javascript">
$(document).ready(initResizable);
$(window).load(resizeHeight);
</script>
<link href="search/search.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="search/search.js"></script>
<script type="text/javascript">
$(document).ready(function() { searchBox.OnSelectItem(0); });
</script>
<link href="doxygen.css" rel="stylesheet" type="text/css" />
</head>
<body>
<div id="top"><!-- do not remove this div, it is closed by doxygen! -->
<div id="titlearea">
<table cellspacing="0" cellpadding="0">
<tbody>
<tr style="height: 56px;">
<td style="padding-left: 0.5em;">
<div id="projectname">Impala
</div>
<div id="projectbrief">Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.</div>
</td>
</tr>
</tbody>
</table>
</div>
<!-- end header part -->
<!-- Generated by Doxygen 1.8.6 -->
<script type="text/javascript">
var searchBox = new SearchBox("searchBox", "search",false,'Search');
</script>
<div id="navrow1" class="tabs">
<ul class="tablist">
<li><a href="index.html"><span>Main&#160;Page</span></a></li>
<li><a href="namespaces.html"><span>Namespaces</span></a></li>
<li><a href="annotated.html"><span>Classes</span></a></li>
<li class="current"><a href="files.html"><span>Files</span></a></li>
<li>
<div id="MSearchBox" class="MSearchBoxInactive">
<span class="left">
<img id="MSearchSelect" src="search/mag_sel.png"
onmouseover="return searchBox.OnSearchSelectShow()"
onmouseout="return searchBox.OnSearchSelectHide()"
alt=""/>
<input type="text" id="MSearchField" value="Search" accesskey="S"
onfocus="searchBox.OnSearchFieldFocus(true)"
onblur="searchBox.OnSearchFieldFocus(false)"
onkeyup="searchBox.OnSearchFieldChange(event)"/>
</span><span class="right">
<a id="MSearchClose" href="javascript:searchBox.CloseResultsWindow()"><img id="MSearchCloseImg" border="0" src="search/close.png" alt=""/></a>
</span>
</div>
</li>
</ul>
</div>
<div id="navrow2" class="tabs2">
<ul class="tablist">
<li><a href="files.html"><span>File&#160;List</span></a></li>
<li><a href="globals.html"><span>File&#160;Members</span></a></li>
</ul>
</div>
</div><!-- top -->
<div id="side-nav" class="ui-resizable side-nav-resizable">
<div id="nav-tree">
<div id="nav-tree-contents">
<div id="nav-sync" class="sync"></div>
</div>
</div>
<div id="splitbar" style="-moz-user-select:none;"
class="ui-resizable-handle">
</div>
</div>
<script type="text/javascript">
$(document).ready(function(){initNavTree('data-stream-sender_8cc_source.html','');});
</script>
<div id="doc-content">
<!-- window showing the filter options -->
<div id="MSearchSelectWindow"
onmouseover="return searchBox.OnSearchSelectShow()"
onmouseout="return searchBox.OnSearchSelectHide()"
onkeydown="return searchBox.OnSearchSelectKey(event)">
<a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(0)"><span class="SelectionMark">&#160;</span>All</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(1)"><span class="SelectionMark">&#160;</span>Classes</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(2)"><span class="SelectionMark">&#160;</span>Namespaces</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(3)"><span class="SelectionMark">&#160;</span>Files</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(4)"><span class="SelectionMark">&#160;</span>Functions</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(5)"><span class="SelectionMark">&#160;</span>Variables</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(6)"><span class="SelectionMark">&#160;</span>Typedefs</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(7)"><span class="SelectionMark">&#160;</span>Enumerations</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(8)"><span class="SelectionMark">&#160;</span>Enumerator</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(9)"><span class="SelectionMark">&#160;</span>Friends</a><a class="SelectItem" href="javascript:void(0)" onclick="searchBox.OnSelectItem(10)"><span class="SelectionMark">&#160;</span>Macros</a></div>
<!-- iframe showing the search results (closed by default) -->
<div id="MSearchResultsWindow">
<iframe src="javascript:void(0)" frameborder="0"
name="MSearchResults" id="MSearchResults">
</iframe>
</div>
<div class="header">
<div class="headertitle">
<div class="title">data-stream-sender.cc</div> </div>
</div><!--header-->
<div class="contents">
<a href="data-stream-sender_8cc.html">Go to the documentation of this file.</a><div class="fragment"><div class="line"><a name="l00001"></a><span class="lineno"> 1</span>&#160;<span class="comment">// Copyright 2012 Cloudera Inc.</span></div>
<div class="line"><a name="l00002"></a><span class="lineno"> 2</span>&#160;<span class="comment">//</span></div>
<div class="line"><a name="l00003"></a><span class="lineno"> 3</span>&#160;<span class="comment">// Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);</span></div>
<div class="line"><a name="l00004"></a><span class="lineno"> 4</span>&#160;<span class="comment">// you may not use this file except in compliance with the License.</span></div>
<div class="line"><a name="l00005"></a><span class="lineno"> 5</span>&#160;<span class="comment">// You may obtain a copy of the License at</span></div>
<div class="line"><a name="l00006"></a><span class="lineno"> 6</span>&#160;<span class="comment">//</span></div>
<div class="line"><a name="l00007"></a><span class="lineno"> 7</span>&#160;<span class="comment">// http://www.apache.org/licenses/LICENSE-2.0</span></div>
<div class="line"><a name="l00008"></a><span class="lineno"> 8</span>&#160;<span class="comment">//</span></div>
<div class="line"><a name="l00009"></a><span class="lineno"> 9</span>&#160;<span class="comment">// Unless required by applicable law or agreed to in writing, software</span></div>
<div class="line"><a name="l00010"></a><span class="lineno"> 10</span>&#160;<span class="comment">// distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span></div>
<div class="line"><a name="l00011"></a><span class="lineno"> 11</span>&#160;<span class="comment">// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span></div>
<div class="line"><a name="l00012"></a><span class="lineno"> 12</span>&#160;<span class="comment">// See the License for the specific language governing permissions and</span></div>
<div class="line"><a name="l00013"></a><span class="lineno"> 13</span>&#160;<span class="comment">// limitations under the License.</span></div>
<div class="line"><a name="l00014"></a><span class="lineno"> 14</span>&#160;</div>
<div class="line"><a name="l00015"></a><span class="lineno"> 15</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="data-stream-sender_8h.html">runtime/data-stream-sender.h</a>&quot;</span></div>
<div class="line"><a name="l00016"></a><span class="lineno"> 16</span>&#160;</div>
<div class="line"><a name="l00017"></a><span class="lineno"> 17</span>&#160;<span class="preprocessor">#include &lt;iostream&gt;</span></div>
<div class="line"><a name="l00018"></a><span class="lineno"> 18</span>&#160;<span class="preprocessor">#include &lt;boost/shared_ptr.hpp&gt;</span></div>
<div class="line"><a name="l00019"></a><span class="lineno"> 19</span>&#160;<span class="preprocessor">#include &lt;thrift/protocol/TDebugProtocol.h&gt;</span></div>
<div class="line"><a name="l00020"></a><span class="lineno"> 20</span>&#160;</div>
<div class="line"><a name="l00021"></a><span class="lineno"> 21</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="logging_8h.html">common/logging.h</a>&quot;</span></div>
<div class="line"><a name="l00022"></a><span class="lineno"> 22</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="expr_8h.html">exprs/expr.h</a>&quot;</span></div>
<div class="line"><a name="l00023"></a><span class="lineno"> 23</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="expr-context_8h.html">exprs/expr-context.h</a>&quot;</span></div>
<div class="line"><a name="l00024"></a><span class="lineno"> 24</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="descriptors_8h.html">runtime/descriptors.h</a>&quot;</span></div>
<div class="line"><a name="l00025"></a><span class="lineno"> 25</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="tuple-row_8h.html">runtime/tuple-row.h</a>&quot;</span></div>
<div class="line"><a name="l00026"></a><span class="lineno"> 26</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="row-batch_8h.html">runtime/row-batch.h</a>&quot;</span></div>
<div class="line"><a name="l00027"></a><span class="lineno"> 27</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="raw-value_8h.html">runtime/raw-value.h</a>&quot;</span></div>
<div class="line"><a name="l00028"></a><span class="lineno"> 28</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="runtime-state_8h.html">runtime/runtime-state.h</a>&quot;</span></div>
<div class="line"><a name="l00029"></a><span class="lineno"> 29</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="client-cache_8h.html">runtime/client-cache.h</a>&quot;</span></div>
<div class="line"><a name="l00030"></a><span class="lineno"> 30</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="mem-tracker_8h.html">runtime/mem-tracker.h</a>&quot;</span></div>
<div class="line"><a name="l00031"></a><span class="lineno"> 31</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="debug-util_8h.html">util/debug-util.h</a>&quot;</span></div>
<div class="line"><a name="l00032"></a><span class="lineno"> 32</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="network-util_8h.html">util/network-util.h</a>&quot;</span></div>
<div class="line"><a name="l00033"></a><span class="lineno"> 33</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="thrift-client_8h.html">rpc/thrift-client.h</a>&quot;</span></div>
<div class="line"><a name="l00034"></a><span class="lineno"> 34</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="thrift-util_8h.html">rpc/thrift-util.h</a>&quot;</span></div>
<div class="line"><a name="l00035"></a><span class="lineno"> 35</span>&#160;</div>
<div class="line"><a name="l00036"></a><span class="lineno"> 36</span>&#160;<span class="preprocessor">#include &quot;gen-cpp/Types_types.h&quot;</span></div>
<div class="line"><a name="l00037"></a><span class="lineno"> 37</span>&#160;<span class="preprocessor">#include &quot;gen-cpp/ImpalaInternalService.h&quot;</span></div>
<div class="line"><a name="l00038"></a><span class="lineno"> 38</span>&#160;<span class="preprocessor">#include &quot;gen-cpp/ImpalaInternalService_types.h&quot;</span></div>
<div class="line"><a name="l00039"></a><span class="lineno"> 39</span>&#160;</div>
<div class="line"><a name="l00040"></a><span class="lineno"> 40</span>&#160;<span class="preprocessor">#include &quot;<a class="code" href="names_8h.html">common/names.h</a>&quot;</span></div>
<div class="line"><a name="l00041"></a><span class="lineno"> 41</span>&#160;</div>
<div class="line"><a name="l00042"></a><span class="lineno"> 42</span>&#160;<span class="keyword">using</span> boost::condition_variable;</div>
<div class="line"><a name="l00043"></a><span class="lineno"> 43</span>&#160;<span class="keyword">using namespace </span>apache::thrift;</div>
<div class="line"><a name="l00044"></a><span class="lineno"> 44</span>&#160;<span class="keyword">using namespace </span><a class="code" href="namespaceimpala.html#a8cf06290ff145eec23570a8b9790f412">apache::thrift::protocol</a>;</div>
<div class="line"><a name="l00045"></a><span class="lineno"> 45</span>&#160;<span class="keyword">using namespace </span>apache::thrift::transport;</div>
<div class="line"><a name="l00046"></a><span class="lineno"> 46</span>&#160;</div>
<div class="line"><a name="l00047"></a><span class="lineno"> 47</span>&#160;<span class="keyword">namespace </span>impala {</div>
<div class="line"><a name="l00048"></a><span class="lineno"> 48</span>&#160;</div>
<div class="line"><a name="l00049"></a><span class="lineno"> 49</span>&#160;<span class="comment">// A channel sends data asynchronously via calls to TransmitData</span></div>
<div class="line"><a name="l00050"></a><span class="lineno"> 50</span>&#160;<span class="comment">// to a single destination ipaddress/node.</span></div>
<div class="line"><a name="l00051"></a><span class="lineno"> 51</span>&#160;<span class="comment">// It has a fixed-capacity buffer and allows the caller either to add rows to</span></div>
<div class="line"><a name="l00052"></a><span class="lineno"> 52</span>&#160;<span class="comment">// that buffer individually (AddRow()), or circumvent the buffer altogether and send</span></div>
<div class="line"><a name="l00053"></a><span class="lineno"> 53</span>&#160;<span class="comment">// TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC</span></div>
<div class="line"><a name="l00054"></a><span class="lineno"> 54</span>&#160;<span class="comment">// at any one time (ie, sending will block if the most recent rpc hasn&#39;t finished,</span></div>
<div class="line"><a name="l00055"></a><span class="lineno"> 55</span>&#160;<span class="comment">// which allows the receiver node to throttle the sender by withholding acks).</span></div>
<div class="line"><a name="l00056"></a><span class="lineno"> 56</span>&#160;<span class="comment">// *Not* thread-safe.</span></div>
<div class="line"><a name="l00057"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html"> 57</a></span>&#160;<span class="keyword">class </span><a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html">DataStreamSender::Channel</a> {</div>
<div class="line"><a name="l00058"></a><span class="lineno"> 58</span>&#160; <span class="keyword">public</span>:</div>
<div class="line"><a name="l00059"></a><span class="lineno"> 59</span>&#160; <span class="comment">// Create channel to send data to particular ipaddress/port/query/node</span></div>
<div class="line"><a name="l00060"></a><span class="lineno"> 60</span>&#160; <span class="comment">// combination. buffer_size is specified in bytes and a soft limit on</span></div>
<div class="line"><a name="l00061"></a><span class="lineno"> 61</span>&#160; <span class="comment">// how much tuple data is getting accumulated before being sent; it only applies</span></div>
<div class="line"><a name="l00062"></a><span class="lineno"> 62</span>&#160; <span class="comment">// when data is added via AddRow() and not sent directly via SendBatch().</span></div>
<div class="line"><a name="l00063"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a946eb95314825d434a918ee858f6e5ca"> 63</a></span>&#160; <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a946eb95314825d434a918ee858f6e5ca">Channel</a>(<a class="code" href="classimpala_1_1DataStreamSender.html">DataStreamSender</a>* parent, <span class="keyword">const</span> <a class="code" href="classimpala_1_1RowDescriptor.html">RowDescriptor</a>&amp; <a class="code" href="namespaceimpala.html#aa98447566dd6700a2faaaaf3059f4d95">row_desc</a>,</div>
<div class="line"><a name="l00064"></a><span class="lineno"> 64</span>&#160; <span class="keyword">const</span> TNetworkAddress&amp; destination, <span class="keyword">const</span> TUniqueId&amp; fragment_instance_id,</div>
<div class="line"><a name="l00065"></a><span class="lineno"> 65</span>&#160; <a class="code" href="namespaceimpala.html#aca80061c98b44477ea84e4332993b7e7">PlanNodeId</a> dest_node_id, <span class="keywordtype">int</span> buffer_size)</div>
<div class="line"><a name="l00066"></a><span class="lineno"> 66</span>&#160; : parent_(parent),</div>
<div class="line"><a name="l00067"></a><span class="lineno"> 67</span>&#160; buffer_size_(buffer_size),</div>
<div class="line"><a name="l00068"></a><span class="lineno"> 68</span>&#160; client_cache_(NULL),</div>
<div class="line"><a name="l00069"></a><span class="lineno"> 69</span>&#160; <a class="code" href="namespaceimpala.html#ad393a2093952c5b6f3a61bd3e1302e61">row_desc_</a>(row_desc),</div>
<div class="line"><a name="l00070"></a><span class="lineno"> 70</span>&#160; address_(<a class="code" href="namespaceimpala.html#a309108c8b3eaf4e5b154bc4eb4624880">MakeNetworkAddress</a>(destination.hostname, destination.port)),</div>
<div class="line"><a name="l00071"></a><span class="lineno"> 71</span>&#160; fragment_instance_id_(fragment_instance_id),</div>
<div class="line"><a name="l00072"></a><span class="lineno"> 72</span>&#160; dest_node_id_(dest_node_id),</div>
<div class="line"><a name="l00073"></a><span class="lineno"> 73</span>&#160; num_data_bytes_sent_(0),</div>
<div class="line"><a name="l00074"></a><span class="lineno"> 74</span>&#160; rpc_thread_(<span class="stringliteral">&quot;DataStreamSender&quot;</span>, <span class="stringliteral">&quot;SenderThread&quot;</span>, 1, 1,</div>
<div class="line"><a name="l00075"></a><span class="lineno"> 75</span>&#160; bind&lt;void&gt;(mem_fn(&amp;<a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html">Channel</a>::TransmitData), this, _1, _2)),</div>
<div class="line"><a name="l00076"></a><span class="lineno"> 76</span>&#160; rpc_in_flight_(false) {</div>
<div class="line"><a name="l00077"></a><span class="lineno"> 77</span>&#160; }</div>
<div class="line"><a name="l00078"></a><span class="lineno"> 78</span>&#160;</div>
<div class="line"><a name="l00079"></a><span class="lineno"> 79</span>&#160; <span class="comment">// Initialize channel.</span></div>
<div class="line"><a name="l00080"></a><span class="lineno"> 80</span>&#160; <span class="comment">// Returns OK if successful, error indication otherwise.</span></div>
<div class="line"><a name="l00081"></a><span class="lineno"> 81</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> Init(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state);</div>
<div class="line"><a name="l00082"></a><span class="lineno"> 82</span>&#160;</div>
<div class="line"><a name="l00083"></a><span class="lineno"> 83</span>&#160; <span class="comment">// Copies a single row into this channel&#39;s output buffer and flushes buffer</span></div>
<div class="line"><a name="l00084"></a><span class="lineno"> 84</span>&#160; <span class="comment">// if it reaches capacity.</span></div>
<div class="line"><a name="l00085"></a><span class="lineno"> 85</span>&#160; <span class="comment">// Returns error status if any of the preceding rpcs failed, OK otherwise.</span></div>
<div class="line"><a name="l00086"></a><span class="lineno"> 86</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> AddRow(<a class="code" href="classimpala_1_1TupleRow.html">TupleRow</a>* row);</div>
<div class="line"><a name="l00087"></a><span class="lineno"> 87</span>&#160;</div>
<div class="line"><a name="l00088"></a><span class="lineno"> 88</span>&#160; <span class="comment">// Asynchronously sends a row batch.</span></div>
<div class="line"><a name="l00089"></a><span class="lineno"> 89</span>&#160; <span class="comment">// Returns the status of the most recently finished TransmitData</span></div>
<div class="line"><a name="l00090"></a><span class="lineno"> 90</span>&#160; <span class="comment">// rpc (or OK if there wasn&#39;t one that hasn&#39;t been reported yet).</span></div>
<div class="line"><a name="l00091"></a><span class="lineno"> 91</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> SendBatch(TRowBatch* batch);</div>
<div class="line"><a name="l00092"></a><span class="lineno"> 92</span>&#160;</div>
<div class="line"><a name="l00093"></a><span class="lineno"> 93</span>&#160; <span class="comment">// Return status of last TransmitData rpc (initiated by the most recent call</span></div>
<div class="line"><a name="l00094"></a><span class="lineno"> 94</span>&#160; <span class="comment">// to either SendBatch() or SendCurrentBatch()).</span></div>
<div class="line"><a name="l00095"></a><span class="lineno"> 95</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> GetSendStatus();</div>
<div class="line"><a name="l00096"></a><span class="lineno"> 96</span>&#160;</div>
<div class="line"><a name="l00097"></a><span class="lineno"> 97</span>&#160; <span class="comment">// Waits for the rpc thread pool to finish the current rpc.</span></div>
<div class="line"><a name="l00098"></a><span class="lineno"> 98</span>&#160; <span class="keywordtype">void</span> WaitForRpc();</div>
<div class="line"><a name="l00099"></a><span class="lineno"> 99</span>&#160;</div>
<div class="line"><a name="l00100"></a><span class="lineno"> 100</span>&#160; <span class="comment">// Flush buffered rows and close channel.</span></div>
<div class="line"><a name="l00101"></a><span class="lineno"> 101</span>&#160; <span class="comment">// Logs errors if any of the preceding rpcs failed.</span></div>
<div class="line"><a name="l00102"></a><span class="lineno"> 102</span>&#160; <span class="keywordtype">void</span> Close(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state);</div>
<div class="line"><a name="l00103"></a><span class="lineno"> 103</span>&#160;</div>
<div class="line"><a name="l00104"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#ab51ff8c9b206da41c47ade235d923550"> 104</a></span>&#160; int64_t <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#ab51ff8c9b206da41c47ade235d923550">num_data_bytes_sent</a>()<span class="keyword"> const </span>{ <span class="keywordflow">return</span> num_data_bytes_sent_; }</div>
<div class="line"><a name="l00105"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e78160dbfe7db64784d0b469a0b1c2c"> 105</a></span>&#160; TRowBatch* <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e78160dbfe7db64784d0b469a0b1c2c">thrift_batch</a>() { <span class="keywordflow">return</span> &amp;thrift_batch_; }</div>
<div class="line"><a name="l00106"></a><span class="lineno"> 106</span>&#160;</div>
<div class="line"><a name="l00107"></a><span class="lineno"> 107</span>&#160; <span class="keyword">private</span>:</div>
<div class="line"><a name="l00108"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#adbff8212a64611b75c045aef09b8e2aa"> 108</a></span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html">DataStreamSender</a>* <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#adbff8212a64611b75c045aef09b8e2aa">parent_</a>;</div>
<div class="line"><a name="l00109"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1fdba48ea7980af1182fde5d14b78af4"> 109</a></span>&#160; <span class="keywordtype">int</span> <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1fdba48ea7980af1182fde5d14b78af4">buffer_size_</a>;</div>
<div class="line"><a name="l00110"></a><span class="lineno"> 110</span>&#160;</div>
<div class="line"><a name="l00111"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a8df49496bd0b8a4cd1502049650e974c"> 111</a></span>&#160; <a class="code" href="classimpala_1_1ClientCache.html">ImpalaInternalServiceClientCache</a>* <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a8df49496bd0b8a4cd1502049650e974c">client_cache_</a>;</div>
<div class="line"><a name="l00112"></a><span class="lineno"> 112</span>&#160;</div>
<div class="line"><a name="l00113"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9f7b040146c91730872113ea0d8dba4e"> 113</a></span>&#160; <span class="keyword">const</span> <a class="code" href="classimpala_1_1RowDescriptor.html">RowDescriptor</a>&amp; <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9f7b040146c91730872113ea0d8dba4e">row_desc_</a>;</div>
<div class="line"><a name="l00114"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#aec04a5a2a66d21fdb28c5293f6780c35"> 114</a></span>&#160; TNetworkAddress <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#aec04a5a2a66d21fdb28c5293f6780c35">address_</a>;</div>
<div class="line"><a name="l00115"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e62d5e2dfd00539a3e5ad5da86b62cf"> 115</a></span>&#160; TUniqueId <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e62d5e2dfd00539a3e5ad5da86b62cf">fragment_instance_id_</a>;</div>
<div class="line"><a name="l00116"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a03c516a8696979f9c05dc7cc2819219e"> 116</a></span>&#160; <a class="code" href="namespaceimpala.html#aca80061c98b44477ea84e4332993b7e7">PlanNodeId</a> <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a03c516a8696979f9c05dc7cc2819219e">dest_node_id_</a>;</div>
<div class="line"><a name="l00117"></a><span class="lineno"> 117</span>&#160;</div>
<div class="line"><a name="l00118"></a><span class="lineno"> 118</span>&#160; <span class="comment">// the number of TRowBatch.data bytes sent successfully</span></div>
<div class="line"><a name="l00119"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a4f74ca65ead877aaa6768c5d6525603e"> 119</a></span>&#160; int64_t <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a4f74ca65ead877aaa6768c5d6525603e">num_data_bytes_sent_</a>;</div>
<div class="line"><a name="l00120"></a><span class="lineno"> 120</span>&#160;</div>
<div class="line"><a name="l00121"></a><span class="lineno"> 121</span>&#160; <span class="comment">// we&#39;re accumulating rows into this batch</span></div>
<div class="line"><a name="l00122"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9309610f37d46c02fd711a6d0df1bf8a"> 122</a></span>&#160; scoped_ptr&lt;RowBatch&gt; <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9309610f37d46c02fd711a6d0df1bf8a">batch_</a>;</div>
<div class="line"><a name="l00123"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9f6dcc5112b566649c335a589814edb3"> 123</a></span>&#160; TRowBatch <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9f6dcc5112b566649c335a589814edb3">thrift_batch_</a>;</div>
<div class="line"><a name="l00124"></a><span class="lineno"> 124</span>&#160;</div>
<div class="line"><a name="l00125"></a><span class="lineno"> 125</span>&#160; <span class="comment">// We want to reuse the rpc thread to prevent creating a thread per rowbatch.</span></div>
<div class="line"><a name="l00126"></a><span class="lineno"> 126</span>&#160; <span class="comment">// TODO: currently we only have one batch in flight, but we should buffer more</span></div>
<div class="line"><a name="l00127"></a><span class="lineno"> 127</span>&#160; <span class="comment">// batches. This is a bit tricky since the channels share the outgoing batch</span></div>
<div class="line"><a name="l00128"></a><span class="lineno"> 128</span>&#160; <span class="comment">// pointer we need some mechanism to coordinate when the batch is all done.</span></div>
<div class="line"><a name="l00129"></a><span class="lineno"> 129</span>&#160; <span class="comment">// TODO: if the order of row batches does not matter, we can consider increasing</span></div>
<div class="line"><a name="l00130"></a><span class="lineno"> 130</span>&#160; <span class="comment">// the number of threads.</span></div>
<div class="line"><a name="l00131"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a620e18f5af05d18ccb20294fe8240bd6"> 131</a></span>&#160; <a class="code" href="classimpala_1_1ThreadPool.html">ThreadPool&lt;TRowBatch*&gt;</a> <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a620e18f5af05d18ccb20294fe8240bd6">rpc_thread_</a>; <span class="comment">// sender thread.</span></div>
<div class="line"><a name="l00132"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a68a204858648ba801d85231f41480db4"> 132</a></span>&#160; condition_variable <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a68a204858648ba801d85231f41480db4">rpc_done_cv_</a>; <span class="comment">// signaled when rpc_in_flight_ is set to true.</span></div>
<div class="line"><a name="l00133"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9412386c8008dc38dc6e7c84d100f41c"> 133</a></span>&#160; mutex <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a9412386c8008dc38dc6e7c84d100f41c">rpc_thread_lock_</a>; <span class="comment">// Lock with rpc_done_cv_ protecting rpc_in_flight_</span></div>
<div class="line"><a name="l00134"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a6578f1decfa5bfb55201269d6ea1a229"> 134</a></span>&#160; <span class="keywordtype">bool</span> <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a6578f1decfa5bfb55201269d6ea1a229">rpc_in_flight_</a>; <span class="comment">// true if the rpc_thread_ is busy sending.</span></div>
<div class="line"><a name="l00135"></a><span class="lineno"> 135</span>&#160;</div>
<div class="line"><a name="l00136"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a11a77fb40bb6bbe7d2fdc3ebefa4b24c"> 136</a></span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a11a77fb40bb6bbe7d2fdc3ebefa4b24c">rpc_status_</a>; <span class="comment">// status of most recently finished TransmitData rpc</span></div>
<div class="line"><a name="l00137"></a><span class="lineno"> 137</span>&#160;</div>
<div class="line"><a name="l00138"></a><span class="lineno"> 138</span>&#160; <span class="comment">// Serialize batch_ into thrift_batch_ and send via SendBatch().</span></div>
<div class="line"><a name="l00139"></a><span class="lineno"> 139</span>&#160; <span class="comment">// Returns SendBatch() status.</span></div>
<div class="line"><a name="l00140"></a><span class="lineno"> 140</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> SendCurrentBatch();</div>
<div class="line"><a name="l00141"></a><span class="lineno"> 141</span>&#160;</div>
<div class="line"><a name="l00142"></a><span class="lineno"> 142</span>&#160; <span class="comment">// Synchronously call TransmitData() on a client from client_cache_ and update</span></div>
<div class="line"><a name="l00143"></a><span class="lineno"> 143</span>&#160; <span class="comment">// rpc_status_ based on return value (or set to error if RPC failed).</span></div>
<div class="line"><a name="l00144"></a><span class="lineno"> 144</span>&#160; <span class="comment">// Called from a thread from the rpc_thread_ pool.</span></div>
<div class="line"><a name="l00145"></a><span class="lineno"> 145</span>&#160; <span class="keywordtype">void</span> TransmitData(<span class="keywordtype">int</span> thread_id, <span class="keyword">const</span> TRowBatch*);</div>
<div class="line"><a name="l00146"></a><span class="lineno"> 146</span>&#160; <span class="keywordtype">void</span> TransmitDataHelper(<span class="keyword">const</span> TRowBatch*);</div>
<div class="line"><a name="l00147"></a><span class="lineno"> 147</span>&#160;</div>
<div class="line"><a name="l00148"></a><span class="lineno"> 148</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> CloseInternal();</div>
<div class="line"><a name="l00149"></a><span class="lineno"> 149</span>&#160;};</div>
<div class="line"><a name="l00150"></a><span class="lineno"> 150</span>&#160;</div>
<div class="line"><a name="l00151"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#ac141d7b4eccc41a26579d21b7c20e85e"> 151</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> DataStreamSender::Channel::Init(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state) {</div>
<div class="line"><a name="l00152"></a><span class="lineno"> 152</span>&#160; client_cache_ = state-&gt;<a class="code" href="classimpala_1_1RuntimeState.html#acb0becec52f772879949545374280774">impalad_client_cache</a>();</div>
<div class="line"><a name="l00153"></a><span class="lineno"> 153</span>&#160; <span class="comment">// TODO: figure out how to size batch_</span></div>
<div class="line"><a name="l00154"></a><span class="lineno"> 154</span>&#160; <span class="keywordtype">int</span> capacity = max(1, buffer_size_ / max(<a class="code" href="namespaceimpala.html#ad393a2093952c5b6f3a61bd3e1302e61">row_desc_</a>.<a class="code" href="classimpala_1_1RowDescriptor.html#acc6beec42c710b65f958cf91981cb4b9">GetRowSize</a>(), 1));</div>
<div class="line"><a name="l00155"></a><span class="lineno"> 155</span>&#160; batch_.reset(<span class="keyword">new</span> <a class="code" href="classimpala_1_1RowBatch.html">RowBatch</a>(<a class="code" href="namespaceimpala.html#ad393a2093952c5b6f3a61bd3e1302e61">row_desc_</a>, capacity, parent_-&gt;mem_tracker_.get()));</div>
<div class="line"><a name="l00156"></a><span class="lineno"> 156</span>&#160; <span class="keywordflow">return</span> <a class="code" href="namespaceimpala.html#ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898">Status::OK</a>;</div>
<div class="line"><a name="l00157"></a><span class="lineno"> 157</span>&#160;}</div>
<div class="line"><a name="l00158"></a><span class="lineno"> 158</span>&#160;</div>
<div class="line"><a name="l00159"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a127f3c10e732383a272a84bee6cca5ff"> 159</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> DataStreamSender::Channel::SendBatch(TRowBatch* batch) {</div>
<div class="line"><a name="l00160"></a><span class="lineno"> 160</span>&#160; <a class="code" href="logging_8h.html#a6ccc2106c47622db4e52a401a415fc58">VLOG_ROW</a> &lt;&lt; <span class="stringliteral">&quot;Channel::SendBatch() instance_id=&quot;</span> &lt;&lt; fragment_instance_id_</div>
<div class="line"><a name="l00161"></a><span class="lineno"> 161</span>&#160; &lt;&lt; <span class="stringliteral">&quot; dest_node=&quot;</span> &lt;&lt; dest_node_id_ &lt;&lt; <span class="stringliteral">&quot; #rows=&quot;</span> &lt;&lt; batch-&gt;num_rows;</div>
<div class="line"><a name="l00162"></a><span class="lineno"> 162</span>&#160; <span class="comment">// return if the previous batch saw an error</span></div>
<div class="line"><a name="l00163"></a><span class="lineno"> 163</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(GetSendStatus());</div>
<div class="line"><a name="l00164"></a><span class="lineno"> 164</span>&#160; {</div>
<div class="line"><a name="l00165"></a><span class="lineno"> 165</span>&#160; unique_lock&lt;mutex&gt; l(rpc_thread_lock_);</div>
<div class="line"><a name="l00166"></a><span class="lineno"> 166</span>&#160; rpc_in_flight_ = <span class="keyword">true</span>;</div>
<div class="line"><a name="l00167"></a><span class="lineno"> 167</span>&#160; }</div>
<div class="line"><a name="l00168"></a><span class="lineno"> 168</span>&#160; <span class="keywordflow">if</span> (!rpc_thread_.Offer(batch)) {</div>
<div class="line"><a name="l00169"></a><span class="lineno"> 169</span>&#160; unique_lock&lt;mutex&gt; l(rpc_thread_lock_);</div>
<div class="line"><a name="l00170"></a><span class="lineno"> 170</span>&#160; rpc_in_flight_ = <span class="keyword">false</span>;</div>
<div class="line"><a name="l00171"></a><span class="lineno"> 171</span>&#160; }</div>
<div class="line"><a name="l00172"></a><span class="lineno"> 172</span>&#160; <span class="keywordflow">return</span> <a class="code" href="namespaceimpala.html#ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898">Status::OK</a>;</div>
<div class="line"><a name="l00173"></a><span class="lineno"> 173</span>&#160;}</div>
<div class="line"><a name="l00174"></a><span class="lineno"> 174</span>&#160;</div>
<div class="line"><a name="l00175"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a3349dd4d57b8ef46b8e9003beaaee559"> 175</a></span>&#160;<span class="keywordtype">void</span> DataStreamSender::Channel::TransmitData(<span class="keywordtype">int</span> thread_id, <span class="keyword">const</span> TRowBatch* batch) {</div>
<div class="line"><a name="l00176"></a><span class="lineno"> 176</span>&#160; DCHECK(rpc_in_flight_);</div>
<div class="line"><a name="l00177"></a><span class="lineno"> 177</span>&#160; TransmitDataHelper(batch);</div>
<div class="line"><a name="l00178"></a><span class="lineno"> 178</span>&#160;</div>
<div class="line"><a name="l00179"></a><span class="lineno"> 179</span>&#160; {</div>
<div class="line"><a name="l00180"></a><span class="lineno"> 180</span>&#160; unique_lock&lt;mutex&gt; l(rpc_thread_lock_);</div>
<div class="line"><a name="l00181"></a><span class="lineno"> 181</span>&#160; rpc_in_flight_ = <span class="keyword">false</span>;</div>
<div class="line"><a name="l00182"></a><span class="lineno"> 182</span>&#160; }</div>
<div class="line"><a name="l00183"></a><span class="lineno"> 183</span>&#160; rpc_done_cv_.notify_one();</div>
<div class="line"><a name="l00184"></a><span class="lineno"> 184</span>&#160;}</div>
<div class="line"><a name="l00185"></a><span class="lineno"> 185</span>&#160;</div>
<div class="line"><a name="l00186"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a54cc6b98fd631c39b1c5692ff5bbff94"> 186</a></span>&#160;<span class="keywordtype">void</span> DataStreamSender::Channel::TransmitDataHelper(<span class="keyword">const</span> TRowBatch* batch) {</div>
<div class="line"><a name="l00187"></a><span class="lineno"> 187</span>&#160; DCHECK(batch != NULL);</div>
<div class="line"><a name="l00188"></a><span class="lineno"> 188</span>&#160; <a class="code" href="logging_8h.html#a6ccc2106c47622db4e52a401a415fc58">VLOG_ROW</a> &lt;&lt; <span class="stringliteral">&quot;Channel::TransmitData() instance_id=&quot;</span> &lt;&lt; fragment_instance_id_</div>
<div class="line"><a name="l00189"></a><span class="lineno"> 189</span>&#160; &lt;&lt; <span class="stringliteral">&quot; dest_node=&quot;</span> &lt;&lt; dest_node_id_</div>
<div class="line"><a name="l00190"></a><span class="lineno"> 190</span>&#160; &lt;&lt; <span class="stringliteral">&quot; #rows=&quot;</span> &lt;&lt; batch-&gt;num_rows;</div>
<div class="line"><a name="l00191"></a><span class="lineno"> 191</span>&#160; TTransmitDataParams params;</div>
<div class="line"><a name="l00192"></a><span class="lineno"> 192</span>&#160; params.protocol_version = ImpalaInternalServiceVersion::V1;</div>
<div class="line"><a name="l00193"></a><span class="lineno"> 193</span>&#160; params.__set_dest_fragment_instance_id(fragment_instance_id_);</div>
<div class="line"><a name="l00194"></a><span class="lineno"> 194</span>&#160; params.__set_dest_node_id(dest_node_id_);</div>
<div class="line"><a name="l00195"></a><span class="lineno"> 195</span>&#160; params.__set_row_batch(*batch); <span class="comment">// yet another copy</span></div>
<div class="line"><a name="l00196"></a><span class="lineno"> 196</span>&#160; params.__set_eos(<span class="keyword">false</span>);</div>
<div class="line"><a name="l00197"></a><span class="lineno"> 197</span>&#160; params.__set_sender_id(parent_-&gt;sender_id_);</div>
<div class="line"><a name="l00198"></a><span class="lineno"> 198</span>&#160;</div>
<div class="line"><a name="l00199"></a><span class="lineno"> 199</span>&#160; <a class="code" href="classimpala_1_1ClientConnection.html">ImpalaInternalServiceConnection</a> client(client_cache_, address_, &amp;rpc_status_);</div>
<div class="line"><a name="l00200"></a><span class="lineno"> 200</span>&#160; <span class="keywordflow">if</span> (!rpc_status_.ok()) <span class="keywordflow">return</span>;</div>
<div class="line"><a name="l00201"></a><span class="lineno"> 201</span>&#160;</div>
<div class="line"><a name="l00202"></a><span class="lineno"> 202</span>&#160; TTransmitDataResult res;</div>
<div class="line"><a name="l00203"></a><span class="lineno"> 203</span>&#160; {</div>
<div class="line"><a name="l00204"></a><span class="lineno"> 204</span>&#160; <a class="code" href="runtime-profile_8h.html#aaa9a2971c6368e3ddd3f5140a0295eb7">SCOPED_TIMER</a>(parent_-&gt;thrift_transmit_timer_);</div>
<div class="line"><a name="l00205"></a><span class="lineno"> 205</span>&#160; rpc_status_ =</div>
<div class="line"><a name="l00206"></a><span class="lineno"> 206</span>&#160; client.<a class="code" href="classimpala_1_1ClientConnection.html#aadbbddffff691a67309e0e2f784134ce">DoRpc</a>(&amp;ImpalaInternalServiceClient::TransmitData, params, &amp;res);</div>
<div class="line"><a name="l00207"></a><span class="lineno"> 207</span>&#160; <span class="keywordflow">if</span> (!rpc_status_.ok()) <span class="keywordflow">return</span>;</div>
<div class="line"><a name="l00208"></a><span class="lineno"> 208</span>&#160; }</div>
<div class="line"><a name="l00209"></a><span class="lineno"> 209</span>&#160;</div>
<div class="line"><a name="l00210"></a><span class="lineno"> 210</span>&#160; <span class="keywordflow">if</span> (res.status.status_code != <a class="code" href="namespaceimpala.html#ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898">TErrorCode::OK</a>) {</div>
<div class="line"><a name="l00211"></a><span class="lineno"> 211</span>&#160; rpc_status_ = res.status;</div>
<div class="line"><a name="l00212"></a><span class="lineno"> 212</span>&#160; } <span class="keywordflow">else</span> {</div>
<div class="line"><a name="l00213"></a><span class="lineno"> 213</span>&#160; num_data_bytes_sent_ += RowBatch::GetBatchSize(*batch);</div>
<div class="line"><a name="l00214"></a><span class="lineno"> 214</span>&#160; <a class="code" href="logging_8h.html#a6ccc2106c47622db4e52a401a415fc58">VLOG_ROW</a> &lt;&lt; <span class="stringliteral">&quot;incremented #data_bytes_sent=&quot;</span></div>
<div class="line"><a name="l00215"></a><span class="lineno"> 215</span>&#160; &lt;&lt; num_data_bytes_sent_;</div>
<div class="line"><a name="l00216"></a><span class="lineno"> 216</span>&#160; }</div>
<div class="line"><a name="l00217"></a><span class="lineno"> 217</span>&#160;}</div>
<div class="line"><a name="l00218"></a><span class="lineno"> 218</span>&#160;</div>
<div class="line"><a name="l00219"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a4efc59f4f4764d26115f15f43d9468ed"> 219</a></span>&#160;<span class="keywordtype">void</span> DataStreamSender::Channel::WaitForRpc() {</div>
<div class="line"><a name="l00220"></a><span class="lineno"> 220</span>&#160; <a class="code" href="runtime-profile_8h.html#aaa9a2971c6368e3ddd3f5140a0295eb7">SCOPED_TIMER</a>(parent_-&gt;state_-&gt;total_network_send_timer());</div>
<div class="line"><a name="l00221"></a><span class="lineno"> 221</span>&#160; unique_lock&lt;mutex&gt; l(rpc_thread_lock_);</div>
<div class="line"><a name="l00222"></a><span class="lineno"> 222</span>&#160; <span class="keywordflow">while</span> (rpc_in_flight_) {</div>
<div class="line"><a name="l00223"></a><span class="lineno"> 223</span>&#160; rpc_done_cv_.wait(l);</div>
<div class="line"><a name="l00224"></a><span class="lineno"> 224</span>&#160; }</div>
<div class="line"><a name="l00225"></a><span class="lineno"> 225</span>&#160;}</div>
<div class="line"><a name="l00226"></a><span class="lineno"> 226</span>&#160;</div>
<div class="line"><a name="l00227"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a5f13200cf119abee40d2f0b11e6caf4c"> 227</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> DataStreamSender::Channel::AddRow(<a class="code" href="classimpala_1_1TupleRow.html">TupleRow</a>* row) {</div>
<div class="line"><a name="l00228"></a><span class="lineno"> 228</span>&#160; <span class="keywordtype">int</span> row_num = batch_-&gt;AddRow();</div>
<div class="line"><a name="l00229"></a><span class="lineno"> 229</span>&#160; <span class="keywordflow">if</span> (row_num == RowBatch::INVALID_ROW_INDEX) {</div>
<div class="line"><a name="l00230"></a><span class="lineno"> 230</span>&#160; <span class="comment">// batch_ is full, let&#39;s send it; but first wait for an ongoing</span></div>
<div class="line"><a name="l00231"></a><span class="lineno"> 231</span>&#160; <span class="comment">// transmission to finish before modifying thrift_batch_</span></div>
<div class="line"><a name="l00232"></a><span class="lineno"> 232</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(SendCurrentBatch());</div>
<div class="line"><a name="l00233"></a><span class="lineno"> 233</span>&#160; row_num = batch_-&gt;AddRow();</div>
<div class="line"><a name="l00234"></a><span class="lineno"> 234</span>&#160; DCHECK_NE(row_num, RowBatch::INVALID_ROW_INDEX);</div>
<div class="line"><a name="l00235"></a><span class="lineno"> 235</span>&#160; }</div>
<div class="line"><a name="l00236"></a><span class="lineno"> 236</span>&#160;</div>
<div class="line"><a name="l00237"></a><span class="lineno"> 237</span>&#160; <a class="code" href="classimpala_1_1TupleRow.html">TupleRow</a>* dest = batch_-&gt;GetRow(row_num);</div>
<div class="line"><a name="l00238"></a><span class="lineno"> 238</span>&#160; batch_-&gt;CopyRow(row, dest);</div>
<div class="line"><a name="l00239"></a><span class="lineno"> 239</span>&#160; <span class="keyword">const</span> vector&lt;TupleDescriptor*&gt;&amp; descs = <a class="code" href="namespaceimpala.html#ad393a2093952c5b6f3a61bd3e1302e61">row_desc_</a>.<a class="code" href="classimpala_1_1RowDescriptor.html#a3d57439c68d08fe374c33370529a855c">tuple_descriptors</a>();</div>
<div class="line"><a name="l00240"></a><span class="lineno"> 240</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; descs.size(); ++i) {</div>
<div class="line"><a name="l00241"></a><span class="lineno"> 241</span>&#160; <span class="keywordflow">if</span> (<a class="code" href="compiler-util_8h.html#a9acc330d508b9a3b775cfdf7ce405e7d">UNLIKELY</a>(row-&gt;<a class="code" href="classimpala_1_1TupleRow.html#aed3092e531291470b27fc48afa901600">GetTuple</a>(i) == NULL)) {</div>
<div class="line"><a name="l00242"></a><span class="lineno"> 242</span>&#160; dest-&gt;<a class="code" href="classimpala_1_1TupleRow.html#aaf1d69b97c90eec7da3d07bdc40f5665">SetTuple</a>(i, NULL);</div>
<div class="line"><a name="l00243"></a><span class="lineno"> 243</span>&#160; } <span class="keywordflow">else</span> {</div>
<div class="line"><a name="l00244"></a><span class="lineno"> 244</span>&#160; dest-&gt;<a class="code" href="classimpala_1_1TupleRow.html#aaf1d69b97c90eec7da3d07bdc40f5665">SetTuple</a>(i, row-&gt;<a class="code" href="classimpala_1_1TupleRow.html#aed3092e531291470b27fc48afa901600">GetTuple</a>(i)-&gt;<a class="code" href="classimpala_1_1Tuple.html#a7bd61a6c3fe36dc122b94d5f1f2ecac4">DeepCopy</a>(*descs[i],</div>
<div class="line"><a name="l00245"></a><span class="lineno"> 245</span>&#160; batch_-&gt;tuple_data_pool()));</div>
<div class="line"><a name="l00246"></a><span class="lineno"> 246</span>&#160; }</div>
<div class="line"><a name="l00247"></a><span class="lineno"> 247</span>&#160; }</div>
<div class="line"><a name="l00248"></a><span class="lineno"> 248</span>&#160; batch_-&gt;CommitLastRow();</div>
<div class="line"><a name="l00249"></a><span class="lineno"> 249</span>&#160; <span class="keywordflow">return</span> <a class="code" href="namespaceimpala.html#ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898">Status::OK</a>;</div>
<div class="line"><a name="l00250"></a><span class="lineno"> 250</span>&#160;}</div>
<div class="line"><a name="l00251"></a><span class="lineno"> 251</span>&#160;</div>
<div class="line"><a name="l00252"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#ae8232117a5f2bba91c47c2337ca5aa52"> 252</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> DataStreamSender::Channel::SendCurrentBatch() {</div>
<div class="line"><a name="l00253"></a><span class="lineno"> 253</span>&#160; <span class="comment">// make sure there&#39;s no in-flight TransmitData() call that might still want to</span></div>
<div class="line"><a name="l00254"></a><span class="lineno"> 254</span>&#160; <span class="comment">// access thrift_batch_</span></div>
<div class="line"><a name="l00255"></a><span class="lineno"> 255</span>&#160; WaitForRpc();</div>
<div class="line"><a name="l00256"></a><span class="lineno"> 256</span>&#160; parent_-&gt;SerializeBatch(batch_.get(), &amp;thrift_batch_);</div>
<div class="line"><a name="l00257"></a><span class="lineno"> 257</span>&#160; batch_-&gt;Reset();</div>
<div class="line"><a name="l00258"></a><span class="lineno"> 258</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(SendBatch(&amp;thrift_batch_));</div>
<div class="line"><a name="l00259"></a><span class="lineno"> 259</span>&#160; <span class="keywordflow">return</span> <a class="code" href="namespaceimpala.html#ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898">Status::OK</a>;</div>
<div class="line"><a name="l00260"></a><span class="lineno"> 260</span>&#160;}</div>
<div class="line"><a name="l00261"></a><span class="lineno"> 261</span>&#160;</div>
<div class="line"><a name="l00262"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a71157e78e5b11718c36edad3701d2c49"> 262</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> DataStreamSender::Channel::GetSendStatus() {</div>
<div class="line"><a name="l00263"></a><span class="lineno"> 263</span>&#160; WaitForRpc();</div>
<div class="line"><a name="l00264"></a><span class="lineno"> 264</span>&#160; <span class="keywordflow">if</span> (!rpc_status_.ok()) {</div>
<div class="line"><a name="l00265"></a><span class="lineno"> 265</span>&#160; LOG(ERROR) &lt;&lt; <span class="stringliteral">&quot;channel send status: &quot;</span> &lt;&lt; rpc_status_.GetDetail();</div>
<div class="line"><a name="l00266"></a><span class="lineno"> 266</span>&#160; }</div>
<div class="line"><a name="l00267"></a><span class="lineno"> 267</span>&#160; <span class="keywordflow">return</span> rpc_status_;</div>
<div class="line"><a name="l00268"></a><span class="lineno"> 268</span>&#160;}</div>
<div class="line"><a name="l00269"></a><span class="lineno"> 269</span>&#160;</div>
<div class="line"><a name="l00270"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#ad2f724aa119b7476437b020ffdc7c3f5"> 270</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> DataStreamSender::Channel::CloseInternal() {</div>
<div class="line"><a name="l00271"></a><span class="lineno"> 271</span>&#160; <a class="code" href="logging_8h.html#a24efb6b382d3e1bdd7817e5e31dc5c1d">VLOG_RPC</a> &lt;&lt; <span class="stringliteral">&quot;Channel::Close() instance_id=&quot;</span> &lt;&lt; fragment_instance_id_</div>
<div class="line"><a name="l00272"></a><span class="lineno"> 272</span>&#160; &lt;&lt; <span class="stringliteral">&quot; dest_node=&quot;</span> &lt;&lt; dest_node_id_</div>
<div class="line"><a name="l00273"></a><span class="lineno"> 273</span>&#160; &lt;&lt; <span class="stringliteral">&quot; #rows= &quot;</span> &lt;&lt; batch_-&gt;num_rows();</div>
<div class="line"><a name="l00274"></a><span class="lineno"> 274</span>&#160;</div>
<div class="line"><a name="l00275"></a><span class="lineno"> 275</span>&#160; <span class="keywordflow">if</span> (batch_-&gt;num_rows() &gt; 0) {</div>
<div class="line"><a name="l00276"></a><span class="lineno"> 276</span>&#160; <span class="comment">// flush</span></div>
<div class="line"><a name="l00277"></a><span class="lineno"> 277</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(SendCurrentBatch());</div>
<div class="line"><a name="l00278"></a><span class="lineno"> 278</span>&#160; }</div>
<div class="line"><a name="l00279"></a><span class="lineno"> 279</span>&#160; <span class="comment">// if the last transmitted batch resulted in a error, return that error</span></div>
<div class="line"><a name="l00280"></a><span class="lineno"> 280</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(GetSendStatus());</div>
<div class="line"><a name="l00281"></a><span class="lineno"> 281</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> status;</div>
<div class="line"><a name="l00282"></a><span class="lineno"> 282</span>&#160; <a class="code" href="classimpala_1_1ClientConnection.html">ImpalaInternalServiceConnection</a> client(client_cache_, address_, &amp;status);</div>
<div class="line"><a name="l00283"></a><span class="lineno"> 283</span>&#160; <span class="keywordflow">if</span> (!status.<a class="code" href="classimpala_1_1Status.html#a95ba859e42fe93445b340533220836ac">ok</a>()) {</div>
<div class="line"><a name="l00284"></a><span class="lineno"> 284</span>&#160; <span class="keywordflow">return</span> status;</div>
<div class="line"><a name="l00285"></a><span class="lineno"> 285</span>&#160; }</div>
<div class="line"><a name="l00286"></a><span class="lineno"> 286</span>&#160; TTransmitDataParams params;</div>
<div class="line"><a name="l00287"></a><span class="lineno"> 287</span>&#160; params.protocol_version = ImpalaInternalServiceVersion::V1;</div>
<div class="line"><a name="l00288"></a><span class="lineno"> 288</span>&#160; params.__set_dest_fragment_instance_id(fragment_instance_id_);</div>
<div class="line"><a name="l00289"></a><span class="lineno"> 289</span>&#160; params.__set_dest_node_id(dest_node_id_);</div>
<div class="line"><a name="l00290"></a><span class="lineno"> 290</span>&#160; params.__set_sender_id(parent_-&gt;sender_id_);</div>
<div class="line"><a name="l00291"></a><span class="lineno"> 291</span>&#160; params.__set_eos(<span class="keyword">true</span>);</div>
<div class="line"><a name="l00292"></a><span class="lineno"> 292</span>&#160; TTransmitDataResult res;</div>
<div class="line"><a name="l00293"></a><span class="lineno"> 293</span>&#160; <a class="code" href="logging_8h.html#a24efb6b382d3e1bdd7817e5e31dc5c1d">VLOG_RPC</a> &lt;&lt; <span class="stringliteral">&quot;calling TransmitData to close channel&quot;</span>;</div>
<div class="line"><a name="l00294"></a><span class="lineno"> 294</span>&#160; rpc_status_ = client.<a class="code" href="classimpala_1_1ClientConnection.html#aadbbddffff691a67309e0e2f784134ce">DoRpc</a>(&amp;ImpalaInternalServiceClient::TransmitData, params, &amp;res);</div>
<div class="line"><a name="l00295"></a><span class="lineno"> 295</span>&#160; <span class="keywordflow">if</span> (!rpc_status_.ok()) {</div>
<div class="line"><a name="l00296"></a><span class="lineno"> 296</span>&#160; stringstream msg;</div>
<div class="line"><a name="l00297"></a><span class="lineno"> 297</span>&#160; msg &lt;&lt; <span class="stringliteral">&quot;CloseChannel() to &quot;</span> &lt;&lt; address_ &lt;&lt; <span class="stringliteral">&quot; failed:\n&quot;</span> &lt;&lt; rpc_status_.<a class="code" href="classimpala_1_1Status.html#afe14635279d3cd633d08ecbadee51019">msg</a>().<a class="code" href="classimpala_1_1ErrorMsg.html#a99f5a1f06dfc4462643fd645ee38a75f">msg</a>();</div>
<div class="line"><a name="l00298"></a><span class="lineno"> 298</span>&#160; <span class="keywordflow">return</span> <a class="code" href="classimpala_1_1Status.html">Status</a>(rpc_status_.code(), msg.str());</div>
<div class="line"><a name="l00299"></a><span class="lineno"> 299</span>&#160; }</div>
<div class="line"><a name="l00300"></a><span class="lineno"> 300</span>&#160; <span class="keywordflow">return</span> <a class="code" href="classimpala_1_1Status.html">Status</a>(res.status);</div>
<div class="line"><a name="l00301"></a><span class="lineno"> 301</span>&#160;}</div>
<div class="line"><a name="l00302"></a><span class="lineno"> 302</span>&#160;</div>
<div class="line"><a name="l00303"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender_1_1Channel.html#a6971b363e6acf807c4b6bc6f0262d465"> 303</a></span>&#160;<span class="keywordtype">void</span> DataStreamSender::Channel::Close(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state) {</div>
<div class="line"><a name="l00304"></a><span class="lineno"> 304</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> s = CloseInternal();</div>
<div class="line"><a name="l00305"></a><span class="lineno"> 305</span>&#160; <span class="keywordflow">if</span> (!s.<a class="code" href="classimpala_1_1Status.html#a95ba859e42fe93445b340533220836ac">ok</a>()) state-&gt;<a class="code" href="classimpala_1_1RuntimeState.html#aaf076b1879d9e712b08ba3a6e24607b9">LogError</a>(s.<a class="code" href="classimpala_1_1Status.html#afe14635279d3cd633d08ecbadee51019">msg</a>());</div>
<div class="line"><a name="l00306"></a><span class="lineno"> 306</span>&#160; rpc_thread_.DrainAndShutdown();</div>
<div class="line"><a name="l00307"></a><span class="lineno"> 307</span>&#160; batch_.reset();</div>
<div class="line"><a name="l00308"></a><span class="lineno"> 308</span>&#160;}</div>
<div class="line"><a name="l00309"></a><span class="lineno"> 309</span>&#160;</div>
<div class="line"><a name="l00310"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#af0f6ed4d7c05379050ff946652597a70"> 310</a></span>&#160;DataStreamSender::DataStreamSender(<a class="code" href="classimpala_1_1ObjectPool.html">ObjectPool</a>* <a class="code" href="expr-benchmark_8cc.html#a3a5de7bd423fbc0afc4cf935c166ca6b">pool</a>, <span class="keywordtype">int</span> sender_id,</div>
<div class="line"><a name="l00311"></a><span class="lineno"> 311</span>&#160; <span class="keyword">const</span> <a class="code" href="classimpala_1_1RowDescriptor.html">RowDescriptor</a>&amp; <a class="code" href="namespaceimpala.html#aa98447566dd6700a2faaaaf3059f4d95">row_desc</a>, <span class="keyword">const</span> TDataStreamSink&amp; sink,</div>
<div class="line"><a name="l00312"></a><span class="lineno"> 312</span>&#160; <span class="keyword">const</span> vector&lt;TPlanFragmentDestination&gt;&amp; destinations,</div>
<div class="line"><a name="l00313"></a><span class="lineno"> 313</span>&#160; <span class="keywordtype">int</span> per_channel_buffer_size)</div>
<div class="line"><a name="l00314"></a><span class="lineno"> 314</span>&#160; : sender_id_(sender_id),</div>
<div class="line"><a name="l00315"></a><span class="lineno"> 315</span>&#160; pool_(pool),</div>
<div class="line"><a name="l00316"></a><span class="lineno"> 316</span>&#160; <a class="code" href="namespaceimpala.html#ad393a2093952c5b6f3a61bd3e1302e61">row_desc_</a>(row_desc),</div>
<div class="line"><a name="l00317"></a><span class="lineno"> 317</span>&#160; current_channel_idx_(0),</div>
<div class="line"><a name="l00318"></a><span class="lineno"> 318</span>&#160; closed_(false),</div>
<div class="line"><a name="l00319"></a><span class="lineno"> 319</span>&#160; current_thrift_batch_(&amp;thrift_batch1_),</div>
<div class="line"><a name="l00320"></a><span class="lineno"> 320</span>&#160; profile_(NULL),</div>
<div class="line"><a name="l00321"></a><span class="lineno"> 321</span>&#160; serialize_batch_timer_(NULL),</div>
<div class="line"><a name="l00322"></a><span class="lineno"> 322</span>&#160; thrift_transmit_timer_(NULL),</div>
<div class="line"><a name="l00323"></a><span class="lineno"> 323</span>&#160; bytes_sent_counter_(NULL),</div>
<div class="line"><a name="l00324"></a><span class="lineno"> 324</span>&#160; dest_node_id_(sink.dest_node_id) {</div>
<div class="line"><a name="l00325"></a><span class="lineno"> 325</span>&#160; DCHECK_GT(destinations.size(), 0);</div>
<div class="line"><a name="l00326"></a><span class="lineno"> 326</span>&#160; DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED</div>
<div class="line"><a name="l00327"></a><span class="lineno"> 327</span>&#160; || sink.output_partition.type == TPartitionType::HASH_PARTITIONED</div>
<div class="line"><a name="l00328"></a><span class="lineno"> 328</span>&#160; || sink.output_partition.type == TPartitionType::RANDOM);</div>
<div class="line"><a name="l00329"></a><span class="lineno"> 329</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a5001d5f3c979d1ac348e964d032d6ce6">broadcast_</a> = sink.output_partition.type == TPartitionType::UNPARTITIONED;</div>
<div class="line"><a name="l00330"></a><span class="lineno"> 330</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a9712721a0c91210b32f65329e18edec4">random_</a> = sink.output_partition.type == TPartitionType::RANDOM;</div>
<div class="line"><a name="l00331"></a><span class="lineno"> 331</span>&#160; <span class="comment">// TODO: use something like google3&#39;s linked_ptr here (scoped_ptr isn&#39;t copyable)</span></div>
<div class="line"><a name="l00332"></a><span class="lineno"> 332</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; destinations.size(); ++i) {</div>
<div class="line"><a name="l00333"></a><span class="lineno"> 333</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.push_back(</div>
<div class="line"><a name="l00334"></a><span class="lineno"> 334</span>&#160; <span class="keyword">new</span> <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html">Channel</a>(<span class="keyword">this</span>, row_desc, destinations[i].server,</div>
<div class="line"><a name="l00335"></a><span class="lineno"> 335</span>&#160; destinations[i].fragment_instance_id,</div>
<div class="line"><a name="l00336"></a><span class="lineno"> 336</span>&#160; sink.dest_node_id, per_channel_buffer_size));</div>
<div class="line"><a name="l00337"></a><span class="lineno"> 337</span>&#160; }</div>
<div class="line"><a name="l00338"></a><span class="lineno"> 338</span>&#160;</div>
<div class="line"><a name="l00339"></a><span class="lineno"> 339</span>&#160; <span class="keywordflow">if</span> (<a class="code" href="classimpala_1_1DataStreamSender.html#a5001d5f3c979d1ac348e964d032d6ce6">broadcast_</a> || <a class="code" href="classimpala_1_1DataStreamSender.html#a9712721a0c91210b32f65329e18edec4">random_</a>) {</div>
<div class="line"><a name="l00340"></a><span class="lineno"> 340</span>&#160; <span class="comment">// Randomize the order we open/transmit to channels to avoid thundering herd problems.</span></div>
<div class="line"><a name="l00341"></a><span class="lineno"> 341</span>&#160; srand(reinterpret_cast&lt;uint64_t&gt;(<span class="keyword">this</span>));</div>
<div class="line"><a name="l00342"></a><span class="lineno"> 342</span>&#160; random_shuffle(<a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.begin(), <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.end());</div>
<div class="line"><a name="l00343"></a><span class="lineno"> 343</span>&#160; }</div>
<div class="line"><a name="l00344"></a><span class="lineno"> 344</span>&#160;</div>
<div class="line"><a name="l00345"></a><span class="lineno"> 345</span>&#160; <span class="keywordflow">if</span> (sink.output_partition.type == TPartitionType::HASH_PARTITIONED) {</div>
<div class="line"><a name="l00346"></a><span class="lineno"> 346</span>&#160; <span class="comment">// TODO: move this to Init()? would need to save &#39;sink&#39; somewhere</span></div>
<div class="line"><a name="l00347"></a><span class="lineno"> 347</span>&#160; <a class="code" href="classimpala_1_1Status.html">Status</a> status =</div>
<div class="line"><a name="l00348"></a><span class="lineno"> 348</span>&#160; <a class="code" href="classimpala_1_1Expr.html#a6b9c01a432ae49b57482c9fc42a73681">Expr::CreateExprTrees</a>(pool, sink.output_partition.partition_exprs,</div>
<div class="line"><a name="l00349"></a><span class="lineno"> 349</span>&#160; &amp;<a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>);</div>
<div class="line"><a name="l00350"></a><span class="lineno"> 350</span>&#160; DCHECK(status.<a class="code" href="classimpala_1_1Status.html#a95ba859e42fe93445b340533220836ac">ok</a>());</div>
<div class="line"><a name="l00351"></a><span class="lineno"> 351</span>&#160; }</div>
<div class="line"><a name="l00352"></a><span class="lineno"> 352</span>&#160;}</div>
<div class="line"><a name="l00353"></a><span class="lineno"> 353</span>&#160;</div>
<div class="line"><a name="l00354"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#a9d54e519344431380d62139e583bb6be"> 354</a></span>&#160;<a class="code" href="classimpala_1_1DataStreamSender.html#a9d54e519344431380d62139e583bb6be">DataStreamSender::~DataStreamSender</a>() {</div>
<div class="line"><a name="l00355"></a><span class="lineno"> 355</span>&#160; <span class="comment">// TODO: check that sender was either already closed() or there was an error</span></div>
<div class="line"><a name="l00356"></a><span class="lineno"> 356</span>&#160; <span class="comment">// on some channel</span></div>
<div class="line"><a name="l00357"></a><span class="lineno"> 357</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size(); ++i) {</div>
<div class="line"><a name="l00358"></a><span class="lineno"> 358</span>&#160; <span class="keyword">delete</span> <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[i];</div>
<div class="line"><a name="l00359"></a><span class="lineno"> 359</span>&#160; }</div>
<div class="line"><a name="l00360"></a><span class="lineno"> 360</span>&#160;}</div>
<div class="line"><a name="l00361"></a><span class="lineno"> 361</span>&#160;</div>
<div class="line"><a name="l00362"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#acef39c14c7ed4b85122aa3c0cde80247"> 362</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> <a class="code" href="classimpala_1_1DataStreamSender.html#acef39c14c7ed4b85122aa3c0cde80247">DataStreamSender::Prepare</a>(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state) {</div>
<div class="line"><a name="l00363"></a><span class="lineno"> 363</span>&#160; DCHECK(state != NULL);</div>
<div class="line"><a name="l00364"></a><span class="lineno"> 364</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#ac212459ad24f6386484ee23856d8020a">state_</a> = state;</div>
<div class="line"><a name="l00365"></a><span class="lineno"> 365</span>&#160; stringstream title;</div>
<div class="line"><a name="l00366"></a><span class="lineno"> 366</span>&#160; title &lt;&lt; <span class="stringliteral">&quot;DataStreamSender (dst_id=&quot;</span> &lt;&lt; <a class="code" href="classimpala_1_1DataStreamSender.html#a0da91963ba5b4e8299392661e8215cdd">dest_node_id_</a> &lt;&lt; <span class="stringliteral">&quot;)&quot;</span>;</div>
<div class="line"><a name="l00367"></a><span class="lineno"> 367</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a9c12706a6222a87cb545099a725c3a2b">profile_</a> = <a class="code" href="classimpala_1_1DataStreamSender.html#a78b2b85b58341c703f2ae9d1492fc0c2">pool_</a>-&gt;<a class="code" href="classimpala_1_1ObjectPool.html#ab191078c99825682a0a9915ca1e0c05c">Add</a>(<span class="keyword">new</span> <a class="code" href="classimpala_1_1RuntimeProfile.html">RuntimeProfile</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a78b2b85b58341c703f2ae9d1492fc0c2">pool_</a>, title.str()));</div>
<div class="line"><a name="l00368"></a><span class="lineno"> 368</span>&#160; <a class="code" href="runtime-profile_8h.html#aaa9a2971c6368e3ddd3f5140a0295eb7">SCOPED_TIMER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a9c12706a6222a87cb545099a725c3a2b">profile_</a>-&gt;<a class="code" href="classimpala_1_1RuntimeProfile.html#abd28edd77ba3928dbac3af735102fb3b">total_time_counter</a>());</div>
<div class="line"><a name="l00369"></a><span class="lineno"> 369</span>&#160;</div>
<div class="line"><a name="l00370"></a><span class="lineno"> 370</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#ae79c8af712dd169b821cdb61b761a9bd">mem_tracker_</a>.reset(<span class="keyword">new</span> <a class="code" href="classimpala_1_1MemTracker.html">MemTracker</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>(), -1, -1, <span class="stringliteral">&quot;DataStreamSender&quot;</span>,</div>
<div class="line"><a name="l00371"></a><span class="lineno"> 371</span>&#160; state-&gt;<a class="code" href="classimpala_1_1RuntimeState.html#ad6c837d1a8e2d4a995aa00a4df7c5af5">instance_mem_tracker</a>()));</div>
<div class="line"><a name="l00372"></a><span class="lineno"> 372</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(</div>
<div class="line"><a name="l00373"></a><span class="lineno"> 373</span>&#160; <a class="code" href="classimpala_1_1Expr.html#a3ae02e50debba50ac5c7b6dd9f8016e7">Expr::Prepare</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>, state, <a class="code" href="classimpala_1_1DataStreamSender.html#a29aee1e5cf5534019c438874836c248b">row_desc_</a>, <a class="code" href="classimpala_1_1DataStreamSender.html#ae79c8af712dd169b821cdb61b761a9bd">mem_tracker_</a>.get()));</div>
<div class="line"><a name="l00374"></a><span class="lineno"> 374</span>&#160;</div>
<div class="line"><a name="l00375"></a><span class="lineno"> 375</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#afb8209f4ee653757ec84277a03734e31">bytes_sent_counter_</a> =</div>
<div class="line"><a name="l00376"></a><span class="lineno"> 376</span>&#160; <a class="code" href="runtime-profile_8h.html#aac035c52016117af399543521f069c51">ADD_COUNTER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>(), <span class="stringliteral">&quot;BytesSent&quot;</span>, TUnit::BYTES);</div>
<div class="line"><a name="l00377"></a><span class="lineno"> 377</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#ad494bafc728fd45ff9965005c672a023">uncompressed_bytes_counter_</a> =</div>
<div class="line"><a name="l00378"></a><span class="lineno"> 378</span>&#160; <a class="code" href="runtime-profile_8h.html#aac035c52016117af399543521f069c51">ADD_COUNTER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>(), <span class="stringliteral">&quot;UncompressedRowBatchSize&quot;</span>, TUnit::BYTES);</div>
<div class="line"><a name="l00379"></a><span class="lineno"> 379</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a267f05a34f49154c4213778f69ca0e02">serialize_batch_timer_</a> =</div>
<div class="line"><a name="l00380"></a><span class="lineno"> 380</span>&#160; <a class="code" href="runtime-profile_8h.html#a8841ca205b2a05d608e82d443bad3a77">ADD_TIMER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>(), <span class="stringliteral">&quot;SerializeBatchTime&quot;</span>);</div>
<div class="line"><a name="l00381"></a><span class="lineno"> 381</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a9af4aaaa29c37202f3d1ee90746058b2">thrift_transmit_timer_</a> = <a class="code" href="runtime-profile_8h.html#a8841ca205b2a05d608e82d443bad3a77">ADD_TIMER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>(), <span class="stringliteral">&quot;ThriftTransmitTime(*)&quot;</span>);</div>
<div class="line"><a name="l00382"></a><span class="lineno"> 382</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a96873c121e5ddecadbb7d509b277b413">network_throughput_</a> =</div>
<div class="line"><a name="l00383"></a><span class="lineno"> 383</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>()-&gt;<a class="code" href="classimpala_1_1RuntimeProfile.html#a4ba4028a644bd8f71cdd624081f86ba7">AddDerivedCounter</a>(<span class="stringliteral">&quot;NetworkThroughput(*)&quot;</span>, TUnit::BYTES_PER_SECOND,</div>
<div class="line"><a name="l00384"></a><span class="lineno"> 384</span>&#160; bind&lt;int64_t&gt;(&amp;<a class="code" href="classimpala_1_1RuntimeProfile.html#ab1e095fac59cde33085156670c1e5130">RuntimeProfile::UnitsPerSecond</a>, <a class="code" href="classimpala_1_1DataStreamSender.html#afb8209f4ee653757ec84277a03734e31">bytes_sent_counter_</a>,</div>
<div class="line"><a name="l00385"></a><span class="lineno"> 385</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a9af4aaaa29c37202f3d1ee90746058b2">thrift_transmit_timer_</a>));</div>
<div class="line"><a name="l00386"></a><span class="lineno"> 386</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#aeb6427e9059b1b7384f78cd25b8cd8d7">overall_throughput_</a> =</div>
<div class="line"><a name="l00387"></a><span class="lineno"> 387</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>()-&gt;<a class="code" href="classimpala_1_1RuntimeProfile.html#a4ba4028a644bd8f71cdd624081f86ba7">AddDerivedCounter</a>(<span class="stringliteral">&quot;OverallThroughput&quot;</span>, TUnit::BYTES_PER_SECOND,</div>
<div class="line"><a name="l00388"></a><span class="lineno"> 388</span>&#160; bind&lt;int64_t&gt;(&amp;<a class="code" href="classimpala_1_1RuntimeProfile.html#ab1e095fac59cde33085156670c1e5130">RuntimeProfile::UnitsPerSecond</a>, <a class="code" href="classimpala_1_1DataStreamSender.html#afb8209f4ee653757ec84277a03734e31">bytes_sent_counter_</a>,</div>
<div class="line"><a name="l00389"></a><span class="lineno"> 389</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">profile</a>()-&gt;total_time_counter()));</div>
<div class="line"><a name="l00390"></a><span class="lineno"> 390</span>&#160;</div>
<div class="line"><a name="l00391"></a><span class="lineno"> 391</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size(); ++i) {</div>
<div class="line"><a name="l00392"></a><span class="lineno"> 392</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[i]-&gt;Init(state));</div>
<div class="line"><a name="l00393"></a><span class="lineno"> 393</span>&#160; }</div>
<div class="line"><a name="l00394"></a><span class="lineno"> 394</span>&#160; <span class="keywordflow">return</span> <a class="code" href="classimpala_1_1Status.html#a580565665ea944eb64f3f495b1bee1e0">Status::OK</a>;</div>
<div class="line"><a name="l00395"></a><span class="lineno"> 395</span>&#160;}</div>
<div class="line"><a name="l00396"></a><span class="lineno"> 396</span>&#160;</div>
<div class="line"><a name="l00397"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#a441b986df33ba936a2de32af7e84c9d8"> 397</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> <a class="code" href="classimpala_1_1DataStreamSender.html#a441b986df33ba936a2de32af7e84c9d8">DataStreamSender::Open</a>(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state) {</div>
<div class="line"><a name="l00398"></a><span class="lineno"> 398</span>&#160; <span class="keywordflow">return</span> <a class="code" href="classimpala_1_1Expr.html#aad1111f87951868de4d8f8b2dfe87c5f">Expr::Open</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>, state);</div>
<div class="line"><a name="l00399"></a><span class="lineno"> 399</span>&#160;}</div>
<div class="line"><a name="l00400"></a><span class="lineno"> 400</span>&#160;</div>
<div class="line"><a name="l00401"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#a9b36b9b654831d4757e22f131196343b"> 401</a></span>&#160;<a class="code" href="classimpala_1_1Status.html">Status</a> <a class="code" href="classimpala_1_1DataStreamSender.html#a9b36b9b654831d4757e22f131196343b">DataStreamSender::Send</a>(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state, <a class="code" href="classimpala_1_1RowBatch.html">RowBatch</a>* batch, <span class="keywordtype">bool</span> eos) {</div>
<div class="line"><a name="l00402"></a><span class="lineno"> 402</span>&#160; <a class="code" href="runtime-profile_8h.html#aaa9a2971c6368e3ddd3f5140a0295eb7">SCOPED_TIMER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a9c12706a6222a87cb545099a725c3a2b">profile_</a>-&gt;<a class="code" href="classimpala_1_1RuntimeProfile.html#abd28edd77ba3928dbac3af735102fb3b">total_time_counter</a>());</div>
<div class="line"><a name="l00403"></a><span class="lineno"> 403</span>&#160; <a class="code" href="classimpala_1_1ExprContext.html#a92f9a380825bf39ff33b5b3168bc377a">ExprContext::FreeLocalAllocations</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>);</div>
<div class="line"><a name="l00404"></a><span class="lineno"> 404</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(state-&gt;<a class="code" href="classimpala_1_1RuntimeState.html#a27bed76f727f93f1d75fbb2762cb0fa6">CheckQueryState</a>());</div>
<div class="line"><a name="l00405"></a><span class="lineno"> 405</span>&#160; DCHECK(!<a class="code" href="classimpala_1_1DataStreamSender.html#a425142ace64ba49e8122f069c79603e0">closed_</a>);</div>
<div class="line"><a name="l00406"></a><span class="lineno"> 406</span>&#160;</div>
<div class="line"><a name="l00407"></a><span class="lineno"> 407</span>&#160; <span class="keywordflow">if</span> (batch-&gt;<a class="code" href="classimpala_1_1RowBatch.html#ac695df3b85ee416b3d99844813ae813d">num_rows</a>() == 0) <span class="keywordflow">return</span> <a class="code" href="classimpala_1_1Status.html#a580565665ea944eb64f3f495b1bee1e0">Status::OK</a>;</div>
<div class="line"><a name="l00408"></a><span class="lineno"> 408</span>&#160; <span class="keywordflow">if</span> (<a class="code" href="classimpala_1_1DataStreamSender.html#a5001d5f3c979d1ac348e964d032d6ce6">broadcast_</a> || <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size() == 1) {</div>
<div class="line"><a name="l00409"></a><span class="lineno"> 409</span>&#160; <span class="comment">// current_thrift_batch_ is *not* the one that was written by the last call</span></div>
<div class="line"><a name="l00410"></a><span class="lineno"> 410</span>&#160; <span class="comment">// to Serialize()</span></div>
<div class="line"><a name="l00411"></a><span class="lineno"> 411</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#abbbddecac6016f222fb67ba610050385">SerializeBatch</a>(batch, <a class="code" href="classimpala_1_1DataStreamSender.html#af88aec7b54580fa5254895fae593f9ee">current_thrift_batch_</a>, <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size());</div>
<div class="line"><a name="l00412"></a><span class="lineno"> 412</span>&#160; <span class="comment">// SendBatch() will block if there are still in-flight rpcs (and those will</span></div>
<div class="line"><a name="l00413"></a><span class="lineno"> 413</span>&#160; <span class="comment">// reference the previously written thrift batch)</span></div>
<div class="line"><a name="l00414"></a><span class="lineno"> 414</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size(); ++i) {</div>
<div class="line"><a name="l00415"></a><span class="lineno"> 415</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[i]-&gt;SendBatch(<a class="code" href="classimpala_1_1DataStreamSender.html#af88aec7b54580fa5254895fae593f9ee">current_thrift_batch_</a>));</div>
<div class="line"><a name="l00416"></a><span class="lineno"> 416</span>&#160; }</div>
<div class="line"><a name="l00417"></a><span class="lineno"> 417</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#af88aec7b54580fa5254895fae593f9ee">current_thrift_batch_</a> =</div>
<div class="line"><a name="l00418"></a><span class="lineno"> 418</span>&#160; (<a class="code" href="classimpala_1_1DataStreamSender.html#af88aec7b54580fa5254895fae593f9ee">current_thrift_batch_</a> == &amp;<a class="code" href="classimpala_1_1DataStreamSender.html#a8efd4d7e7db1a831ec30ca0f6b26f23a">thrift_batch1_</a> ? &amp;<a class="code" href="classimpala_1_1DataStreamSender.html#a33cfb26543180cf9e27ef48595719135">thrift_batch2_</a> : &amp;<a class="code" href="classimpala_1_1DataStreamSender.html#a8efd4d7e7db1a831ec30ca0f6b26f23a">thrift_batch1_</a>);</div>
<div class="line"><a name="l00419"></a><span class="lineno"> 419</span>&#160; } <span class="keywordflow">else</span> <span class="keywordflow">if</span> (<a class="code" href="classimpala_1_1DataStreamSender.html#a9712721a0c91210b32f65329e18edec4">random_</a>) {</div>
<div class="line"><a name="l00420"></a><span class="lineno"> 420</span>&#160; <span class="comment">// Round-robin batches among channels. Wait for the current channel to finish its</span></div>
<div class="line"><a name="l00421"></a><span class="lineno"> 421</span>&#160; <span class="comment">// rpc before overwriting its batch.</span></div>
<div class="line"><a name="l00422"></a><span class="lineno"> 422</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html">Channel</a>* current_channel = <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[<a class="code" href="classimpala_1_1DataStreamSender.html#ad48f0843f4e4db5ed5abf45c68796b0b">current_channel_idx_</a>];</div>
<div class="line"><a name="l00423"></a><span class="lineno"> 423</span>&#160; current_channel-&gt;<a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a4efc59f4f4764d26115f15f43d9468ed">WaitForRpc</a>();</div>
<div class="line"><a name="l00424"></a><span class="lineno"> 424</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#abbbddecac6016f222fb67ba610050385">SerializeBatch</a>(batch, current_channel-&gt;<a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e78160dbfe7db64784d0b469a0b1c2c">thrift_batch</a>());</div>
<div class="line"><a name="l00425"></a><span class="lineno"> 425</span>&#160; current_channel-&gt;<a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a127f3c10e732383a272a84bee6cca5ff">SendBatch</a>(current_channel-&gt;<a class="code" href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e78160dbfe7db64784d0b469a0b1c2c">thrift_batch</a>());</div>
<div class="line"><a name="l00426"></a><span class="lineno"> 426</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#ad48f0843f4e4db5ed5abf45c68796b0b">current_channel_idx_</a> = (<a class="code" href="classimpala_1_1DataStreamSender.html#ad48f0843f4e4db5ed5abf45c68796b0b">current_channel_idx_</a> + 1) % <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size();</div>
<div class="line"><a name="l00427"></a><span class="lineno"> 427</span>&#160; } <span class="keywordflow">else</span> {</div>
<div class="line"><a name="l00428"></a><span class="lineno"> 428</span>&#160; <span class="comment">// hash-partition batch&#39;s rows across channels</span></div>
<div class="line"><a name="l00429"></a><span class="lineno"> 429</span>&#160; <span class="keywordtype">int</span> num_channels = <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size();</div>
<div class="line"><a name="l00430"></a><span class="lineno"> 430</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; batch-&gt;<a class="code" href="classimpala_1_1RowBatch.html#ac695df3b85ee416b3d99844813ae813d">num_rows</a>(); ++i) {</div>
<div class="line"><a name="l00431"></a><span class="lineno"> 431</span>&#160; <a class="code" href="classimpala_1_1TupleRow.html">TupleRow</a>* row = batch-&gt;<a class="code" href="classimpala_1_1RowBatch.html#a9320ed986b0717ef26d73d871f3b3a42">GetRow</a>(i);</div>
<div class="line"><a name="l00432"></a><span class="lineno"> 432</span>&#160; uint32_t hash_val = <a class="code" href="classimpala_1_1HashUtil.html#ad47e5dcaf416f05760be732dfbc66651">HashUtil::FNV_SEED</a>;</div>
<div class="line"><a name="l00433"></a><span class="lineno"> 433</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; <a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>.size(); ++i) {</div>
<div class="line"><a name="l00434"></a><span class="lineno"> 434</span>&#160; <a class="code" href="classimpala_1_1ExprContext.html">ExprContext</a>* ctx = <a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>[i];</div>
<div class="line"><a name="l00435"></a><span class="lineno"> 435</span>&#160; <span class="keywordtype">void</span>* partition_val = ctx-&gt;<a class="code" href="classimpala_1_1ExprContext.html#ae0c52a90c10e6c206888ff43b9d033e5">GetValue</a>(row);</div>
<div class="line"><a name="l00436"></a><span class="lineno"> 436</span>&#160; <span class="comment">// We can&#39;t use the crc hash function here because it does not result</span></div>
<div class="line"><a name="l00437"></a><span class="lineno"> 437</span>&#160; <span class="comment">// in uncorrelated hashes with different seeds. Instead we must use</span></div>
<div class="line"><a name="l00438"></a><span class="lineno"> 438</span>&#160; <span class="comment">// fnv hash.</span></div>
<div class="line"><a name="l00439"></a><span class="lineno"> 439</span>&#160; <span class="comment">// TODO: fix crc hash/GetHashValue()</span></div>
<div class="line"><a name="l00440"></a><span class="lineno"> 440</span>&#160; hash_val =</div>
<div class="line"><a name="l00441"></a><span class="lineno"> 441</span>&#160; <a class="code" href="classimpala_1_1RawValue.html#ade36561a0ea59fde387ff49f6565c6ec">RawValue::GetHashValueFnv</a>(partition_val, ctx-&gt;<a class="code" href="classimpala_1_1ExprContext.html#a2e652a852e99d5cd375dce3f91520aee">root</a>()-&gt;<a class="code" href="classimpala_1_1Expr.html#a742827844080d45d514719742cb3e7f5">type</a>(), hash_val);</div>
<div class="line"><a name="l00442"></a><span class="lineno"> 442</span>&#160; }</div>
<div class="line"><a name="l00443"></a><span class="lineno"> 443</span>&#160;</div>
<div class="line"><a name="l00444"></a><span class="lineno"> 444</span>&#160; <a class="code" href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[hash_val % num_channels]-&gt;AddRow(row));</div>
<div class="line"><a name="l00445"></a><span class="lineno"> 445</span>&#160; }</div>
<div class="line"><a name="l00446"></a><span class="lineno"> 446</span>&#160; }</div>
<div class="line"><a name="l00447"></a><span class="lineno"> 447</span>&#160; <span class="keywordflow">return</span> <a class="code" href="classimpala_1_1Status.html#a580565665ea944eb64f3f495b1bee1e0">Status::OK</a>;</div>
<div class="line"><a name="l00448"></a><span class="lineno"> 448</span>&#160;}</div>
<div class="line"><a name="l00449"></a><span class="lineno"> 449</span>&#160;</div>
<div class="line"><a name="l00450"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#a54b74afd5299ec9465d49cc748485d00"> 450</a></span>&#160;<span class="keywordtype">void</span> <a class="code" href="classimpala_1_1DataStreamSender.html#a54b74afd5299ec9465d49cc748485d00">DataStreamSender::Close</a>(<a class="code" href="classimpala_1_1RuntimeState.html">RuntimeState</a>* state) {</div>
<div class="line"><a name="l00451"></a><span class="lineno"> 451</span>&#160; <span class="keywordflow">if</span> (<a class="code" href="classimpala_1_1DataStreamSender.html#a425142ace64ba49e8122f069c79603e0">closed_</a>) <span class="keywordflow">return</span>;</div>
<div class="line"><a name="l00452"></a><span class="lineno"> 452</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size(); ++i) {</div>
<div class="line"><a name="l00453"></a><span class="lineno"> 453</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[i]-&gt;Close(state);</div>
<div class="line"><a name="l00454"></a><span class="lineno"> 454</span>&#160; }</div>
<div class="line"><a name="l00455"></a><span class="lineno"> 455</span>&#160; <a class="code" href="classimpala_1_1Expr.html#a70300b0ba7c91fddb9adfaeb0dfc09a5">Expr::Close</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">partition_expr_ctxs_</a>, state);</div>
<div class="line"><a name="l00456"></a><span class="lineno"> 456</span>&#160; <a class="code" href="classimpala_1_1DataStreamSender.html#a425142ace64ba49e8122f069c79603e0">closed_</a> = <span class="keyword">true</span>;</div>
<div class="line"><a name="l00457"></a><span class="lineno"> 457</span>&#160;}</div>
<div class="line"><a name="l00458"></a><span class="lineno"> 458</span>&#160;</div>
<div class="line"><a name="l00459"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#abbbddecac6016f222fb67ba610050385"> 459</a></span>&#160;<span class="keywordtype">void</span> <a class="code" href="classimpala_1_1DataStreamSender.html#abbbddecac6016f222fb67ba610050385">DataStreamSender::SerializeBatch</a>(<a class="code" href="classimpala_1_1RowBatch.html">RowBatch</a>* src, TRowBatch* dest, <span class="keywordtype">int</span> num_receivers) {</div>
<div class="line"><a name="l00460"></a><span class="lineno"> 460</span>&#160; <a class="code" href="logging_8h.html#a6ccc2106c47622db4e52a401a415fc58">VLOG_ROW</a> &lt;&lt; <span class="stringliteral">&quot;serializing &quot;</span> &lt;&lt; src-&gt;<a class="code" href="classimpala_1_1RowBatch.html#ac695df3b85ee416b3d99844813ae813d">num_rows</a>() &lt;&lt; <span class="stringliteral">&quot; rows&quot;</span>;</div>
<div class="line"><a name="l00461"></a><span class="lineno"> 461</span>&#160; {</div>
<div class="line"><a name="l00462"></a><span class="lineno"> 462</span>&#160; <a class="code" href="runtime-profile_8h.html#aaa9a2971c6368e3ddd3f5140a0295eb7">SCOPED_TIMER</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#a267f05a34f49154c4213778f69ca0e02">serialize_batch_timer_</a>);</div>
<div class="line"><a name="l00463"></a><span class="lineno"> 463</span>&#160; <span class="keywordtype">int</span> uncompressed_bytes = src-&gt;<a class="code" href="classimpala_1_1RowBatch.html#a5a7ff7bf9d6bd28990eedab985477390">Serialize</a>(dest);</div>
<div class="line"><a name="l00464"></a><span class="lineno"> 464</span>&#160; <a class="code" href="runtime-profile_8h.html#a0842f330c18b3e4e53e13655add0310a">COUNTER_ADD</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#afb8209f4ee653757ec84277a03734e31">bytes_sent_counter_</a>, <a class="code" href="classimpala_1_1RowBatch.html#af12857237879012707070a5deb7a529f">RowBatch::GetBatchSize</a>(*dest) * num_receivers);</div>
<div class="line"><a name="l00465"></a><span class="lineno"> 465</span>&#160; <a class="code" href="runtime-profile_8h.html#a0842f330c18b3e4e53e13655add0310a">COUNTER_ADD</a>(<a class="code" href="classimpala_1_1DataStreamSender.html#ad494bafc728fd45ff9965005c672a023">uncompressed_bytes_counter_</a>, uncompressed_bytes * num_receivers);</div>
<div class="line"><a name="l00466"></a><span class="lineno"> 466</span>&#160; }</div>
<div class="line"><a name="l00467"></a><span class="lineno"> 467</span>&#160;}</div>
<div class="line"><a name="l00468"></a><span class="lineno"> 468</span>&#160;</div>
<div class="line"><a name="l00469"></a><span class="lineno"><a class="line" href="classimpala_1_1DataStreamSender.html#ae93db027f9d0cb4c50e8dd962d92a624"> 469</a></span>&#160;int64_t <a class="code" href="classimpala_1_1DataStreamSender.html#ae93db027f9d0cb4c50e8dd962d92a624">DataStreamSender::GetNumDataBytesSent</a>()<span class="keyword"> const </span>{</div>
<div class="line"><a name="l00470"></a><span class="lineno"> 470</span>&#160; <span class="comment">// TODO: do we need synchronization here or are reads &amp; writes to 8-byte ints</span></div>
<div class="line"><a name="l00471"></a><span class="lineno"> 471</span>&#160; <span class="comment">// atomic?</span></div>
<div class="line"><a name="l00472"></a><span class="lineno"> 472</span>&#160; int64_t result = 0;</div>
<div class="line"><a name="l00473"></a><span class="lineno"> 473</span>&#160; <span class="keywordflow">for</span> (<span class="keywordtype">int</span> i = 0; i &lt; <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>.size(); ++i) {</div>
<div class="line"><a name="l00474"></a><span class="lineno"> 474</span>&#160; result += <a class="code" href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">channels_</a>[i]-&gt;num_data_bytes_sent();</div>
<div class="line"><a name="l00475"></a><span class="lineno"> 475</span>&#160; }</div>
<div class="line"><a name="l00476"></a><span class="lineno"> 476</span>&#160; <span class="keywordflow">return</span> result;</div>
<div class="line"><a name="l00477"></a><span class="lineno"> 477</span>&#160;}</div>
<div class="line"><a name="l00478"></a><span class="lineno"> 478</span>&#160;</div>
<div class="line"><a name="l00479"></a><span class="lineno"> 479</span>&#160;}</div>
<div class="ttc" id="classimpala_1_1ErrorMsg_html_a99f5a1f06dfc4462643fd645ee38a75f"><div class="ttname"><a href="classimpala_1_1ErrorMsg.html#a99f5a1f06dfc4462643fd645ee38a75f">impala::ErrorMsg::msg</a></div><div class="ttdeci">const std::string &amp; msg() const </div><div class="ttdoc">Returns the formatted error string. </div><div class="ttdef"><b>Definition:</b> <a href="error-util_8h_source.html#l00118">error-util.h:118</a></div></div>
<div class="ttc" id="row-batch_8h_html"><div class="ttname"><a href="row-batch_8h.html">row-batch.h</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a9af4aaaa29c37202f3d1ee90746058b2"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a9af4aaaa29c37202f3d1ee90746058b2">impala::DataStreamSender::thrift_transmit_timer_</a></div><div class="ttdeci">RuntimeProfile::Counter * thrift_transmit_timer_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00118">data-stream-sender.h:118</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeProfile_html_a4ba4028a644bd8f71cdd624081f86ba7"><div class="ttname"><a href="classimpala_1_1RuntimeProfile.html#a4ba4028a644bd8f71cdd624081f86ba7">impala::RuntimeProfile::AddDerivedCounter</a></div><div class="ttdeci">DerivedCounter * AddDerivedCounter(const std::string &amp;name, TUnit::type unit, const DerivedCounterFunction &amp;counter_fn, const std::string &amp;parent_counter_name=&quot;&quot;)</div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8cc_source.html#l00447">runtime-profile.cc:447</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_ad494bafc728fd45ff9965005c672a023"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#ad494bafc728fd45ff9965005c672a023">impala::DataStreamSender::uncompressed_bytes_counter_</a></div><div class="ttdeci">RuntimeProfile::Counter * uncompressed_bytes_counter_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00120">data-stream-sender.h:120</a></div></div>
<div class="ttc" id="classimpala_1_1RowDescriptor_html_acc6beec42c710b65f958cf91981cb4b9"><div class="ttname"><a href="classimpala_1_1RowDescriptor.html#acc6beec42c710b65f958cf91981cb4b9">impala::RowDescriptor::GetRowSize</a></div><div class="ttdeci">int GetRowSize() const </div><div class="ttdef"><b>Definition:</b> <a href="descriptors_8cc_source.html#l00320">descriptors.cc:320</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeState_html_a27bed76f727f93f1d75fbb2762cb0fa6"><div class="ttname"><a href="classimpala_1_1RuntimeState.html#a27bed76f727f93f1d75fbb2762cb0fa6">impala::RuntimeState::CheckQueryState</a></div><div class="ttdeci">Status CheckQueryState()</div><div class="ttdef"><b>Definition:</b> <a href="runtime-state_8cc_source.html#l00286">runtime-state.cc:286</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a8df49496bd0b8a4cd1502049650e974c"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a8df49496bd0b8a4cd1502049650e974c">impala::DataStreamSender::Channel::client_cache_</a></div><div class="ttdeci">ImpalaInternalServiceClientCache * client_cache_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00111">data-stream-sender.cc:111</a></div></div>
<div class="ttc" id="classimpala_1_1RowBatch_html_ac695df3b85ee416b3d99844813ae813d"><div class="ttname"><a href="classimpala_1_1RowBatch.html#ac695df3b85ee416b3d99844813ae813d">impala::RowBatch::num_rows</a></div><div class="ttdeci">int num_rows() const </div><div class="ttdef"><b>Definition:</b> <a href="row-batch_8h_source.html#l00215">row-batch.h:215</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a9712721a0c91210b32f65329e18edec4"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a9712721a0c91210b32f65329e18edec4">impala::DataStreamSender::random_</a></div><div class="ttdeci">bool random_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00101">data-stream-sender.h:101</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_afb8209f4ee653757ec84277a03734e31"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#afb8209f4ee653757ec84277a03734e31">impala::DataStreamSender::bytes_sent_counter_</a></div><div class="ttdeci">RuntimeProfile::Counter * bytes_sent_counter_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00119">data-stream-sender.h:119</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a127f3c10e732383a272a84bee6cca5ff"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a127f3c10e732383a272a84bee6cca5ff">impala::DataStreamSender::Channel::SendBatch</a></div><div class="ttdeci">Status SendBatch(TRowBatch *batch)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00159">data-stream-sender.cc:159</a></div></div>
<div class="ttc" id="namespaceimpala_html_aca80061c98b44477ea84e4332993b7e7"><div class="ttname"><a href="namespaceimpala.html#aca80061c98b44477ea84e4332993b7e7">impala::PlanNodeId</a></div><div class="ttdeci">int PlanNodeId</div><div class="ttdef"><b>Definition:</b> <a href="global-types_8h_source.html#l00026">global-types.h:26</a></div></div>
<div class="ttc" id="classimpala_1_1TupleRow_html_aed3092e531291470b27fc48afa901600"><div class="ttname"><a href="classimpala_1_1TupleRow.html#aed3092e531291470b27fc48afa901600">impala::TupleRow::GetTuple</a></div><div class="ttdeci">Tuple * GetTuple(int tuple_idx)</div><div class="ttdef"><b>Definition:</b> <a href="tuple-row_8h_source.html#l00030">tuple-row.h:30</a></div></div>
<div class="ttc" id="namespaceimpala_html_a8cf06290ff145eec23570a8b9790f412"><div class="ttname"><a href="namespaceimpala.html#a8cf06290ff145eec23570a8b9790f412">impala::protocol</a></div><div class="ttdeci">const StringSearch UrlParser::protocol_search &amp; protocol</div><div class="ttdef"><b>Definition:</b> <a href="url-parser_8cc_source.html#l00036">url-parser.cc:36</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a68a204858648ba801d85231f41480db4"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a68a204858648ba801d85231f41480db4">impala::DataStreamSender::Channel::rpc_done_cv_</a></div><div class="ttdeci">condition_variable rpc_done_cv_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00132">data-stream-sender.cc:132</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a96873c121e5ddecadbb7d509b277b413"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a96873c121e5ddecadbb7d509b277b413">impala::DataStreamSender::network_throughput_</a></div><div class="ttdeci">RuntimeProfile::Counter * network_throughput_</div><div class="ttdoc">Throughput per time spent in TransmitData. </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00124">data-stream-sender.h:124</a></div></div>
<div class="ttc" id="classimpala_1_1ClientConnection_html"><div class="ttname"><a href="classimpala_1_1ClientConnection.html">impala::ClientConnection</a></div><div class="ttdef"><b>Definition:</b> <a href="client-cache_8h_source.html#l00192">client-cache.h:192</a></div></div>
<div class="ttc" id="classimpala_1_1Expr_html_aad1111f87951868de4d8f8b2dfe87c5f"><div class="ttname"><a href="classimpala_1_1Expr.html#aad1111f87951868de4d8f8b2dfe87c5f">impala::Expr::Open</a></div><div class="ttdeci">static Status Open(const std::vector&lt; ExprContext * &gt; &amp;ctxs, RuntimeState *state)</div><div class="ttdoc">Convenience function for opening multiple expr trees. </div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_aec04a5a2a66d21fdb28c5293f6780c35"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#aec04a5a2a66d21fdb28c5293f6780c35">impala::DataStreamSender::Channel::address_</a></div><div class="ttdeci">TNetworkAddress address_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00114">data-stream-sender.cc:114</a></div></div>
<div class="ttc" id="mem-tracker_8h_html"><div class="ttname"><a href="mem-tracker_8h.html">mem-tracker.h</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a4efc59f4f4764d26115f15f43d9468ed"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a4efc59f4f4764d26115f15f43d9468ed">impala::DataStreamSender::Channel::WaitForRpc</a></div><div class="ttdeci">void WaitForRpc()</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00219">data-stream-sender.cc:219</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a11a77fb40bb6bbe7d2fdc3ebefa4b24c"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a11a77fb40bb6bbe7d2fdc3ebefa4b24c">impala::DataStreamSender::Channel::rpc_status_</a></div><div class="ttdeci">Status rpc_status_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00136">data-stream-sender.cc:136</a></div></div>
<div class="ttc" id="status_8h_html_a85f7d0e774e15eb35b74f53264305e16"><div class="ttname"><a href="status_8h.html#a85f7d0e774e15eb35b74f53264305e16">RETURN_IF_ERROR</a></div><div class="ttdeci">#define RETURN_IF_ERROR(stmt)</div><div class="ttdoc">some generally useful macros </div><div class="ttdef"><b>Definition:</b> <a href="status_8h_source.html#l00242">status.h:242</a></div></div>
<div class="ttc" id="raw-value_8h_html"><div class="ttname"><a href="raw-value_8h.html">raw-value.h</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a9309610f37d46c02fd711a6d0df1bf8a"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a9309610f37d46c02fd711a6d0df1bf8a">impala::DataStreamSender::Channel::batch_</a></div><div class="ttdeci">scoped_ptr&lt; RowBatch &gt; batch_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00122">data-stream-sender.cc:122</a></div></div>
<div class="ttc" id="classimpala_1_1RowBatch_html_a9320ed986b0717ef26d73d871f3b3a42"><div class="ttname"><a href="classimpala_1_1RowBatch.html#a9320ed986b0717ef26d73d871f3b3a42">impala::RowBatch::GetRow</a></div><div class="ttdeci">TupleRow * GetRow(int row_idx)</div><div class="ttdef"><b>Definition:</b> <a href="row-batch_8h_source.html#l00140">row-batch.h:140</a></div></div>
<div class="ttc" id="classimpala_1_1ExprContext_html_ae0c52a90c10e6c206888ff43b9d033e5"><div class="ttname"><a href="classimpala_1_1ExprContext.html#ae0c52a90c10e6c206888ff43b9d033e5">impala::ExprContext::GetValue</a></div><div class="ttdeci">void * GetValue(TupleRow *row)</div><div class="ttdef"><b>Definition:</b> <a href="expr-context_8cc_source.html#l00200">expr-context.cc:200</a></div></div>
<div class="ttc" id="classimpala_1_1ExprContext_html"><div class="ttname"><a href="classimpala_1_1ExprContext.html">impala::ExprContext</a></div><div class="ttdef"><b>Definition:</b> <a href="expr-context_8h_source.html#l00040">expr-context.h:40</a></div></div>
<div class="ttc" id="runtime-profile_8h_html_a8841ca205b2a05d608e82d443bad3a77"><div class="ttname"><a href="runtime-profile_8h.html#a8841ca205b2a05d608e82d443bad3a77">ADD_TIMER</a></div><div class="ttdeci">#define ADD_TIMER(profile, name)</div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8h_source.html#l00050">runtime-profile.h:50</a></div></div>
<div class="ttc" id="expr-context_8h_html"><div class="ttname"><a href="expr-context_8h.html">expr-context.h</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_acef39c14c7ed4b85122aa3c0cde80247"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#acef39c14c7ed4b85122aa3c0cde80247">impala::DataStreamSender::Prepare</a></div><div class="ttdeci">virtual Status Prepare(RuntimeState *state)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00362">data-stream-sender.cc:362</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a8efd4d7e7db1a831ec30ca0f6b26f23a"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a8efd4d7e7db1a831ec30ca0f6b26f23a">impala::DataStreamSender::thrift_batch1_</a></div><div class="ttdeci">TRowBatch thrift_batch1_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00109">data-stream-sender.h:109</a></div></div>
<div class="ttc" id="classimpala_1_1RowDescriptor_html"><div class="ttname"><a href="classimpala_1_1RowDescriptor.html">impala::RowDescriptor</a></div><div class="ttdef"><b>Definition:</b> <a href="descriptors_8h_source.html#l00373">descriptors.h:373</a></div></div>
<div class="ttc" id="client-cache_8h_html"><div class="ttname"><a href="client-cache_8h.html">client-cache.h</a></div></div>
<div class="ttc" id="classimpala_1_1ObjectPool_html"><div class="ttname"><a href="classimpala_1_1ObjectPool.html">impala::ObjectPool</a></div><div class="ttdef"><b>Definition:</b> <a href="object-pool_8h_source.html#l00030">object-pool.h:30</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeProfile_html_ab1e095fac59cde33085156670c1e5130"><div class="ttname"><a href="classimpala_1_1RuntimeProfile.html#ab1e095fac59cde33085156670c1e5130">impala::RuntimeProfile::UnitsPerSecond</a></div><div class="ttdeci">static int64_t UnitsPerSecond(const Counter *total_counter, const Counter *timer)</div><div class="ttdoc">Derived counter function: return measured throughput as input_value/second. </div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8cc_source.html#l00733">runtime-profile.cc:733</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_aad9605700b73457dfec1ce13604ae6d2"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#aad9605700b73457dfec1ce13604ae6d2">impala::DataStreamSender::profile</a></div><div class="ttdeci">virtual RuntimeProfile * profile()</div><div class="ttdoc">Returns the runtime profile for the sink. </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00090">data-stream-sender.h:90</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a1fdba48ea7980af1182fde5d14b78af4"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a1fdba48ea7980af1182fde5d14b78af4">impala::DataStreamSender::Channel::buffer_size_</a></div><div class="ttdeci">int buffer_size_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00109">data-stream-sender.cc:109</a></div></div>
<div class="ttc" id="namespaceimpala_html_a309108c8b3eaf4e5b154bc4eb4624880"><div class="ttname"><a href="namespaceimpala.html#a309108c8b3eaf4e5b154bc4eb4624880">impala::MakeNetworkAddress</a></div><div class="ttdeci">TNetworkAddress MakeNetworkAddress(const string &amp;hostname, int port)</div><div class="ttdef"><b>Definition:</b> <a href="network-util_8cc_source.html#l00096">network-util.cc:96</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a441b986df33ba936a2de32af7e84c9d8"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a441b986df33ba936a2de32af7e84c9d8">impala::DataStreamSender::Open</a></div><div class="ttdeci">virtual Status Open(RuntimeState *state)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00397">data-stream-sender.cc:397</a></div></div>
<div class="ttc" id="logging_8h_html"><div class="ttname"><a href="logging_8h.html">logging.h</a></div></div>
<div class="ttc" id="runtime-profile_8h_html_a0842f330c18b3e4e53e13655add0310a"><div class="ttname"><a href="runtime-profile_8h.html#a0842f330c18b3e4e53e13655add0310a">COUNTER_ADD</a></div><div class="ttdeci">#define COUNTER_ADD(c, v)</div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8h_source.html#l00055">runtime-profile.h:55</a></div></div>
<div class="ttc" id="classimpala_1_1TupleRow_html"><div class="ttname"><a href="classimpala_1_1TupleRow.html">impala::TupleRow</a></div><div class="ttdef"><b>Definition:</b> <a href="tuple-row_8h_source.html#l00028">tuple-row.h:28</a></div></div>
<div class="ttc" id="runtime-profile_8h_html_aaa9a2971c6368e3ddd3f5140a0295eb7"><div class="ttname"><a href="runtime-profile_8h.html#aaa9a2971c6368e3ddd3f5140a0295eb7">SCOPED_TIMER</a></div><div class="ttdeci">#define SCOPED_TIMER(c)</div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8h_source.html#l00053">runtime-profile.h:53</a></div></div>
<div class="ttc" id="classimpala_1_1RawValue_html_ade36561a0ea59fde387ff49f6565c6ec"><div class="ttname"><a href="classimpala_1_1RawValue.html#ade36561a0ea59fde387ff49f6565c6ec">impala::RawValue::GetHashValueFnv</a></div><div class="ttdeci">static uint32_t GetHashValueFnv(const void *v, const ColumnType &amp;type, uint32_t seed)</div><div class="ttdef"><b>Definition:</b> <a href="raw-value_8h_source.html#l00196">raw-value.h:196</a></div></div>
<div class="ttc" id="classimpala_1_1Expr_html_a70300b0ba7c91fddb9adfaeb0dfc09a5"><div class="ttname"><a href="classimpala_1_1Expr.html#a70300b0ba7c91fddb9adfaeb0dfc09a5">impala::Expr::Close</a></div><div class="ttdeci">static void Close(const std::vector&lt; ExprContext * &gt; &amp;ctxs, RuntimeState *state)</div><div class="ttdoc">Convenience function for closing multiple expr trees. </div></div>
<div class="ttc" id="classimpala_1_1Tuple_html_a7bd61a6c3fe36dc122b94d5f1f2ecac4"><div class="ttname"><a href="classimpala_1_1Tuple.html#a7bd61a6c3fe36dc122b94d5f1f2ecac4">impala::Tuple::DeepCopy</a></div><div class="ttdeci">Tuple * DeepCopy(const TupleDescriptor &amp;desc, MemPool *pool, bool convert_ptrs=false)</div><div class="ttdef"><b>Definition:</b> <a href="tuple_8cc_source.html#l00034">tuple.cc:34</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a29aee1e5cf5534019c438874836c248b"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a29aee1e5cf5534019c438874836c248b">impala::DataStreamSender::row_desc_</a></div><div class="ttdeci">const RowDescriptor &amp; row_desc_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00099">data-stream-sender.h:99</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html">impala::DataStreamSender::Channel</a></div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00057">data-stream-sender.cc:57</a></div></div>
<div class="ttc" id="classimpala_1_1Status_html"><div class="ttname"><a href="classimpala_1_1Status.html">impala::Status</a></div><div class="ttdef"><b>Definition:</b> <a href="status_8h_source.html#l00081">status.h:81</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a9f7b040146c91730872113ea0d8dba4e"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a9f7b040146c91730872113ea0d8dba4e">impala::DataStreamSender::Channel::row_desc_</a></div><div class="ttdeci">const RowDescriptor &amp; row_desc_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00113">data-stream-sender.cc:113</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_adbff8212a64611b75c045aef09b8e2aa"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#adbff8212a64611b75c045aef09b8e2aa">impala::DataStreamSender::Channel::parent_</a></div><div class="ttdeci">DataStreamSender * parent_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00108">data-stream-sender.cc:108</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a33cfb26543180cf9e27ef48595719135"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a33cfb26543180cf9e27ef48595719135">impala::DataStreamSender::thrift_batch2_</a></div><div class="ttdeci">TRowBatch thrift_batch2_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00110">data-stream-sender.h:110</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a9f6dcc5112b566649c335a589814edb3"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a9f6dcc5112b566649c335a589814edb3">impala::DataStreamSender::Channel::thrift_batch_</a></div><div class="ttdeci">TRowBatch thrift_batch_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00123">data-stream-sender.cc:123</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeState_html_aaf076b1879d9e712b08ba3a6e24607b9"><div class="ttname"><a href="classimpala_1_1RuntimeState.html#aaf076b1879d9e712b08ba3a6e24607b9">impala::RuntimeState::LogError</a></div><div class="ttdeci">bool LogError(const ErrorMsg &amp;msg)</div><div class="ttdef"><b>Definition:</b> <a href="runtime-state_8cc_source.html#l00224">runtime-state.cc:224</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeState_html"><div class="ttname"><a href="classimpala_1_1RuntimeState.html">impala::RuntimeState</a></div><div class="ttdef"><b>Definition:</b> <a href="runtime-state_8h_source.html#l00069">runtime-state.h:69</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_aa7c52712676f0849f95b4913a50432c8"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#aa7c52712676f0849f95b4913a50432c8">impala::DataStreamSender::channels_</a></div><div class="ttdeci">std::vector&lt; Channel * &gt; channels_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00114">data-stream-sender.h:114</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_ad48f0843f4e4db5ed5abf45c68796b0b"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#ad48f0843f4e4db5ed5abf45c68796b0b">impala::DataStreamSender::current_channel_idx_</a></div><div class="ttdeci">int current_channel_idx_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00102">data-stream-sender.h:102</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a1e62d5e2dfd00539a3e5ad5da86b62cf"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e62d5e2dfd00539a3e5ad5da86b62cf">impala::DataStreamSender::Channel::fragment_instance_id_</a></div><div class="ttdeci">TUniqueId fragment_instance_id_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00115">data-stream-sender.cc:115</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_abbbddecac6016f222fb67ba610050385"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#abbbddecac6016f222fb67ba610050385">impala::DataStreamSender::SerializeBatch</a></div><div class="ttdeci">void SerializeBatch(RowBatch *src, TRowBatch *dest, int num_receivers=1)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00459">data-stream-sender.cc:459</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html">impala::DataStreamSender</a></div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00046">data-stream-sender.h:46</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a9b36b9b654831d4757e22f131196343b"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a9b36b9b654831d4757e22f131196343b">impala::DataStreamSender::Send</a></div><div class="ttdeci">virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00401">data-stream-sender.cc:401</a></div></div>
<div class="ttc" id="classimpala_1_1ThreadPool_html"><div class="ttname"><a href="classimpala_1_1ThreadPool.html">impala::ThreadPool&lt; TRowBatch * &gt;</a></div></div>
<div class="ttc" id="classimpala_1_1ClientConnection_html_aadbbddffff691a67309e0e2f784134ce"><div class="ttname"><a href="classimpala_1_1ClientConnection.html#aadbbddffff691a67309e0e2f784134ce">impala::ClientConnection::DoRpc</a></div><div class="ttdeci">Status DoRpc(const F &amp;f, const Request &amp;request, Response *response)</div><div class="ttdef"><b>Definition:</b> <a href="client-cache_8h_source.html#l00225">client-cache.h:225</a></div></div>
<div class="ttc" id="debug-util_8h_html"><div class="ttname"><a href="debug-util_8h.html">debug-util.h</a></div></div>
<div class="ttc" id="expr-benchmark_8cc_html_a3a5de7bd423fbc0afc4cf935c166ca6b"><div class="ttname"><a href="expr-benchmark_8cc.html#a3a5de7bd423fbc0afc4cf935c166ca6b">pool</a></div><div class="ttdeci">ObjectPool pool</div><div class="ttdef"><b>Definition:</b> <a href="expr-benchmark_8cc_source.html#l00089">expr-benchmark.cc:89</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a1e78160dbfe7db64784d0b469a0b1c2c"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a1e78160dbfe7db64784d0b469a0b1c2c">impala::DataStreamSender::Channel::thrift_batch</a></div><div class="ttdeci">TRowBatch * thrift_batch()</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00105">data-stream-sender.cc:105</a></div></div>
<div class="ttc" id="thrift-util_8h_html"><div class="ttname"><a href="thrift-util_8h.html">thrift-util.h</a></div></div>
<div class="ttc" id="runtime-profile_8h_html_aac035c52016117af399543521f069c51"><div class="ttname"><a href="runtime-profile_8h.html#aac035c52016117af399543521f069c51">ADD_COUNTER</a></div><div class="ttdeci">#define ADD_COUNTER(profile, name, unit)</div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8h_source.html#l00047">runtime-profile.h:47</a></div></div>
<div class="ttc" id="classimpala_1_1RowBatch_html_af12857237879012707070a5deb7a529f"><div class="ttname"><a href="classimpala_1_1RowBatch.html#af12857237879012707070a5deb7a529f">impala::RowBatch::GetBatchSize</a></div><div class="ttdeci">static int GetBatchSize(const TRowBatch &amp;batch)</div><div class="ttdoc">Utility function: returns total size of batch. </div><div class="ttdef"><b>Definition:</b> <a href="row-batch_8cc_source.html#l00264">row-batch.cc:264</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_ae93db027f9d0cb4c50e8dd962d92a624"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#ae93db027f9d0cb4c50e8dd962d92a624">impala::DataStreamSender::GetNumDataBytesSent</a></div><div class="ttdeci">int64_t GetNumDataBytesSent() const </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00469">data-stream-sender.cc:469</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a9412386c8008dc38dc6e7c84d100f41c"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a9412386c8008dc38dc6e7c84d100f41c">impala::DataStreamSender::Channel::rpc_thread_lock_</a></div><div class="ttdeci">mutex rpc_thread_lock_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00133">data-stream-sender.cc:133</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a9d54e519344431380d62139e583bb6be"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a9d54e519344431380d62139e583bb6be">impala::DataStreamSender::~DataStreamSender</a></div><div class="ttdeci">virtual ~DataStreamSender()</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00354">data-stream-sender.cc:354</a></div></div>
<div class="ttc" id="logging_8h_html_a6ccc2106c47622db4e52a401a415fc58"><div class="ttname"><a href="logging_8h.html#a6ccc2106c47622db4e52a401a415fc58">VLOG_ROW</a></div><div class="ttdeci">#define VLOG_ROW</div><div class="ttdef"><b>Definition:</b> <a href="logging_8h_source.html#l00059">logging.h:59</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a54b74afd5299ec9465d49cc748485d00"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a54b74afd5299ec9465d49cc748485d00">impala::DataStreamSender::Close</a></div><div class="ttdeci">virtual void Close(RuntimeState *state)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00450">data-stream-sender.cc:450</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a5001d5f3c979d1ac348e964d032d6ce6"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a5001d5f3c979d1ac348e964d032d6ce6">impala::DataStreamSender::broadcast_</a></div><div class="ttdeci">bool broadcast_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00100">data-stream-sender.h:100</a></div></div>
<div class="ttc" id="classimpala_1_1MemTracker_html"><div class="ttname"><a href="classimpala_1_1MemTracker.html">impala::MemTracker</a></div><div class="ttdoc">This class is thread-safe. </div><div class="ttdef"><b>Definition:</b> <a href="mem-tracker_8h_source.html#l00061">mem-tracker.h:61</a></div></div>
<div class="ttc" id="runtime-state_8h_html"><div class="ttname"><a href="runtime-state_8h.html">runtime-state.h</a></div></div>
<div class="ttc" id="classimpala_1_1RowBatch_html"><div class="ttname"><a href="classimpala_1_1RowBatch.html">impala::RowBatch</a></div><div class="ttdef"><b>Definition:</b> <a href="row-batch_8h_source.html#l00066">row-batch.h:66</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_aeb6427e9059b1b7384f78cd25b8cd8d7"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#aeb6427e9059b1b7384f78cd25b8cd8d7">impala::DataStreamSender::overall_throughput_</a></div><div class="ttdeci">RuntimeProfile::Counter * overall_throughput_</div><div class="ttdoc">Throughput per total time spent in sender. </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00127">data-stream-sender.h:127</a></div></div>
<div class="ttc" id="namespaceimpala_html_aa98447566dd6700a2faaaaf3059f4d95"><div class="ttname"><a href="namespaceimpala.html#aa98447566dd6700a2faaaaf3059f4d95">impala::row_desc</a></div><div class="ttdeci">const RowDescriptor &amp; row_desc() const </div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a425142ace64ba49e8122f069c79603e0"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a425142ace64ba49e8122f069c79603e0">impala::DataStreamSender::closed_</a></div><div class="ttdeci">bool closed_</div><div class="ttdoc">If true, this sender has been closed. Not valid to call Send() anymore. </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00105">data-stream-sender.h:105</a></div></div>
<div class="ttc" id="classimpala_1_1HashUtil_html_ad47e5dcaf416f05760be732dfbc66651"><div class="ttname"><a href="classimpala_1_1HashUtil.html#ad47e5dcaf416f05760be732dfbc66651">impala::HashUtil::FNV_SEED</a></div><div class="ttdeci">static const uint32_t FNV_SEED</div><div class="ttdef"><b>Definition:</b> <a href="hash-util_8h_source.html#l00099">hash-util.h:99</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_ac212459ad24f6386484ee23856d8020a"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#ac212459ad24f6386484ee23856d8020a">impala::DataStreamSender::state_</a></div><div class="ttdeci">RuntimeState * state_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00097">data-stream-sender.h:97</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a9c12706a6222a87cb545099a725c3a2b"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a9c12706a6222a87cb545099a725c3a2b">impala::DataStreamSender::profile_</a></div><div class="ttdeci">RuntimeProfile * profile_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00116">data-stream-sender.h:116</a></div></div>
<div class="ttc" id="network-util_8h_html"><div class="ttname"><a href="network-util_8h.html">network-util.h</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeState_html_ad6c837d1a8e2d4a995aa00a4df7c5af5"><div class="ttname"><a href="classimpala_1_1RuntimeState.html#ad6c837d1a8e2d4a995aa00a4df7c5af5">impala::RuntimeState::instance_mem_tracker</a></div><div class="ttdeci">MemTracker * instance_mem_tracker()</div><div class="ttdef"><b>Definition:</b> <a href="runtime-state_8h_source.html#l00140">runtime-state.h:140</a></div></div>
<div class="ttc" id="classimpala_1_1RowDescriptor_html_a3d57439c68d08fe374c33370529a855c"><div class="ttname"><a href="classimpala_1_1RowDescriptor.html#a3d57439c68d08fe374c33370529a855c">impala::RowDescriptor::tuple_descriptors</a></div><div class="ttdeci">const std::vector&lt; TupleDescriptor * &gt; &amp; tuple_descriptors() const </div><div class="ttdoc">Return descriptors for all tuples in this row, in order of appearance. </div><div class="ttdef"><b>Definition:</b> <a href="descriptors_8h_source.html#l00412">descriptors.h:412</a></div></div>
<div class="ttc" id="classimpala_1_1RowBatch_html_a5a7ff7bf9d6bd28990eedab985477390"><div class="ttname"><a href="classimpala_1_1RowBatch.html#a5a7ff7bf9d6bd28990eedab985477390">impala::RowBatch::Serialize</a></div><div class="ttdeci">int Serialize(TRowBatch *output_batch)</div><div class="ttdef"><b>Definition:</b> <a href="row-batch_8cc_source.html#l00147">row-batch.cc:147</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a628bdb9621325d09e0d23d0c82c8b651"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a628bdb9621325d09e0d23d0c82c8b651">impala::DataStreamSender::partition_expr_ctxs_</a></div><div class="ttdeci">std::vector&lt; ExprContext * &gt; partition_expr_ctxs_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00113">data-stream-sender.h:113</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a267f05a34f49154c4213778f69ca0e02"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a267f05a34f49154c4213778f69ca0e02">impala::DataStreamSender::serialize_batch_timer_</a></div><div class="ttdeci">RuntimeProfile::Counter * serialize_batch_timer_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00117">data-stream-sender.h:117</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeProfile_html"><div class="ttname"><a href="classimpala_1_1RuntimeProfile.html">impala::RuntimeProfile</a></div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8h_source.html#l00083">runtime-profile.h:83</a></div></div>
<div class="ttc" id="classimpala_1_1TupleRow_html_aaf1d69b97c90eec7da3d07bdc40f5665"><div class="ttname"><a href="classimpala_1_1TupleRow.html#aaf1d69b97c90eec7da3d07bdc40f5665">impala::TupleRow::SetTuple</a></div><div class="ttdeci">void SetTuple(int tuple_idx, Tuple *tuple)</div><div class="ttdef"><b>Definition:</b> <a href="tuple-row_8h_source.html#l00034">tuple-row.h:34</a></div></div>
<div class="ttc" id="classimpala_1_1Expr_html_a742827844080d45d514719742cb3e7f5"><div class="ttname"><a href="classimpala_1_1Expr.html#a742827844080d45d514719742cb3e7f5">impala::Expr::type</a></div><div class="ttdeci">const ColumnType &amp; type() const </div><div class="ttdef"><b>Definition:</b> <a href="expr_8h_source.html#l00145">expr.h:145</a></div></div>
<div class="ttc" id="namespaceimpala_html_ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898"><div class="ttname"><a href="namespaceimpala.html#ac55a10ca0171687156609e8d6ba28127a45b8e4259988c3518a05b9202efb0898">impala::OK</a></div><div class="ttdef"><b>Definition:</b> <a href="webserver_8cc_source.html#l00115">webserver.cc:115</a></div></div>
<div class="ttc" id="classimpala_1_1ExprContext_html_a92f9a380825bf39ff33b5b3168bc377a"><div class="ttname"><a href="classimpala_1_1ExprContext.html#a92f9a380825bf39ff33b5b3168bc377a">impala::ExprContext::FreeLocalAllocations</a></div><div class="ttdeci">void FreeLocalAllocations()</div><div class="ttdef"><b>Definition:</b> <a href="expr-context_8cc_source.html#l00109">expr-context.cc:109</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeState_html_acb0becec52f772879949545374280774"><div class="ttname"><a href="classimpala_1_1RuntimeState.html#acb0becec52f772879949545374280774">impala::RuntimeState::impalad_client_cache</a></div><div class="ttdeci">ImpalaInternalServiceClientCache * impalad_client_cache()</div><div class="ttdef"><b>Definition:</b> <a href="runtime-state_8h_source.html#l00133">runtime-state.h:133</a></div></div>
<div class="ttc" id="compiler-util_8h_html_a9acc330d508b9a3b775cfdf7ce405e7d"><div class="ttname"><a href="compiler-util_8h.html#a9acc330d508b9a3b775cfdf7ce405e7d">UNLIKELY</a></div><div class="ttdeci">#define UNLIKELY(expr)</div><div class="ttdef"><b>Definition:</b> <a href="compiler-util_8h_source.html#l00033">compiler-util.h:33</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a78b2b85b58341c703f2ae9d1492fc0c2"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a78b2b85b58341c703f2ae9d1492fc0c2">impala::DataStreamSender::pool_</a></div><div class="ttdeci">ObjectPool * pool_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00098">data-stream-sender.h:98</a></div></div>
<div class="ttc" id="data-stream-sender_8h_html"><div class="ttname"><a href="data-stream-sender_8h.html">data-stream-sender.h</a></div></div>
<div class="ttc" id="classimpala_1_1Status_html_a580565665ea944eb64f3f495b1bee1e0"><div class="ttname"><a href="classimpala_1_1Status.html#a580565665ea944eb64f3f495b1bee1e0">impala::Status::OK</a></div><div class="ttdeci">static const Status OK</div><div class="ttdef"><b>Definition:</b> <a href="status_8h_source.html#l00087">status.h:87</a></div></div>
<div class="ttc" id="logging_8h_html_a24efb6b382d3e1bdd7817e5e31dc5c1d"><div class="ttname"><a href="logging_8h.html#a24efb6b382d3e1bdd7817e5e31dc5c1d">VLOG_RPC</a></div><div class="ttdeci">#define VLOG_RPC</div><div class="ttdef"><b>Definition:</b> <a href="logging_8h_source.html#l00056">logging.h:56</a></div></div>
<div class="ttc" id="namespaceimpala_html_ad393a2093952c5b6f3a61bd3e1302e61"><div class="ttname"><a href="namespaceimpala.html#ad393a2093952c5b6f3a61bd3e1302e61">impala::row_desc_</a></div><div class="ttdeci">const RowDescriptor * row_desc_</div><div class="ttdoc">owned by plan root, which resides in runtime_state_&#39;s pool </div><div class="ttdef"><b>Definition:</b> <a href="coordinator_8h_source.html#l00255">coordinator.h:255</a></div></div>
<div class="ttc" id="expr_8h_html"><div class="ttname"><a href="expr_8h.html">expr.h</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_af88aec7b54580fa5254895fae593f9ee"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#af88aec7b54580fa5254895fae593f9ee">impala::DataStreamSender::current_thrift_batch_</a></div><div class="ttdeci">TRowBatch * current_thrift_batch_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00111">data-stream-sender.h:111</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a946eb95314825d434a918ee858f6e5ca"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a946eb95314825d434a918ee858f6e5ca">impala::DataStreamSender::Channel::Channel</a></div><div class="ttdeci">Channel(DataStreamSender *parent, const RowDescriptor &amp;row_desc, const TNetworkAddress &amp;destination, const TUniqueId &amp;fragment_instance_id, PlanNodeId dest_node_id, int buffer_size)</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00063">data-stream-sender.cc:63</a></div></div>
<div class="ttc" id="names_8h_html"><div class="ttname"><a href="names_8h.html">names.h</a></div></div>
<div class="ttc" id="classimpala_1_1ObjectPool_html_ab191078c99825682a0a9915ca1e0c05c"><div class="ttname"><a href="classimpala_1_1ObjectPool.html#ab191078c99825682a0a9915ca1e0c05c">impala::ObjectPool::Add</a></div><div class="ttdeci">T * Add(T *t)</div><div class="ttdef"><b>Definition:</b> <a href="object-pool_8h_source.html#l00042">object-pool.h:42</a></div></div>
<div class="ttc" id="classimpala_1_1Expr_html_a6b9c01a432ae49b57482c9fc42a73681"><div class="ttname"><a href="classimpala_1_1Expr.html#a6b9c01a432ae49b57482c9fc42a73681">impala::Expr::CreateExprTrees</a></div><div class="ttdeci">static Status CreateExprTrees(ObjectPool *pool, const std::vector&lt; TExpr &gt; &amp;texprs, std::vector&lt; ExprContext * &gt; *ctxs)</div><div class="ttdef"><b>Definition:</b> <a href="expr_8cc_source.html#l00149">expr.cc:149</a></div></div>
<div class="ttc" id="tuple-row_8h_html"><div class="ttname"><a href="tuple-row_8h.html">tuple-row.h</a></div></div>
<div class="ttc" id="classimpala_1_1Status_html_afe14635279d3cd633d08ecbadee51019"><div class="ttname"><a href="classimpala_1_1Status.html#afe14635279d3cd633d08ecbadee51019">impala::Status::msg</a></div><div class="ttdeci">const ErrorMsg &amp; msg() const </div><div class="ttdoc">Returns the error message associated with a non-successful status. </div><div class="ttdef"><b>Definition:</b> <a href="status_8h_source.html#l00189">status.h:189</a></div></div>
<div class="ttc" id="classimpala_1_1Expr_html_a3ae02e50debba50ac5c7b6dd9f8016e7"><div class="ttname"><a href="classimpala_1_1Expr.html#a3ae02e50debba50ac5c7b6dd9f8016e7">impala::Expr::Prepare</a></div><div class="ttdeci">static Status Prepare(const std::vector&lt; ExprContext * &gt; &amp;ctxs, RuntimeState *state, const RowDescriptor &amp;row_desc, MemTracker *tracker)</div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a03c516a8696979f9c05dc7cc2819219e"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a03c516a8696979f9c05dc7cc2819219e">impala::DataStreamSender::Channel::dest_node_id_</a></div><div class="ttdeci">PlanNodeId dest_node_id_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00116">data-stream-sender.cc:116</a></div></div>
<div class="ttc" id="thrift-client_8h_html"><div class="ttname"><a href="thrift-client_8h.html">thrift-client.h</a></div></div>
<div class="ttc" id="classimpala_1_1ClientCache_html"><div class="ttname"><a href="classimpala_1_1ClientCache.html">impala::ClientCache</a></div><div class="ttdef"><b>Definition:</b> <a href="client-cache_8h_source.html#l00187">client-cache.h:187</a></div></div>
<div class="ttc" id="classimpala_1_1Status_html_a95ba859e42fe93445b340533220836ac"><div class="ttname"><a href="classimpala_1_1Status.html#a95ba859e42fe93445b340533220836ac">impala::Status::ok</a></div><div class="ttdeci">bool ok() const </div><div class="ttdef"><b>Definition:</b> <a href="status_8h_source.html#l00172">status.h:172</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_ab51ff8c9b206da41c47ade235d923550"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#ab51ff8c9b206da41c47ade235d923550">impala::DataStreamSender::Channel::num_data_bytes_sent</a></div><div class="ttdeci">int64_t num_data_bytes_sent() const </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00104">data-stream-sender.cc:104</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a6578f1decfa5bfb55201269d6ea1a229"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a6578f1decfa5bfb55201269d6ea1a229">impala::DataStreamSender::Channel::rpc_in_flight_</a></div><div class="ttdeci">bool rpc_in_flight_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00134">data-stream-sender.cc:134</a></div></div>
<div class="ttc" id="descriptors_8h_html"><div class="ttname"><a href="descriptors_8h.html">descriptors.h</a></div></div>
<div class="ttc" id="classimpala_1_1ExprContext_html_a2e652a852e99d5cd375dce3f91520aee"><div class="ttname"><a href="classimpala_1_1ExprContext.html#a2e652a852e99d5cd375dce3f91520aee">impala::ExprContext::root</a></div><div class="ttdeci">Expr * root()</div><div class="ttdef"><b>Definition:</b> <a href="expr-context_8h_source.html#l00105">expr-context.h:105</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_ae79c8af712dd169b821cdb61b761a9bd"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#ae79c8af712dd169b821cdb61b761a9bd">impala::DataStreamSender::mem_tracker_</a></div><div class="ttdeci">boost::scoped_ptr&lt; MemTracker &gt; mem_tracker_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00121">data-stream-sender.h:121</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a620e18f5af05d18ccb20294fe8240bd6"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a620e18f5af05d18ccb20294fe8240bd6">impala::DataStreamSender::Channel::rpc_thread_</a></div><div class="ttdeci">ThreadPool&lt; TRowBatch * &gt; rpc_thread_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00131">data-stream-sender.cc:131</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_1_1Channel_html_a4f74ca65ead877aaa6768c5d6525603e"><div class="ttname"><a href="classimpala_1_1DataStreamSender_1_1Channel.html#a4f74ca65ead877aaa6768c5d6525603e">impala::DataStreamSender::Channel::num_data_bytes_sent_</a></div><div class="ttdeci">int64_t num_data_bytes_sent_</div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8cc_source.html#l00119">data-stream-sender.cc:119</a></div></div>
<div class="ttc" id="classimpala_1_1DataStreamSender_html_a0da91963ba5b4e8299392661e8215cdd"><div class="ttname"><a href="classimpala_1_1DataStreamSender.html#a0da91963ba5b4e8299392661e8215cdd">impala::DataStreamSender::dest_node_id_</a></div><div class="ttdeci">PlanNodeId dest_node_id_</div><div class="ttdoc">Identifier of the destination plan node. </div><div class="ttdef"><b>Definition:</b> <a href="data-stream-sender_8h_source.html#l00130">data-stream-sender.h:130</a></div></div>
<div class="ttc" id="classimpala_1_1RuntimeProfile_html_abd28edd77ba3928dbac3af735102fb3b"><div class="ttname"><a href="classimpala_1_1RuntimeProfile.html#abd28edd77ba3928dbac3af735102fb3b">impala::RuntimeProfile::total_time_counter</a></div><div class="ttdeci">Counter * total_time_counter()</div><div class="ttdoc">Returns the counter for the total elapsed time. </div><div class="ttdef"><b>Definition:</b> <a href="runtime-profile_8h_source.html#l00453">runtime-profile.h:453</a></div></div>
</div><!-- fragment --></div><!-- contents -->
</div><!-- doc-content -->
<!-- start footer part -->
<div id="nav-path" class="navpath"><!-- id is needed for treeview function! -->
<ul>
<li class="navelem"><a class="el" href="dir_e5d120be6b5e8a44336cbfd013b25604.html">be</a></li><li class="navelem"><a class="el" href="dir_68cf7cafb51a962d5bc4848b83cab0de.html">src</a></li><li class="navelem"><a class="el" href="dir_1ef28ab5153adb64262182d038c65543.html">runtime</a></li><li class="navelem"><a class="el" href="data-stream-sender_8cc.html">data-stream-sender.cc</a></li>
<li class="footer">Generated on Thu May 7 2015 16:10:36 for Impala by
<a href="http://www.doxygen.org/index.html">
<img class="footer" src="doxygen.png" alt="doxygen"/></a> 1.8.6 </li>
</ul>
</div>
</body>
</html>