| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| Randomly sampled RDDs. |
| """ |
| |
| import sys |
| |
| from pyspark import SparkContext |
| from pyspark.mllib.util import MLUtils |
| |
| |
| if __name__ == "__main__": |
| if len(sys.argv) not in [1, 2]: |
| print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>" |
| exit(-1) |
| if len(sys.argv) == 2: |
| datapath = sys.argv[1] |
| else: |
| datapath = 'data/mllib/sample_binary_classification_data.txt' |
| |
| sc = SparkContext(appName="PythonSampledRDDs") |
| |
| fraction = 0.1 # fraction of data to sample |
| |
| examples = MLUtils.loadLibSVMFile(sc, datapath) |
| numExamples = examples.count() |
| if numExamples == 0: |
| print >> sys.stderr, "Error: Data file had no samples to load." |
| exit(1) |
| print 'Loaded data with %d examples from file: %s' % (numExamples, datapath) |
| |
| # Example: RDD.sample() and RDD.takeSample() |
| expectedSampleSize = int(numExamples * fraction) |
| print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ |
| % (fraction, expectedSampleSize) |
| sampledRDD = examples.sample(withReplacement=True, fraction=fraction) |
| print ' RDD.sample(): sample has %d examples' % sampledRDD.count() |
| sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize) |
| print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) |
| |
| print |
| |
| # Example: RDD.sampleByKey() |
| keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features)) |
| print ' Keyed data using label (Int) as key ==> Orig' |
| # Count examples per label in original data. |
| keyCountsA = keyedRDD.countByKey() |
| |
| # Subsample, and count examples per label in sampled data. |
| fractions = {} |
| for k in keyCountsA.keys(): |
| fractions[k] = fraction |
| sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions) |
| keyCountsB = sampledByKeyRDD.countByKey() |
| sizeB = sum(keyCountsB.values()) |
| print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ |
| % sizeB |
| |
| # Compare samples |
| print ' \tFractions of examples with key' |
| print 'Key\tOrig\tSample' |
| for k in sorted(keyCountsA.keys()): |
| fracA = keyCountsA[k] / float(numExamples) |
| if sizeB != 0: |
| fracB = keyCountsB.get(k, 0) / float(sizeB) |
| else: |
| fracB = 0 |
| print '%d\t%g\t%g' % (k, fracA, fracB) |
| |
| sc.stop() |