blob: d08ea68eeb62b8ba31664b4a1f47d75231f4734c [file] [log] [blame]
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"><html xmlns="http://www.w3.org/1999/xhtml"><head><title>R: Write the streaming SparkDataFrame to a data source.</title>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<link rel="stylesheet" type="text/css" href="R.css" />
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/styles/github.min.css">
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/highlight.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.3/languages/r.min.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
</head><body>
<table width="100%" summary="page for write.stream {SparkR}"><tr><td>write.stream {SparkR}</td><td style="text-align: right;">R Documentation</td></tr></table>
<h2>Write the streaming SparkDataFrame to a data source.</h2>
<h3>Description</h3>
<p>The data source is specified by the <code>source</code> and a set of options (...).
If <code>source</code> is not specified, the default data source configured by
spark.sql.sources.default will be used.
</p>
<h3>Usage</h3>
<pre>
write.stream(df, source = NULL, outputMode = NULL, ...)
## S4 method for signature 'SparkDataFrame'
write.stream(
df,
source = NULL,
outputMode = NULL,
partitionBy = NULL,
trigger.processingTime = NULL,
trigger.once = NULL,
...
)
</pre>
<h3>Arguments</h3>
<table summary="R argblock">
<tr valign="top"><td><code>df</code></td>
<td>
<p>a streaming SparkDataFrame.</p>
</td></tr>
<tr valign="top"><td><code>source</code></td>
<td>
<p>a name for external data source.</p>
</td></tr>
<tr valign="top"><td><code>outputMode</code></td>
<td>
<p>one of 'append', 'complete', 'update'.</p>
</td></tr>
<tr valign="top"><td><code>...</code></td>
<td>
<p>additional external data source specific named options.</p>
</td></tr>
<tr valign="top"><td><code>partitionBy</code></td>
<td>
<p>a name or a list of names of columns to partition the output by on the file
system. If specified, the output is laid out on the file system similar to Hive's
partitioning scheme.</p>
</td></tr>
<tr valign="top"><td><code>trigger.processingTime</code></td>
<td>
<p>a processing time interval as a string, e.g. '5 seconds',
'1 minute'. This is a trigger that runs a query periodically based on the processing
time. If value is '0 seconds', the query will run as fast as possible, this is the
default. Only one trigger can be set.</p>
</td></tr>
<tr valign="top"><td><code>trigger.once</code></td>
<td>
<p>a logical, must be set to <code>TRUE</code>. This is a trigger that processes only
one batch of data in a streaming query then terminates the query. Only one trigger can be
set.</p>
</td></tr>
</table>
<h3>Details</h3>
<p>Additionally, <code>outputMode</code> specifies how data of a streaming SparkDataFrame is written to a
output data source. There are three modes:
</p>
<ul>
<li><p> append: Only the new rows in the streaming SparkDataFrame will be written out. This
output mode can be only be used in queries that do not contain any aggregation.
</p>
</li>
<li><p> complete: All the rows in the streaming SparkDataFrame will be written out every time
there are some updates. This output mode can only be used in queries that
contain aggregations.
</p>
</li>
<li><p> update: Only the rows that were updated in the streaming SparkDataFrame will be written
out every time there are some updates. If the query doesn't contain aggregations,
it will be equivalent to <code>append</code> mode.
</p>
</li></ul>
<h3>Note</h3>
<p>write.stream since 2.2.0
</p>
<p>experimental
</p>
<h3>See Also</h3>
<p><a href="read.stream.html">read.stream</a>
</p>
<p>Other SparkDataFrame functions:
<code><a href="SparkDataFrame.html">SparkDataFrame-class</a></code>,
<code><a href="summarize.html">agg</a>()</code>,
<code><a href="alias.html">alias</a>()</code>,
<code><a href="arrange.html">arrange</a>()</code>,
<code><a href="as.data.frame.html">as.data.frame</a>()</code>,
<code><a href="attach.html">attach,SparkDataFrame-method</a></code>,
<code><a href="broadcast.html">broadcast</a>()</code>,
<code><a href="cache.html">cache</a>()</code>,
<code><a href="checkpoint.html">checkpoint</a>()</code>,
<code><a href="coalesce.html">coalesce</a>()</code>,
<code><a href="collect.html">collect</a>()</code>,
<code><a href="columns.html">colnames</a>()</code>,
<code><a href="coltypes.html">coltypes</a>()</code>,
<code><a href="createOrReplaceTempView.html">createOrReplaceTempView</a>()</code>,
<code><a href="crossJoin.html">crossJoin</a>()</code>,
<code><a href="cube.html">cube</a>()</code>,
<code><a href="dapplyCollect.html">dapplyCollect</a>()</code>,
<code><a href="dapply.html">dapply</a>()</code>,
<code><a href="describe.html">describe</a>()</code>,
<code><a href="dim.html">dim</a>()</code>,
<code><a href="distinct.html">distinct</a>()</code>,
<code><a href="dropDuplicates.html">dropDuplicates</a>()</code>,
<code><a href="nafunctions.html">dropna</a>()</code>,
<code><a href="drop.html">drop</a>()</code>,
<code><a href="dtypes.html">dtypes</a>()</code>,
<code><a href="exceptAll.html">exceptAll</a>()</code>,
<code><a href="except.html">except</a>()</code>,
<code><a href="explain.html">explain</a>()</code>,
<code><a href="filter.html">filter</a>()</code>,
<code><a href="first.html">first</a>()</code>,
<code><a href="gapplyCollect.html">gapplyCollect</a>()</code>,
<code><a href="gapply.html">gapply</a>()</code>,
<code><a href="getNumPartitions.html">getNumPartitions</a>()</code>,
<code><a href="groupBy.html">group_by</a>()</code>,
<code><a href="head.html">head</a>()</code>,
<code><a href="hint.html">hint</a>()</code>,
<code><a href="histogram.html">histogram</a>()</code>,
<code><a href="insertInto.html">insertInto</a>()</code>,
<code><a href="intersectAll.html">intersectAll</a>()</code>,
<code><a href="intersect.html">intersect</a>()</code>,
<code><a href="isLocal.html">isLocal</a>()</code>,
<code><a href="isStreaming.html">isStreaming</a>()</code>,
<code><a href="join.html">join</a>()</code>,
<code><a href="limit.html">limit</a>()</code>,
<code><a href="localCheckpoint.html">localCheckpoint</a>()</code>,
<code><a href="merge.html">merge</a>()</code>,
<code><a href="mutate.html">mutate</a>()</code>,
<code><a href="ncol.html">ncol</a>()</code>,
<code><a href="nrow.html">nrow</a>()</code>,
<code><a href="persist.html">persist</a>()</code>,
<code><a href="printSchema.html">printSchema</a>()</code>,
<code><a href="randomSplit.html">randomSplit</a>()</code>,
<code><a href="rbind.html">rbind</a>()</code>,
<code><a href="rename.html">rename</a>()</code>,
<code><a href="repartitionByRange.html">repartitionByRange</a>()</code>,
<code><a href="repartition.html">repartition</a>()</code>,
<code><a href="rollup.html">rollup</a>()</code>,
<code><a href="sample.html">sample</a>()</code>,
<code><a href="saveAsTable.html">saveAsTable</a>()</code>,
<code><a href="schema.html">schema</a>()</code>,
<code><a href="selectExpr.html">selectExpr</a>()</code>,
<code><a href="select.html">select</a>()</code>,
<code><a href="showDF.html">showDF</a>()</code>,
<code><a href="show.html">show</a>()</code>,
<code><a href="storageLevel.html">storageLevel</a>()</code>,
<code><a href="str.html">str</a>()</code>,
<code><a href="subset.html">subset</a>()</code>,
<code><a href="summary.html">summary</a>()</code>,
<code><a href="take.html">take</a>()</code>,
<code><a href="toJSON.html">toJSON</a>()</code>,
<code><a href="unionByName.html">unionByName</a>()</code>,
<code><a href="union.html">union</a>()</code>,
<code><a href="unpersist.html">unpersist</a>()</code>,
<code><a href="withColumn.html">withColumn</a>()</code>,
<code><a href="withWatermark.html">withWatermark</a>()</code>,
<code><a href="with.html">with</a>()</code>,
<code><a href="write.df.html">write.df</a>()</code>,
<code><a href="write.jdbc.html">write.jdbc</a>()</code>,
<code><a href="write.json.html">write.json</a>()</code>,
<code><a href="write.orc.html">write.orc</a>()</code>,
<code><a href="write.parquet.html">write.parquet</a>()</code>,
<code><a href="write.text.html">write.text</a>()</code>
</p>
<h3>Examples</h3>
<pre><code class="r">## Not run:
##D sparkR.session()
##D df &lt;- read.stream(&quot;socket&quot;, host = &quot;localhost&quot;, port = 9999)
##D isStreaming(df)
##D wordCounts &lt;- count(group_by(df, &quot;value&quot;))
##D
##D # console
##D q &lt;- write.stream(wordCounts, &quot;console&quot;, outputMode = &quot;complete&quot;)
##D # text stream
##D q &lt;- write.stream(df, &quot;text&quot;, path = &quot;/home/user/out&quot;, checkpointLocation = &quot;/home/user/cp&quot;
##D partitionBy = c(&quot;year&quot;, &quot;month&quot;), trigger.processingTime = &quot;30 seconds&quot;)
##D # memory stream
##D q &lt;- write.stream(wordCounts, &quot;memory&quot;, queryName = &quot;outs&quot;, outputMode = &quot;complete&quot;)
##D head(sql(&quot;SELECT * from outs&quot;))
##D queryName(q)
##D
##D stopQuery(q)
## End(Not run)
</code></pre>
<hr /><div style="text-align: center;">[Package <em>SparkR</em> version 2.4.7 <a href="00Index.html">Index</a>]</div>
</body></html>