blob: fbd18eab528606333fa67d1e441ad7025efc7273 [file] [log] [blame]
/*
* Copyright 2016, Yahoo! Inc.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/
package com.yahoo.sketches.pig.quantiles;
import java.io.IOException;
import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.quantiles.QuantilesSketch;
import com.yahoo.sketches.quantiles.Union;
import com.yahoo.sketches.quantiles.UnionBuilder;
/**
* This is a Pig UDF that merges Quantiles Sketches.
* To assist Pig, this class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces.
*/
public class Merge extends EvalFunc<Tuple> implements Accumulator<Tuple>, Algebraic {
private static final TupleFactory tupleFactory_ = TupleFactory.getInstance();
// With the single exception of the Accumulator interface, UDFs are stateless.
// All parameters kept at the class level must be final, except for the accumUnion.
private final UnionBuilder unionBuilder_;
private Union accumUnion_;
//TOP LEVEL API
/**
* Default constructor. Assumes default k.
*/
public Merge() {
this(0);
}
/**
* String constructor.
*
* @param kStr string representation of k
*/
public Merge(final String kStr) {
this(Integer.parseInt(kStr));
}
/**
* Base constructor.
*
* @param k parameter that determines the accuracy and size of the sketch.
*/
public Merge(final int k) {
super();
unionBuilder_ = Union.builder();
if (k > 0) unionBuilder_.setK(k);
}
//@formatter:off
/**
* Top-level exec function.
* This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
* and returns a single updated <b>Sketch</b> as a <b>Sketch Tuple</b>.
*
* <p>
* If a large number of calls are anticipated, leveraging either the <i>Algebraic</i> or
* <i>Accumulator</i> interfaces is recommended. Pig normally handles this automatically.
*
* <p>
* Internally, this method presents the inner <b>Sketch Tuples</b> to a new <b>Union</b>.
* The result is returned as a <b>Sketch Tuple</b>
*
* <p>
* Types are in the form: Java data type: Pig DataType
*
* <p>
*
* <b>Input Tuple</b>
* <ul>
* <li>Tuple: TUPLE (Must contain only one field)
* <ul>
* <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
* <ul>
* <li>index 0: Tuple: TUPLE <b>Sketch Tuple</b></li>
* <li>...</li>
* <li>index n-1: Tuple: TUPLE <b>Sketch Tuple</b></li>
* </ul>
* </li>
* </ul>
* </li>
* </ul>
*
* <b>Sketch Tuple</b>
* <ul>
* <li>Tuple: TUPLE (Contains exactly 1 field)
* <ul>
* <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
* </ul>
* </li>
* </ul>
*
* @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
* @return Sketch Tuple. If inputTuple is null or empty, returns empty sketch.
* @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
*/
//@formatter:on
@Override // TOP LEVEL EXEC
public Tuple exec(final Tuple inputTuple) throws IOException {
//The exec is a stateless function. It operates on the input and returns a result.
if (inputTuple != null && inputTuple.size() > 0) {
final Union union = unionBuilder_.build();
final DataBag bag = (DataBag) inputTuple.get(0);
updateUnion(bag, union);
final QuantilesSketch resultSketch = union.getResultAndReset();
if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
// return empty sketch
return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
@Override
public Schema outputSchema(final Schema input) {
if (input == null) return null;
try {
final Schema tupleSchema = new Schema();
tupleSchema.add(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
return new Schema(new Schema.FieldSchema(getSchemaName(
this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
} catch (FrontendException e) {
throw new RuntimeException(e);
}
}
//ACCUMULATOR INTERFACE
/**
* An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
* accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize the
* sketch at the end. Instead, it can be called multiple times, each time with another bag of
* Sketch Tuples to be input to the Union.
*
* @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
* @see #exec
* @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
* @throws IOException by Pig
*/
@Override
public void accumulate(final Tuple inputTuple) throws IOException {
if (inputTuple == null || inputTuple.size() == 0) return;
final DataBag bag = (DataBag) inputTuple.get(0);
if (bag == null) return;
if (accumUnion_ == null) accumUnion_ = unionBuilder_.build();
updateUnion(bag, accumUnion_);
}
/**
* Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
*
* @return Sketch Tuple. (see {@link #exec} for return tuple format)
* @see "org.apache.pig.Accumulator.getValue()"
*/
@Override
public Tuple getValue() {
if (accumUnion_ != null) {
final QuantilesSketch resultSketch = accumUnion_.getResultAndReset();
if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
// return empty sketch
return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
/**
* Cleans up the UDF state after being called using the {@link Accumulator} interface.
*
* @see "org.apache.pig.Accumulator.cleanup()"
*/
@Override
public void cleanup() {
accumUnion_ = null;
}
//ALGEBRAIC INTERFACE
@Override
public String getInitial() {
return Initial.class.getName();
}
@Override
public String getIntermed() {
return IntermediateFinal.class.getName();
}
@Override
public String getFinal() {
return IntermediateFinal.class.getName();
}
//TOP LEVEL PRIVATE STATIC METHODS
/**
* Updates a union from a bag of sketches
*
* @param bag A bag of sketchTuples.
* @param union The union to update
*/
private static void updateUnion(DataBag bag, Union union) throws ExecException {
for (Tuple innerTuple: bag) {
final Object f0 = innerTuple.get(0);
if (f0 == null) continue;
if (f0 instanceof DataByteArray) {
final DataByteArray dba = (DataByteArray) f0;
if (dba.size() > 0) {
union.update(new NativeMemory(dba.get()));
}
} else {
throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: " + innerTuple.getType(0));
}
}
}
//STATIC Initial Class only called by Pig
/**
* Class used to calculate the initial pass of an Algebraic sketch operation.
*
* <p>
* The Initial class simply passes through all records unchanged so that they can be
* processed by the intermediate processor instead.</p>
*/
public static class Initial extends EvalFunc<Tuple> {
// The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless.
// The constructors and final parameters must mirror the parent class as there is no linkage
// between them.
/**
* Default constructor.
*/
public Initial() {}
/**
* Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
* same constructor arguments as the base UDF. In this case the arguments are ignored.
*
* @param kStr string representation of k
*/
public Initial(final String kStr) {}
@Override // Initial exec
public Tuple exec(final Tuple inputTuple) throws IOException {
return inputTuple;
}
}
// STATIC IntermediateFinal Class only called by Pig
/**
* Class used to calculate the intermediate or final combiner pass of an <i>Algebraic</i> union
* operation. This is called from the combiner, and may be called multiple times (from the mapper
* and from the reducer). It will receive a bag of values returned by either the <i>Intermediate</i>
* stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
* interpret both types.
*/
public static class IntermediateFinal extends EvalFunc<Tuple> {
// The Algebraic worker classes (Initial, IntermediateFinal) are static and stateless.
// The constructors and final parameters must mirror the parent class as there is no linkage
// between them.
private final UnionBuilder unionBuilder_;
/**
* Default constructor. Assumes default k.
*/
public IntermediateFinal() {
this(0);
}
/**
* Constructor for the intermediate and final passes of an Algebraic function. Pig will call
* this and pass the same constructor arguments as the base UDF.
*
* @param kStr string representation of k
*/
public IntermediateFinal(final String kStr) {
this(Integer.parseInt(kStr));
}
/**
* Constructor with primitives for the intermediate and final passes of an Algebraic function.
*
* @param k parameter that determines the accuracy and size of the sketch.
*/
public IntermediateFinal(final int k) {
unionBuilder_ = Union.builder();
if (k > 0) unionBuilder_.setK(k);
}
@Override // IntermediateFinal exec
public Tuple exec(final Tuple inputTuple) throws IOException {
if (inputTuple != null && inputTuple.size() > 0) {
final Union union = unionBuilder_.build();
final DataBag outerBag = (DataBag) inputTuple.get(0);
for (final Tuple dataTuple: outerBag) {
final Object f0 = dataTuple.get(0);
if (f0 == null) continue;
if (f0 instanceof DataBag) {
final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
if (innerBag.size() == 0) continue;
// If field 0 of a dataTuple is again a Bag all tuples of this inner bag
// will be passed into the union.
// It is due to system bagged outputs from multiple mapper Initial functions.
// The Intermediate stage was bypassed.
updateUnion(innerBag, union);
} else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
// If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
// It is due to system bagged outputs from multiple mapper Intermediate functions.
// Each dataTuple.DBA:sketch will merged into the union.
final DataByteArray dba = (DataByteArray) f0;
union.update(new NativeMemory(dba.get()));
} else {
throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ f0.getClass().getName());
}
}
final QuantilesSketch resultSketch = union.getResultAndReset();
if (resultSketch != null) return tupleFactory_.newTuple(new DataByteArray(resultSketch.toByteArray()));
}
// return empty sketch
return tupleFactory_.newTuple(new DataByteArray(unionBuilder_.build().getResult().toByteArray()));
}
} // end IntermediateFinal
}