blob: a9c4d2c81c7c8d1025a30c6ec34a35461eb9150b [file] [log] [blame]
<!DOCTYPE html>
<html >
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>pyspark.pandas.mlflow &#8212; PySpark 4.0.0-preview1 documentation</title>
<script data-cfasync="false">
document.documentElement.dataset.mode = localStorage.getItem("mode") || "";
document.documentElement.dataset.theme = localStorage.getItem("theme") || "light";
</script>
<!-- Loaded before other Sphinx assets -->
<link href="../../../_static/styles/theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/bootstrap.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/styles/pydata-sphinx-theme.css?digest=e353d410970836974a52" rel="stylesheet" />
<link href="../../../_static/vendor/fontawesome/6.1.2/css/all.min.css?digest=e353d410970836974a52" rel="stylesheet" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-solid-900.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-brands-400.woff2" />
<link rel="preload" as="font" type="font/woff2" crossorigin href="../../../_static/vendor/fontawesome/6.1.2/webfonts/fa-regular-400.woff2" />
<link rel="stylesheet" type="text/css" href="../../../_static/pygments.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/copybutton.css" />
<link rel="stylesheet" type="text/css" href="../../../_static/css/pyspark.css" />
<!-- Pre-loaded scripts that we'll load fully later -->
<link rel="preload" as="script" href="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52" />
<link rel="preload" as="script" href="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52" />
<script data-url_root="../../../" id="documentation_options" src="../../../_static/documentation_options.js"></script>
<script src="../../../_static/jquery.js"></script>
<script src="../../../_static/underscore.js"></script>
<script src="../../../_static/doctools.js"></script>
<script src="../../../_static/clipboard.min.js"></script>
<script src="../../../_static/copybutton.js"></script>
<script crossorigin="anonymous" integrity="sha256-Ae2Vz/4ePdIu6ZyI/5ZGsYnb+m0JlOmKPjt6XZ9JJkA=" src="https://cdnjs.cloudflare.com/ajax/libs/require.js/2.3.4/require.min.js"></script>
<script>DOCUMENTATION_OPTIONS.pagename = '_modules/pyspark/pandas/mlflow';</script>
<link rel="canonical" href="https://spark.apache.org/docs/latest/api/python/_modules/pyspark/pandas/mlflow.html" />
<link rel="search" title="Search" href="../../../search.html" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="docsearch:language" content="None">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body data-bs-spy="scroll" data-bs-target=".bd-toc-nav" data-offset="180" data-bs-root-margin="0px 0px -60%" data-default-mode="">
<a class="skip-link" href="#main-content">Skip to main content</a>
<input type="checkbox"
class="sidebar-toggle"
name="__primary"
id="__primary"/>
<label class="overlay overlay-primary" for="__primary"></label>
<input type="checkbox"
class="sidebar-toggle"
name="__secondary"
id="__secondary"/>
<label class="overlay overlay-secondary" for="__secondary"></label>
<div class="search-button__wrapper">
<div class="search-button__overlay"></div>
<div class="search-button__search-container">
<form class="bd-search d-flex align-items-center"
action="../../../search.html"
method="get">
<i class="fa-solid fa-magnifying-glass"></i>
<input type="search"
class="form-control"
name="q"
id="search-input"
placeholder="Search the docs ..."
aria-label="Search the docs ..."
autocomplete="off"
autocorrect="off"
autocapitalize="off"
spellcheck="false"/>
<span class="search-button__kbd-shortcut"><kbd class="kbd-shortcut__modifier">Ctrl</kbd>+<kbd>K</kbd></span>
</form></div>
</div>
<nav class="bd-header navbar navbar-expand-lg bd-navbar">
<div class="bd-header__inner bd-page-width">
<label class="sidebar-toggle primary-toggle" for="__primary">
<span class="fa-solid fa-bars"></span>
</label>
<div class="navbar-header-items__start">
<div class="navbar-item">
<a class="navbar-brand logo" href="../../../index.html">
<img src="../../../_static/spark-logo-light.png" class="logo__image only-light" alt="Logo image"/>
<script>document.write(`<img src="../../../_static/spark-logo-dark.png" class="logo__image only-dark" alt="Logo image"/>`);</script>
</a></div>
</div>
<div class="col-lg-9 navbar-header-items">
<div class="me-auto navbar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="navbar-header-items__end">
<div class="navbar-item navbar-persistent--container">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/pandas/mlflow.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="navbar-persistent--mobile">
<script>
document.write(`
<button class="btn btn-sm navbar-btn search-button search-button__button" title="Search" aria-label="Search" data-bs-placement="bottom" data-bs-toggle="tooltip">
<i class="fa-solid fa-magnifying-glass"></i>
</button>
`);
</script>
</div>
</div>
</nav>
<div class="bd-container">
<div class="bd-container__inner bd-page-width">
<div class="bd-sidebar-primary bd-sidebar hide-on-wide">
<div class="sidebar-header-items sidebar-primary__section">
<div class="sidebar-header-items__center">
<div class="navbar-item"><nav class="navbar-nav">
<p class="sidebar-header-items__title"
role="heading"
aria-level="1"
aria-label="Site Navigation">
Site Navigation
</p>
<ul class="bd-navbar-elements navbar-nav">
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../index.html">
Overview
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../getting_started/index.html">
Getting Started
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../user_guide/index.html">
User Guides
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../reference/index.html">
API Reference
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../development/index.html">
Development
</a>
</li>
<li class="nav-item">
<a class="nav-link nav-internal" href="../../../migration_guide/index.html">
Migration Guides
</a>
</li>
</ul>
</nav></div>
</div>
<div class="sidebar-header-items__end">
<div class="navbar-item"><!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div id="version-button" class="dropdown">
<button type="button" class="btn btn-secondary btn-sm navbar-btn dropdown-toggle" id="version_switcher_button" data-toggle="dropdown">
4.0.0-preview1
<span class="caret"></span>
</button>
<div id="version_switcher" class="dropdown-menu list-group-flush py-0" aria-labelledby="version_switcher_button">
<!-- dropdown will be populated by javascript on page load -->
</div>
</div>
<script type="text/javascript">
// Function to construct the target URL from the JSON components
function buildURL(entry) {
var template = "https://spark.apache.org/docs/{version}/api/python/index.html"; // supplied by jinja
template = template.replace("{version}", entry.version);
return template;
}
// Function to check if corresponding page path exists in other version of docs
// and, if so, go there instead of the homepage of the other docs version
function checkPageExistsAndRedirect(event) {
const currentFilePath = "_modules/pyspark/pandas/mlflow.html",
otherDocsHomepage = event.target.getAttribute("href");
let tryUrl = `${otherDocsHomepage}${currentFilePath}`;
$.ajax({
type: 'HEAD',
url: tryUrl,
// if the page exists, go there
success: function() {
location.href = tryUrl;
}
}).fail(function() {
location.href = otherDocsHomepage;
});
return false;
}
// Function to populate the version switcher
(function () {
// get JSON config
$.getJSON("https://spark.apache.org/static/versions.json", function(data, textStatus, jqXHR) {
// create the nodes first (before AJAX calls) to ensure the order is
// correct (for now, links will go to doc version homepage)
$.each(data, function(index, entry) {
// if no custom name specified (e.g., "latest"), use version string
if (!("name" in entry)) {
entry.name = entry.version;
}
// construct the appropriate URL, and add it to the dropdown
entry.url = buildURL(entry);
const node = document.createElement("a");
node.setAttribute("class", "list-group-item list-group-item-action py-1");
node.setAttribute("href", `${entry.url}`);
node.textContent = `${entry.name}`;
node.onclick = checkPageExistsAndRedirect;
$("#version_switcher").append(node);
});
});
})();
</script></div>
<div class="navbar-item">
<script>
document.write(`
<button class="theme-switch-button btn btn-sm btn-outline-primary navbar-btn rounded-circle" title="light/dark" aria-label="light/dark" data-bs-placement="bottom" data-bs-toggle="tooltip">
<span class="theme-switch" data-mode="light"><i class="fa-solid fa-sun"></i></span>
<span class="theme-switch" data-mode="dark"><i class="fa-solid fa-moon"></i></span>
<span class="theme-switch" data-mode="auto"><i class="fa-solid fa-circle-half-stroke"></i></span>
</button>
`);
</script></div>
<div class="navbar-item"><ul class="navbar-icon-links navbar-nav"
aria-label="Icon Links">
<li class="nav-item">
<a href="https://github.com/apache/spark" title="GitHub" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-brands fa-github"></i></span>
<label class="sr-only">GitHub</label></a>
</li>
<li class="nav-item">
<a href="https://pypi.org/project/pyspark" title="PyPI" class="nav-link" rel="noopener" target="_blank" data-bs-toggle="tooltip" data-bs-placement="bottom"><span><i class="fa-solid fa-box"></i></span>
<label class="sr-only">PyPI</label></a>
</li>
</ul></div>
</div>
</div>
<div class="sidebar-primary-items__end sidebar-primary__section">
</div>
<div id="rtd-footer-container"></div>
</div>
<main id="main-content" class="bd-main">
<div class="bd-content">
<div class="bd-article-container">
<div class="bd-header-article">
<div class="header-article-items header-article__inner">
<div class="header-article-items__start">
<div class="header-article-item">
<nav aria-label="Breadcrumbs">
<ul class="bd-breadcrumbs" role="navigation" aria-label="Breadcrumb">
<li class="breadcrumb-item breadcrumb-home">
<a href="../../../index.html" class="nav-link" aria-label="Home">
<i class="fa-solid fa-home"></i>
</a>
</li>
<li class="breadcrumb-item"><a href="../../index.html" class="nav-link">Module code</a></li>
<li class="breadcrumb-item active" aria-current="page">pyspark.pandas.mlflow</li>
</ul>
</nav>
</div>
</div>
</div>
</div>
<div id="searchbox"></div>
<article class="bd-article" role="main">
<h1>Source code for pyspark.pandas.mlflow</h1><div class="highlight"><pre>
<span></span><span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one or more</span>
<span class="c1"># contributor license agreements. See the NOTICE file distributed with</span>
<span class="c1"># this work for additional information regarding copyright ownership.</span>
<span class="c1"># The ASF licenses this file to You under the Apache License, Version 2.0</span>
<span class="c1"># (the &quot;License&quot;); you may not use this file except in compliance with</span>
<span class="c1"># the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing, software</span>
<span class="c1"># distributed under the License is distributed on an &quot;AS IS&quot; BASIS,</span>
<span class="c1"># WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="c1"># See the License for the specific language governing permissions and</span>
<span class="c1"># limitations under the License.</span>
<span class="c1">#</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd">MLflow-related functions to load models and apply them to pandas-on-Spark dataframes.</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">List</span><span class="p">,</span> <span class="n">Union</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="kn">import</span> <span class="n">Any</span>
<span class="kn">import</span> <span class="nn">pandas</span> <span class="k">as</span> <span class="nn">pd</span>
<span class="kn">import</span> <span class="nn">numpy</span> <span class="k">as</span> <span class="nn">np</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.types</span> <span class="kn">import</span> <span class="n">DataType</span>
<span class="kn">from</span> <span class="nn">pyspark.sql.functions</span> <span class="kn">import</span> <span class="n">struct</span>
<span class="kn">from</span> <span class="nn">pyspark.pandas._typing</span> <span class="kn">import</span> <span class="n">Label</span><span class="p">,</span> <span class="n">Dtype</span>
<span class="kn">from</span> <span class="nn">pyspark.pandas.utils</span> <span class="kn">import</span> <span class="n">lazy_property</span><span class="p">,</span> <span class="n">default_session</span>
<span class="kn">from</span> <span class="nn">pyspark.pandas.frame</span> <span class="kn">import</span> <span class="n">DataFrame</span>
<span class="kn">from</span> <span class="nn">pyspark.pandas.series</span> <span class="kn">import</span> <span class="n">Series</span><span class="p">,</span> <span class="n">first_series</span>
<span class="kn">from</span> <span class="nn">pyspark.pandas.typedef</span> <span class="kn">import</span> <span class="n">as_spark_type</span>
<span class="n">__all__</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;PythonModelWrapper&quot;</span><span class="p">,</span> <span class="s2">&quot;load_model&quot;</span><span class="p">]</span>
<div class="viewcode-block" id="PythonModelWrapper"><a class="viewcode-back" href="../../../reference/pyspark.pandas/api/pyspark.pandas.mlflow.PythonModelWrapper.html#pyspark.pandas.mlflow.PythonModelWrapper">[docs]</a><span class="k">class</span> <span class="nc">PythonModelWrapper</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A wrapper around MLflow&#39;s Python object model.</span>
<span class="sd"> This wrapper acts as a predictor on pandas-on-Spark</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">model_uri</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">return_type_hint</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">type</span><span class="p">,</span> <span class="n">Dtype</span><span class="p">]):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_model_uri</span> <span class="o">=</span> <span class="n">model_uri</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_return_type_hint</span> <span class="o">=</span> <span class="n">return_type_hint</span>
<span class="nd">@lazy_property</span>
<span class="k">def</span> <span class="nf">_return_type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">DataType</span><span class="p">:</span>
<span class="n">hint</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_return_type_hint</span>
<span class="c1"># The logic is simple for now, because it corresponds to the default</span>
<span class="c1"># case: continuous predictions</span>
<span class="c1"># TODO: do something smarter, for example when there is a sklearn.Classifier (it should</span>
<span class="c1"># return an integer or a categorical)</span>
<span class="c1"># We can do the same for pytorch/tensorflow/keras models by looking at the output types.</span>
<span class="c1"># However, this is probably better done in mlflow than here.</span>
<span class="k">if</span> <span class="n">hint</span> <span class="o">==</span> <span class="s2">&quot;infer&quot;</span> <span class="ow">or</span> <span class="ow">not</span> <span class="n">hint</span><span class="p">:</span>
<span class="n">hint</span> <span class="o">=</span> <span class="n">np</span><span class="o">.</span><span class="n">float64</span>
<span class="k">return</span> <span class="n">as_spark_type</span><span class="p">(</span><span class="n">hint</span><span class="p">)</span>
<span class="nd">@lazy_property</span>
<span class="k">def</span> <span class="nf">_model</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The return object has to follow the API of mlflow.pyfunc.PythonModel.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">mlflow</span> <span class="kn">import</span> <span class="n">pyfunc</span>
<span class="k">return</span> <span class="n">pyfunc</span><span class="o">.</span><span class="n">load_model</span><span class="p">(</span><span class="n">model_uri</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_model_uri</span><span class="p">)</span>
<span class="nd">@lazy_property</span>
<span class="k">def</span> <span class="nf">_model_udf</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="n">Any</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">mlflow</span> <span class="kn">import</span> <span class="n">pyfunc</span>
<span class="n">spark</span> <span class="o">=</span> <span class="n">default_session</span><span class="p">()</span>
<span class="k">return</span> <span class="n">pyfunc</span><span class="o">.</span><span class="n">spark_udf</span><span class="p">(</span><span class="n">spark</span><span class="p">,</span> <span class="n">model_uri</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_model_uri</span><span class="p">,</span> <span class="n">result_type</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_return_type</span><span class="p">)</span>
<span class="k">def</span> <span class="fm">__str__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="k">return</span> <span class="s2">&quot;PythonModelWrapper(</span><span class="si">{}</span><span class="s2">)&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_model</span><span class="p">))</span>
<span class="k">def</span> <span class="fm">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">-&gt;</span> <span class="nb">str</span><span class="p">:</span>
<span class="k">return</span> <span class="s2">&quot;PythonModelWrapper(</span><span class="si">{}</span><span class="s2">)&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">repr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_model</span><span class="p">))</span>
<span class="k">def</span> <span class="nf">predict</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">data</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="n">DataFrame</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">])</span> <span class="o">-&gt;</span> <span class="n">Union</span><span class="p">[</span><span class="n">Series</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">Series</span><span class="p">]:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a prediction on the data.</span>
<span class="sd"> If the data is a pandas-on-Spark DataFrame, the return is a pandas-on-Spark Series.</span>
<span class="sd"> If the data is a pandas Dataframe, the return is the expected output of the underlying</span>
<span class="sd"> pyfunc object (typically a pandas Series or a numpy array).</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_model</span><span class="o">.</span><span class="n">predict</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">DataFrame</span><span class="p">):</span>
<span class="n">s</span> <span class="o">=</span> <span class="n">struct</span><span class="p">(</span><span class="o">*</span><span class="n">data</span><span class="o">.</span><span class="n">columns</span><span class="p">)</span>
<span class="n">return_col</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_model_udf</span><span class="p">(</span><span class="n">s</span><span class="p">)</span>
<span class="n">column_labels</span><span class="p">:</span> <span class="n">List</span><span class="p">[</span><span class="n">Label</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span>
<span class="p">(</span><span class="n">col</span><span class="p">,)</span> <span class="k">for</span> <span class="n">col</span> <span class="ow">in</span> <span class="n">data</span><span class="o">.</span><span class="n">_internal</span><span class="o">.</span><span class="n">spark_frame</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="n">return_col</span><span class="p">)</span><span class="o">.</span><span class="n">columns</span>
<span class="p">]</span>
<span class="n">internal</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">_internal</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span>
<span class="n">column_labels</span><span class="o">=</span><span class="n">column_labels</span><span class="p">,</span> <span class="n">data_spark_columns</span><span class="o">=</span><span class="p">[</span><span class="n">return_col</span><span class="p">],</span> <span class="n">data_fields</span><span class="o">=</span><span class="kc">None</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">first_series</span><span class="p">(</span><span class="n">DataFrame</span><span class="p">(</span><span class="n">internal</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span><span class="s2">&quot;unknown data type: </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="n">data</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span><span class="p">))</span></div>
<div class="viewcode-block" id="load_model"><a class="viewcode-back" href="../../../reference/pyspark.pandas/api/pyspark.pandas.mlflow.load_model.html#pyspark.pandas.mlflow.load_model">[docs]</a><span class="k">def</span> <span class="nf">load_model</span><span class="p">(</span>
<span class="n">model_uri</span><span class="p">:</span> <span class="nb">str</span><span class="p">,</span> <span class="n">predict_type</span><span class="p">:</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">type</span><span class="p">,</span> <span class="n">Dtype</span><span class="p">]</span> <span class="o">=</span> <span class="s2">&quot;infer&quot;</span>
<span class="p">)</span> <span class="o">-&gt;</span> <span class="n">PythonModelWrapper</span><span class="p">:</span>
<span class="w"> </span><span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Loads an MLflow model into a wrapper that can be used both for pandas and pandas-on-Spark</span>
<span class="sd"> DataFrame.</span>
<span class="sd"> Parameters</span>
<span class="sd"> ----------</span>
<span class="sd"> model_uri : str</span>
<span class="sd"> URI pointing to the model. See MLflow documentation for more details.</span>
<span class="sd"> predict_type : a python basic type, a numpy basic type, a Spark type or &#39;infer&#39;.</span>
<span class="sd"> This is the return type that is expected when calling the predict function of the model.</span>
<span class="sd"> If &#39;infer&#39; is specified, the wrapper will attempt to automatically determine the return type</span>
<span class="sd"> based on the model type.</span>
<span class="sd"> Returns</span>
<span class="sd"> -------</span>
<span class="sd"> PythonModelWrapper</span>
<span class="sd"> A wrapper around MLflow PythonModel objects. This wrapper is expected to adhere to the</span>
<span class="sd"> interface of mlflow.pyfunc.PythonModel.</span>
<span class="sd"> Examples</span>
<span class="sd"> --------</span>
<span class="sd"> Here is a full example that creates a model with scikit-learn and saves the model with</span>
<span class="sd"> MLflow. The model is then loaded as a predictor that can be applied on a pandas-on-Spark</span>
<span class="sd"> Dataframe.</span>
<span class="sd"> We first initialize our MLflow environment:</span>
<span class="sd"> &gt;&gt;&gt; from mlflow.tracking import MlflowClient, set_tracking_uri</span>
<span class="sd"> &gt;&gt;&gt; import mlflow.sklearn</span>
<span class="sd"> &gt;&gt;&gt; from tempfile import mkdtemp</span>
<span class="sd"> &gt;&gt;&gt; d = mkdtemp(&quot;pandas_on_spark_mlflow&quot;)</span>
<span class="sd"> &gt;&gt;&gt; set_tracking_uri(&quot;file:%s&quot;%d)</span>
<span class="sd"> &gt;&gt;&gt; client = MlflowClient()</span>
<span class="sd"> &gt;&gt;&gt; exp_id = mlflow.create_experiment(&quot;my_experiment&quot;)</span>
<span class="sd"> &gt;&gt;&gt; exp = mlflow.set_experiment(&quot;my_experiment&quot;)</span>
<span class="sd"> We aim at learning this numerical function using a simple linear regressor.</span>
<span class="sd"> &gt;&gt;&gt; from sklearn.linear_model import LinearRegression</span>
<span class="sd"> &gt;&gt;&gt; train = pd.DataFrame({&quot;x1&quot;: np.arange(8), &quot;x2&quot;: np.arange(8)**2,</span>
<span class="sd"> ... &quot;y&quot;: np.log(2 + np.arange(8))})</span>
<span class="sd"> &gt;&gt;&gt; train_x = train[[&quot;x1&quot;, &quot;x2&quot;]]</span>
<span class="sd"> &gt;&gt;&gt; train_y = train[[&quot;y&quot;]]</span>
<span class="sd"> &gt;&gt;&gt; with mlflow.start_run():</span>
<span class="sd"> ... lr = LinearRegression()</span>
<span class="sd"> ... lr.fit(train_x, train_y)</span>
<span class="sd"> ... mlflow.sklearn.log_model(lr, &quot;model&quot;)</span>
<span class="sd"> LinearRegression...</span>
<span class="sd"> Now that our model is logged using MLflow, we load it back and apply it on a pandas-on-Spark</span>
<span class="sd"> dataframe:</span>
<span class="sd"> &gt;&gt;&gt; from pyspark.pandas.mlflow import load_model</span>
<span class="sd"> &gt;&gt;&gt; run_info = client.search_runs(exp_id)[-1].info</span>
<span class="sd"> &gt;&gt;&gt; model = load_model(&quot;runs:/{run_id}/model&quot;.format(run_id=run_info.run_id))</span>
<span class="sd"> &gt;&gt;&gt; prediction_df = ps.DataFrame({&quot;x1&quot;: [2.0], &quot;x2&quot;: [4.0]})</span>
<span class="sd"> &gt;&gt;&gt; prediction_df[&quot;prediction&quot;] = model.predict(prediction_df)</span>
<span class="sd"> &gt;&gt;&gt; prediction_df</span>
<span class="sd"> x1 x2 prediction</span>
<span class="sd"> 0 2.0 4.0 1.355551</span>
<span class="sd"> The model also works on pandas DataFrames as expected:</span>
<span class="sd"> &gt;&gt;&gt; model.predict(prediction_df[[&quot;x1&quot;, &quot;x2&quot;]].to_pandas())</span>
<span class="sd"> array([[1.35555142]])</span>
<span class="sd"> Notes</span>
<span class="sd"> -----</span>
<span class="sd"> Currently, the model prediction can only be merged back with the existing dataframe.</span>
<span class="sd"> Other columns must be manually joined.</span>
<span class="sd"> For example, this code will not work:</span>
<span class="sd"> &gt;&gt;&gt; df = ps.DataFrame({&quot;x1&quot;: [2.0], &quot;x2&quot;: [3.0], &quot;z&quot;: [-1]})</span>
<span class="sd"> &gt;&gt;&gt; features = df[[&quot;x1&quot;, &quot;x2&quot;]]</span>
<span class="sd"> &gt;&gt;&gt; y = model.predict(features)</span>
<span class="sd"> &gt;&gt;&gt; # Works:</span>
<span class="sd"> &gt;&gt;&gt; features[&quot;y&quot;] = y # doctest: +SKIP</span>
<span class="sd"> &gt;&gt;&gt; # Will fail with a message about dataframes not aligned.</span>
<span class="sd"> &gt;&gt;&gt; df[&quot;y&quot;] = y # doctest: +SKIP</span>
<span class="sd"> A current workaround is to use the .merge() function, using the feature values</span>
<span class="sd"> as merging keys.</span>
<span class="sd"> &gt;&gt;&gt; features[&#39;y&#39;] = y</span>
<span class="sd"> &gt;&gt;&gt; everything = df.merge(features, on=[&#39;x1&#39;, &#39;x2&#39;])</span>
<span class="sd"> &gt;&gt;&gt; everything</span>
<span class="sd"> x1 x2 z y</span>
<span class="sd"> 0 2.0 3.0 -1 1.376932</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">PythonModelWrapper</span><span class="p">(</span><span class="n">model_uri</span><span class="p">,</span> <span class="n">predict_type</span><span class="p">)</span></div>
<span class="k">def</span> <span class="nf">_test</span><span class="p">()</span> <span class="o">-&gt;</span> <span class="kc">None</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">doctest</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">from</span> <span class="nn">pyspark.sql</span> <span class="kn">import</span> <span class="n">SparkSession</span>
<span class="kn">import</span> <span class="nn">pyspark.pandas.mlflow</span>
<span class="n">os</span><span class="o">.</span><span class="n">chdir</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">environ</span><span class="p">[</span><span class="s2">&quot;SPARK_HOME&quot;</span><span class="p">])</span>
<span class="n">globs</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">pandas</span><span class="o">.</span><span class="n">mlflow</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">copy</span><span class="p">()</span>
<span class="n">globs</span><span class="p">[</span><span class="s2">&quot;ps&quot;</span><span class="p">]</span> <span class="o">=</span> <span class="n">pyspark</span><span class="o">.</span><span class="n">pandas</span>
<span class="n">spark</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">.</span><span class="n">master</span><span class="p">(</span><span class="s2">&quot;local[4]&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">appName</span><span class="p">(</span><span class="s2">&quot;pyspark.pandas.mlflow tests&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">getOrCreate</span><span class="p">()</span>
<span class="p">)</span>
<span class="p">(</span><span class="n">failure_count</span><span class="p">,</span> <span class="n">test_count</span><span class="p">)</span> <span class="o">=</span> <span class="n">doctest</span><span class="o">.</span><span class="n">testmod</span><span class="p">(</span>
<span class="n">pyspark</span><span class="o">.</span><span class="n">pandas</span><span class="o">.</span><span class="n">mlflow</span><span class="p">,</span>
<span class="n">globs</span><span class="o">=</span><span class="n">globs</span><span class="p">,</span>
<span class="n">optionflags</span><span class="o">=</span><span class="n">doctest</span><span class="o">.</span><span class="n">ELLIPSIS</span> <span class="o">|</span> <span class="n">doctest</span><span class="o">.</span><span class="n">NORMALIZE_WHITESPACE</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">spark</span><span class="o">.</span><span class="n">stop</span><span class="p">()</span>
<span class="k">if</span> <span class="n">failure_count</span><span class="p">:</span>
<span class="n">sys</span><span class="o">.</span><span class="n">exit</span><span class="p">(</span><span class="o">-</span><span class="mi">1</span><span class="p">)</span>
<span class="k">if</span> <span class="vm">__name__</span> <span class="o">==</span> <span class="s2">&quot;__main__&quot;</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="kn">import</span> <span class="nn">mlflow</span> <span class="c1"># noqa: F401</span>
<span class="kn">import</span> <span class="nn">sklearn</span> <span class="c1"># noqa: F401</span>
<span class="n">_test</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">ImportError</span><span class="p">:</span>
<span class="k">pass</span>
</pre></div>
</article>
<footer class="bd-footer-article">
<div class="footer-article-items footer-article__inner">
<div class="footer-article-item"><!-- Previous / next buttons -->
<div class="prev-next-area">
</div></div>
</div>
</footer>
</div>
</div>
<footer class="bd-footer-content">
</footer>
</main>
</div>
</div>
<!-- Scripts loaded after <body> so the DOM is not blocked -->
<script src="../../../_static/scripts/bootstrap.js?digest=e353d410970836974a52"></script>
<script src="../../../_static/scripts/pydata-sphinx-theme.js?digest=e353d410970836974a52"></script>
<footer class="bd-footer">
<div class="bd-footer__inner bd-page-width">
<div class="footer-items__start">
<div class="footer-item"><p class="copyright">
Copyright @ 2024 The Apache Software Foundation, Licensed under the <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache License, Version 2.0</a>.
</p></div>
<div class="footer-item">
<p class="sphinx-version">
Created using <a href="https://www.sphinx-doc.org/">Sphinx</a> 4.5.0.
<br/>
</p>
</div>
</div>
<div class="footer-items__end">
<div class="footer-item"><p class="theme-version">
Built with the <a href="https://pydata-sphinx-theme.readthedocs.io/en/stable/index.html">PyData Sphinx Theme</a> 0.13.3.
</p></div>
</div>
</div>
</footer>
</body>
</html>