| |
| |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <title>Spark Data Source for Apache CouchDB/Cloudant</title> |
| <meta name="description" content="Spark Data Source for Apache CouchDB/Cloudant"> |
| <meta name="author" content=""> |
| |
| <!-- Enable responsive viewport --> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <!-- Le HTML5 shim, for IE6-8 support of HTML elements --> |
| <!--[if lt IE 9]> |
| <script src="http://html5shim.googlecode.com/svn/trunk/html5.js"></script> |
| <![endif]--> |
| |
| <!-- Le styles --> |
| <link href="/assets/themes/apache-clean/bootstrap/css/bootstrap.css" rel="stylesheet"> |
| <link href="/assets/themes/apache-clean/css/style.css?body=1" rel="stylesheet" type="text/css"> |
| <link href="/assets/themes/apache-clean/css/syntax.css" rel="stylesheet" type="text/css" media="screen" /> |
| <!-- Le fav and touch icons --> |
| <!-- Update these with your own images |
| <link rel="shortcut icon" href="images/favicon.ico"> |
| <link rel="apple-touch-icon" href="images/apple-touch-icon.png"> |
| <link rel="apple-touch-icon" sizes="72x72" href="images/apple-touch-icon-72x72.png"> |
| <link rel="apple-touch-icon" sizes="114x114" href="images/apple-touch-icon-114x114.png"> |
| --> |
| |
| <!-- make tables sortable by adding class tag "sortable" to table elements --> |
| <script src="http://www.kryogenix.org/code/browser/sorttable/sorttable.js"></script> |
| |
| |
| </head> |
| |
| <body> |
| |
| |
| |
| <!-- Navigation --> |
| <div id="nav-bar"> |
| <nav id="nav-container" class="navbar navbar-inverse " role="navigation"> |
| <div class="container"> |
| <!-- Brand and toggle get grouped for better mobile display --> |
| |
| <div class="navbar-header page-scroll"> |
| <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse"> |
| <span class="sr-only">Toggle navigation</span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <a class="navbar-brand page-scroll" href="/#home">Home</a> |
| </div> |
| <!-- Collect the nav links, forms, and other content for toggling --> |
| <nav class="navbar-collapse collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| |
| |
| |
| <li id="download"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Download<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/downloads/spark" target="_self">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="/downloads/flink" target="_self">Bahir Flink Extensions</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="community"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Community<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/community" target="_self">Get Involved</a></li> |
| |
| |
| <li><a href="/contributing" target="_self">Contributing</a></li> |
| |
| |
| <li><a href="/contributing-extensions" target="_self">Contributing Extensions</a></li> |
| |
| |
| <li><a href="https://issues.apache.org/jira/browse/BAHIR" target="_blank">Issue Tracker</a></li> |
| |
| |
| <li><a href="/community#source-code" target="_self">Source Code</a></li> |
| |
| |
| <li><a href="/community-members" target="_self">Project Committers</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="documentation"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Documentation<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="/docs/spark/overview" target="_self">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="/docs/flink/overview" target="_self">Bahir Flink Extensions</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="github"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">GitHub<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="https://github.com/apache/bahir" target="_blank">Bahir Spark Extensions</a></li> |
| |
| |
| <li><a href="https://github.com/apache/bahir-flink" target="_blank">Bahir Flink Extensions</a></li> |
| |
| |
| <li><a href="https://github.com/apache/bahir-website" target="_blank">Bahir Website</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| |
| |
| <li id="apache"> |
| |
| <a href="#" data-toggle="dropdown" class="dropdown-toggle">Apache<b class="caret"></b></a> |
| <ul class="dropdown-menu dropdown-left"> |
| |
| |
| <li><a href="http://www.apache.org/foundation/how-it-works.html" target="_blank">Apache Software Foundation</a></li> |
| |
| |
| <li><a href="http://www.apache.org/licenses/" target="_blank">Apache License</a></li> |
| |
| |
| <li><a href="http://www.apache.org/foundation/sponsorship" target="_blank">Sponsorship</a></li> |
| |
| |
| <li><a href="http://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a></li> |
| |
| |
| <li><a href="/privacy-policy" target="_self">Privacy Policy</a></li> |
| |
| </ul> |
| |
| </li> |
| |
| |
| </ul> |
| </nav><!--/.navbar-collapse --> |
| <!-- /.navbar-collapse --> |
| </div> |
| <!-- /.container --> |
| </nav> |
| </div> |
| |
| |
| <div class="container"> |
| |
| |
| |
| <!--<div class="hero-unit Spark Data Source for Apache CouchDB/Cloudant"> |
| <h1></h1> |
| </div> |
| --> |
| |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- |
| |
| --> |
| |
| <p>A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming.</p> |
| |
| <p><a href="https://cloudant.com">IBM® Cloudant®</a> is a document-oriented DataBase as a Service (DBaaS). It stores data as documents |
| in JSON format. It’s built with scalability, high availability, and durability in mind. It comes with a |
| wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and |
| geospatial indexing. The replication capabilities make it easy to keep data in sync between database |
| clusters, desktop PCs, and mobile devices.</p> |
| |
| <p><a href="http://couchdb.apache.org">Apache CouchDB™</a> is open source database software that focuses on ease of use and having an architecture that “completely embraces the Web”. It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.</p> |
| |
| <h2 id="linking">Linking</h2> |
| |
| <p>Using SBT:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.2.0" |
| </code></pre></div></div> |
| |
| <p>Using Maven:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code><dependency> |
| <groupId>org.apache.bahir</groupId> |
| <artifactId>spark-sql-cloudant_2.11</artifactId> |
| <version>2.2.0</version> |
| </dependency> |
| </code></pre></div></div> |
| |
| <p>This library can also be added to Spark jobs launched through <code class="language-plaintext highlighter-rouge">spark-shell</code> or <code class="language-plaintext highlighter-rouge">spark-submit</code> by using the <code class="language-plaintext highlighter-rouge">--packages</code> command line option.</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 |
| </code></pre></div></div> |
| |
| <p>Unlike using <code class="language-plaintext highlighter-rouge">--jars</code>, using <code class="language-plaintext highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath. |
| The <code class="language-plaintext highlighter-rouge">--packages</code> argument can also be used with <code class="language-plaintext highlighter-rouge">bin/spark-submit</code>.</p> |
| |
| <p>Submit a job in Python:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --master local[4] --jars <path to cloudant-spark.jar> <path to python script> |
| </code></pre></div></div> |
| |
| <p>Submit a job in Scala:</p> |
| |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class "<your class>" --master local[4] --jars <path to cloudant-spark.jar> <path to your app jar> |
| </code></pre></div></div> |
| |
| <p>This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.</p> |
| |
| <h2 id="configuration-options">Configuration options</h2> |
| <p>The configuration is obtained in the following sequence:</p> |
| |
| <ol> |
| <li>default in the Config, which is set in the application.conf</li> |
| <li>key in the SparkConf, which is set in SparkConf</li> |
| <li>key in the parameters, which is set in a dataframe or temporaty table options</li> |
| <li>“spark.”+key in the SparkConf (as they are treated as the one passed in through spark-submit using –conf option)</li> |
| </ol> |
| |
| <p>Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using –conf takes precedence over any setting in the code.</p> |
| |
| <h3 id="configuration-in-applicationconf">Configuration in application.conf</h3> |
| <p>Default values are defined in <a href="cloudant-spark-sql/src/main/resources/application.conf">here</a>.</p> |
| |
| <h3 id="configuration-on-sparkconf">Configuration on SparkConf</h3> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th style="text-align: center">Default</th> |
| <th>Meaning</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td>cloudant.protocol</td> |
| <td style="text-align: center">https</td> |
| <td>protocol to use to transfer data: http or https</td> |
| </tr> |
| <tr> |
| <td>cloudant.host</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant host url</td> |
| </tr> |
| <tr> |
| <td>cloudant.username</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant userid</td> |
| </tr> |
| <tr> |
| <td>cloudant.password</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant password</td> |
| </tr> |
| <tr> |
| <td>cloudant.useQuery</td> |
| <td style="text-align: center">false</td> |
| <td>By default, _all_docs endpoint is used if configuration ‘view’ and ‘index’ (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.</td> |
| </tr> |
| <tr> |
| <td>cloudant.queryLimit</td> |
| <td style="text-align: center">25</td> |
| <td>The maximum number of results returned when querying the _find endpoint.</td> |
| </tr> |
| <tr> |
| <td>jsonstore.rdd.partitions</td> |
| <td style="text-align: center">10</td> |
| <td>the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition</td> |
| </tr> |
| <tr> |
| <td>jsonstore.rdd.maxInPartition</td> |
| <td style="text-align: center">-1</td> |
| <td>the max rows in a partition. -1 means unlimited</td> |
| </tr> |
| <tr> |
| <td>jsonstore.rdd.minInPartition</td> |
| <td style="text-align: center">10</td> |
| <td>the min rows in a partition.</td> |
| </tr> |
| <tr> |
| <td>jsonstore.rdd.requestTimeout</td> |
| <td style="text-align: center">900000</td> |
| <td>the request timeout in milliseconds</td> |
| </tr> |
| <tr> |
| <td>bulkSize</td> |
| <td style="text-align: center">200</td> |
| <td>the bulk save size</td> |
| </tr> |
| <tr> |
| <td>schemaSampleSize</td> |
| <td style="text-align: center">“-1”</td> |
| <td>the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs</td> |
| </tr> |
| <tr> |
| <td>createDBOnSave</td> |
| <td style="text-align: center">“false”</td> |
| <td>whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h3 id="configuration-on-spark-sql-temporary-table-or-dataframe">Configuration on Spark SQL Temporary Table or DataFrame</h3> |
| |
| <p>Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS:</p> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th style="text-align: center">Default</th> |
| <th>Meaning</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td>database</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant database name</td> |
| </tr> |
| <tr> |
| <td>view</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant view w/o the database name. only used for load.</td> |
| </tr> |
| <tr> |
| <td>index</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.</td> |
| </tr> |
| <tr> |
| <td>path</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant: as database name if database is not present</td> |
| </tr> |
| <tr> |
| <td>schemaSampleSize</td> |
| <td style="text-align: center">“-1”</td> |
| <td>the sample size used to discover the schema for this temp table. -1 scans all documents</td> |
| </tr> |
| <tr> |
| <td>bulkSize</td> |
| <td style="text-align: center">200</td> |
| <td>the bulk save size</td> |
| </tr> |
| <tr> |
| <td>createDBOnSave</td> |
| <td style="text-align: center">“false”</td> |
| <td>whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <p>For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: <code class="language-plaintext highlighter-rouge">{id, key, value}</code>, where <code class="language-plaintext highlighter-rouge">value </code>can be a compount field. An example of loading data from a view:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')"</span><span class="p">)</span> |
| |
| </code></pre></div></div> |
| |
| <h3 id="configuration-on-cloudant-receiver-for-spark-streaming">Configuration on Cloudant Receiver for Spark Streaming</h3> |
| |
| <table> |
| <thead> |
| <tr> |
| <th>Name</th> |
| <th style="text-align: center">Default</th> |
| <th>Meaning</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td>cloudant.host</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant host url</td> |
| </tr> |
| <tr> |
| <td>cloudant.username</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant userid</td> |
| </tr> |
| <tr> |
| <td>cloudant.password</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant password</td> |
| </tr> |
| <tr> |
| <td>database</td> |
| <td style="text-align: center"> </td> |
| <td>cloudant database name</td> |
| </tr> |
| <tr> |
| <td>selector</td> |
| <td style="text-align: center">all documents</td> |
| <td>a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector’s conditions will be retrieved from Cloudant and loaded into Spark.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| <h3 id="configuration-in-spark-submit-using-conf-option">Configuration in spark-submit using –conf option</h3> |
| |
| <p>The above stated configuration keys can also be set using <code class="language-plaintext highlighter-rouge">spark-submit --conf</code> option. When passing configuration in spark-submit, make sure adding “spark.” as prefix to the keys.</p> |
| |
| <h2 id="examples">Examples</h2> |
| |
| <h3 id="python-api">Python API</h3> |
| |
| <h4 id="using-sql-in-python">Using SQL In Python</h4> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\ |
| <span class="p">.</span><span class="n">builder</span>\ |
| <span class="p">.</span><span class="n">appName</span><span class="p">(</span><span class="s">"Cloudant Spark SQL Example in Python using temp tables"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.host"</span><span class="p">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.username"</span><span class="p">,</span> <span class="s">"USERNAME"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.password"</span><span class="p">,</span><span class="s">"PASSWORD"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">getOrCreate</span><span class="p">()</span> |
| |
| |
| <span class="c1"># Loading temp table from Cloudant db |
| </span><span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')"</span><span class="p">)</span> |
| <span class="n">airportData</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">sql</span><span class="p">(</span><span class="s">"SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id"</span><span class="p">)</span> |
| <span class="n">airportData</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="k">print</span> <span class="s">'Total # of rows in airportData: '</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">airportData</span><span class="p">.</span><span class="n">count</span><span class="p">())</span> |
| <span class="k">for</span> <span class="n">code</span> <span class="ow">in</span> <span class="n">airportData</span><span class="p">.</span><span class="n">collect</span><span class="p">():</span> |
| <span class="k">print</span> <span class="n">code</span><span class="p">.</span><span class="n">_id</span> |
| </code></pre></div></div> |
| |
| <p>See <a href="examples/python/CloudantApp.py">CloudantApp.py</a> for examples.</p> |
| |
| <p>Submit job example:</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py |
| </code></pre></div></div> |
| |
| <h4 id="using-dataframe-in-python">Using DataFrame In Python</h4> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">spark</span> <span class="o">=</span> <span class="n">SparkSession</span>\ |
| <span class="p">.</span><span class="n">builder</span>\ |
| <span class="p">.</span><span class="n">appName</span><span class="p">(</span><span class="s">"Cloudant Spark SQL Example in Python using dataframes"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.host"</span><span class="p">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.username"</span><span class="p">,</span> <span class="s">"USERNAME"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"cloudant.password"</span><span class="p">,</span><span class="s">"PASSWORD"</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">config</span><span class="p">(</span><span class="s">"jsonstore.rdd.partitions"</span><span class="p">,</span> <span class="mi">8</span><span class="p">)</span>\ |
| <span class="p">.</span><span class="n">getOrCreate</span><span class="p">()</span> |
| |
| <span class="c1"># ***1. Loading dataframe from Cloudant db |
| </span><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">read</span><span class="p">.</span><span class="n">load</span><span class="p">(</span><span class="s">"n_airportcodemapping"</span><span class="p">,</span> <span class="s">"org.apache.bahir.cloudant"</span><span class="p">)</span> |
| <span class="n">df</span><span class="p">.</span><span class="n">cache</span><span class="p">()</span> |
| <span class="n">df</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span> |
| <span class="n">df</span><span class="p">.</span><span class="nb">filter</span><span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="n">airportName</span> <span class="o">>=</span> <span class="s">'Moscow'</span><span class="p">).</span><span class="n">select</span><span class="p">(</span><span class="s">"_id"</span><span class="p">,</span><span class="s">'airportName'</span><span class="p">).</span><span class="n">show</span><span class="p">()</span> |
| <span class="n">df</span><span class="p">.</span><span class="nb">filter</span><span class="p">(</span><span class="n">df</span><span class="p">.</span><span class="n">_id</span> <span class="o">>=</span> <span class="s">'CAA'</span><span class="p">).</span><span class="n">select</span><span class="p">(</span><span class="s">"_id"</span><span class="p">,</span><span class="s">'airportName'</span><span class="p">).</span><span class="n">show</span><span class="p">()</span> |
| </code></pre></div></div> |
| |
| <p>See <a href="examples/python/CloudantDF.py">CloudantDF.py</a> for examples.</p> |
| |
| <p>In case of doing multiple operations on a dataframe (select, filter etc.), |
| you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again. |
| Persisting will also speed up computation. This statement will persist an RDD in memory: <code class="language-plaintext highlighter-rouge">df.cache()</code>. Alternatively for large dbs to persist in memory & disk, use:</p> |
| |
| <div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">StorageLevel</span> |
| <span class="n">df</span><span class="p">.</span><span class="n">persist</span><span class="p">(</span><span class="n">storageLevel</span> <span class="o">=</span> <span class="n">StorageLevel</span><span class="p">(</span><span class="bp">True</span><span class="p">,</span> <span class="bp">True</span><span class="p">,</span> <span class="bp">False</span><span class="p">,</span> <span class="bp">True</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span> |
| </code></pre></div></div> |
| |
| <p><a href="examples/python/CloudantDFOption.py">Sample code</a> on using DataFrame option to define cloudant configuration</p> |
| |
| <h3 id="scala-api">Scala API</h3> |
| |
| <h4 id="using-sql-in-scala">Using SQL In Scala</h4> |
| |
| <div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span> |
| <span class="o">.</span><span class="py">builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="py">appName</span><span class="o">(</span><span class="s">"Cloudant Spark SQL Example"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.host"</span><span class="o">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.username"</span><span class="o">,</span> <span class="s">"USERNAME"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.password"</span><span class="o">,</span><span class="s">"PASSWORD"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">getOrCreate</span><span class="o">()</span> |
| |
| <span class="c1">// For implicit conversions of Dataframe to RDDs</span> |
| <span class="k">import</span> <span class="nn">spark.implicits._</span> |
| |
| <span class="c1">// create a temp table from Cloudant db and query it using sql syntax</span> |
| <span class="nv">spark</span><span class="o">.</span><span class="py">sql</span><span class="o">(</span> |
| <span class="n">s</span><span class="s">""" |
| |CREATE TEMPORARY TABLE airportTable |
| |USING org.apache.bahir.cloudant |
| |OPTIONS ( database 'n_airportcodemapping') |
| """</span><span class="o">.</span><span class="py">stripMargin</span><span class="o">)</span> |
| <span class="c1">// create a dataframe</span> |
| <span class="k">val</span> <span class="nv">airportData</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">sql</span><span class="o">(</span><span class="s">"SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id"</span><span class="o">)</span> |
| <span class="nv">airportData</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span> |
| <span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"Total # of rows in airportData: "</span> <span class="o">+</span> <span class="nv">airportData</span><span class="o">.</span><span class="py">count</span><span class="o">())</span> |
| <span class="c1">// convert dataframe to array of Rows, and process each row</span> |
| <span class="nv">airportData</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">t</span> <span class="k">=></span> <span class="s">"code: "</span> <span class="o">+</span> <span class="nf">t</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">+</span> <span class="s">",name:"</span> <span class="o">+</span> <span class="nf">t</span><span class="o">(</span><span class="mi">1</span><span class="o">)).</span><span class="py">collect</span><span class="o">().</span><span class="py">foreach</span><span class="o">(</span><span class="n">println</span><span class="o">)</span> |
| </code></pre></div></div> |
| <p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantApp.scala">CloudantApp.scala</a> for examples.</p> |
| |
| <p>Submit job example:</p> |
| <div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD /path/to/spark-sql-cloudant_2.11-2.2.0-tests.jar |
| </code></pre></div></div> |
| |
| <h3 id="using-dataframe-in-scala">Using DataFrame In Scala</h3> |
| |
| <div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nc">SparkSession</span> |
| <span class="o">.</span><span class="py">builder</span><span class="o">()</span> |
| <span class="o">.</span><span class="py">appName</span><span class="o">(</span><span class="s">"Cloudant Spark SQL Example with Dataframe"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.host"</span><span class="o">,</span><span class="s">"ACCOUNT.cloudant.com"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.username"</span><span class="o">,</span> <span class="s">"USERNAME"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"cloudant.password"</span><span class="o">,</span><span class="s">"PASSWORD"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"createDBOnSave"</span><span class="o">,</span><span class="s">"true"</span><span class="o">)</span> <span class="c1">// to create a db on save</span> |
| <span class="o">.</span><span class="py">config</span><span class="o">(</span><span class="s">"jsonstore.rdd.partitions"</span><span class="o">,</span> <span class="s">"20"</span><span class="o">)</span> <span class="c1">// using 20 partitions</span> |
| <span class="o">.</span><span class="py">getOrCreate</span><span class="o">()</span> |
| |
| <span class="c1">// 1. Loading data from Cloudant db</span> |
| <span class="k">val</span> <span class="nv">df</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.cloudant"</span><span class="o">).</span><span class="py">load</span><span class="o">(</span><span class="s">"n_flight"</span><span class="o">)</span> |
| <span class="c1">// Caching df in memory to speed computations</span> |
| <span class="c1">// and not to retrieve data from cloudant again</span> |
| <span class="nv">df</span><span class="o">.</span><span class="py">cache</span><span class="o">()</span> |
| <span class="nv">df</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span> |
| |
| <span class="c1">// 2. Saving dataframe to Cloudant db</span> |
| <span class="k">val</span> <span class="nv">df2</span> <span class="k">=</span> <span class="nv">df</span><span class="o">.</span><span class="py">filter</span><span class="o">(</span><span class="nf">df</span><span class="o">(</span><span class="s">"flightSegmentId"</span><span class="o">)</span> <span class="o">===</span> <span class="s">"AA106"</span><span class="o">)</span> |
| <span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"flightSegmentId"</span><span class="o">,</span><span class="s">"economyClassBaseCost"</span><span class="o">)</span> |
| <span class="nv">df2</span><span class="o">.</span><span class="py">show</span><span class="o">()</span> |
| <span class="nv">df2</span><span class="o">.</span><span class="py">write</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"org.apache.bahir.cloudant"</span><span class="o">).</span><span class="py">save</span><span class="o">(</span><span class="s">"n_flight2"</span><span class="o">)</span> |
| </code></pre></div></div> |
| |
| <p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantDF.scala">CloudantDF.scala</a> for examples.</p> |
| |
| <p><a href="examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala">Sample code</a> on using DataFrame option to define Cloudant configuration.</p> |
| |
| <h3 id="using-streams-in-scala">Using Streams In Scala</h3> |
| |
| <div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">sparkConf</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">))</span> |
| <span class="k">val</span> <span class="nv">changes</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CloudantReceiver</span><span class="o">(</span><span class="nc">Map</span><span class="o">(</span> |
| <span class="s">"cloudant.host"</span> <span class="o">-></span> <span class="s">"ACCOUNT.cloudant.com"</span><span class="o">,</span> |
| <span class="s">"cloudant.username"</span> <span class="o">-></span> <span class="s">"USERNAME"</span><span class="o">,</span> |
| <span class="s">"cloudant.password"</span> <span class="o">-></span> <span class="s">"PASSWORD"</span><span class="o">,</span> |
| <span class="s">"database"</span> <span class="o">-></span> <span class="s">"n_airportcodemapping"</span><span class="o">)))</span> |
| |
| <span class="nv">changes</span><span class="o">.</span><span class="py">foreachRDD</span><span class="o">((</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">],</span> <span class="n">time</span><span class="k">:</span> <span class="kt">Time</span><span class="o">)</span> <span class="k">=></span> <span class="o">{</span> |
| <span class="c1">// Get the singleton instance of SparkSession</span> |
| <span class="k">val</span> <span class="nv">spark</span> <span class="k">=</span> <span class="nv">SparkSessionSingleton</span><span class="o">.</span><span class="py">getInstance</span><span class="o">(</span><span class="nv">rdd</span><span class="o">.</span><span class="py">sparkContext</span><span class="o">.</span><span class="py">getConf</span><span class="o">)</span> |
| |
| <span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"========= $time ========="</span><span class="o">)</span> |
| <span class="c1">// Convert RDD[String] to DataFrame</span> |
| <span class="k">val</span> <span class="nv">changesDataFrame</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">json</span><span class="o">(</span><span class="n">rdd</span><span class="o">)</span> |
| <span class="nf">if</span> <span class="o">(!</span><span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">schema</span><span class="o">.</span><span class="py">isEmpty</span><span class="o">)</span> <span class="o">{</span> |
| <span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span> |
| <span class="nv">changesDataFrame</span><span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"*"</span><span class="o">).</span><span class="py">show</span><span class="o">()</span> |
| <span class="o">....</span> |
| <span class="o">}</span> |
| <span class="o">})</span> |
| <span class="nv">ssc</span><span class="o">.</span><span class="py">start</span><span class="o">()</span> |
| <span class="c1">// run streaming for 120 secs</span> |
| <span class="nv">Thread</span><span class="o">.</span><span class="py">sleep</span><span class="o">(</span><span class="mi">120000L</span><span class="o">)</span> |
| <span class="nv">ssc</span><span class="o">.</span><span class="py">stop</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span> |
| |
| </code></pre></div></div> |
| |
| <p>See <a href="examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala">CloudantStreaming.scala</a> for examples.</p> |
| |
| <p>By default, Spark Streaming will load all documents from a database. If you want to limit the loading to |
| specific documents, use <code class="language-plaintext highlighter-rouge">selector</code> option of <code class="language-plaintext highlighter-rouge">CloudantReceiver</code> and specify your conditions |
| (See <a href="examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala">CloudantStreamingSelector.scala</a> |
| example for more details):</p> |
| |
| <div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="nv">changes</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CloudantReceiver</span><span class="o">(</span><span class="nc">Map</span><span class="o">(</span> |
| <span class="s">"cloudant.host"</span> <span class="o">-></span> <span class="s">"ACCOUNT.cloudant.com"</span><span class="o">,</span> |
| <span class="s">"cloudant.username"</span> <span class="o">-></span> <span class="s">"USERNAME"</span><span class="o">,</span> |
| <span class="s">"cloudant.password"</span> <span class="o">-></span> <span class="s">"PASSWORD"</span><span class="o">,</span> |
| <span class="s">"database"</span> <span class="o">-></span> <span class="s">"sales"</span><span class="o">,</span> |
| <span class="s">"selector"</span> <span class="o">-></span> <span class="s">"{\"month\":\"May\", \"rep\":\"John\"}"</span><span class="o">)))</span> |
| </code></pre></div></div> |
| |
| </div> |
| </div> |
| |
| |
| |
| <hr> |
| |
| <!-- <p>© 2021 </p>--> |
| <footer class="site-footer"> |
| <div class="wrapper"> |
| <div class="footer-col-wrapper"> |
| |
| <div style="text-align:center;"> |
| |
| <div> |
| Copyright © 2016-<script>document.write(new Date().getFullYear());</script> <a href="http://www.apache.org">The Apache Software Foundation</a>. |
| Licensed under the <a href="http://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>. |
| <br> |
| |
| Apache and the Apache Feather logo are trademarks of The Apache Software Foundation. |
| |
| </div> |
| </div> |
| </div> |
| </div> |
| </footer> |
| |
| </div> |
| |
| |
| |
| |
| <script type="text/javascript"> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-79140859-1', 'bahir.apache.org'); |
| ga('require', 'linkid', 'linkid.js'); |
| ga('send', 'pageview'); |
| |
| </script> |
| |
| |
| |
| <script src="/assets/themes/apache-clean/jquery/jquery-2.1.1.min.js"></script> |
| |
| <script src="/assets/themes/apache-clean/bootstrap/js/bootstrap.min.js"></script> |
| |
| |
| </body> |
| </html> |
| |