blob: d36c453bc8311e53f21b6f5e30d7bc0cfc53995d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A loc indexer for pandas-on-Spark DataFrame/Series.
"""
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable
from functools import reduce
from typing import Any, Optional, List, Tuple, TYPE_CHECKING, Union, cast, Sized
import pandas as pd
from pandas.api.types import is_list_like
from pyspark.sql import functions as F, Column
from pyspark.sql.types import BooleanType, LongType
from pyspark.sql.utils import AnalysisException
import numpy as np
from pyspark import pandas as ps # noqa: F401
from pyspark.pandas._typing import Label, Name, Scalar
from pyspark.pandas.internal import (
InternalField,
InternalFrame,
NATURAL_ORDER_COLUMN_NAME,
SPARK_DEFAULT_SERIES_NAME,
)
from pyspark.pandas.exceptions import SparkPandasIndexingError, SparkPandasNotImplementedError
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.utils import (
is_name_like_tuple,
is_name_like_value,
lazy_property,
name_like_string,
same_anchor,
scol_for,
spark_column_equals,
verify_temp_column_name,
)
if TYPE_CHECKING:
from pyspark.pandas.frame import DataFrame # noqa: F401 (SPARK-34943)
from pyspark.pandas.series import Series # noqa: F401 (SPARK-34943)
class IndexerLike(object):
def __init__(self, psdf_or_psser: Union["Series", "DataFrame"]):
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series
assert isinstance(
psdf_or_psser, (DataFrame, Series)
), "unexpected argument type: {}".format(type(psdf_or_psser))
self._psdf_or_psser = psdf_or_psser
@property
def _is_df(self) -> bool:
from pyspark.pandas.frame import DataFrame
return isinstance(self._psdf_or_psser, DataFrame)
@property
def _is_series(self) -> bool:
from pyspark.pandas.series import Series
return isinstance(self._psdf_or_psser, Series)
@property
def _psdf(self) -> "DataFrame":
if self._is_df:
return cast("DataFrame", self._psdf_or_psser)
else:
assert self._is_series
return self._psdf_or_psser._psdf
@property
def _internal(self) -> InternalFrame:
return self._psdf._internal
class AtIndexer(IndexerLike):
"""
Access a single value for a row/column label pair.
If the index is not unique, all matching pairs are returned as an array.
Similar to ``loc``, in that both provide label-based lookups. Use ``at`` if you only need to
get a single value in a DataFrame or Series.
.. note:: Unlike pandas, pandas-on-Spark only allows using ``at`` to get values but not to
set them.
.. note:: Warning: If ``row_index`` matches a lot of rows, large amounts of data will be
fetched, potentially causing your machine to run out of memory.
Raises
------
KeyError
When label does not exist in DataFrame
Examples
--------
>>> psdf = ps.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]],
... index=[4, 5, 5], columns=['A', 'B', 'C'])
>>> psdf
A B C
4 0 2 3
5 0 4 1
5 10 20 30
Get value at specified row/column pair
>>> psdf.at[4, 'B']
2
Get array if an index occurs multiple times
>>> psdf.at[5, 'B']
array([ 4, 20])
"""
def __getitem__(self, key: Any) -> Union["Series", "DataFrame", Scalar]:
if self._is_df:
if not isinstance(key, tuple) or len(key) != 2:
raise TypeError("Use DataFrame.at like .at[row_index, column_name]")
row_sel, col_sel = key
else:
assert self._is_series, type(self._psdf_or_psser)
if isinstance(key, tuple) and len(key) != 1:
raise TypeError("Use Series.at like .at[row_index]")
row_sel = key
col_sel = self._psdf_or_psser._column_label
if self._internal.index_level == 1:
if not is_name_like_value(row_sel, allow_none=False, allow_tuple=False):
raise ValueError("At based indexing on a single index can only have a single value")
row_sel = (row_sel,)
else:
if not is_name_like_tuple(row_sel, allow_none=False):
raise ValueError("At based indexing on multi-index can only have tuple values")
if col_sel is not None:
if not is_name_like_value(col_sel, allow_none=False):
raise ValueError("At based indexing on multi-index can only have tuple values")
if not is_name_like_tuple(col_sel):
col_sel = (col_sel,)
cond = reduce(
lambda x, y: x & y,
[scol == row for scol, row in zip(self._internal.index_spark_columns, row_sel)],
)
pdf = (
self._internal.spark_frame.drop(NATURAL_ORDER_COLUMN_NAME)
.filter(cond)
.select(self._internal.spark_column_for(col_sel))
.toPandas()
)
if len(pdf) < 1:
raise KeyError(name_like_string(row_sel))
values = cast(pd.DataFrame, pdf).iloc[:, 0].values
return (
values if (len(row_sel) < self._internal.index_level or len(values) > 1) else values[0]
)
class iAtIndexer(IndexerLike):
"""
Access a single value for a row/column pair by integer position.
Similar to ``iloc``, in that both provide integer-based lookups. Use
``iat`` if you only need to get or set a single value in a DataFrame
or Series.
Raises
------
KeyError
When label does not exist in DataFrame
Examples
--------
>>> df = ps.DataFrame([[0, 2, 3], [0, 4, 1], [10, 20, 30]],
... columns=['A', 'B', 'C'])
>>> df
A B C
0 0 2 3
1 0 4 1
2 10 20 30
Get value at specified row/column pair
>>> df.iat[1, 2]
1
Get value within a series
>>> psser = ps.Series([1, 2, 3], index=[10, 20, 30])
>>> psser
10 1
20 2
30 3
dtype: int64
>>> psser.iat[1]
2
"""
def __getitem__(self, key: Any) -> Union["Series", "DataFrame", Scalar]:
if self._is_df:
if not isinstance(key, tuple) or len(key) != 2:
raise TypeError(
"Use DataFrame.iat like .iat[row_integer_position, column_integer_position]"
)
row_sel, col_sel = key
if not isinstance(row_sel, int) or not isinstance(col_sel, int):
raise ValueError("iAt based indexing can only have integer indexers")
return self._psdf_or_psser.iloc[row_sel, col_sel]
else:
assert self._is_series, type(self._psdf_or_psser)
if not isinstance(key, int) and len(key) != 1:
raise TypeError("Use Series.iat like .iat[row_integer_position]")
if not isinstance(key, int):
raise ValueError("iAt based indexing can only have integer indexers")
return self._psdf_or_psser.iloc[key]
class LocIndexerLike(IndexerLike, metaclass=ABCMeta):
def _select_rows(self, rows_sel: Any) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
"""
Dispatch the logic for select rows to more specific methods by `rows_sel` argument types.
Parameters
----------
rows_sel : the key specified to select rows.
Returns
-------
Tuple of Spark column, int, int:
* The Spark column for the condition to filter the rows.
* The number of rows when the selection can be simplified by limit.
* The remaining index rows if the result index size is shrunk.
"""
from pyspark.pandas.series import Series
if rows_sel is None:
return None, None, None
elif isinstance(rows_sel, Series):
return self._select_rows_by_series(rows_sel)
elif isinstance(rows_sel, Column):
return self._select_rows_by_spark_column(rows_sel)
elif isinstance(rows_sel, slice):
if rows_sel == slice(None):
# If slice is None - select everything, so nothing to do
return None, None, None
return self._select_rows_by_slice(rows_sel)
elif isinstance(rows_sel, tuple):
return self._select_rows_else(rows_sel)
elif is_list_like(rows_sel):
return self._select_rows_by_iterable(rows_sel)
else:
return self._select_rows_else(rows_sel)
def _select_cols(
self, cols_sel: Any, missing_keys: Optional[List[Name]] = None
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
"""
Dispatch the logic for select columns to more specific methods by `cols_sel` argument types.
Parameters
----------
cols_sel : the key specified to select columns.
Returns
-------
Tuple of list of column label, list of Spark columns, list of dtypes, bool:
* The column labels selected.
* The Spark columns selected.
* The field metadata selected.
* The boolean value whether Series should be returned or not.
* The Series name if needed.
"""
from pyspark.pandas.series import Series
if cols_sel is None:
column_labels = self._internal.column_labels
data_spark_columns = self._internal.data_spark_columns
data_fields = self._internal.data_fields
return column_labels, data_spark_columns, data_fields, False, None
elif isinstance(cols_sel, Series):
return self._select_cols_by_series(cols_sel, missing_keys)
elif isinstance(cols_sel, Column):
return self._select_cols_by_spark_column(cols_sel, missing_keys)
elif isinstance(cols_sel, slice):
if cols_sel == slice(None):
# If slice is None - select everything, so nothing to do
column_labels = self._internal.column_labels
data_spark_columns = self._internal.data_spark_columns
data_fields = self._internal.data_fields
return column_labels, data_spark_columns, data_fields, False, None
return self._select_cols_by_slice(cols_sel, missing_keys)
elif isinstance(cols_sel, tuple):
return self._select_cols_else(cols_sel, missing_keys)
elif is_list_like(cols_sel):
return self._select_cols_by_iterable(cols_sel, missing_keys)
else:
return self._select_cols_else(cols_sel, missing_keys)
# Methods for row selection
@abstractmethod
def _select_rows_by_series(
self, rows_sel: "Series"
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
"""Select rows by `Series` type key."""
pass
@abstractmethod
def _select_rows_by_spark_column(
self, rows_sel: Column
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
"""Select rows by Spark `Column` type key."""
pass
@abstractmethod
def _select_rows_by_slice(
self, rows_sel: slice
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
"""Select rows by `slice` type key."""
pass
@abstractmethod
def _select_rows_by_iterable(
self, rows_sel: Iterable
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
"""Select rows by `Iterable` type key."""
pass
@abstractmethod
def _select_rows_else(
self, rows_sel: Any
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
"""Select rows by other type key."""
pass
# Methods for col selection
@abstractmethod
def _select_cols_by_series(
self, cols_sel: "Series", missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
"""Select columns by `Series` type key."""
pass
@abstractmethod
def _select_cols_by_spark_column(
self, cols_sel: Column, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
"""Select columns by Spark `Column` type key."""
pass
@abstractmethod
def _select_cols_by_slice(
self, cols_sel: slice, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
"""Select columns by `slice` type key."""
pass
@abstractmethod
def _select_cols_by_iterable(
self, cols_sel: Iterable, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
"""Select columns by `Iterable` type key."""
pass
@abstractmethod
def _select_cols_else(
self, cols_sel: Any, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
"""Select columns by other type key."""
pass
def __getitem__(self, key: Any) -> Union["Series", "DataFrame"]:
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
if self._is_series:
if isinstance(key, Series) and not same_anchor(key, self._psdf_or_psser):
psdf = self._psdf_or_psser.to_frame()
temp_col = verify_temp_column_name(psdf, "__temp_col__")
psdf[temp_col] = key
return type(self)(psdf[self._psdf_or_psser.name])[psdf[temp_col]]
cond, limit, remaining_index = self._select_rows(key)
if cond is None and limit is None:
return self._psdf_or_psser
column_label = self._psdf_or_psser._column_label
column_labels = [column_label]
data_spark_columns = [self._internal.spark_column_for(column_label)]
data_fields = [self._internal.field_for(column_label)]
returns_series = True
series_name = self._psdf_or_psser.name
else:
assert self._is_df
if isinstance(key, tuple):
if len(key) != 2:
raise SparkPandasIndexingError("Only accepts pairs of candidates")
rows_sel, cols_sel = key
else:
rows_sel = key
cols_sel = None
if isinstance(rows_sel, Series) and not same_anchor(rows_sel, self._psdf_or_psser):
psdf = self._psdf_or_psser.copy()
temp_col = verify_temp_column_name(cast("DataFrame", psdf), "__temp_col__")
psdf[temp_col] = rows_sel
return type(self)(psdf)[psdf[temp_col], cols_sel][list(self._psdf_or_psser.columns)]
cond, limit, remaining_index = self._select_rows(rows_sel)
(
column_labels,
data_spark_columns,
data_fields,
returns_series,
series_name,
) = self._select_cols(cols_sel)
if cond is None and limit is None and returns_series:
psser = self._psdf_or_psser._psser_for(column_labels[0])
if series_name is not None and series_name != psser.name:
psser = psser.rename(series_name)
return psser
if remaining_index is not None:
index_spark_columns = self._internal.index_spark_columns[-remaining_index:]
index_names = self._internal.index_names[-remaining_index:]
index_fields = self._internal.index_fields[-remaining_index:]
else:
index_spark_columns = self._internal.index_spark_columns
index_names = self._internal.index_names
index_fields = self._internal.index_fields
if len(column_labels) > 0:
column_labels = column_labels.copy()
column_labels_level = max(
len(label) if label is not None else 1 for label in column_labels
)
none_column = 0
for i, label in enumerate(column_labels):
if label is None:
label = (none_column,)
none_column += 1
if len(label) < column_labels_level:
label = tuple(list(label) + ([""]) * (column_labels_level - len(label)))
column_labels[i] = label
if i == 0 and none_column == 1:
column_labels = [None]
column_label_names = self._internal.column_label_names[-column_labels_level:]
else:
column_label_names = self._internal.column_label_names
try:
sdf = self._internal.spark_frame
if cond is not None:
index_columns = sdf.select(index_spark_columns).columns
data_columns = sdf.select(data_spark_columns).columns
sdf = sdf.filter(cond).select(index_spark_columns + data_spark_columns)
index_spark_columns = [scol_for(sdf, col) for col in index_columns]
data_spark_columns = [scol_for(sdf, col) for col in data_columns]
if limit is not None:
if limit >= 0:
sdf = sdf.limit(limit)
else:
sdf = sdf.limit(sdf.count() + limit)
sdf = sdf.drop(NATURAL_ORDER_COLUMN_NAME)
except AnalysisException:
raise KeyError(
"[{}] don't exist in columns".format(
[col._jc.toString() for col in data_spark_columns] # type: ignore
)
)
internal = InternalFrame(
spark_frame=sdf,
index_spark_columns=index_spark_columns,
index_names=index_names,
index_fields=index_fields,
column_labels=column_labels,
data_spark_columns=data_spark_columns,
data_fields=data_fields,
column_label_names=column_label_names,
)
psdf = DataFrame(internal)
if returns_series:
psdf_or_psser = first_series(psdf)
if series_name is not None and series_name != psdf_or_psser.name:
psdf_or_psser = psdf_or_psser.rename(series_name)
else:
psdf_or_psser = psdf
if remaining_index is not None and remaining_index == 0:
pdf_or_pser = psdf_or_psser.head(2).to_pandas()
length = len(pdf_or_pser)
if length == 0:
raise KeyError(name_like_string(key))
elif length == 1:
return pdf_or_pser.iloc[0]
else:
return psdf_or_psser
else:
return psdf_or_psser
def __setitem__(self, key: Any, value: Any) -> None:
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
if self._is_series:
if (
isinstance(key, Series)
and (isinstance(self, iLocIndexer) or not same_anchor(key, self._psdf_or_psser))
) or (
isinstance(value, Series)
and (isinstance(self, iLocIndexer) or not same_anchor(value, self._psdf_or_psser))
):
if self._psdf_or_psser.name is None:
psdf = self._psdf_or_psser.to_frame()
column_label = psdf._internal.column_labels[0]
else:
psdf = self._psdf_or_psser._psdf.copy()
column_label = self._psdf_or_psser._column_label
temp_natural_order = verify_temp_column_name(psdf, "__temp_natural_order__")
temp_key_col = verify_temp_column_name(psdf, "__temp_key_col__")
temp_value_col = verify_temp_column_name(psdf, "__temp_value_col__")
psdf[temp_natural_order] = F.monotonically_increasing_id()
if isinstance(key, Series):
psdf[temp_key_col] = key
if isinstance(value, Series):
psdf[temp_value_col] = value
psdf = psdf.sort_values(temp_natural_order).drop(temp_natural_order)
psser = psdf._psser_for(column_label)
if isinstance(key, Series):
key = F.col(
"`{}`".format(psdf[temp_key_col]._internal.data_spark_column_names[0])
)
if isinstance(value, Series):
value = F.col(
"`{}`".format(psdf[temp_value_col]._internal.data_spark_column_names[0])
)
type(self)(psser)[key] = value
if self._psdf_or_psser.name is None:
psser = psser.rename()
self._psdf_or_psser._psdf._update_internal_frame(
psser._psdf[
self._psdf_or_psser._psdf._internal.column_labels
]._internal.resolved_copy,
requires_same_anchor=False,
)
return
if isinstance(value, DataFrame):
raise ValueError("Incompatible indexer with DataFrame")
cond, limit, remaining_index = self._select_rows(key)
if cond is None:
cond = SF.lit(True)
if limit is not None:
cond = cond & (
self._internal.spark_frame[cast(iLocIndexer, self)._sequence_col]
< SF.lit(limit)
)
if isinstance(value, (Series, Column)):
if remaining_index is not None and remaining_index == 0:
raise ValueError(
"No axis named {} for object type {}".format(key, type(value).__name__)
)
if isinstance(value, Series):
value = value.spark.column
else:
value = SF.lit(value)
scol = (
F.when(cond, value)
.otherwise(self._internal.spark_column_for(self._psdf_or_psser._column_label))
.alias(name_like_string(self._psdf_or_psser.name or SPARK_DEFAULT_SERIES_NAME))
)
internal = self._internal.with_new_spark_column(
self._psdf_or_psser._column_label, scol # TODO: dtype?
)
self._psdf_or_psser._psdf._update_internal_frame(internal, requires_same_anchor=False)
else:
assert self._is_df
if isinstance(key, tuple):
if len(key) != 2:
raise SparkPandasIndexingError("Only accepts pairs of candidates")
rows_sel, cols_sel = key
else:
rows_sel = key
cols_sel = None
if isinstance(value, DataFrame):
if len(value.columns) == 1:
value = first_series(value)
else:
raise ValueError("Only a dataframe with one column can be assigned")
if (
isinstance(rows_sel, Series)
and (
isinstance(self, iLocIndexer) or not same_anchor(rows_sel, self._psdf_or_psser)
)
) or (
isinstance(value, Series)
and (isinstance(self, iLocIndexer) or not same_anchor(value, self._psdf_or_psser))
):
psdf = cast(DataFrame, self._psdf_or_psser.copy())
temp_natural_order = verify_temp_column_name(psdf, "__temp_natural_order__")
temp_key_col = verify_temp_column_name(psdf, "__temp_key_col__")
temp_value_col = verify_temp_column_name(psdf, "__temp_value_col__")
psdf[temp_natural_order] = F.monotonically_increasing_id()
if isinstance(rows_sel, Series):
psdf[temp_key_col] = rows_sel
if isinstance(value, Series):
psdf[temp_value_col] = value
psdf = psdf.sort_values(temp_natural_order).drop(temp_natural_order)
if isinstance(rows_sel, Series):
rows_sel = F.col(
"`{}`".format(psdf[temp_key_col]._internal.data_spark_column_names[0])
)
if isinstance(value, Series):
value = F.col(
"`{}`".format(psdf[temp_value_col]._internal.data_spark_column_names[0])
)
type(self)(psdf)[rows_sel, cols_sel] = value
self._psdf_or_psser._update_internal_frame(
psdf[list(self._psdf_or_psser.columns)]._internal.resolved_copy,
requires_same_anchor=False,
)
return
cond, limit, remaining_index = self._select_rows(rows_sel)
missing_keys = [] # type: Optional[List[Name]]
_, data_spark_columns, _, _, _ = self._select_cols(cols_sel, missing_keys=missing_keys)
if cond is None:
cond = SF.lit(True)
if limit is not None:
cond = cond & (
self._internal.spark_frame[cast(iLocIndexer, self)._sequence_col]
< SF.lit(limit)
)
if isinstance(value, (Series, Column)):
if remaining_index is not None and remaining_index == 0:
raise ValueError("Incompatible indexer with Series")
if len(data_spark_columns) > 1:
raise ValueError("shape mismatch")
if isinstance(value, Series):
value = value.spark.column
else:
value = SF.lit(value)
new_data_spark_columns = []
new_fields = []
for new_scol, spark_column_name, new_field in zip(
self._internal.data_spark_columns,
self._internal.data_spark_column_names,
self._internal.data_fields,
):
for scol in data_spark_columns:
if spark_column_equals(new_scol, scol):
new_scol = F.when(cond, value).otherwise(scol).alias(spark_column_name)
new_field = InternalField.from_struct_field(
self._internal.spark_frame.select(new_scol).schema[0],
use_extension_dtypes=new_field.is_extension_dtype,
)
break
new_data_spark_columns.append(new_scol)
new_fields.append(new_field)
column_labels = self._internal.column_labels.copy()
for missing in missing_keys:
if is_name_like_tuple(missing):
label = cast(Label, missing)
else:
label = cast(Label, (missing,))
if len(label) < self._internal.column_labels_level:
label = tuple(
list(label) + ([""] * (self._internal.column_labels_level - len(label)))
)
elif len(label) > self._internal.column_labels_level:
raise KeyError(
"Key length ({}) exceeds index depth ({})".format(
len(label), self._internal.column_labels_level
)
)
column_labels.append(label)
new_data_spark_columns.append(F.when(cond, value).alias(name_like_string(label)))
new_fields.append(None)
internal = self._internal.with_new_columns(
new_data_spark_columns, column_labels=column_labels, data_fields=new_fields
)
self._psdf_or_psser._update_internal_frame(internal, requires_same_anchor=False)
class LocIndexer(LocIndexerLike):
"""
Access a group of rows and columns by label(s) or a boolean Series.
``.loc[]`` is primarily label based, but may also be used with a
conditional boolean Series derived from the DataFrame or Series.
Allowed inputs are:
- A single label, e.g. ``5`` or ``'a'``, (note that ``5`` is
interpreted as a *label* of the index, and **never** as an
integer position along the index) for column selection.
- A list or array of labels, e.g. ``['a', 'b', 'c']``.
- A slice object with labels, e.g. ``'a':'f'``.
- A conditional boolean Series derived from the DataFrame or Series
- A boolean array of the same length as the column axis being sliced,
e.g. ``[True, False, True]``.
- An alignable boolean pandas Series to the column axis being sliced.
The index of the key will be aligned before masking.
Not allowed inputs which pandas allows are:
- A boolean array of the same length as the row axis being sliced,
e.g. ``[True, False, True]``.
- A ``callable`` function with one argument (the calling Series, DataFrame
or Panel) and that returns valid output for indexing (one of the above)
.. note:: MultiIndex is not supported yet.
.. note:: Note that contrary to usual python slices, **both** the
start and the stop are included, and the step of the slice is not allowed.
.. note:: With a list or array of labels for row selection,
pandas-on-Spark behaves as a filter without reordering by the labels.
See Also
--------
Series.loc : Access group of values using labels.
Examples
--------
**Getting values**
>>> df = ps.DataFrame([[1, 2], [4, 5], [7, 8]],
... index=['cobra', 'viper', 'sidewinder'],
... columns=['max_speed', 'shield'])
>>> df
max_speed shield
cobra 1 2
viper 4 5
sidewinder 7 8
Single label. Note this returns the row as a Series.
>>> df.loc['viper']
max_speed 4
shield 5
Name: viper, dtype: int64
List of labels. Note using ``[[]]`` returns a DataFrame.
Also note that pandas-on-Spark behaves just a filter without reordering by the labels.
>>> df.loc[['viper', 'sidewinder']]
max_speed shield
viper 4 5
sidewinder 7 8
>>> df.loc[['sidewinder', 'viper']]
max_speed shield
viper 4 5
sidewinder 7 8
Single label for column.
>>> df.loc['cobra', 'shield']
2
List of labels for row.
>>> df.loc[['cobra'], 'shield']
cobra 2
Name: shield, dtype: int64
List of labels for column.
>>> df.loc['cobra', ['shield']]
shield 2
Name: cobra, dtype: int64
List of labels for both row and column.
>>> df.loc[['cobra'], ['shield']]
shield
cobra 2
Slice with labels for row and single label for column. As mentioned
above, note that both the start and stop of the slice are included.
>>> df.loc['cobra':'viper', 'max_speed']
cobra 1
viper 4
Name: max_speed, dtype: int64
Conditional that returns a boolean Series
>>> df.loc[df['shield'] > 6]
max_speed shield
sidewinder 7 8
Conditional that returns a boolean Series with column labels specified
>>> df.loc[df['shield'] > 6, ['max_speed']]
max_speed
sidewinder 7
A boolean array of the same length as the column axis being sliced.
>>> df.loc[:, [False, True]]
shield
cobra 2
viper 5
sidewinder 8
An alignable boolean Series to the column axis being sliced.
>>> df.loc[:, pd.Series([False, True], index=['max_speed', 'shield'])]
shield
cobra 2
viper 5
sidewinder 8
**Setting values**
Setting value for all items matching the list of labels.
>>> df.loc[['viper', 'sidewinder'], ['shield']] = 50
>>> df
max_speed shield
cobra 1 2
viper 4 50
sidewinder 7 50
Setting value for an entire row
>>> df.loc['cobra'] = 10
>>> df
max_speed shield
cobra 10 10
viper 4 50
sidewinder 7 50
Set value for an entire column
>>> df.loc[:, 'max_speed'] = 30
>>> df
max_speed shield
cobra 30 10
viper 30 50
sidewinder 30 50
Set value for an entire list of columns
>>> df.loc[:, ['max_speed', 'shield']] = 100
>>> df
max_speed shield
cobra 100 100
viper 100 100
sidewinder 100 100
Set value with Series
>>> df.loc[:, 'shield'] = df['shield'] * 2
>>> df
max_speed shield
cobra 100 200
viper 100 200
sidewinder 100 200
**Getting values on a DataFrame with an index that has integer labels**
Another example using integers for the index
>>> df = ps.DataFrame([[1, 2], [4, 5], [7, 8]],
... index=[7, 8, 9],
... columns=['max_speed', 'shield'])
>>> df
max_speed shield
7 1 2
8 4 5
9 7 8
Slice with integer labels for rows. As mentioned above, note that both
the start and stop of the slice are included.
>>> df.loc[7:9]
max_speed shield
7 1 2
8 4 5
9 7 8
"""
@staticmethod
def _NotImplemented(description: str) -> SparkPandasNotImplementedError:
return SparkPandasNotImplementedError(
description=description,
pandas_function=".loc[..., ...]",
spark_target_function="select, where",
)
def _select_rows_by_series(
self, rows_sel: "Series"
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
assert isinstance(rows_sel.spark.data_type, BooleanType), rows_sel.spark.data_type
return rows_sel.spark.column, None, None
def _select_rows_by_spark_column(
self, rows_sel: Column
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
spark_type = self._internal.spark_frame.select(rows_sel).schema[0].dataType
assert isinstance(spark_type, BooleanType), spark_type
return rows_sel, None, None
def _select_rows_by_slice(
self, rows_sel: slice
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
from pyspark.pandas.indexes import MultiIndex
if rows_sel.step is not None:
raise LocIndexer._NotImplemented("Cannot use step with Spark.")
elif self._internal.index_level == 1:
sdf = self._internal.spark_frame
index = self._psdf_or_psser.index
index_column = index.to_series()
index_data_type = index_column.spark.data_type
start = rows_sel.start
stop = rows_sel.stop
# get natural order from '__natural_order__' from start to stop
# to keep natural order.
start_and_stop = (
sdf.select(index_column.spark.column, NATURAL_ORDER_COLUMN_NAME)
.where(
(index_column.spark.column == SF.lit(start).cast(index_data_type))
| (index_column.spark.column == SF.lit(stop).cast(index_data_type))
)
.collect()
)
start = [row[1] for row in start_and_stop if row[0] == start]
start = start[0] if len(start) > 0 else None
stop = [row[1] for row in start_and_stop if row[0] == stop]
stop = stop[-1] if len(stop) > 0 else None
conds = [] # type: List[Column]
if start is not None:
conds.append(F.col(NATURAL_ORDER_COLUMN_NAME) >= SF.lit(start).cast(LongType()))
if stop is not None:
conds.append(F.col(NATURAL_ORDER_COLUMN_NAME) <= SF.lit(stop).cast(LongType()))
# if index order is not monotonic increasing or decreasing
# and specified values don't exist in index, raise KeyError
if (start is None and rows_sel.start is not None) or (
stop is None and rows_sel.stop is not None
):
inc = index_column.is_monotonic_increasing
if inc is False:
dec = index_column.is_monotonic_decreasing
if start is None and rows_sel.start is not None:
start = rows_sel.start
if inc is not False:
conds.append(
index_column.spark.column >= SF.lit(start).cast(index_data_type)
)
elif dec is not False:
conds.append(
index_column.spark.column <= SF.lit(start).cast(index_data_type)
)
else:
raise KeyError(rows_sel.start)
if stop is None and rows_sel.stop is not None:
stop = rows_sel.stop
if inc is not False:
conds.append(
index_column.spark.column <= SF.lit(stop).cast(index_data_type)
)
elif dec is not False:
conds.append(
index_column.spark.column >= SF.lit(stop).cast(index_data_type)
)
else:
raise KeyError(rows_sel.stop)
return reduce(lambda x, y: x & y, conds), None, None
else:
index = self._psdf_or_psser.index
index_data_type = [f.dataType for f in index.to_series().spark.data_type]
start = rows_sel.start
if start is not None:
if not isinstance(start, tuple):
start = (start,)
if len(start) == 0:
start = None
stop = rows_sel.stop
if stop is not None:
if not isinstance(stop, tuple):
stop = (stop,)
if len(stop) == 0:
stop = None
depth = max(
len(start) if start is not None else 0, len(stop) if stop is not None else 0
)
if depth == 0:
return None, None, None
elif (
depth > self._internal.index_level
or not index.droplevel(list(range(self._internal.index_level)[depth:])).is_monotonic
):
raise KeyError(
"Key length ({}) was greater than MultiIndex sort depth".format(depth)
)
conds = []
if start is not None:
cond = SF.lit(True)
for scol, value, dt in list(
zip(self._internal.index_spark_columns, start, index_data_type)
)[::-1]:
compare = MultiIndex._comparator_for_monotonic_increasing(dt)
cond = F.when(scol.eqNullSafe(SF.lit(value).cast(dt)), cond).otherwise(
compare(scol, SF.lit(value).cast(dt), Column.__gt__)
)
conds.append(cond)
if stop is not None:
cond = SF.lit(True)
for scol, value, dt in list(
zip(self._internal.index_spark_columns, stop, index_data_type)
)[::-1]:
compare = MultiIndex._comparator_for_monotonic_increasing(dt)
cond = F.when(scol.eqNullSafe(SF.lit(value).cast(dt)), cond).otherwise(
compare(scol, SF.lit(value).cast(dt), Column.__lt__)
)
conds.append(cond)
return reduce(lambda x, y: x & y, conds), None, None
def _select_rows_by_iterable(
self, rows_sel: Iterable
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
rows_sel = list(rows_sel)
if len(rows_sel) == 0:
return SF.lit(False), None, None
elif self._internal.index_level == 1:
index_column = self._psdf_or_psser.index.to_series()
index_data_type = index_column.spark.data_type
if len(rows_sel) == 1:
return (
index_column.spark.column == SF.lit(rows_sel[0]).cast(index_data_type),
None,
None,
)
else:
return (
index_column.spark.column.isin(
[SF.lit(r).cast(index_data_type) for r in rows_sel]
),
None,
None,
)
else:
raise LocIndexer._NotImplemented("Cannot select with MultiIndex with Spark.")
def _select_rows_else(
self, rows_sel: Any
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
if not isinstance(rows_sel, tuple):
rows_sel = (rows_sel,)
if len(rows_sel) > self._internal.index_level:
raise SparkPandasIndexingError("Too many indexers")
rows = [scol == value for scol, value in zip(self._internal.index_spark_columns, rows_sel)]
return (
reduce(lambda x, y: x & y, rows),
None,
self._internal.index_level - len(rows_sel),
)
def _get_from_multiindex_column(
self,
key: Optional[Label],
missing_keys: Optional[List[Name]],
labels: Optional[List[Tuple[Label, Label]]] = None,
recursed: int = 0,
) -> Tuple[List[Label], Optional[List[Column]], List[InternalField], bool, Optional[Name]]:
"""Select columns from multi-index columns."""
assert isinstance(key, tuple)
if labels is None:
labels = [(label, label) for label in self._internal.column_labels]
for k in key:
labels = [
(label, None if lbl is None else lbl[1:])
for label, lbl in labels
if (lbl is None and k is None) or (lbl is not None and lbl[0] == k)
]
if len(labels) == 0:
if missing_keys is None:
raise KeyError(k)
else:
missing_keys.append(key)
return [], [], [], False, None
if all(lbl is not None and len(lbl) > 0 and lbl[0] == "" for _, lbl in labels):
# If the head is '', drill down recursively.
labels = [(label, tuple([str(key), *lbl[1:]])) for i, (label, lbl) in enumerate(labels)]
return self._get_from_multiindex_column((str(key),), missing_keys, labels, recursed + 1)
else:
returns_series = all(lbl is None or len(lbl) == 0 for _, lbl in labels)
if returns_series:
label_set = set(label for label, _ in labels)
assert len(label_set) == 1
label = list(label_set)[0]
column_labels = [label]
data_spark_columns = [self._internal.spark_column_for(label)]
data_fields = [self._internal.field_for(label)]
if label is None:
series_name = None # type: Name
else:
if recursed > 0:
label = label[:-recursed]
series_name = label if len(label) > 1 else label[0]
else:
column_labels = [
None if lbl is None or lbl == (None,) else lbl for _, lbl in labels
]
data_spark_columns = [self._internal.spark_column_for(label) for label, _ in labels]
data_fields = [self._internal.field_for(label) for label, _ in labels]
series_name = None
return column_labels, data_spark_columns, data_fields, returns_series, series_name
def _select_cols_by_series(
self, cols_sel: "Series", missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
column_labels = cols_sel._internal.column_labels
data_spark_columns = cols_sel._internal.data_spark_columns
data_fields = cols_sel._internal.data_fields
return column_labels, data_spark_columns, data_fields, True, None
def _select_cols_by_spark_column(
self, cols_sel: Column, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
column_labels = [
(self._internal.spark_frame.select(cols_sel).columns[0],)
] # type: List[Label]
data_spark_columns = [cols_sel]
return column_labels, data_spark_columns, None, True, None
def _select_cols_by_slice(
self, cols_sel: slice, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
start, stop = self._psdf_or_psser.columns.slice_locs(
start=cols_sel.start, end=cols_sel.stop
)
column_labels = self._internal.column_labels[start:stop]
data_spark_columns = self._internal.data_spark_columns[start:stop]
data_fields = self._internal.data_fields[start:stop]
return column_labels, data_spark_columns, data_fields, False, None
def _select_cols_by_iterable(
self, cols_sel: Iterable, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
from pyspark.pandas.series import Series
if all(isinstance(key, Series) for key in cols_sel):
column_labels = [key._column_label for key in cols_sel]
data_spark_columns = [key.spark.column for key in cols_sel]
data_fields = [key._internal.data_fields[0] for key in cols_sel]
elif all(isinstance(key, Column) for key in cols_sel):
column_labels = [
(self._internal.spark_frame.select(col).columns[0],) for col in cols_sel
]
data_spark_columns = list(cols_sel)
data_fields = None
elif all(isinstance(key, bool) for key in cols_sel) or all(
isinstance(key, np.bool_) for key in cols_sel
):
if len(cast(Sized, cols_sel)) != len(self._internal.column_labels):
raise IndexError(
"Boolean index has wrong length: %s instead of %s"
% (len(cast(Sized, cols_sel)), len(self._internal.column_labels))
)
if isinstance(cols_sel, pd.Series):
if not cols_sel.index.sort_values().equals(self._psdf.columns.sort_values()):
raise SparkPandasIndexingError(
"Unalignable boolean Series provided as indexer "
"(index of the boolean Series and of the indexed object do not match)"
)
else:
column_labels = [
column_label
for column_label in self._internal.column_labels
if cols_sel[column_label if len(column_label) > 1 else column_label[0]]
]
data_spark_columns = [
self._internal.spark_column_for(column_label)
for column_label in column_labels
]
data_fields = [
self._internal.field_for(column_label) for column_label in column_labels
]
else:
column_labels = [
self._internal.column_labels[i] for i, col in enumerate(cols_sel) if col
]
data_spark_columns = [
self._internal.data_spark_columns[i] for i, col in enumerate(cols_sel) if col
]
data_fields = [
self._internal.data_fields[i] for i, col in enumerate(cols_sel) if col
]
elif any(isinstance(key, tuple) for key in cols_sel) and any(
not is_name_like_tuple(key) for key in cols_sel
):
raise TypeError(
"Expected tuple, got {}".format(
type(set(key for key in cols_sel if not is_name_like_tuple(key)).pop())
)
)
else:
if missing_keys is None and all(isinstance(key, tuple) for key in cols_sel):
level = self._internal.column_labels_level
if any(len(key) != level for key in cols_sel):
raise ValueError("All the key level should be the same as column index level.")
column_labels = []
data_spark_columns = []
data_fields = []
for key in cols_sel:
found = False
for label in self._internal.column_labels:
if label == key or label[0] == key:
column_labels.append(label)
data_spark_columns.append(self._internal.spark_column_for(label))
data_fields.append(self._internal.field_for(label))
found = True
if not found:
if missing_keys is None:
raise KeyError("['{}'] not in index".format(name_like_string(key)))
else:
missing_keys.append(key)
return column_labels, data_spark_columns, data_fields, False, None
def _select_cols_else(
self, cols_sel: Any, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
if not is_name_like_tuple(cols_sel):
cols_sel = (cols_sel,)
return self._get_from_multiindex_column(cols_sel, missing_keys)
class iLocIndexer(LocIndexerLike):
"""
Purely integer-location based indexing for selection by position.
``.iloc[]`` is primarily integer position based (from ``0`` to
``length-1`` of the axis), but may also be used with a conditional boolean Series.
Allowed inputs are:
- An integer for column selection, e.g. ``5``.
- A list or array of integers for row selection with distinct index values,
e.g. ``[3, 4, 0]``
- A list or array of integers for column selection, e.g. ``[4, 3, 0]``.
- A boolean array for column selection.
- A slice object with ints for row and column selection, e.g. ``1:7``.
Not allowed inputs which pandas allows are:
- A list or array of integers for row selection with duplicated indexes,
e.g. ``[4, 4, 0]``.
- A boolean array for row selection.
- A ``callable`` function with one argument (the calling Series, DataFrame
or Panel) and that returns valid output for indexing (one of the above).
This is useful in method chains, when you don't have a reference to the
calling object, but would like to base your selection on some value.
``.iloc`` will raise ``IndexError`` if a requested indexer is
out-of-bounds, except *slice* indexers which allow out-of-bounds
indexing (this conforms with python/numpy *slice* semantics).
See Also
--------
DataFrame.loc : Purely label-location based indexer for selection by label.
Series.iloc : Purely integer-location based indexing for
selection by position.
Examples
--------
>>> mydict = [{'a': 1, 'b': 2, 'c': 3, 'd': 4},
... {'a': 100, 'b': 200, 'c': 300, 'd': 400},
... {'a': 1000, 'b': 2000, 'c': 3000, 'd': 4000 }]
>>> df = ps.DataFrame(mydict, columns=['a', 'b', 'c', 'd'])
>>> df
a b c d
0 1 2 3 4
1 100 200 300 400
2 1000 2000 3000 4000
**Indexing just the rows**
A scalar integer for row selection.
>>> df.iloc[1]
a 100
b 200
c 300
d 400
Name: 1, dtype: int64
>>> df.iloc[[0]]
a b c d
0 1 2 3 4
With a `slice` object.
>>> df.iloc[:3]
a b c d
0 1 2 3 4
1 100 200 300 400
2 1000 2000 3000 4000
**Indexing both axes**
You can mix the indexer types for the index and columns. Use ``:`` to
select the entire axis.
With scalar integers.
>>> df.iloc[:1, 1]
0 2
Name: b, dtype: int64
With lists of integers.
>>> df.iloc[:2, [1, 3]]
b d
0 2 4
1 200 400
With `slice` objects.
>>> df.iloc[:2, 0:3]
a b c
0 1 2 3
1 100 200 300
With a boolean array whose length matches the columns.
>>> df.iloc[:, [True, False, True, False]]
a c
0 1 3
1 100 300
2 1000 3000
**Setting values**
Setting value for all items matching the list of labels.
>>> df.iloc[[1, 2], [1]] = 50
>>> df
a b c d
0 1 2 3 4
1 100 50 300 400
2 1000 50 3000 4000
Setting value for an entire row
>>> df.iloc[0] = 10
>>> df
a b c d
0 10 10 10 10
1 100 50 300 400
2 1000 50 3000 4000
Set value for an entire column
>>> df.iloc[:, 2] = 30
>>> df
a b c d
0 10 10 30 10
1 100 50 30 400
2 1000 50 30 4000
Set value for an entire list of columns
>>> df.iloc[:, [2, 3]] = 100
>>> df
a b c d
0 10 10 100 100
1 100 50 100 100
2 1000 50 100 100
Set value with Series
>>> df.iloc[:, 3] = df.iloc[:, 3] * 2
>>> df
a b c d
0 10 10 100 200
1 100 50 100 200
2 1000 50 100 200
"""
@staticmethod
def _NotImplemented(description: str) -> SparkPandasNotImplementedError:
return SparkPandasNotImplementedError(
description=description,
pandas_function=".iloc[..., ...]",
spark_target_function="select, where",
)
@lazy_property
def _internal(self) -> "InternalFrame":
# Use resolved_copy to fix the natural order.
internal = super()._internal.resolved_copy
sdf, force_nullable = InternalFrame.attach_distributed_sequence_column(
internal.spark_frame, column_name=self._sequence_col
)
return internal.with_new_sdf(
spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME),
index_fields=(
[field.copy(nullable=True) for field in internal.index_fields]
if force_nullable
else internal.index_fields
),
data_fields=(
[field.copy(nullable=True) for field in internal.data_fields]
if force_nullable
else internal.data_fields
),
)
@lazy_property
def _sequence_col(self) -> str:
# Use resolved_copy to fix the natural order.
internal = super()._internal.resolved_copy
return verify_temp_column_name(internal.spark_frame, "__distributed_sequence_column__")
def _select_rows_by_series(
self, rows_sel: "Series"
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
raise iLocIndexer._NotImplemented(
".iloc requires numeric slice, conditional "
"boolean Index or a sequence of positions as int, "
"got {}".format(type(rows_sel))
)
def _select_rows_by_spark_column(
self, rows_sel: Column
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
raise iLocIndexer._NotImplemented(
".iloc requires numeric slice, conditional "
"boolean Index or a sequence of positions as int, "
"got {}".format(type(rows_sel))
)
def _select_rows_by_slice(
self, rows_sel: slice
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
def verify_type(i: int) -> None:
if not isinstance(i, int):
raise TypeError(
"cannot do slice indexing with these indexers [{}] of {}".format(i, type(i))
)
has_negative = False
start = rows_sel.start
if start is not None:
verify_type(start)
if start == 0:
start = None
elif start < 0:
has_negative = True
stop = rows_sel.stop
if stop is not None:
verify_type(stop)
if stop < 0:
has_negative = True
step = rows_sel.step
if step is not None:
verify_type(step)
if step == 0:
raise ValueError("slice step cannot be zero")
else:
step = 1
if start is None and step == 1:
return None, stop, None
sdf = self._internal.spark_frame
sequence_scol = sdf[self._sequence_col]
if has_negative or (step < 0 and start is None):
cnt = sdf.count()
cond = []
if start is not None:
if start < 0:
start = start + cnt
if step >= 0:
cond.append(sequence_scol >= SF.lit(start).cast(LongType()))
else:
cond.append(sequence_scol <= SF.lit(start).cast(LongType()))
if stop is not None:
if stop < 0:
stop = stop + cnt
if step >= 0:
cond.append(sequence_scol < SF.lit(stop).cast(LongType()))
else:
cond.append(sequence_scol > SF.lit(stop).cast(LongType()))
if step != 1:
if step > 0:
start = start or 0
else:
start = start or (cnt - 1)
cond.append(((sequence_scol - start) % SF.lit(step).cast(LongType())) == SF.lit(0))
return reduce(lambda x, y: x & y, cond), None, None
def _select_rows_by_iterable(
self, rows_sel: Iterable
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
sdf = self._internal.spark_frame
if any(isinstance(key, (int, np.int64, np.int32)) and key < 0 for key in rows_sel):
offset = sdf.count()
else:
offset = 0
new_rows_sel = []
for key in list(rows_sel):
if not isinstance(key, (int, np.int64, np.int32)):
raise TypeError(
"cannot do positional indexing with these indexers [{}] of {}".format(
key, type(key)
)
)
if key < 0:
key = key + offset
new_rows_sel.append(key)
if len(new_rows_sel) != len(set(new_rows_sel)):
raise NotImplementedError(
"Duplicated row selection is not currently supported; "
"however, normalised index was [%s]" % new_rows_sel
)
sequence_scol = sdf[self._sequence_col]
cond = []
for key in new_rows_sel:
cond.append(sequence_scol == SF.lit(int(key)).cast(LongType()))
if len(cond) == 0:
cond = [SF.lit(False)]
return reduce(lambda x, y: x | y, cond), None, None
def _select_rows_else(
self, rows_sel: Any
) -> Tuple[Optional[Column], Optional[int], Optional[int]]:
if isinstance(rows_sel, int):
sdf = self._internal.spark_frame
return (sdf[self._sequence_col] == rows_sel), None, 0
elif isinstance(rows_sel, tuple):
raise SparkPandasIndexingError("Too many indexers")
else:
raise iLocIndexer._NotImplemented(
".iloc requires numeric slice, conditional "
"boolean Index or a sequence of positions as int, "
"got {}".format(type(rows_sel))
)
def _select_cols_by_series(
self, cols_sel: "Series", missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
raise ValueError(
"Location based indexing can only have [integer, integer slice, "
"listlike of integers, boolean array] types, got {}".format(cols_sel)
)
def _select_cols_by_spark_column(
self, cols_sel: Column, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
raise ValueError(
"Location based indexing can only have [integer, integer slice, "
"listlike of integers, boolean array] types, got {}".format(cols_sel)
)
def _select_cols_by_slice(
self, cols_sel: slice, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
if all(
s is None or isinstance(s, int) for s in (cols_sel.start, cols_sel.stop, cols_sel.step)
):
column_labels = self._internal.column_labels[cols_sel]
data_spark_columns = self._internal.data_spark_columns[cols_sel]
data_fields = self._internal.data_fields[cols_sel]
return column_labels, data_spark_columns, data_fields, False, None
else:
not_none = (
cols_sel.start
if cols_sel.start is not None
else cols_sel.stop
if cols_sel.stop is not None
else cols_sel.step
)
raise TypeError(
"cannot do slice indexing with these indexers {} of {}".format(
not_none, type(not_none)
)
)
def _select_cols_by_iterable(
self, cols_sel: Iterable, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
if all(isinstance(s, bool) for s in cols_sel):
cols_sel = [i for i, s in enumerate(cols_sel) if s]
if all(isinstance(s, int) for s in cols_sel):
column_labels = [self._internal.column_labels[s] for s in cols_sel]
data_spark_columns = [self._internal.data_spark_columns[s] for s in cols_sel]
data_fields = [self._internal.data_fields[s] for s in cols_sel]
return column_labels, data_spark_columns, data_fields, False, None
else:
raise TypeError("cannot perform reduce with flexible type")
def _select_cols_else(
self, cols_sel: Any, missing_keys: Optional[List[Name]]
) -> Tuple[
List[Label],
Optional[List[Column]],
Optional[List[InternalField]],
bool,
Optional[Name],
]:
if isinstance(cols_sel, int):
if cols_sel > len(self._internal.column_labels):
raise KeyError(cols_sel)
column_labels = [self._internal.column_labels[cols_sel]]
data_spark_columns = [self._internal.data_spark_columns[cols_sel]]
data_fields = [self._internal.data_fields[cols_sel]]
return column_labels, data_spark_columns, data_fields, True, None
else:
raise ValueError(
"Location based indexing can only have [integer, integer slice, "
"listlike of integers, boolean array] types, got {}".format(cols_sel)
)
def __setitem__(self, key: Any, value: Any) -> None:
if is_list_like(value) and not isinstance(value, Column):
iloc_item = self[key]
if not is_list_like(key) or not is_list_like(iloc_item):
raise ValueError("setting an array element with a sequence.")
else:
shape_iloc_item = iloc_item.shape
len_iloc_item = shape_iloc_item[0]
len_value = len(value)
if len_iloc_item != len_value:
if self._is_series:
raise ValueError(
"cannot set using a list-like indexer with a different length than "
"the value"
)
else:
raise ValueError(
"shape mismatch: value array of shape ({},) could not be broadcast "
"to indexing result of shape {}".format(len_value, shape_iloc_item)
)
super().__setitem__(key, value)
# Update again with resolved_copy to drop extra columns.
self._psdf._update_internal_frame(
self._psdf._internal.resolved_copy, requires_same_anchor=False
)
# Clean up implicitly cached properties to be able to reuse the indexer.
del self._internal
del self._sequence_col
def _test() -> None:
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.indexing
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.pandas.indexing.__dict__.copy()
globs["ps"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.indexing tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.indexing,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
_test()