blob: 7ef9a2c228610817a6cc247d1d775283838bc49d [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>
Examples | Apache Spark
</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-EVSTQN3/azprG1Anm3QDgpJLIm9Nao0Yz1ztcQTwFspd3yD65VohhpuuCOmLASjC" crossorigin="anonymous">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="/css/custom.css" rel="stylesheet">
<!-- Code highlighter CSS -->
<link href="/css/pygments-default.css" rel="stylesheet">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<!-- Matomo -->
<script>
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body class="global">
<nav class="navbar navbar-expand-lg navbar-dark p-0 px-4" style="background: #1D6890;">
<a class="navbar-brand" href="/">
<img src="/images/spark-logo-rev.svg" alt="" width="141" height="72">
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarContent"
aria-controls="navbarContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse col-md-12 col-lg-auto pt-4" id="navbarContent">
<ul class="navbar-nav me-auto">
<li class="nav-item">
<a class="nav-link active" aria-current="page" href="/downloads.html">Download</a>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="libraries" role="button" data-bs-toggle="dropdown"
aria-expanded="false">
Libraries
</a>
<ul class="dropdown-menu" aria-labelledby="libraries">
<li><a class="dropdown-item" href="/sql/">SQL and DataFrames</a></li>
<li><a class="dropdown-item" href="/spark-connect/">Spark Connect</a></li>
<li><a class="dropdown-item" href="/streaming/">Spark Streaming</a></li>
<li><a class="dropdown-item" href="/pandas-on-spark/">pandas on Spark</a></li>
<li><a class="dropdown-item" href="/mllib/">MLlib (machine learning)</a></li>
<li><a class="dropdown-item" href="/graphx/">GraphX (graph)</a></li>
<li>
<hr class="dropdown-divider">
</li>
<li><a class="dropdown-item" href="/third-party-projects.html">Third-Party Projects</a></li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="documentation" role="button" data-bs-toggle="dropdown"
aria-expanded="false">
Documentation
</a>
<ul class="dropdown-menu" aria-labelledby="documentation">
<li><a class="dropdown-item" href="/docs/latest/">Latest Release</a></li>
<li><a class="dropdown-item" href="/documentation.html">Older Versions and Other Resources</a></li>
<li><a class="dropdown-item" href="/faq.html">Frequently Asked Questions</a></li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link active" aria-current="page" href="/examples.html">Examples</a>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="community" role="button" data-bs-toggle="dropdown"
aria-expanded="false">
Community
</a>
<ul class="dropdown-menu" aria-labelledby="community">
<li><a class="dropdown-item" href="/community.html">Mailing Lists &amp; Resources</a></li>
<li><a class="dropdown-item" href="/contributing.html">Contributing to Spark</a></li>
<li><a class="dropdown-item" href="/improvement-proposals.html">Improvement Proposals (SPIP)</a>
</li>
<li><a class="dropdown-item" href="https://issues.apache.org/jira/browse/SPARK">Issue Tracker</a>
</li>
<li><a class="dropdown-item" href="/powered-by.html">Powered By</a></li>
<li><a class="dropdown-item" href="/committers.html">Project Committers</a></li>
<li><a class="dropdown-item" href="/history.html">Project History</a></li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="developers" role="button" data-bs-toggle="dropdown"
aria-expanded="false">
Developers
</a>
<ul class="dropdown-menu" aria-labelledby="developers">
<li><a class="dropdown-item" href="/developer-tools.html">Useful Developer Tools</a></li>
<li><a class="dropdown-item" href="/versioning-policy.html">Versioning Policy</a></li>
<li><a class="dropdown-item" href="/release-process.html">Release Process</a></li>
<li><a class="dropdown-item" href="/security.html">Security</a></li>
</ul>
</li>
</ul>
<ul class="navbar-nav ml-auto">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="apacheFoundation" role="button"
data-bs-toggle="dropdown" aria-expanded="false">
Apache Software Foundation
</a>
<ul class="dropdown-menu" aria-labelledby="apacheFoundation">
<li><a class="dropdown-item" href="https://www.apache.org/">Apache Homepage</a></li>
<li><a class="dropdown-item" href="https://www.apache.org/licenses/">License</a></li>
<li><a class="dropdown-item"
href="https://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
<li><a class="dropdown-item" href="https://www.apache.org/foundation/thanks.html">Thanks</a></li>
<li><a class="dropdown-item" href="https://www.apache.org/security/">Security</a></li>
<li><a class="dropdown-item" href="https://www.apache.org/events/current-event">Event</a></li>
</ul>
</li>
</ul>
</div>
</nav>
<div class="container">
<div class="row mt-4">
<div class="col-12 col-md-9">
<h1>Apache Spark<span class="tm">&trade;</span> examples</h1>
<p>This page shows you how to use different Apache Spark APIs with simple examples.</p>
<p>Spark is a great engine for small and large datasets. It can be used with single-node/localhost environments, or distributed clusters. Spark’s expansive API, excellent performance, and flexibility make it a good option for many analyses. This guide shows examples with the following Spark APIs:</p>
<ul>
<li>DataFrames</li>
<li>SQL</li>
<li>Structured Streaming</li>
<li>RDDs</li>
</ul>
<p>The examples use small datasets so the they are easy to follow.</p>
<h2 id="spark-dataframe-example">Spark DataFrame example</h2>
<p>This section shows you how to create a Spark DataFrame and run simple operations. The examples are on a small DataFrame, so you can easily see the functionality.</p>
<p>Let’s start by creating a Spark Session:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
</code></pre></div></div>
<p>Some Spark runtime environments come with pre-instantiated Spark Sessions. The <code class="language-plaintext highlighter-rouge">getOrCreate()</code> method will use an existing Spark Session or create a new Spark Session if one does not already exist.</p>
<p><strong><em>Create a Spark DataFrame</em></strong></p>
<p>Start by creating a DataFrame with <code class="language-plaintext highlighter-rouge">first_name</code> and <code class="language-plaintext highlighter-rouge">age</code> columns and four rows of data:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df = spark.createDataFrame(
[
("sue", 32),
("li", 3),
("bob", 75),
("heo", 13),
],
["first_name", "age"],
)
</code></pre></div></div>
<p>Use the <code class="language-plaintext highlighter-rouge">show()</code> method to view the contents of the DataFrame:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
</code></pre></div></div>
<p>Now, let’s perform some data processing operations on the DataFrame.</p>
<p><strong><em>Add a column to a Spark DataFrame</em></strong></p>
<p>Let’s add a <code class="language-plaintext highlighter-rouge">life_stage</code> column to the DataFrame that returns “child” if the age is 12 or under, “teenager” if the age is between 13 and 19, and “adult” if the age is 20 or older.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>from pyspark.sql.functions import col, when
df1 = df.withColumn(
"life_stage",
when(col("age") &lt; 13, "child")
.when(col("age").between(13, 19), "teenager")
.otherwise("adult"),
)
</code></pre></div></div>
<p>It’s easy to add columns to a Spark DataFrame. Let’s view the contents of <code class="language-plaintext highlighter-rouge">df1</code>.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df1.show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| li| 3| child|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
</code></pre></div></div>
<p>Notice how the original DataFrame is unchanged:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
</code></pre></div></div>
<p>Spark operations don’t mutate the DataFrame. You must assign the result to a new variable to access the DataFrame changes for subsequent operations.</p>
<p><strong><em>Filter a Spark DataFrame</em></strong></p>
<p>Now, filter the DataFrame so it only includes teenagers and adults.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df1.where(col("life_stage").isin(["teenager", "adult"])).show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
</code></pre></div></div>
<p><strong><em>Group by aggregation on Spark DataFrame</em></strong></p>
<p>Now, let’s compute the average age for everyone in the dataset:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>from pyspark.sql.functions import avg
df1.select(avg("age")).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
</code></pre></div></div>
<p>You can also compute the average age for each <code class="language-plaintext highlighter-rouge">life_stage</code>:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df1.groupBy("life_stage").avg().show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
</code></pre></div></div>
<p>Spark lets you run queries on DataFrames with SQL if you don’t want to use the programmatic APIs.</p>
<p><strong><em>Query the DataFrame with SQL</em></strong></p>
<p>Here’s how you can compute the average age for everyone with SQL:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql("select avg(age) from {df1}", df1=df1).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
</code></pre></div></div>
<p>And here’s how to compute the average age by <code class="language-plaintext highlighter-rouge">life_stage</code> with SQL:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
</code></pre></div></div>
<p>Spark lets you use the programmatic API, the SQL API, or a combination of both. This flexibility makes Spark accessible to a variety of users and powerfully expressive.</p>
<h2 id="spark-sql-example">Spark SQL Example</h2>
<p>Let’s persist the DataFrame in a named Parquet table that is easily accessible via the SQL API.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df1.write.saveAsTable("some_people")
</code></pre></div></div>
<p>Make sure that the table is accessible via the table name:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
+----------+---+----------+
</code></pre></div></div>
<p>Now, let’s use SQL to insert a few more rows of data into the table:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
</code></pre></div></div>
<p>Inspect the table contents to confirm the row was inserted:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
| frank| 4| child|
+----------+---+----------+
</code></pre></div></div>
<p>Run a query that returns the teenagers:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>spark.sql("select * from some_people where life_stage='teenager'").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
+----------+---+----------+
</code></pre></div></div>
<p>Spark makes it easy to register tables and query them with pure SQL.</p>
<h2 id="spark-structured-streaming-example">Spark Structured Streaming Example</h2>
<p>Spark also has Structured Streaming APIs that allow you to create batch or real-time streaming applications.</p>
<p>Let’s see how to use Spark Structured Streaming to read data from Kafka and write it to a Parquet table hourly.</p>
<p>Suppose you have a Kafka stream that’s continuously populated with the following data:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}
</code></pre></div></div>
<p>Here’s how to read the Kafka source into a Spark DataFrame:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", subscribeTopic)
.load()
)
</code></pre></div></div>
<p>Create a function that cleans the input data.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>schema = StructType([
StructField("student_name", StringType()),
StructField("graduation_year", StringType()),
StructField("major", StringType()),
])
def with_normalized_names(df, schema):
parsed_df = (
df.withColumn("json_data", from_json(col("value").cast("string"), schema))
.withColumn("student_name", col("json_data.student_name"))
.withColumn("graduation_year", col("json_data.graduation_year"))
.withColumn("major", col("json_data.major"))
.drop(col("json_data"))
.drop(col("value"))
)
split_col = split(parsed_df["student_name"], "XX")
return (
parsed_df.withColumn("first_name", split_col.getItem(0))
.withColumn("last_name", split_col.getItem(1))
.drop("student_name")
)
</code></pre></div></div>
<p>Now, create a function that will read all of the new data in Kafka whenever it’s run.</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>def perform_available_now_update():
checkpointPath = "data/tmp_students_checkpoint/"
path = "data/tmp_students"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
availableNow=True
).format("parquet").option("checkpointLocation", checkpointPath).start(path)
</code></pre></div></div>
<p>Invoke the <code class="language-plaintext highlighter-rouge">perform_available_now_update()</code> function and see the contents of the Parquet table.</p>
<p>You can set up a cron job to run the <code class="language-plaintext highlighter-rouge">perform_available_now_update()</code> function every hour so your Parquet table is regularly updated.</p>
<h2 id="spark-rdd-example">Spark RDD Example</h2>
<p>The Spark RDD APIs are suitable for unstructured data.</p>
<p>The Spark DataFrame API is easier and more performant for structured data.</p>
<p>Suppose you have a text file called <code class="language-plaintext highlighter-rouge">some_text.txt</code> with the following three lines of data:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>these are words
these are more words
words in english
</code></pre></div></div>
<p>You would like to compute the count of each word in the text file. Here is how to perform this computation with Spark RDDs:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>text_file = spark.sparkContext.textFile("some_words.txt")
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
</code></pre></div></div>
<p>Let’s take a look at the result:</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>counts.collect()
[('these', 2),
('are', 2),
('more', 1),
('in', 1),
('words', 3),
('english', 1)]
</code></pre></div></div>
<p>Spark allows for efficient execution of the query because it parallelizes this computation. Many other query engines aren’t capable of parallelizing computations.</p>
<h2 id="conclusion">Conclusion</h2>
<p>These examples have shown how Spark provides nice user APIs for computations on small datasets. Spark can scale these same code examples to large datasets on distributed clusters. It’s fantastic how Spark can handle both large and small datasets.</p>
<p>Spark also has an expansive API compared with other query engines. Spark allows you to perform DataFrame operations with programmatic APIs, write SQL, perform streaming analyses, and do machine learning. Spark saves you from learning multiple frameworks and patching together various libraries to perform an analysis.</p>
<p><a name="additional"></a></p>
<h2>Additional examples</h2>
<p>Many additional examples are distributed with Spark:</p>
<ul>
<li>Basic Spark: <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples">Scala examples</a>, <a href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples">Java examples</a>, <a href="https://github.com/apache/spark/tree/master/examples/src/main/python">Python examples</a></li>
<li>Spark Streaming: <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming">Scala examples</a>, <a href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming">Java examples</a></li>
</ul>
</div>
<div class="col-12 col-md-3">
<div class="news" style="margin-bottom: 20px;">
<h5>Latest News</h5>
<ul class="list-unstyled">
<li><a href="/news/spark-3-4-3-released.html">Spark 3.4.3 released</a>
<span class="small">(Apr 18, 2024)</span></li>
<li><a href="/news/spark-3-5-1-released.html">Spark 3.5.1 released</a>
<span class="small">(Feb 23, 2024)</span></li>
<li><a href="/news/spark-3-3-4-released.html">Spark 3.3.4 released</a>
<span class="small">(Dec 16, 2023)</span></li>
<li><a href="/news/spark-3-4-2-released.html">Spark 3.4.2 released</a>
<span class="small">(Nov 30, 2023)</span></li>
</ul>
<p class="small" style="text-align: right;"><a href="/news/index.html">Archive</a></p>
</div>
<div style="text-align:center; margin-bottom: 20px;">
<a href="https://www.apache.org/events/current-event.html">
<img src="https://www.apache.org/events/current-event-234x60.png" style="max-width: 100%;"/>
</a>
</div>
<div class="hidden-xs hidden-sm">
<a href="/downloads.html" class="btn btn-cta btn-lg d-grid" style="margin-bottom: 30px;">
Download Spark
</a>
<p style="font-size: 16px; font-weight: 500; color: #555;">
Built-in Libraries:
</p>
<ul class="list-none">
<li><a href="/sql/">SQL and DataFrames</a></li>
<li><a href="/streaming/">Spark Streaming</a></li>
<li><a href="/mllib/">MLlib (machine learning)</a></li>
<li><a href="/graphx/">GraphX (graph)</a></li>
</ul>
<a href="/third-party-projects.html">Third-Party Projects</a>
</div>
</div>
</div>
<footer class="small">
<hr>
Apache Spark, Spark, Apache, the Apache feather logo, and the Apache Spark project logo are either registered
trademarks or trademarks of The Apache Software Foundation in the United States and other countries.
See guidance on use of Apache Spark <a href="/trademarks.html">trademarks</a>.
All other marks mentioned may be trademarks or registered trademarks of their respective owners.
Copyright &copy; 2018 The Apache Software Foundation, Licensed under the
<a href="https://www.apache.org/licenses/">Apache License, Version 2.0</a>.
</footer>
</div>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.0.2/dist/js/bootstrap.bundle.min.js"
integrity="sha384-MrcW6ZMFYlzcLA8Nl+NtUVF0sA7MsXsP1UyJoMp4YLEuNSfAP+JcXn/tWtIaxVXM"
crossorigin="anonymous"></script>
<script src="https://code.jquery.com/jquery.js"></script>
<script src="/js/lang-tabs.js"></script>
<script src="/js/downloads.js"></script>
</body>
</html>