| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import copy |
| import sys |
| import os |
| import operator |
| import shlex |
| import warnings |
| import heapq |
| import bisect |
| import random |
| from subprocess import Popen, PIPE |
| from threading import Thread |
| from collections import defaultdict |
| from itertools import chain |
| from functools import reduce |
| from math import sqrt, log, isinf, isnan, pow, ceil |
| |
| from pyspark.java_gateway import local_connect_and_auth |
| from pyspark.serializers import AutoBatchedSerializer, BatchedSerializer, NoOpSerializer, \ |
| CartesianDeserializer, CloudPickleSerializer, PairDeserializer, PickleSerializer, \ |
| pack_long, read_int, write_int |
| from pyspark.join import python_join, python_left_outer_join, \ |
| python_right_outer_join, python_full_outer_join, python_cogroup |
| from pyspark.statcounter import StatCounter |
| from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler |
| from pyspark.storagelevel import StorageLevel |
| from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests |
| from pyspark.resource.profile import ResourceProfile |
| from pyspark.resultiterable import ResultIterable |
| from pyspark.shuffle import Aggregator, ExternalMerger, \ |
| get_used_memory, ExternalSorter, ExternalGroupBy |
| from pyspark.traceback_utils import SCCallSiteSync |
| from pyspark.util import fail_on_stopiteration, _parse_memory |
| |
| __all__ = ["RDD"] |
| |
| |
| class PythonEvalType(object): |
| """ |
| Evaluation type of python rdd. |
| |
| These values are internal to PySpark. |
| |
| These values should match values in org.apache.spark.api.python.PythonEvalType. |
| """ |
| NON_UDF = 0 |
| |
| SQL_BATCHED_UDF = 100 |
| |
| SQL_SCALAR_PANDAS_UDF = 200 |
| SQL_GROUPED_MAP_PANDAS_UDF = 201 |
| SQL_GROUPED_AGG_PANDAS_UDF = 202 |
| SQL_WINDOW_AGG_PANDAS_UDF = 203 |
| SQL_SCALAR_PANDAS_ITER_UDF = 204 |
| SQL_MAP_PANDAS_ITER_UDF = 205 |
| SQL_COGROUPED_MAP_PANDAS_UDF = 206 |
| |
| |
| def portable_hash(x): |
| """ |
| This function returns consistent hash code for builtin types, especially |
| for None and tuple with None. |
| |
| The algorithm is similar to that one used by CPython 2.7 |
| |
| Examples |
| -------- |
| >>> portable_hash(None) |
| 0 |
| >>> portable_hash((None, 1)) & 0xffffffff |
| 219750521 |
| """ |
| |
| if 'PYTHONHASHSEED' not in os.environ: |
| raise RuntimeError("Randomness of hash of string should be disabled via PYTHONHASHSEED") |
| |
| if x is None: |
| return 0 |
| if isinstance(x, tuple): |
| h = 0x345678 |
| for i in x: |
| h ^= portable_hash(i) |
| h *= 1000003 |
| h &= sys.maxsize |
| h ^= len(x) |
| if h == -1: |
| h = -2 |
| return int(h) |
| return hash(x) |
| |
| |
| class BoundedFloat(float): |
| """ |
| Bounded value is generated by approximate job, with confidence and low |
| bound and high bound. |
| |
| Examples |
| -------- |
| >>> BoundedFloat(100.0, 0.95, 95.0, 105.0) |
| 100.0 |
| """ |
| def __new__(cls, mean, confidence, low, high): |
| obj = float.__new__(cls, mean) |
| obj.confidence = confidence |
| obj.low = low |
| obj.high = high |
| return obj |
| |
| |
| def _create_local_socket(sock_info): |
| """ |
| Create a local socket that can be used to load deserialized data from the JVM |
| |
| Parameters |
| ---------- |
| sock_info : tuple |
| Tuple containing port number and authentication secret for a local socket. |
| |
| Returns |
| ------- |
| sockfile file descriptor of the local socket |
| """ |
| port = sock_info[0] |
| auth_secret = sock_info[1] |
| sockfile, sock = local_connect_and_auth(port, auth_secret) |
| # The RDD materialization time is unpredictable, if we set a timeout for socket reading |
| # operation, it will very possibly fail. See SPARK-18281. |
| sock.settimeout(None) |
| return sockfile |
| |
| |
| def _load_from_socket(sock_info, serializer): |
| """ |
| Connect to a local socket described by sock_info and use the given serializer to yield data |
| |
| Parameters |
| ---------- |
| sock_info : tuple |
| Tuple containing port number and authentication secret for a local socket. |
| serializer : :py:class:`Serializer` |
| The PySpark serializer to use |
| |
| Returns |
| ------- |
| result of :py:meth:`Serializer.load_stream`, |
| usually a generator that yields deserialized data |
| """ |
| sockfile = _create_local_socket(sock_info) |
| # The socket will be automatically closed when garbage-collected. |
| return serializer.load_stream(sockfile) |
| |
| |
| def _local_iterator_from_socket(sock_info, serializer): |
| |
| class PyLocalIterable(object): |
| """ Create a synchronous local iterable over a socket """ |
| |
| def __init__(self, _sock_info, _serializer): |
| port, auth_secret, self.jsocket_auth_server = _sock_info |
| self._sockfile = _create_local_socket((port, auth_secret)) |
| self._serializer = _serializer |
| self._read_iter = iter([]) # Initialize as empty iterator |
| self._read_status = 1 |
| |
| def __iter__(self): |
| while self._read_status == 1: |
| # Request next partition data from Java |
| write_int(1, self._sockfile) |
| self._sockfile.flush() |
| |
| # If response is 1 then there is a partition to read, if 0 then fully consumed |
| self._read_status = read_int(self._sockfile) |
| if self._read_status == 1: |
| |
| # Load the partition data as a stream and read each item |
| self._read_iter = self._serializer.load_stream(self._sockfile) |
| for item in self._read_iter: |
| yield item |
| |
| # An error occurred, join serving thread and raise any exceptions from the JVM |
| elif self._read_status == -1: |
| self.jsocket_auth_server.getResult() |
| |
| def __del__(self): |
| # If local iterator is not fully consumed, |
| if self._read_status == 1: |
| try: |
| # Finish consuming partition data stream |
| for _ in self._read_iter: |
| pass |
| # Tell Java to stop sending data and close connection |
| write_int(0, self._sockfile) |
| self._sockfile.flush() |
| except Exception: |
| # Ignore any errors, socket is automatically closed when garbage-collected |
| pass |
| |
| return iter(PyLocalIterable(sock_info, serializer)) |
| |
| |
| class Partitioner(object): |
| def __init__(self, numPartitions, partitionFunc): |
| self.numPartitions = numPartitions |
| self.partitionFunc = partitionFunc |
| |
| def __eq__(self, other): |
| return (isinstance(other, Partitioner) and self.numPartitions == other.numPartitions |
| and self.partitionFunc == other.partitionFunc) |
| |
| def __call__(self, k): |
| return self.partitionFunc(k) % self.numPartitions |
| |
| |
| class RDD(object): |
| |
| """ |
| A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. |
| Represents an immutable, partitioned collection of elements that can be |
| operated on in parallel. |
| """ |
| |
| def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())): |
| self._jrdd = jrdd |
| self.is_cached = False |
| self.is_checkpointed = False |
| self.has_resource_profile = False |
| self.ctx = ctx |
| self._jrdd_deserializer = jrdd_deserializer |
| self._id = jrdd.id() |
| self.partitioner = None |
| |
| def _pickled(self): |
| return self._reserialize(AutoBatchedSerializer(PickleSerializer())) |
| |
| def id(self): |
| """ |
| A unique ID for this RDD (within its SparkContext). |
| """ |
| return self._id |
| |
| def __repr__(self): |
| return self._jrdd.toString() |
| |
| def __getnewargs__(self): |
| # This method is called when attempting to pickle an RDD, which is always an error: |
| raise RuntimeError( |
| "It appears that you are attempting to broadcast an RDD or reference an RDD from an " |
| "action or transformation. RDD transformations and actions can only be invoked by the " |
| "driver, not inside of other transformations; for example, " |
| "rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values " |
| "transformation and count action cannot be performed inside of the rdd1.map " |
| "transformation. For more information, see SPARK-5063." |
| ) |
| |
| @property |
| def context(self): |
| """ |
| The :class:`SparkContext` that this RDD was created on. |
| """ |
| return self.ctx |
| |
| def cache(self): |
| """ |
| Persist this RDD with the default storage level (`MEMORY_ONLY`). |
| """ |
| self.is_cached = True |
| self.persist(StorageLevel.MEMORY_ONLY) |
| return self |
| |
| def persist(self, storageLevel=StorageLevel.MEMORY_ONLY): |
| """ |
| Set this RDD's storage level to persist its values across operations |
| after the first time it is computed. This can only be used to assign |
| a new storage level if the RDD does not have a storage level set yet. |
| If no storage level is specified defaults to (`MEMORY_ONLY`). |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(["b", "a", "c"]) |
| >>> rdd.persist().is_cached |
| True |
| """ |
| self.is_cached = True |
| javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) |
| self._jrdd.persist(javaStorageLevel) |
| return self |
| |
| def unpersist(self, blocking=False): |
| """ |
| Mark the RDD as non-persistent, and remove all blocks for it from |
| memory and disk. |
| |
| .. versionchanged:: 3.0.0 |
| Added optional argument `blocking` to specify whether to block until all |
| blocks are deleted. |
| """ |
| self.is_cached = False |
| self._jrdd.unpersist(blocking) |
| return self |
| |
| def checkpoint(self): |
| """ |
| Mark this RDD for checkpointing. It will be saved to a file inside the |
| checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and |
| all references to its parent RDDs will be removed. This function must |
| be called before any job has been executed on this RDD. It is strongly |
| recommended that this RDD is persisted in memory, otherwise saving it |
| on a file will require recomputation. |
| """ |
| self.is_checkpointed = True |
| self._jrdd.rdd().checkpoint() |
| |
| def isCheckpointed(self): |
| """ |
| Return whether this RDD is checkpointed and materialized, either reliably or locally. |
| """ |
| return self._jrdd.rdd().isCheckpointed() |
| |
| def localCheckpoint(self): |
| """ |
| Mark this RDD for local checkpointing using Spark's existing caching layer. |
| |
| This method is for users who wish to truncate RDD lineages while skipping the expensive |
| step of replicating the materialized data in a reliable distributed file system. This is |
| useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX). |
| |
| Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed |
| data is written to ephemeral local storage in the executors instead of to a reliable, |
| fault-tolerant storage. The effect is that if an executor fails during the computation, |
| the checkpointed data may no longer be accessible, causing an irrecoverable job failure. |
| |
| This is NOT safe to use with dynamic allocation, which removes executors along |
| with their cached blocks. If you must use both features, you are advised to set |
| `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. |
| |
| The checkpoint directory set through :meth:`SparkContext.setCheckpointDir` is not used. |
| """ |
| self._jrdd.rdd().localCheckpoint() |
| |
| def isLocallyCheckpointed(self): |
| """ |
| Return whether this RDD is marked for local checkpointing. |
| |
| Exposed for testing. |
| """ |
| return self._jrdd.rdd().isLocallyCheckpointed() |
| |
| def getCheckpointFile(self): |
| """ |
| Gets the name of the file to which this RDD was checkpointed |
| |
| Not defined if RDD is checkpointed locally. |
| """ |
| checkpointFile = self._jrdd.rdd().getCheckpointFile() |
| if checkpointFile.isDefined(): |
| return checkpointFile.get() |
| |
| def map(self, f, preservesPartitioning=False): |
| """ |
| Return a new RDD by applying a function to each element of this RDD. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(["b", "a", "c"]) |
| >>> sorted(rdd.map(lambda x: (x, 1)).collect()) |
| [('a', 1), ('b', 1), ('c', 1)] |
| """ |
| def func(_, iterator): |
| return map(fail_on_stopiteration(f), iterator) |
| return self.mapPartitionsWithIndex(func, preservesPartitioning) |
| |
| def flatMap(self, f, preservesPartitioning=False): |
| """ |
| Return a new RDD by first applying a function to all elements of this |
| RDD, and then flattening the results. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([2, 3, 4]) |
| >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) |
| [1, 1, 1, 2, 2, 3] |
| >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) |
| [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] |
| """ |
| def func(s, iterator): |
| return chain.from_iterable(map(fail_on_stopiteration(f), iterator)) |
| return self.mapPartitionsWithIndex(func, preservesPartitioning) |
| |
| def mapPartitions(self, f, preservesPartitioning=False): |
| """ |
| Return a new RDD by applying a function to each partition of this RDD. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2, 3, 4], 2) |
| >>> def f(iterator): yield sum(iterator) |
| >>> rdd.mapPartitions(f).collect() |
| [3, 7] |
| """ |
| def func(s, iterator): |
| return f(iterator) |
| return self.mapPartitionsWithIndex(func, preservesPartitioning) |
| |
| def mapPartitionsWithIndex(self, f, preservesPartitioning=False): |
| """ |
| Return a new RDD by applying a function to each partition of this RDD, |
| while tracking the index of the original partition. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2, 3, 4], 4) |
| >>> def f(splitIndex, iterator): yield splitIndex |
| >>> rdd.mapPartitionsWithIndex(f).sum() |
| 6 |
| """ |
| return PipelinedRDD(self, f, preservesPartitioning) |
| |
| def mapPartitionsWithSplit(self, f, preservesPartitioning=False): |
| """ |
| |
| Return a new RDD by applying a function to each partition of this RDD, |
| while tracking the index of the original partition. |
| |
| .. deprecated:: 0.9.0 |
| use :py:meth:`RDD.mapPartitionsWithIndex` instead. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2, 3, 4], 4) |
| >>> def f(splitIndex, iterator): yield splitIndex |
| >>> rdd.mapPartitionsWithSplit(f).sum() |
| 6 |
| """ |
| warnings.warn( |
| "mapPartitionsWithSplit is deprecated; use mapPartitionsWithIndex instead", |
| FutureWarning, stacklevel=2 |
| ) |
| return self.mapPartitionsWithIndex(f, preservesPartitioning) |
| |
| def getNumPartitions(self): |
| """ |
| Returns the number of partitions in RDD |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2, 3, 4], 2) |
| >>> rdd.getNumPartitions() |
| 2 |
| """ |
| return self._jrdd.partitions().size() |
| |
| def filter(self, f): |
| """ |
| Return a new RDD containing only the elements that satisfy a predicate. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) |
| >>> rdd.filter(lambda x: x % 2 == 0).collect() |
| [2, 4] |
| """ |
| def func(iterator): |
| return filter(fail_on_stopiteration(f), iterator) |
| return self.mapPartitions(func, True) |
| |
| def distinct(self, numPartitions=None): |
| """ |
| Return a new RDD containing the distinct elements in this RDD. |
| |
| Examples |
| -------- |
| >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) |
| [1, 2, 3] |
| """ |
| return self.map(lambda x: (x, None)) \ |
| .reduceByKey(lambda x, _: x, numPartitions) \ |
| .map(lambda x: x[0]) |
| |
| def sample(self, withReplacement, fraction, seed=None): |
| """ |
| Return a sampled subset of this RDD. |
| |
| Parameters |
| ---------- |
| withReplacement : bool |
| can elements be sampled multiple times (replaced when sampled out) |
| fraction : float |
| expected size of the sample as a fraction of this RDD's size |
| without replacement: probability that each element is chosen; fraction must be [0, 1] |
| with replacement: expected number of times each element is chosen; fraction must be >= 0 |
| seed : int, optional |
| seed for the random number generator |
| |
| Notes |
| ----- |
| This is not guaranteed to provide exactly the fraction specified of the total |
| count of the given :class:`DataFrame`. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(100), 4) |
| >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14 |
| True |
| """ |
| assert fraction >= 0.0, "Negative fraction value: %s" % fraction |
| return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) |
| |
| def randomSplit(self, weights, seed=None): |
| """ |
| Randomly splits this RDD with the provided weights. |
| |
| weights : list |
| weights for splits, will be normalized if they don't sum to 1 |
| seed : int, optional |
| random seed |
| |
| Returns |
| ------- |
| list |
| split RDDs in a list |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(500), 1) |
| >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17) |
| >>> len(rdd1.collect() + rdd2.collect()) |
| 500 |
| >>> 150 < rdd1.count() < 250 |
| True |
| >>> 250 < rdd2.count() < 350 |
| True |
| """ |
| s = float(sum(weights)) |
| cweights = [0.0] |
| for w in weights: |
| cweights.append(cweights[-1] + w / s) |
| if seed is None: |
| seed = random.randint(0, 2 ** 32 - 1) |
| return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True) |
| for lb, ub in zip(cweights, cweights[1:])] |
| |
| # this is ported from scala/spark/RDD.scala |
| def takeSample(self, withReplacement, num, seed=None): |
| """ |
| Return a fixed-size sampled subset of this RDD. |
| |
| Notes |
| ----- |
| This method should only be used if the resulting array is expected |
| to be small, as all the data is loaded into the driver's memory. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(0, 10)) |
| >>> len(rdd.takeSample(True, 20, 1)) |
| 20 |
| >>> len(rdd.takeSample(False, 5, 2)) |
| 5 |
| >>> len(rdd.takeSample(False, 15, 3)) |
| 10 |
| """ |
| numStDev = 10.0 |
| |
| if num < 0: |
| raise ValueError("Sample size cannot be negative.") |
| elif num == 0: |
| return [] |
| |
| initialCount = self.count() |
| if initialCount == 0: |
| return [] |
| |
| rand = random.Random(seed) |
| |
| if (not withReplacement) and num >= initialCount: |
| # shuffle current RDD and return |
| samples = self.collect() |
| rand.shuffle(samples) |
| return samples |
| |
| maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize)) |
| if num > maxSampleSize: |
| raise ValueError( |
| "Sample size cannot be greater than %d." % maxSampleSize) |
| |
| fraction = RDD._computeFractionForSampleSize( |
| num, initialCount, withReplacement) |
| samples = self.sample(withReplacement, fraction, seed).collect() |
| |
| # If the first sample didn't turn out large enough, keep trying to take samples; |
| # this shouldn't happen often because we use a big multiplier for their initial size. |
| # See: scala/spark/RDD.scala |
| while len(samples) < num: |
| # TODO: add log warning for when more than one iteration was run |
| seed = rand.randint(0, sys.maxsize) |
| samples = self.sample(withReplacement, fraction, seed).collect() |
| |
| rand.shuffle(samples) |
| |
| return samples[0:num] |
| |
| @staticmethod |
| def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement): |
| """ |
| Returns a sampling rate that guarantees a sample of |
| size >= sampleSizeLowerBound 99.99% of the time. |
| |
| How the sampling rate is determined: |
| Let p = num / total, where num is the sample size and total is the |
| total number of data points in the RDD. We're trying to compute |
| q > p such that |
| - when sampling with replacement, we're drawing each data point |
| with prob_i ~ Pois(q), where we want to guarantee |
| Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to |
| total), i.e. the failure rate of not having a sufficiently large |
| sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient |
| to guarantee 0.9999 success rate for num > 12, but we need a |
| slightly larger q (9 empirically determined). |
| - when sampling without replacement, we're drawing each data point |
| with prob_i ~ Binomial(total, fraction) and our choice of q |
| guarantees 1-delta, or 0.9999 success rate, where success rate is |
| defined the same as in sampling with replacement. |
| """ |
| fraction = float(sampleSizeLowerBound) / total |
| if withReplacement: |
| numStDev = 5 |
| if (sampleSizeLowerBound < 12): |
| numStDev = 9 |
| return fraction + numStDev * sqrt(fraction / total) |
| else: |
| delta = 0.00005 |
| gamma = - log(delta) / total |
| return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction)) |
| |
| def union(self, other): |
| """ |
| Return the union of this RDD and another one. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 1, 2, 3]) |
| >>> rdd.union(rdd).collect() |
| [1, 1, 2, 3, 1, 1, 2, 3] |
| """ |
| if self._jrdd_deserializer == other._jrdd_deserializer: |
| rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, |
| self._jrdd_deserializer) |
| else: |
| # These RDDs contain data in different serialized formats, so we |
| # must normalize them to the default serializer. |
| self_copy = self._reserialize() |
| other_copy = other._reserialize() |
| rdd = RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, |
| self.ctx.serializer) |
| if (self.partitioner == other.partitioner and |
| self.getNumPartitions() == rdd.getNumPartitions()): |
| rdd.partitioner = self.partitioner |
| return rdd |
| |
| def intersection(self, other): |
| """ |
| Return the intersection of this RDD and another one. The output will |
| not contain any duplicate elements, even if the input RDDs did. |
| |
| Notes |
| ----- |
| This method performs a shuffle internally. |
| |
| Examples |
| -------- |
| >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) |
| >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) |
| >>> rdd1.intersection(rdd2).collect() |
| [1, 2, 3] |
| """ |
| return self.map(lambda v: (v, None)) \ |
| .cogroup(other.map(lambda v: (v, None))) \ |
| .filter(lambda k_vs: all(k_vs[1])) \ |
| .keys() |
| |
| def _reserialize(self, serializer=None): |
| serializer = serializer or self.ctx.serializer |
| if self._jrdd_deserializer != serializer: |
| self = self.map(lambda x: x, preservesPartitioning=True) |
| self._jrdd_deserializer = serializer |
| return self |
| |
| def __add__(self, other): |
| """ |
| Return the union of this RDD and another one. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 1, 2, 3]) |
| >>> (rdd + rdd).collect() |
| [1, 1, 2, 3, 1, 1, 2, 3] |
| """ |
| if not isinstance(other, RDD): |
| raise TypeError |
| return self.union(other) |
| |
| def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash, |
| ascending=True, keyfunc=lambda x: x): |
| """ |
| Repartition the RDD according to the given partitioner and, within each resulting partition, |
| sort records by their keys. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) |
| >>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True) |
| >>> rdd2.glom().collect() |
| [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]] |
| """ |
| if numPartitions is None: |
| numPartitions = self._defaultReducePartitions() |
| |
| memory = self._memory_limit() |
| serializer = self._jrdd_deserializer |
| |
| def sortPartition(iterator): |
| sort = ExternalSorter(memory * 0.9, serializer).sorted |
| return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending))) |
| |
| return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True) |
| |
| def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): |
| """ |
| Sorts this RDD, which is assumed to consist of (key, value) pairs. |
| |
| Examples |
| -------- |
| >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] |
| >>> sc.parallelize(tmp).sortByKey().first() |
| ('1', 3) |
| >>> sc.parallelize(tmp).sortByKey(True, 1).collect() |
| [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] |
| >>> sc.parallelize(tmp).sortByKey(True, 2).collect() |
| [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] |
| >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] |
| >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) |
| >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() |
| [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)] |
| """ |
| if numPartitions is None: |
| numPartitions = self._defaultReducePartitions() |
| |
| memory = self._memory_limit() |
| serializer = self._jrdd_deserializer |
| |
| def sortPartition(iterator): |
| sort = ExternalSorter(memory * 0.9, serializer).sorted |
| return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending))) |
| |
| if numPartitions == 1: |
| if self.getNumPartitions() > 1: |
| self = self.coalesce(1) |
| return self.mapPartitions(sortPartition, True) |
| |
| # first compute the boundary of each part via sampling: we want to partition |
| # the key-space into bins such that the bins have roughly the same |
| # number of (key, value) pairs falling into them |
| rddSize = self.count() |
| if not rddSize: |
| return self # empty RDD |
| maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner |
| fraction = min(maxSampleSize / max(rddSize, 1), 1.0) |
| samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() |
| samples = sorted(samples, key=keyfunc) |
| |
| # we have numPartitions many parts but one of the them has |
| # an implicit boundary |
| bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] |
| for i in range(0, numPartitions - 1)] |
| |
| def rangePartitioner(k): |
| p = bisect.bisect_left(bounds, keyfunc(k)) |
| if ascending: |
| return p |
| else: |
| return numPartitions - 1 - p |
| |
| return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) |
| |
| def sortBy(self, keyfunc, ascending=True, numPartitions=None): |
| """ |
| Sorts this RDD by the given keyfunc |
| |
| Examples |
| -------- |
| >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] |
| >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() |
| [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] |
| >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() |
| [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] |
| """ |
| return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values() |
| |
| def glom(self): |
| """ |
| Return an RDD created by coalescing all elements within each partition |
| into a list. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2, 3, 4], 2) |
| >>> sorted(rdd.glom().collect()) |
| [[1, 2], [3, 4]] |
| """ |
| def func(iterator): |
| yield list(iterator) |
| return self.mapPartitions(func) |
| |
| def cartesian(self, other): |
| """ |
| Return the Cartesian product of this RDD and another one, that is, the |
| RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and |
| ``b`` is in `other`. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 2]) |
| >>> sorted(rdd.cartesian(rdd).collect()) |
| [(1, 1), (1, 2), (2, 1), (2, 2)] |
| """ |
| # Due to batching, we can't use the Java cartesian method. |
| deserializer = CartesianDeserializer(self._jrdd_deserializer, |
| other._jrdd_deserializer) |
| return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) |
| |
| def groupBy(self, f, numPartitions=None, partitionFunc=portable_hash): |
| """ |
| Return an RDD of grouped items. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) |
| >>> result = rdd.groupBy(lambda x: x % 2).collect() |
| >>> sorted([(x, sorted(y)) for (x, y) in result]) |
| [(0, [2, 8]), (1, [1, 1, 3, 5])] |
| """ |
| return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc) |
| |
| def pipe(self, command, env=None, checkCode=False): |
| """ |
| Return an RDD created by piping elements to a forked external process. |
| |
| Parameters |
| ---------- |
| command : str |
| command to run. |
| env : dict, optional |
| environment variables to set. |
| checkCode : bool, optional |
| whether or not to check the return value of the shell command. |
| |
| Examples |
| -------- |
| >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() |
| ['1', '2', '', '3'] |
| """ |
| if env is None: |
| env = dict() |
| |
| def func(iterator): |
| pipe = Popen( |
| shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) |
| |
| def pipe_objs(out): |
| for obj in iterator: |
| s = str(obj).rstrip('\n') + '\n' |
| out.write(s.encode('utf-8')) |
| out.close() |
| Thread(target=pipe_objs, args=[pipe.stdin]).start() |
| |
| def check_return_code(): |
| pipe.wait() |
| if checkCode and pipe.returncode: |
| raise RuntimeError("Pipe function `%s' exited " |
| "with error code %d" % (command, pipe.returncode)) |
| else: |
| for i in range(0): |
| yield i |
| return (x.rstrip(b'\n').decode('utf-8') for x in |
| chain(iter(pipe.stdout.readline, b''), check_return_code())) |
| return self.mapPartitions(func) |
| |
| def foreach(self, f): |
| """ |
| Applies a function to all elements of this RDD. |
| |
| Examples |
| -------- |
| >>> def f(x): print(x) |
| >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) |
| """ |
| f = fail_on_stopiteration(f) |
| |
| def processPartition(iterator): |
| for x in iterator: |
| f(x) |
| return iter([]) |
| self.mapPartitions(processPartition).count() # Force evaluation |
| |
| def foreachPartition(self, f): |
| """ |
| Applies a function to each partition of this RDD. |
| |
| Examples |
| -------- |
| >>> def f(iterator): |
| ... for x in iterator: |
| ... print(x) |
| >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) |
| """ |
| def func(it): |
| r = f(it) |
| try: |
| return iter(r) |
| except TypeError: |
| return iter([]) |
| self.mapPartitions(func).count() # Force evaluation |
| |
| def collect(self): |
| """ |
| Return a list that contains all of the elements in this RDD. |
| |
| Notes |
| ----- |
| This method should only be used if the resulting array is expected |
| to be small, as all the data is loaded into the driver's memory. |
| """ |
| with SCCallSiteSync(self.context) as css: |
| sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) |
| return list(_load_from_socket(sock_info, self._jrdd_deserializer)) |
| |
| def collectWithJobGroup(self, groupId, description, interruptOnCancel=False): |
| """ |
| When collect rdd, use this method to specify job group. |
| |
| .. versionadded:: 3.0.0 |
| .. deprecated:: 3.1.0 |
| Use :class:`pyspark.InheritableThread` with the pinned thread mode enabled. |
| """ |
| warnings.warn( |
| "Deprecated in 3.1, Use pyspark.InheritableThread with " |
| "the pinned thread mode enabled.", |
| FutureWarning |
| ) |
| |
| with SCCallSiteSync(self.context) as css: |
| sock_info = self.ctx._jvm.PythonRDD.collectAndServeWithJobGroup( |
| self._jrdd.rdd(), groupId, description, interruptOnCancel) |
| return list(_load_from_socket(sock_info, self._jrdd_deserializer)) |
| |
| def reduce(self, f): |
| """ |
| Reduces the elements of this RDD using the specified commutative and |
| associative binary operator. Currently reduces partitions locally. |
| |
| Examples |
| -------- |
| >>> from operator import add |
| >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) |
| 15 |
| >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) |
| 10 |
| >>> sc.parallelize([]).reduce(add) |
| Traceback (most recent call last): |
| ... |
| ValueError: Can not reduce() empty RDD |
| """ |
| f = fail_on_stopiteration(f) |
| |
| def func(iterator): |
| iterator = iter(iterator) |
| try: |
| initial = next(iterator) |
| except StopIteration: |
| return |
| yield reduce(f, iterator, initial) |
| |
| vals = self.mapPartitions(func).collect() |
| if vals: |
| return reduce(f, vals) |
| raise ValueError("Can not reduce() empty RDD") |
| |
| def treeReduce(self, f, depth=2): |
| """ |
| Reduces the elements of this RDD in a multi-level tree pattern. |
| |
| Parameters |
| ---------- |
| f : function |
| depth : int, optional |
| suggested depth of the tree (default: 2) |
| |
| Examples |
| -------- |
| >>> add = lambda x, y: x + y |
| >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10) |
| >>> rdd.treeReduce(add) |
| -5 |
| >>> rdd.treeReduce(add, 1) |
| -5 |
| >>> rdd.treeReduce(add, 2) |
| -5 |
| >>> rdd.treeReduce(add, 5) |
| -5 |
| >>> rdd.treeReduce(add, 10) |
| -5 |
| """ |
| if depth < 1: |
| raise ValueError("Depth cannot be smaller than 1 but got %d." % depth) |
| |
| zeroValue = None, True # Use the second entry to indicate whether this is a dummy value. |
| |
| def op(x, y): |
| if x[1]: |
| return y |
| elif y[1]: |
| return x |
| else: |
| return f(x[0], y[0]), False |
| |
| reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth) |
| if reduced[1]: |
| raise ValueError("Cannot reduce empty RDD.") |
| return reduced[0] |
| |
| def fold(self, zeroValue, op): |
| """ |
| Aggregate the elements of each partition, and then the results for all |
| the partitions, using a given associative function and a neutral "zero value." |
| |
| The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it |
| as its result value to avoid object allocation; however, it should not |
| modify ``t2``. |
| |
| This behaves somewhat differently from fold operations implemented |
| for non-distributed collections in functional languages like Scala. |
| This fold operation may be applied to partitions individually, and then |
| fold those results into the final result, rather than apply the fold |
| to each element sequentially in some defined ordering. For functions |
| that are not commutative, the result may differ from that of a fold |
| applied to a non-distributed collection. |
| |
| Examples |
| -------- |
| >>> from operator import add |
| >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) |
| 15 |
| """ |
| op = fail_on_stopiteration(op) |
| |
| def func(iterator): |
| acc = zeroValue |
| for obj in iterator: |
| acc = op(acc, obj) |
| yield acc |
| # collecting result of mapPartitions here ensures that the copy of |
| # zeroValue provided to each partition is unique from the one provided |
| # to the final reduce call |
| vals = self.mapPartitions(func).collect() |
| return reduce(op, vals, zeroValue) |
| |
| def aggregate(self, zeroValue, seqOp, combOp): |
| """ |
| Aggregate the elements of each partition, and then the results for all |
| the partitions, using a given combine functions and a neutral "zero |
| value." |
| |
| The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it |
| as its result value to avoid object allocation; however, it should not |
| modify ``t2``. |
| |
| The first function (seqOp) can return a different result type, U, than |
| the type of this RDD. Thus, we need one operation for merging a T into |
| an U and one operation for merging two U |
| |
| Examples |
| -------- |
| >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) |
| >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) |
| >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) |
| (10, 4) |
| >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) |
| (0, 0) |
| """ |
| seqOp = fail_on_stopiteration(seqOp) |
| combOp = fail_on_stopiteration(combOp) |
| |
| def func(iterator): |
| acc = zeroValue |
| for obj in iterator: |
| acc = seqOp(acc, obj) |
| yield acc |
| # collecting result of mapPartitions here ensures that the copy of |
| # zeroValue provided to each partition is unique from the one provided |
| # to the final reduce call |
| vals = self.mapPartitions(func).collect() |
| return reduce(combOp, vals, zeroValue) |
| |
| def treeAggregate(self, zeroValue, seqOp, combOp, depth=2): |
| """ |
| Aggregates the elements of this RDD in a multi-level tree |
| pattern. |
| |
| depth : int, optional |
| suggested depth of the tree (default: 2) |
| |
| Examples |
| -------- |
| >>> add = lambda x, y: x + y |
| >>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10) |
| >>> rdd.treeAggregate(0, add, add) |
| -5 |
| >>> rdd.treeAggregate(0, add, add, 1) |
| -5 |
| >>> rdd.treeAggregate(0, add, add, 2) |
| -5 |
| >>> rdd.treeAggregate(0, add, add, 5) |
| -5 |
| >>> rdd.treeAggregate(0, add, add, 10) |
| -5 |
| """ |
| if depth < 1: |
| raise ValueError("Depth cannot be smaller than 1 but got %d." % depth) |
| |
| if self.getNumPartitions() == 0: |
| return zeroValue |
| |
| def aggregatePartition(iterator): |
| acc = zeroValue |
| for obj in iterator: |
| acc = seqOp(acc, obj) |
| yield acc |
| |
| partiallyAggregated = self.mapPartitions(aggregatePartition) |
| numPartitions = partiallyAggregated.getNumPartitions() |
| scale = max(int(ceil(pow(numPartitions, 1.0 / depth))), 2) |
| # If creating an extra level doesn't help reduce the wall-clock time, we stop the tree |
| # aggregation. |
| while numPartitions > scale + numPartitions / scale: |
| numPartitions /= scale |
| curNumPartitions = int(numPartitions) |
| |
| def mapPartition(i, iterator): |
| for obj in iterator: |
| yield (i % curNumPartitions, obj) |
| |
| partiallyAggregated = partiallyAggregated \ |
| .mapPartitionsWithIndex(mapPartition) \ |
| .reduceByKey(combOp, curNumPartitions) \ |
| .values() |
| |
| return partiallyAggregated.reduce(combOp) |
| |
| def max(self, key=None): |
| """ |
| Find the maximum item in this RDD. |
| |
| Parameters |
| ---------- |
| key : function, optional |
| A function used to generate key for comparing |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0]) |
| >>> rdd.max() |
| 43.0 |
| >>> rdd.max(key=str) |
| 5.0 |
| """ |
| if key is None: |
| return self.reduce(max) |
| return self.reduce(lambda a, b: max(a, b, key=key)) |
| |
| def min(self, key=None): |
| """ |
| Find the minimum item in this RDD. |
| |
| Parameters |
| ---------- |
| key : function, optional |
| A function used to generate key for comparing |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0]) |
| >>> rdd.min() |
| 2.0 |
| >>> rdd.min(key=str) |
| 10.0 |
| """ |
| if key is None: |
| return self.reduce(min) |
| return self.reduce(lambda a, b: min(a, b, key=key)) |
| |
| def sum(self): |
| """ |
| Add up the elements in this RDD. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1.0, 2.0, 3.0]).sum() |
| 6.0 |
| """ |
| return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) |
| |
| def count(self): |
| """ |
| Return the number of elements in this RDD. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([2, 3, 4]).count() |
| 3 |
| """ |
| return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() |
| |
| def stats(self): |
| """ |
| Return a :class:`StatCounter` object that captures the mean, variance |
| and count of the RDD's elements in one operation. |
| """ |
| def redFunc(left_counter, right_counter): |
| return left_counter.mergeStats(right_counter) |
| |
| return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) |
| |
| def histogram(self, buckets): |
| """ |
| Compute a histogram using the provided buckets. The buckets |
| are all open to the right except for the last which is closed. |
| e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], |
| which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 |
| and 50 we would have a histogram of 1,0,1. |
| |
| If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), |
| this can be switched from an O(log n) insertion to O(1) per |
| element (where n is the number of buckets). |
| |
| Buckets must be sorted, not contain any duplicates, and have |
| at least two elements. |
| |
| If `buckets` is a number, it will generate buckets which are |
| evenly spaced between the minimum and maximum of the RDD. For |
| example, if the min value is 0 and the max is 100, given `buckets` |
| as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must |
| be at least 1. An exception is raised if the RDD contains infinity. |
| If the elements in the RDD do not vary (max == min), a single bucket |
| will be used. |
| |
| The return value is a tuple of buckets and histogram. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(51)) |
| >>> rdd.histogram(2) |
| ([0, 25, 50], [25, 26]) |
| >>> rdd.histogram([0, 5, 25, 50]) |
| ([0, 5, 25, 50], [5, 20, 26]) |
| >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets |
| ([0, 15, 30, 45, 60], [15, 15, 15, 6]) |
| >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) |
| >>> rdd.histogram(("a", "b", "c")) |
| (('a', 'b', 'c'), [2, 2]) |
| """ |
| |
| if isinstance(buckets, int): |
| if buckets < 1: |
| raise ValueError("number of buckets must be >= 1") |
| |
| # filter out non-comparable elements |
| def comparable(x): |
| if x is None: |
| return False |
| if type(x) is float and isnan(x): |
| return False |
| return True |
| |
| filtered = self.filter(comparable) |
| |
| # faster than stats() |
| def minmax(a, b): |
| return min(a[0], b[0]), max(a[1], b[1]) |
| try: |
| minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) |
| except TypeError as e: |
| if " empty " in str(e): |
| raise ValueError("can not generate buckets from empty RDD") |
| raise |
| |
| if minv == maxv or buckets == 1: |
| return [minv, maxv], [filtered.count()] |
| |
| try: |
| inc = (maxv - minv) / buckets |
| except TypeError: |
| raise TypeError("Can not generate buckets with non-number in RDD") |
| |
| if isinf(inc): |
| raise ValueError("Can not generate buckets with infinite value") |
| |
| # keep them as integer if possible |
| inc = int(inc) |
| if inc * buckets != maxv - minv: |
| inc = (maxv - minv) * 1.0 / buckets |
| |
| buckets = [i * inc + minv for i in range(buckets)] |
| buckets.append(maxv) # fix accumulated error |
| even = True |
| |
| elif isinstance(buckets, (list, tuple)): |
| if len(buckets) < 2: |
| raise ValueError("buckets should have more than one value") |
| |
| if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): |
| raise ValueError("can not have None or NaN in buckets") |
| |
| if sorted(buckets) != list(buckets): |
| raise ValueError("buckets should be sorted") |
| |
| if len(set(buckets)) != len(buckets): |
| raise ValueError("buckets should not contain duplicated values") |
| |
| minv = buckets[0] |
| maxv = buckets[-1] |
| even = False |
| inc = None |
| try: |
| steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] |
| except TypeError: |
| pass # objects in buckets do not support '-' |
| else: |
| if max(steps) - min(steps) < 1e-10: # handle precision errors |
| even = True |
| inc = (maxv - minv) / (len(buckets) - 1) |
| |
| else: |
| raise TypeError("buckets should be a list or tuple or number(int or long)") |
| |
| def histogram(iterator): |
| counters = [0] * len(buckets) |
| for i in iterator: |
| if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: |
| continue |
| t = (int((i - minv) / inc) if even |
| else bisect.bisect_right(buckets, i) - 1) |
| counters[t] += 1 |
| # add last two together |
| last = counters.pop() |
| counters[-1] += last |
| return [counters] |
| |
| def mergeCounters(a, b): |
| return [i + j for i, j in zip(a, b)] |
| |
| return buckets, self.mapPartitions(histogram).reduce(mergeCounters) |
| |
| def mean(self): |
| """ |
| Compute the mean of this RDD's elements. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1, 2, 3]).mean() |
| 2.0 |
| """ |
| return self.stats().mean() |
| |
| def variance(self): |
| """ |
| Compute the variance of this RDD's elements. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1, 2, 3]).variance() |
| 0.666... |
| """ |
| return self.stats().variance() |
| |
| def stdev(self): |
| """ |
| Compute the standard deviation of this RDD's elements. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1, 2, 3]).stdev() |
| 0.816... |
| """ |
| return self.stats().stdev() |
| |
| def sampleStdev(self): |
| """ |
| Compute the sample standard deviation of this RDD's elements (which |
| corrects for bias in estimating the standard deviation by dividing by |
| N-1 instead of N). |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1, 2, 3]).sampleStdev() |
| 1.0 |
| """ |
| return self.stats().sampleStdev() |
| |
| def sampleVariance(self): |
| """ |
| Compute the sample variance of this RDD's elements (which corrects |
| for bias in estimating the variance by dividing by N-1 instead of N). |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1, 2, 3]).sampleVariance() |
| 1.0 |
| """ |
| return self.stats().sampleVariance() |
| |
| def countByValue(self): |
| """ |
| Return the count of each unique value in this RDD as a dictionary of |
| (value, count) pairs. |
| |
| Examples |
| -------- |
| >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) |
| [(1, 2), (2, 3)] |
| """ |
| def countPartition(iterator): |
| counts = defaultdict(int) |
| for obj in iterator: |
| counts[obj] += 1 |
| yield counts |
| |
| def mergeMaps(m1, m2): |
| for k, v in m2.items(): |
| m1[k] += v |
| return m1 |
| return self.mapPartitions(countPartition).reduce(mergeMaps) |
| |
| def top(self, num, key=None): |
| """ |
| Get the top N elements from an RDD. |
| |
| Notes |
| ----- |
| This method should only be used if the resulting array is expected |
| to be small, as all the data is loaded into the driver's memory. |
| |
| It returns the list sorted in descending order. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) |
| [12] |
| >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) |
| [6, 5] |
| >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str) |
| [4, 3, 2] |
| """ |
| def topIterator(iterator): |
| yield heapq.nlargest(num, iterator, key=key) |
| |
| def merge(a, b): |
| return heapq.nlargest(num, a + b, key=key) |
| |
| return self.mapPartitions(topIterator).reduce(merge) |
| |
| def takeOrdered(self, num, key=None): |
| """ |
| Get the N elements from an RDD ordered in ascending order or as |
| specified by the optional key function. |
| |
| Notes |
| ----- |
| This method should only be used if the resulting array is expected |
| to be small, as all the data is loaded into the driver's memory. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) |
| [1, 2, 3, 4, 5, 6] |
| >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) |
| [10, 9, 7, 6, 5, 4] |
| """ |
| |
| def merge(a, b): |
| return heapq.nsmallest(num, a + b, key) |
| |
| return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge) |
| |
| def take(self, num): |
| """ |
| Take the first num elements of the RDD. |
| |
| It works by first scanning one partition, and use the results from |
| that partition to estimate the number of additional partitions needed |
| to satisfy the limit. |
| |
| Translated from the Scala implementation in RDD#take(). |
| |
| Notes |
| ----- |
| This method should only be used if the resulting array is expected |
| to be small, as all the data is loaded into the driver's memory. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) |
| [2, 3] |
| >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) |
| [2, 3, 4, 5, 6] |
| >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) |
| [91, 92, 93] |
| """ |
| items = [] |
| totalParts = self.getNumPartitions() |
| partsScanned = 0 |
| |
| while len(items) < num and partsScanned < totalParts: |
| # The number of partitions to try in this iteration. |
| # It is ok for this number to be greater than totalParts because |
| # we actually cap it at totalParts in runJob. |
| numPartsToTry = 1 |
| if partsScanned > 0: |
| # If we didn't find any rows after the previous iteration, |
| # quadruple and retry. Otherwise, interpolate the number of |
| # partitions we need to try, but overestimate it by 50%. |
| # We also cap the estimation in the end. |
| if len(items) == 0: |
| numPartsToTry = partsScanned * 4 |
| else: |
| # the first parameter of max is >=1 whenever partsScanned >= 2 |
| numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned |
| numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4) |
| |
| left = num - len(items) |
| |
| def takeUpToNumLeft(iterator): |
| iterator = iter(iterator) |
| taken = 0 |
| while taken < left: |
| try: |
| yield next(iterator) |
| except StopIteration: |
| return |
| taken += 1 |
| |
| p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) |
| res = self.context.runJob(self, takeUpToNumLeft, p) |
| |
| items += res |
| partsScanned += numPartsToTry |
| |
| return items[:num] |
| |
| def first(self): |
| """ |
| Return the first element in this RDD. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([2, 3, 4]).first() |
| 2 |
| >>> sc.parallelize([]).first() |
| Traceback (most recent call last): |
| ... |
| ValueError: RDD is empty |
| """ |
| rs = self.take(1) |
| if rs: |
| return rs[0] |
| raise ValueError("RDD is empty") |
| |
| def isEmpty(self): |
| """ |
| Returns true if and only if the RDD contains no elements at all. |
| |
| Notes |
| ----- |
| An RDD may be empty even when it has at least 1 partition. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([]).isEmpty() |
| True |
| >>> sc.parallelize([1]).isEmpty() |
| False |
| """ |
| return self.getNumPartitions() == 0 or len(self.take(1)) == 0 |
| |
| def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): |
| """ |
| Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file |
| system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are |
| converted for output using either user specified converters or, by default, |
| "org.apache.spark.api.python.JavaToWritableConverter". |
| |
| Parameters |
| ---------- |
| conf : dict |
| Hadoop job configuration |
| keyConverter : str, optional |
| fully qualified classname of key converter (None by default) |
| valueConverter : str, optional |
| fully qualified classname of value converter (None by default) |
| """ |
| jconf = self.ctx._dictToJavaMap(conf) |
| pickledRDD = self._pickled() |
| self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf, |
| keyConverter, valueConverter, True) |
| |
| def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, |
| keyConverter=None, valueConverter=None, conf=None): |
| """ |
| Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file |
| system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types |
| will be inferred if not specified. Keys and values are converted for output using either |
| user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The |
| `conf` is applied on top of the base Hadoop conf associated with the SparkContext |
| of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. |
| |
| path : str |
| path to Hadoop file |
| outputFormatClass : str |
| fully qualified classname of Hadoop OutputFormat |
| (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") |
| keyClass : str, optional |
| fully qualified classname of key Writable class |
| (e.g. "org.apache.hadoop.io.IntWritable", None by default) |
| valueClass : str, optional |
| fully qualified classname of value Writable class |
| (e.g. "org.apache.hadoop.io.Text", None by default) |
| keyConverter : str, optional |
| fully qualified classname of key converter (None by default) |
| valueConverter : str, optional |
| fully qualified classname of value converter (None by default) |
| conf : dict, optional |
| Hadoop job configuration (None by default) |
| """ |
| jconf = self.ctx._dictToJavaMap(conf) |
| pickledRDD = self._pickled() |
| self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path, |
| outputFormatClass, |
| keyClass, valueClass, |
| keyConverter, valueConverter, jconf) |
| |
| def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): |
| """ |
| Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file |
| system, using the old Hadoop OutputFormat API (mapred package). Keys/values are |
| converted for output using either user specified converters or, by default, |
| "org.apache.spark.api.python.JavaToWritableConverter". |
| |
| Parameters |
| ---------- |
| conf : dict |
| Hadoop job configuration |
| keyConverter : str, optional |
| fully qualified classname of key converter (None by default) |
| valueConverter : str, optional |
| fully qualified classname of value converter (None by default) |
| """ |
| jconf = self.ctx._dictToJavaMap(conf) |
| pickledRDD = self._pickled() |
| self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf, |
| keyConverter, valueConverter, False) |
| |
| def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, |
| keyConverter=None, valueConverter=None, conf=None, |
| compressionCodecClass=None): |
| """ |
| Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file |
| system, using the old Hadoop OutputFormat API (mapred package). Key and value types |
| will be inferred if not specified. Keys and values are converted for output using either |
| user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The |
| `conf` is applied on top of the base Hadoop conf associated with the SparkContext |
| of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. |
| |
| Parameters |
| ---------- |
| path : str |
| path to Hadoop file |
| outputFormatClass : str |
| fully qualified classname of Hadoop OutputFormat |
| (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") |
| keyClass : str, optional |
| fully qualified classname of key Writable class |
| (e.g. "org.apache.hadoop.io.IntWritable", None by default) |
| valueClass : str, optional |
| fully qualified classname of value Writable class |
| (e.g. "org.apache.hadoop.io.Text", None by default) |
| keyConverter : str, optional |
| fully qualified classname of key converter (None by default) |
| valueConverter : str, optional |
| fully qualified classname of value converter (None by default) |
| conf : dict, optional |
| (None by default) |
| compressionCodecClass : str |
| fully qualified classname of the compression codec class |
| i.e. "org.apache.hadoop.io.compress.GzipCodec" (None by default) |
| """ |
| jconf = self.ctx._dictToJavaMap(conf) |
| pickledRDD = self._pickled() |
| self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path, |
| outputFormatClass, |
| keyClass, valueClass, |
| keyConverter, valueConverter, |
| jconf, compressionCodecClass) |
| |
| def saveAsSequenceFile(self, path, compressionCodecClass=None): |
| """ |
| Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file |
| system, using the "org.apache.hadoop.io.Writable" types that we convert from the |
| RDD's key and value types. The mechanism is as follows: |
| |
| 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. |
| 2. Keys and values of this Java RDD are converted to Writables and written out. |
| |
| Parameters |
| ---------- |
| path : str |
| path to sequence file |
| compressionCodecClass : str, optional |
| fully qualified classname of the compression codec class |
| i.e. "org.apache.hadoop.io.compress.GzipCodec" (None by default) |
| """ |
| pickledRDD = self._pickled() |
| self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True, |
| path, compressionCodecClass) |
| |
| def saveAsPickleFile(self, path, batchSize=10): |
| """ |
| Save this RDD as a SequenceFile of serialized objects. The serializer |
| used is :class:`pyspark.serializers.PickleSerializer`, default batch size |
| is 10. |
| |
| Examples |
| -------- |
| >>> from tempfile import NamedTemporaryFile |
| >>> tmpFile = NamedTemporaryFile(delete=True) |
| >>> tmpFile.close() |
| >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) |
| >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect()) |
| ['1', '2', 'rdd', 'spark'] |
| """ |
| if batchSize == 0: |
| ser = AutoBatchedSerializer(PickleSerializer()) |
| else: |
| ser = BatchedSerializer(PickleSerializer(), batchSize) |
| self._reserialize(ser)._jrdd.saveAsObjectFile(path) |
| |
| def saveAsTextFile(self, path, compressionCodecClass=None): |
| """ |
| Save this RDD as a text file, using string representations of elements. |
| |
| Parameters |
| ---------- |
| path : str |
| path to text file |
| compressionCodecClass : str, optional |
| fully qualified classname of the compression codec class |
| i.e. "org.apache.hadoop.io.compress.GzipCodec" (None by default) |
| |
| Examples |
| -------- |
| >>> from tempfile import NamedTemporaryFile |
| >>> tempFile = NamedTemporaryFile(delete=True) |
| >>> tempFile.close() |
| >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) |
| >>> from fileinput import input |
| >>> from glob import glob |
| >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) |
| '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' |
| |
| Empty lines are tolerated when saving to text files. |
| |
| >>> from tempfile import NamedTemporaryFile |
| >>> tempFile2 = NamedTemporaryFile(delete=True) |
| >>> tempFile2.close() |
| >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) |
| >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) |
| '\\n\\n\\nbar\\nfoo\\n' |
| |
| Using compressionCodecClass |
| |
| >>> from tempfile import NamedTemporaryFile |
| >>> tempFile3 = NamedTemporaryFile(delete=True) |
| >>> tempFile3.close() |
| >>> codec = "org.apache.hadoop.io.compress.GzipCodec" |
| >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) |
| >>> from fileinput import input, hook_compressed |
| >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)) |
| >>> b''.join(result).decode('utf-8') |
| 'bar\\nfoo\\n' |
| """ |
| def func(split, iterator): |
| for x in iterator: |
| if not isinstance(x, (str, bytes)): |
| x = str(x) |
| if isinstance(x, str): |
| x = x.encode("utf-8") |
| yield x |
| keyed = self.mapPartitionsWithIndex(func) |
| keyed._bypass_serializer = True |
| if compressionCodecClass: |
| compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass) |
| keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec) |
| else: |
| keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) |
| |
| # Pair functions |
| |
| def collectAsMap(self): |
| """ |
| Return the key-value pairs in this RDD to the master as a dictionary. |
| |
| Notes |
| ----- |
| This method should only be used if the resulting data is expected |
| to be small, as all the data is loaded into the driver's memory. |
| |
| Examples |
| -------- |
| >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() |
| >>> m[1] |
| 2 |
| >>> m[3] |
| 4 |
| """ |
| return dict(self.collect()) |
| |
| def keys(self): |
| """ |
| Return an RDD with the keys of each tuple. |
| |
| Examples |
| -------- |
| >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() |
| >>> m.collect() |
| [1, 3] |
| """ |
| return self.map(lambda x: x[0]) |
| |
| def values(self): |
| """ |
| Return an RDD with the values of each tuple. |
| |
| Examples |
| -------- |
| >>> m = sc.parallelize([(1, 2), (3, 4)]).values() |
| >>> m.collect() |
| [2, 4] |
| """ |
| return self.map(lambda x: x[1]) |
| |
| def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): |
| """ |
| Merge the values for each key using an associative and commutative reduce function. |
| |
| This will also perform the merging locally on each mapper before |
| sending results to a reducer, similarly to a "combiner" in MapReduce. |
| |
| Output will be partitioned with `numPartitions` partitions, or |
| the default parallelism level if `numPartitions` is not specified. |
| Default partitioner is hash-partition. |
| |
| Examples |
| -------- |
| >>> from operator import add |
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
| >>> sorted(rdd.reduceByKey(add).collect()) |
| [('a', 2), ('b', 1)] |
| """ |
| return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc) |
| |
| def reduceByKeyLocally(self, func): |
| """ |
| Merge the values for each key using an associative and commutative reduce function, but |
| return the results immediately to the master as a dictionary. |
| |
| This will also perform the merging locally on each mapper before |
| sending results to a reducer, similarly to a "combiner" in MapReduce. |
| |
| Examples |
| -------- |
| >>> from operator import add |
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
| >>> sorted(rdd.reduceByKeyLocally(add).items()) |
| [('a', 2), ('b', 1)] |
| """ |
| func = fail_on_stopiteration(func) |
| |
| def reducePartition(iterator): |
| m = {} |
| for k, v in iterator: |
| m[k] = func(m[k], v) if k in m else v |
| yield m |
| |
| def mergeMaps(m1, m2): |
| for k, v in m2.items(): |
| m1[k] = func(m1[k], v) if k in m1 else v |
| return m1 |
| return self.mapPartitions(reducePartition).reduce(mergeMaps) |
| |
| def countByKey(self): |
| """ |
| Count the number of elements for each key, and return the result to the |
| master as a dictionary. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
| >>> sorted(rdd.countByKey().items()) |
| [('a', 2), ('b', 1)] |
| """ |
| return self.map(lambda x: x[0]).countByValue() |
| |
| def join(self, other, numPartitions=None): |
| """ |
| Return an RDD containing all pairs of elements with matching keys in |
| `self` and `other`. |
| |
| Each pair of elements will be returned as a (k, (v1, v2)) tuple, where |
| (k, v1) is in `self` and (k, v2) is in `other`. |
| |
| Performs a hash join across the cluster. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4)]) |
| >>> y = sc.parallelize([("a", 2), ("a", 3)]) |
| >>> sorted(x.join(y).collect()) |
| [('a', (1, 2)), ('a', (1, 3))] |
| """ |
| return python_join(self, other, numPartitions) |
| |
| def leftOuterJoin(self, other, numPartitions=None): |
| """ |
| Perform a left outer join of `self` and `other`. |
| |
| For each element (k, v) in `self`, the resulting RDD will either |
| contain all pairs (k, (v, w)) for w in `other`, or the pair |
| (k, (v, None)) if no elements in `other` have key k. |
| |
| Hash-partitions the resulting RDD into the given number of partitions. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4)]) |
| >>> y = sc.parallelize([("a", 2)]) |
| >>> sorted(x.leftOuterJoin(y).collect()) |
| [('a', (1, 2)), ('b', (4, None))] |
| """ |
| return python_left_outer_join(self, other, numPartitions) |
| |
| def rightOuterJoin(self, other, numPartitions=None): |
| """ |
| Perform a right outer join of `self` and `other`. |
| |
| For each element (k, w) in `other`, the resulting RDD will either |
| contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) |
| if no elements in `self` have key k. |
| |
| Hash-partitions the resulting RDD into the given number of partitions. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4)]) |
| >>> y = sc.parallelize([("a", 2)]) |
| >>> sorted(y.rightOuterJoin(x).collect()) |
| [('a', (2, 1)), ('b', (None, 4))] |
| """ |
| return python_right_outer_join(self, other, numPartitions) |
| |
| def fullOuterJoin(self, other, numPartitions=None): |
| """ |
| Perform a right outer join of `self` and `other`. |
| |
| For each element (k, v) in `self`, the resulting RDD will either |
| contain all pairs (k, (v, w)) for w in `other`, or the pair |
| (k, (v, None)) if no elements in `other` have key k. |
| |
| Similarly, for each element (k, w) in `other`, the resulting RDD will |
| either contain all pairs (k, (v, w)) for v in `self`, or the pair |
| (k, (None, w)) if no elements in `self` have key k. |
| |
| Hash-partitions the resulting RDD into the given number of partitions. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4)]) |
| >>> y = sc.parallelize([("a", 2), ("c", 8)]) |
| >>> sorted(x.fullOuterJoin(y).collect()) |
| [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))] |
| """ |
| return python_full_outer_join(self, other, numPartitions) |
| |
| # TODO: add option to control map-side combining |
| # portable_hash is used as default, because builtin hash of None is different |
| # cross machines. |
| def partitionBy(self, numPartitions, partitionFunc=portable_hash): |
| """ |
| Return a copy of the RDD partitioned using the specified partitioner. |
| |
| Examples |
| -------- |
| >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) |
| >>> sets = pairs.partitionBy(2).glom().collect() |
| >>> len(set(sets[0]).intersection(set(sets[1]))) |
| 0 |
| """ |
| if numPartitions is None: |
| numPartitions = self._defaultReducePartitions() |
| partitioner = Partitioner(numPartitions, partitionFunc) |
| if self.partitioner == partitioner: |
| return self |
| |
| # Transferring O(n) objects to Java is too expensive. |
| # Instead, we'll form the hash buckets in Python, |
| # transferring O(numPartitions) objects to Java. |
| # Each object is a (splitNumber, [objects]) pair. |
| # In order to avoid too huge objects, the objects are |
| # grouped into chunks. |
| outputSerializer = self.ctx._unbatched_serializer |
| |
| limit = (self._memory_limit() / 2) |
| |
| def add_shuffle_key(split, iterator): |
| |
| buckets = defaultdict(list) |
| c, batch = 0, min(10 * numPartitions, 1000) |
| |
| for k, v in iterator: |
| buckets[partitionFunc(k) % numPartitions].append((k, v)) |
| c += 1 |
| |
| # check used memory and avg size of chunk of objects |
| if (c % 1000 == 0 and get_used_memory() > limit |
| or c > batch): |
| n, size = len(buckets), 0 |
| for split in list(buckets.keys()): |
| yield pack_long(split) |
| d = outputSerializer.dumps(buckets[split]) |
| del buckets[split] |
| yield d |
| size += len(d) |
| |
| avg = int(size / n) >> 20 |
| # let 1M < avg < 10M |
| if avg < 1: |
| batch = min(sys.maxsize, batch * 1.5) |
| elif avg > 10: |
| batch = max(int(batch / 1.5), 1) |
| c = 0 |
| |
| for split, items in buckets.items(): |
| yield pack_long(split) |
| yield outputSerializer.dumps(items) |
| |
| keyed = self.mapPartitionsWithIndex(add_shuffle_key, preservesPartitioning=True) |
| keyed._bypass_serializer = True |
| with SCCallSiteSync(self.context) as css: |
| pairRDD = self.ctx._jvm.PairwiseRDD( |
| keyed._jrdd.rdd()).asJavaPairRDD() |
| jpartitioner = self.ctx._jvm.PythonPartitioner(numPartitions, |
| id(partitionFunc)) |
| jrdd = self.ctx._jvm.PythonRDD.valueOfPair(pairRDD.partitionBy(jpartitioner)) |
| rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) |
| rdd.partitioner = partitioner |
| return rdd |
| |
| # TODO: add control over map-side aggregation |
| def combineByKey(self, createCombiner, mergeValue, mergeCombiners, |
| numPartitions=None, partitionFunc=portable_hash): |
| """ |
| Generic function to combine the elements for each key using a custom |
| set of aggregation functions. |
| |
| Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined |
| type" C. |
| |
| Users provide three functions: |
| |
| - `createCombiner`, which turns a V into a C (e.g., creates |
| a one-element list) |
| - `mergeValue`, to merge a V into a C (e.g., adds it to the end of |
| a list) |
| - `mergeCombiners`, to combine two C's into a single one (e.g., merges |
| the lists) |
| |
| To avoid memory allocation, both mergeValue and mergeCombiners are allowed to |
| modify and return their first argument instead of creating a new C. |
| |
| In addition, users can control the partitioning of the output RDD. |
| |
| Notes |
| ----- |
| V and C can be different -- for example, one might group an RDD of type |
| (Int, Int) into an RDD of type (Int, List[Int]). |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)]) |
| >>> def to_list(a): |
| ... return [a] |
| ... |
| >>> def append(a, b): |
| ... a.append(b) |
| ... return a |
| ... |
| >>> def extend(a, b): |
| ... a.extend(b) |
| ... return a |
| ... |
| >>> sorted(x.combineByKey(to_list, append, extend).collect()) |
| [('a', [1, 2]), ('b', [1])] |
| """ |
| if numPartitions is None: |
| numPartitions = self._defaultReducePartitions() |
| |
| serializer = self.ctx.serializer |
| memory = self._memory_limit() |
| agg = Aggregator(createCombiner, mergeValue, mergeCombiners) |
| |
| def combineLocally(iterator): |
| merger = ExternalMerger(agg, memory * 0.9, serializer) |
| merger.mergeValues(iterator) |
| return merger.items() |
| |
| locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True) |
| shuffled = locally_combined.partitionBy(numPartitions, partitionFunc) |
| |
| def _mergeCombiners(iterator): |
| merger = ExternalMerger(agg, memory, serializer) |
| merger.mergeCombiners(iterator) |
| return merger.items() |
| |
| return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True) |
| |
| def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None, |
| partitionFunc=portable_hash): |
| """ |
| Aggregate the values of each key, using given combine functions and a neutral |
| "zero value". This function can return a different result type, U, than the type |
| of the values in this RDD, V. Thus, we need one operation for merging a V into |
| a U and one operation for merging two U's, The former operation is used for merging |
| values within a partition, and the latter is used for merging values between |
| partitions. To avoid memory allocation, both of these functions are |
| allowed to modify and return their first argument instead of creating a new U. |
| """ |
| def createZero(): |
| return copy.deepcopy(zeroValue) |
| |
| return self.combineByKey( |
| lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc) |
| |
| def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash): |
| """ |
| Merge the values for each key using an associative function "func" |
| and a neutral "zeroValue" which may be added to the result an |
| arbitrary number of times, and must not change the result |
| (e.g., 0 for addition, or 1 for multiplication.). |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
| >>> from operator import add |
| >>> sorted(rdd.foldByKey(0, add).collect()) |
| [('a', 2), ('b', 1)] |
| """ |
| def createZero(): |
| return copy.deepcopy(zeroValue) |
| |
| return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions, |
| partitionFunc) |
| |
| def _memory_limit(self): |
| return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m")) |
| |
| # TODO: support variant with custom partitioner |
| def groupByKey(self, numPartitions=None, partitionFunc=portable_hash): |
| """ |
| Group the values for each key in the RDD into a single sequence. |
| Hash-partitions the resulting RDD with numPartitions partitions. |
| |
| Notes |
| ----- |
| If you are grouping in order to perform an aggregation (such as a |
| sum or average) over each key, using reduceByKey or aggregateByKey will |
| provide much better performance. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
| >>> sorted(rdd.groupByKey().mapValues(len).collect()) |
| [('a', 2), ('b', 1)] |
| >>> sorted(rdd.groupByKey().mapValues(list).collect()) |
| [('a', [1, 1]), ('b', [1])] |
| """ |
| def createCombiner(x): |
| return [x] |
| |
| def mergeValue(xs, x): |
| xs.append(x) |
| return xs |
| |
| def mergeCombiners(a, b): |
| a.extend(b) |
| return a |
| |
| memory = self._memory_limit() |
| serializer = self._jrdd_deserializer |
| agg = Aggregator(createCombiner, mergeValue, mergeCombiners) |
| |
| def combine(iterator): |
| merger = ExternalMerger(agg, memory * 0.9, serializer) |
| merger.mergeValues(iterator) |
| return merger.items() |
| |
| locally_combined = self.mapPartitions(combine, preservesPartitioning=True) |
| shuffled = locally_combined.partitionBy(numPartitions, partitionFunc) |
| |
| def groupByKey(it): |
| merger = ExternalGroupBy(agg, memory, serializer) |
| merger.mergeCombiners(it) |
| return merger.items() |
| |
| return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable) |
| |
| def flatMapValues(self, f): |
| """ |
| Pass each value in the key-value pair RDD through a flatMap function |
| without changing the keys; this also retains the original RDD's |
| partitioning. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) |
| >>> def f(x): return x |
| >>> x.flatMapValues(f).collect() |
| [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] |
| """ |
| flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) |
| return self.flatMap(flat_map_fn, preservesPartitioning=True) |
| |
| def mapValues(self, f): |
| """ |
| Pass each value in the key-value pair RDD through a map function |
| without changing the keys; this also retains the original RDD's |
| partitioning. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) |
| >>> def f(x): return len(x) |
| >>> x.mapValues(f).collect() |
| [('a', 3), ('b', 1)] |
| """ |
| map_values_fn = lambda kv: (kv[0], f(kv[1])) |
| return self.map(map_values_fn, preservesPartitioning=True) |
| |
| def groupWith(self, other, *others): |
| """ |
| Alias for cogroup but with support for multiple RDDs. |
| |
| Examples |
| -------- |
| >>> w = sc.parallelize([("a", 5), ("b", 6)]) |
| >>> x = sc.parallelize([("a", 1), ("b", 4)]) |
| >>> y = sc.parallelize([("a", 2)]) |
| >>> z = sc.parallelize([("b", 42)]) |
| >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))] |
| [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] |
| |
| """ |
| return python_cogroup((self, other) + others, numPartitions=None) |
| |
| # TODO: add variant with custom partitioner |
| def cogroup(self, other, numPartitions=None): |
| """ |
| For each key k in `self` or `other`, return a resulting RDD that |
| contains a tuple with the list of values for that key in `self` as |
| well as `other`. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4)]) |
| >>> y = sc.parallelize([("a", 2)]) |
| >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))] |
| [('a', ([1], [2])), ('b', ([4], []))] |
| """ |
| return python_cogroup((self, other), numPartitions) |
| |
| def sampleByKey(self, withReplacement, fractions, seed=None): |
| """ |
| Return a subset of this RDD sampled by key (via stratified sampling). |
| Create a sample of this RDD using variable sampling rates for |
| different keys as specified by fractions, a key to sampling rate map. |
| |
| Examples |
| -------- |
| >>> fractions = {"a": 0.2, "b": 0.1} |
| >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000))) |
| >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect()) |
| >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 |
| True |
| >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 |
| True |
| >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 |
| True |
| """ |
| for fraction in fractions.values(): |
| assert fraction >= 0.0, "Negative fraction value: %s" % fraction |
| return self.mapPartitionsWithIndex( |
| RDDStratifiedSampler(withReplacement, fractions, seed).func, True) |
| |
| def subtractByKey(self, other, numPartitions=None): |
| """ |
| Return each (key, value) pair in `self` that has no pair with matching |
| key in `other`. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) |
| >>> y = sc.parallelize([("a", 3), ("c", None)]) |
| >>> sorted(x.subtractByKey(y).collect()) |
| [('b', 4), ('b', 5)] |
| """ |
| def filter_func(pair): |
| key, (val1, val2) = pair |
| return val1 and not val2 |
| return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0]) |
| |
| def subtract(self, other, numPartitions=None): |
| """ |
| Return each value in `self` that is not contained in `other`. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) |
| >>> y = sc.parallelize([("a", 3), ("c", None)]) |
| >>> sorted(x.subtract(y).collect()) |
| [('a', 1), ('b', 4), ('b', 5)] |
| """ |
| # note: here 'True' is just a placeholder |
| rdd = other.map(lambda x: (x, True)) |
| return self.map(lambda x: (x, True)).subtractByKey(rdd, numPartitions).keys() |
| |
| def keyBy(self, f): |
| """ |
| Creates tuples of the elements in this RDD by applying `f`. |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) |
| >>> y = sc.parallelize(zip(range(0,5), range(0,5))) |
| >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())] |
| [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])] |
| """ |
| return self.map(lambda x: (f(x), x)) |
| |
| def repartition(self, numPartitions): |
| """ |
| Return a new RDD that has exactly numPartitions partitions. |
| |
| Can increase or decrease the level of parallelism in this RDD. |
| Internally, this uses a shuffle to redistribute data. |
| If you are decreasing the number of partitions in this RDD, consider |
| using `coalesce`, which can avoid performing a shuffle. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) |
| >>> sorted(rdd.glom().collect()) |
| [[1], [2, 3], [4, 5], [6, 7]] |
| >>> len(rdd.repartition(2).glom().collect()) |
| 2 |
| >>> len(rdd.repartition(10).glom().collect()) |
| 10 |
| """ |
| return self.coalesce(numPartitions, shuffle=True) |
| |
| def coalesce(self, numPartitions, shuffle=False): |
| """ |
| Return a new RDD that is reduced into `numPartitions` partitions. |
| |
| Examples |
| -------- |
| >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() |
| [[1], [2, 3], [4, 5]] |
| >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() |
| [[1, 2, 3, 4, 5]] |
| """ |
| if shuffle: |
| # Decrease the batch size in order to distribute evenly the elements across output |
| # partitions. Otherwise, repartition will possibly produce highly skewed partitions. |
| batchSize = min(10, self.ctx._batchSize or 1024) |
| ser = BatchedSerializer(PickleSerializer(), batchSize) |
| selfCopy = self._reserialize(ser) |
| jrdd_deserializer = selfCopy._jrdd_deserializer |
| jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) |
| else: |
| jrdd_deserializer = self._jrdd_deserializer |
| jrdd = self._jrdd.coalesce(numPartitions, shuffle) |
| return RDD(jrdd, self.ctx, jrdd_deserializer) |
| |
| def zip(self, other): |
| """ |
| Zips this RDD with another one, returning key-value pairs with the |
| first element in each RDD second element in each RDD, etc. Assumes |
| that the two RDDs have the same number of partitions and the same |
| number of elements in each partition (e.g. one was made through |
| a map on the other). |
| |
| Examples |
| -------- |
| >>> x = sc.parallelize(range(0,5)) |
| >>> y = sc.parallelize(range(1000, 1005)) |
| >>> x.zip(y).collect() |
| [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] |
| """ |
| def get_batch_size(ser): |
| if isinstance(ser, BatchedSerializer): |
| return ser.batchSize |
| return 1 # not batched |
| |
| def batch_as(rdd, batchSize): |
| return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize)) |
| |
| my_batch = get_batch_size(self._jrdd_deserializer) |
| other_batch = get_batch_size(other._jrdd_deserializer) |
| if my_batch != other_batch or not my_batch: |
| # use the smallest batchSize for both of them |
| batchSize = min(my_batch, other_batch) |
| if batchSize <= 0: |
| # auto batched or unlimited |
| batchSize = 100 |
| other = batch_as(other, batchSize) |
| self = batch_as(self, batchSize) |
| |
| if self.getNumPartitions() != other.getNumPartitions(): |
| raise ValueError("Can only zip with RDD which has the same number of partitions") |
| |
| # There will be an Exception in JVM if there are different number |
| # of items in each partitions. |
| pairRDD = self._jrdd.zip(other._jrdd) |
| deserializer = PairDeserializer(self._jrdd_deserializer, |
| other._jrdd_deserializer) |
| return RDD(pairRDD, self.ctx, deserializer) |
| |
| def zipWithIndex(self): |
| """ |
| Zips this RDD with its element indices. |
| |
| The ordering is first based on the partition index and then the |
| ordering of items within each partition. So the first item in |
| the first partition gets index 0, and the last item in the last |
| partition receives the largest index. |
| |
| This method needs to trigger a spark job when this RDD contains |
| more than one partitions. |
| |
| Examples |
| -------- |
| >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() |
| [('a', 0), ('b', 1), ('c', 2), ('d', 3)] |
| """ |
| starts = [0] |
| if self.getNumPartitions() > 1: |
| nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() |
| for i in range(len(nums) - 1): |
| starts.append(starts[-1] + nums[i]) |
| |
| def func(k, it): |
| for i, v in enumerate(it, starts[k]): |
| yield v, i |
| |
| return self.mapPartitionsWithIndex(func) |
| |
| def zipWithUniqueId(self): |
| """ |
| Zips this RDD with generated unique Long ids. |
| |
| Items in the kth partition will get ids k, n+k, 2*n+k, ..., where |
| n is the number of partitions. So there may exist gaps, but this |
| method won't trigger a spark job, which is different from |
| :meth:`zipWithIndex`. |
| |
| Examples |
| -------- |
| >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() |
| [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] |
| """ |
| n = self.getNumPartitions() |
| |
| def func(k, it): |
| for i, v in enumerate(it): |
| yield v, i * n + k |
| |
| return self.mapPartitionsWithIndex(func) |
| |
| def name(self): |
| """ |
| Return the name of this RDD. |
| """ |
| n = self._jrdd.name() |
| if n: |
| return n |
| |
| def setName(self, name): |
| """ |
| Assign a name to this RDD. |
| |
| Examples |
| -------- |
| >>> rdd1 = sc.parallelize([1, 2]) |
| >>> rdd1.setName('RDD1').name() |
| 'RDD1' |
| """ |
| self._jrdd.setName(name) |
| return self |
| |
| def toDebugString(self): |
| """ |
| A description of this RDD and its recursive dependencies for debugging. |
| """ |
| debug_string = self._jrdd.toDebugString() |
| if debug_string: |
| return debug_string.encode('utf-8') |
| |
| def getStorageLevel(self): |
| """ |
| Get the RDD's current storage level. |
| |
| Examples |
| -------- |
| >>> rdd1 = sc.parallelize([1,2]) |
| >>> rdd1.getStorageLevel() |
| StorageLevel(False, False, False, False, 1) |
| >>> print(rdd1.getStorageLevel()) |
| Serialized 1x Replicated |
| """ |
| java_storage_level = self._jrdd.getStorageLevel() |
| storage_level = StorageLevel(java_storage_level.useDisk(), |
| java_storage_level.useMemory(), |
| java_storage_level.useOffHeap(), |
| java_storage_level.deserialized(), |
| java_storage_level.replication()) |
| return storage_level |
| |
| def _defaultReducePartitions(self): |
| """ |
| Returns the default number of partitions to use during reduce tasks (e.g., groupBy). |
| If spark.default.parallelism is set, then we'll use the value from SparkContext |
| defaultParallelism, otherwise we'll use the number of partitions in this RDD. |
| |
| This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce |
| the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will |
| be inherent. |
| """ |
| if self.ctx._conf.contains("spark.default.parallelism"): |
| return self.ctx.defaultParallelism |
| else: |
| return self.getNumPartitions() |
| |
| def lookup(self, key): |
| """ |
| Return the list of values in the RDD for key `key`. This operation |
| is done efficiently if the RDD has a known partitioner by only |
| searching the partition that the key maps to. |
| |
| Examples |
| -------- |
| >>> l = range(1000) |
| >>> rdd = sc.parallelize(zip(l, l), 10) |
| >>> rdd.lookup(42) # slow |
| [42] |
| >>> sorted = rdd.sortByKey() |
| >>> sorted.lookup(42) # fast |
| [42] |
| >>> sorted.lookup(1024) |
| [] |
| >>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey() |
| >>> list(rdd2.lookup(('a', 'b'))[0]) |
| ['c'] |
| """ |
| values = self.filter(lambda kv: kv[0] == key).values() |
| |
| if self.partitioner is not None: |
| return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)]) |
| |
| return values.collect() |
| |
| def _to_java_object_rdd(self): |
| """ Return a JavaRDD of Object by unpickling |
| |
| It will convert each Python object into Java object by Pyrolite, whenever the |
| RDD is serialized in batch or not. |
| """ |
| rdd = self._pickled() |
| return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True) |
| |
| def countApprox(self, timeout, confidence=0.95): |
| """ |
| Approximate version of count() that returns a potentially incomplete |
| result within a timeout, even if not all tasks have finished. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(1000), 10) |
| >>> rdd.countApprox(1000, 1.0) |
| 1000 |
| """ |
| drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))]) |
| return int(drdd.sumApprox(timeout, confidence)) |
| |
| def sumApprox(self, timeout, confidence=0.95): |
| """ |
| Approximate operation to return the sum within a timeout |
| or meet the confidence. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(1000), 10) |
| >>> r = sum(range(1000)) |
| >>> abs(rdd.sumApprox(1000) - r) / r < 0.05 |
| True |
| """ |
| jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd() |
| jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) |
| r = jdrdd.sumApprox(timeout, confidence).getFinalValue() |
| return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) |
| |
| def meanApprox(self, timeout, confidence=0.95): |
| """ |
| Approximate operation to return the mean within a timeout |
| or meet the confidence. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(1000), 10) |
| >>> r = sum(range(1000)) / 1000.0 |
| >>> abs(rdd.meanApprox(1000) - r) / r < 0.05 |
| True |
| """ |
| jrdd = self.map(float)._to_java_object_rdd() |
| jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd()) |
| r = jdrdd.meanApprox(timeout, confidence).getFinalValue() |
| return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high()) |
| |
| def countApproxDistinct(self, relativeSD=0.05): |
| """ |
| Return approximate number of distinct elements in the RDD. |
| |
| Parameters |
| ---------- |
| relativeSD : float, optional |
| Relative accuracy. Smaller values create |
| counters that require more space. |
| It must be greater than 0.000017. |
| |
| Notes |
| ----- |
| The algorithm used is based on streamlib's implementation of |
| `"HyperLogLog in Practice: Algorithmic Engineering of a State |
| of The Art Cardinality Estimation Algorithm", available here |
| <https://doi.org/10.1145/2452376.2452456>`_. |
| |
| Examples |
| -------- |
| >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct() |
| >>> 900 < n < 1100 |
| True |
| >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct() |
| >>> 16 < n < 24 |
| True |
| """ |
| if relativeSD < 0.000017: |
| raise ValueError("relativeSD should be greater than 0.000017") |
| # the hash space in Java is 2^32 |
| hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) |
| return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) |
| |
| def toLocalIterator(self, prefetchPartitions=False): |
| """ |
| Return an iterator that contains all of the elements in this RDD. |
| The iterator will consume as much memory as the largest partition in this RDD. |
| With prefetch it may consume up to the memory of the 2 largest partitions. |
| |
| Parameters |
| ---------- |
| prefetchPartitions : bool, optional |
| If Spark should pre-fetch the next partition |
| before it is needed. |
| |
| Examples |
| -------- |
| >>> rdd = sc.parallelize(range(10)) |
| >>> [x for x in rdd.toLocalIterator()] |
| [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
| """ |
| with SCCallSiteSync(self.context) as css: |
| sock_info = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe( |
| self._jrdd.rdd(), |
| prefetchPartitions) |
| return _local_iterator_from_socket(sock_info, self._jrdd_deserializer) |
| |
| def barrier(self): |
| """ |
| Marks the current stage as a barrier stage, where Spark must launch all tasks together. |
| In case of a task failure, instead of only restarting the failed task, Spark will abort the |
| entire stage and relaunch all tasks for this stage. |
| The barrier execution mode feature is experimental and it only handles limited scenarios. |
| Please read the linked SPIP and design docs to understand the limitations and future plans. |
| |
| .. versionadded:: 2.4.0 |
| |
| Returns |
| ------- |
| :class:`RDDBarrier` |
| instance that provides actions within a barrier stage. |
| |
| See Also |
| -------- |
| pyspark.BarrierTaskContext |
| |
| Notes |
| ----- |
| For additional information see |
| |
| - `SPIP: Barrier Execution Mode <http://jira.apache.org/jira/browse/SPARK-24374>`_ |
| - `Design Doc <https://jira.apache.org/jira/browse/SPARK-24582>`_ |
| |
| This API is experimental |
| """ |
| return RDDBarrier(self) |
| |
| def _is_barrier(self): |
| """ |
| Whether this RDD is in a barrier stage. |
| """ |
| return self._jrdd.rdd().isBarrier() |
| |
| def withResources(self, profile): |
| """ |
| Specify a :class:`pyspark.resource.ResourceProfile` to use when calculating this RDD. |
| This is only supported on certain cluster managers and currently requires dynamic |
| allocation to be enabled. It will result in new executors with the resources specified |
| being acquired to calculate the RDD. |
| |
| .. versionadded:: 3.1.0 |
| |
| Notes |
| ----- |
| This API is experimental |
| """ |
| self.has_resource_profile = True |
| if profile._java_resource_profile is not None: |
| jrp = profile._java_resource_profile |
| else: |
| builder = self.ctx._jvm.org.apache.spark.resource.ResourceProfileBuilder() |
| ereqs = ExecutorResourceRequests(self.ctx._jvm, profile._executor_resource_requests) |
| treqs = TaskResourceRequests(self.ctx._jvm, profile._task_resource_requests) |
| builder.require(ereqs._java_executor_resource_requests) |
| builder.require(treqs._java_task_resource_requests) |
| jrp = builder.build() |
| |
| self._jrdd.withResources(jrp) |
| return self |
| |
| def getResourceProfile(self): |
| """ |
| Get the :class:`pyspark.resource.ResourceProfile` specified with this RDD or None |
| if it wasn't specified. |
| |
| .. versionadded:: 3.1.0 |
| |
| Returns |
| ------- |
| :py:class:`pyspark.resource.ResourceProfile` |
| The the user specified profile or None if none were specified |
| |
| Notes |
| ----- |
| This API is experimental |
| """ |
| rp = self._jrdd.getResourceProfile() |
| if rp is not None: |
| return ResourceProfile(_java_resource_profile=rp) |
| else: |
| return None |
| |
| |
| def _prepare_for_python_RDD(sc, command): |
| # the serialized command will be compressed by broadcast |
| ser = CloudPickleSerializer() |
| pickled_command = ser.dumps(command) |
| if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M |
| # The broadcast will have same life cycle as created PythonRDD |
| broadcast = sc.broadcast(pickled_command) |
| pickled_command = ser.dumps(broadcast) |
| broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars] |
| sc._pickled_broadcast_vars.clear() |
| return pickled_command, broadcast_vars, sc.environment, sc._python_includes |
| |
| |
| def _wrap_function(sc, func, deserializer, serializer, profiler=None): |
| assert deserializer, "deserializer should not be empty" |
| assert serializer, "serializer should not be empty" |
| command = (func, profiler, deserializer, serializer) |
| pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) |
| return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, |
| sc.pythonVer, broadcast_vars, sc._javaAccumulator) |
| |
| |
| class RDDBarrier(object): |
| |
| """ |
| Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together. |
| :class:`RDDBarrier` instances are created by :func:`RDD.barrier`. |
| |
| .. versionadded:: 2.4.0 |
| |
| Notes |
| ----- |
| This API is experimental |
| """ |
| |
| def __init__(self, rdd): |
| self.rdd = rdd |
| |
| def mapPartitions(self, f, preservesPartitioning=False): |
| """ |
| Returns a new RDD by applying a function to each partition of the wrapped RDD, |
| where tasks are launched together in a barrier stage. |
| The interface is the same as :func:`RDD.mapPartitions`. |
| Please see the API doc there. |
| |
| .. versionadded:: 2.4.0 |
| |
| Notes |
| ----- |
| This API is experimental |
| """ |
| def func(s, iterator): |
| return f(iterator) |
| return PipelinedRDD(self.rdd, func, preservesPartitioning, isFromBarrier=True) |
| |
| def mapPartitionsWithIndex(self, f, preservesPartitioning=False): |
| """ |
| Returns a new RDD by applying a function to each partition of the wrapped RDD, while |
| tracking the index of the original partition. And all tasks are launched together |
| in a barrier stage. |
| The interface is the same as :func:`RDD.mapPartitionsWithIndex`. |
| Please see the API doc there. |
| |
| .. versionadded:: 3.0.0 |
| |
| Notes |
| ----- |
| This API is experimental |
| """ |
| return PipelinedRDD(self.rdd, f, preservesPartitioning, isFromBarrier=True) |
| |
| |
| class PipelinedRDD(RDD): |
| |
| """ |
| Examples |
| -------- |
| Pipelined maps: |
| |
| >>> rdd = sc.parallelize([1, 2, 3, 4]) |
| >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() |
| [4, 8, 12, 16] |
| >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() |
| [4, 8, 12, 16] |
| |
| Pipelined reduces: |
| |
| >>> from operator import add |
| >>> rdd.map(lambda x: 2 * x).reduce(add) |
| 20 |
| >>> rdd.flatMap(lambda x: [x, x]).reduce(add) |
| 20 |
| """ |
| |
| def __init__(self, prev, func, preservesPartitioning=False, isFromBarrier=False): |
| if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): |
| # This transformation is the first in its stage: |
| self.func = func |
| self.preservesPartitioning = preservesPartitioning |
| self._prev_jrdd = prev._jrdd |
| self._prev_jrdd_deserializer = prev._jrdd_deserializer |
| else: |
| prev_func = prev.func |
| |
| def pipeline_func(split, iterator): |
| return func(split, prev_func(split, iterator)) |
| self.func = pipeline_func |
| self.preservesPartitioning = \ |
| prev.preservesPartitioning and preservesPartitioning |
| self._prev_jrdd = prev._prev_jrdd # maintain the pipeline |
| self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer |
| self.is_cached = False |
| self.has_resource_profile = False |
| self.is_checkpointed = False |
| self.ctx = prev.ctx |
| self.prev = prev |
| self._jrdd_val = None |
| self._id = None |
| self._jrdd_deserializer = self.ctx.serializer |
| self._bypass_serializer = False |
| self.partitioner = prev.partitioner if self.preservesPartitioning else None |
| self.is_barrier = isFromBarrier or prev._is_barrier() |
| |
| def getNumPartitions(self): |
| return self._prev_jrdd.partitions().size() |
| |
| @property |
| def _jrdd(self): |
| if self._jrdd_val: |
| return self._jrdd_val |
| if self._bypass_serializer: |
| self._jrdd_deserializer = NoOpSerializer() |
| |
| if self.ctx.profiler_collector: |
| profiler = self.ctx.profiler_collector.new_profiler(self.ctx) |
| else: |
| profiler = None |
| |
| wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer, |
| self._jrdd_deserializer, profiler) |
| python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, |
| self.preservesPartitioning, self.is_barrier) |
| self._jrdd_val = python_rdd.asJavaRDD() |
| |
| if profiler: |
| self._id = self._jrdd_val.id() |
| self.ctx.profiler_collector.add_profiler(self._id, profiler) |
| return self._jrdd_val |
| |
| def id(self): |
| if self._id is None: |
| self._id = self._jrdd.id() |
| return self._id |
| |
| def _is_pipelinable(self): |
| return not (self.is_cached or self.is_checkpointed or self.has_resource_profile) |
| |
| def _is_barrier(self): |
| return self.is_barrier |
| |
| |
| def _test(): |
| import doctest |
| from pyspark.context import SparkContext |
| globs = globals().copy() |
| # The small batch size here ensures that we see multiple batches, |
| # even in these small test examples: |
| globs['sc'] = SparkContext('local[4]', 'PythonTest') |
| (failure_count, test_count) = doctest.testmod( |
| globs=globs, optionflags=doctest.ELLIPSIS) |
| globs['sc'].stop() |
| if failure_count: |
| sys.exit(-1) |
| |
| |
| if __name__ == "__main__": |
| _test() |