| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.examples.pi; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.examples.pi.DistSum.Computation; |
| import org.apache.hadoop.examples.pi.DistSum.Parameters; |
| import org.apache.hadoop.examples.pi.math.Bellard; |
| import org.apache.hadoop.examples.pi.math.Summation; |
| import org.apache.hadoop.examples.pi.math.Bellard.Parameter; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| |
| /** |
| * A map/reduce program that uses a BBP-type method to compute exact |
| * binary digits of Pi. |
| * This program is designed for computing the n th bit of Pi, |
| * for large n, say n >= 10^8. |
| * For computing lower bits of Pi, consider using bbp. |
| * |
| * The actually computation is done by DistSum jobs. |
| * The steps for launching the jobs are: |
| * |
| * (1) Initialize parameters. |
| * (2) Create a list of sums. |
| * (3) Read computed values from the given local directory. |
| * (4) Remove the computed values from the sums. |
| * (5) Partition the remaining sums into computation jobs. |
| * (6) Submit the computation jobs to a cluster and then wait for the results. |
| * (7) Write job outputs to the given local directory. |
| * (8) Combine the job outputs and print the Pi bits. |
| */ |
| /* |
| * The command line format is: |
| * > hadoop org.apache.hadoop.examples.pi.DistBbp \ |
| * <b> <nThreads> <nJobs> <type> <nPart> <remoteDir> <localDir> |
| * |
| * And the parameters are: |
| * <b> The number of bits to skip, i.e. compute the (b+1)th position. |
| * <nThreads> The number of working threads. |
| * <nJobs> The number of jobs per sum. |
| * <type> 'm' for map side job, 'r' for reduce side job, 'x' for mix type. |
| * <nPart> The number of parts per job. |
| * <remoteDir> Remote directory for submitting jobs. |
| * <localDir> Local directory for storing output files. |
| * |
| * Note that it may take a long time to finish all the jobs when <b> is large. |
| * If the program is killed in the middle of the execution, the same command with |
| * a different <remoteDir> can be used to resume the execution. For example, suppose |
| * we use the following command to compute the (10^15+57)th bit of Pi. |
| * |
| * > hadoop org.apache.hadoop.examples.pi.DistBbp \ |
| * 1,000,000,000,000,056 20 1000 x 500 remote/a local/output |
| * |
| * It uses 20 threads to summit jobs so that there are at most 20 concurrent jobs. |
| * Each sum (there are totally 14 sums) is partitioned into 1000 jobs. |
| * The jobs will be executed in map-side or reduce-side. Each job has 500 parts. |
| * The remote directory for the jobs is remote/a and the local directory |
| * for storing output is local/output. Depends on the cluster configuration, |
| * it may take many days to finish the entire execution. If the execution is killed, |
| * we may resume it by |
| * |
| * > hadoop org.apache.hadoop.examples.pi.DistBbp \ |
| * 1,000,000,000,000,056 20 1000 x 500 remote/b local/output |
| */ |
| public final class DistBbp extends Configured implements Tool { |
| public static final String DESCRIPTION |
| = "A map/reduce program that uses a BBP-type formula to compute exact bits of Pi."; |
| |
| private final Util.Timer timer = new Util.Timer(true); |
| |
| /** {@inheritDoc} */ |
| public int run(String[] args) throws Exception { |
| //parse arguments |
| if (args.length != DistSum.Parameters.COUNT + 1) |
| return Util.printUsage(args, |
| getClass().getName() + " <b> " + Parameters.LIST |
| + "\n <b> The number of bits to skip, i.e. compute the (b+1)th position." |
| + Parameters.DESCRIPTION); |
| |
| int i = 0; |
| final long b = Util.string2long(args[i++]); |
| final DistSum.Parameters parameters = DistSum.Parameters.parse(args, i); |
| |
| if (b < 0) |
| throw new IllegalArgumentException("b = " + b + " < 0"); |
| Util.printBitSkipped(b); |
| Util.out.println(parameters); |
| Util.out.println(); |
| |
| //initialize sums |
| final DistSum distsum = new DistSum(); |
| distsum.setConf(getConf()); |
| distsum.setParameters(parameters); |
| final boolean isVerbose = getConf().getBoolean(Parser.VERBOSE_PROPERTY, false); |
| final Map<Parameter, List<TaskResult>> existings = new Parser(isVerbose).parse(parameters.localDir.getPath(), null); |
| Parser.combine(existings); |
| for(List<TaskResult> tr : existings.values()) |
| Collections.sort(tr); |
| Util.out.println(); |
| final Map<Bellard.Parameter, Bellard.Sum> sums = Bellard.getSums(b, parameters.nJobs, existings); |
| Util.out.println(); |
| |
| //execute the computations |
| execute(distsum, sums); |
| |
| //compute Pi from the sums |
| final double pi = Bellard.computePi(b, sums); |
| Util.printBitSkipped(b); |
| Util.out.println(Util.pi2string(pi, Bellard.bit2terms(b))); |
| return 0; |
| } |
| |
| /** Execute DistSum computations */ |
| private void execute(DistSum distsum, |
| final Map<Bellard.Parameter, Bellard.Sum> sums) throws Exception { |
| final List<Computation> computations = new ArrayList<Computation>(); |
| int i = 0; |
| for(Bellard.Parameter p : Bellard.Parameter.values()) |
| for(Summation s : sums.get(p)) |
| if (s.getValue() == null) |
| computations.add(distsum.new Computation(i++, p.toString(), s)); |
| |
| if (computations.isEmpty()) |
| Util.out.println("No computation"); |
| else { |
| timer.tick("execute " + computations.size() + " computation(s)"); |
| Util.execute(distsum.getParameters().nThreads, computations); |
| timer.tick("done"); |
| } |
| } |
| |
| /** main */ |
| public static void main(String[] args) throws Exception { |
| System.exit(ToolRunner.run(null, new DistBbp(), args)); |
| } |
| } |