blob: 9df4e5180b2908b3cb9dfca43f48297e35274738 [file] [log] [blame]
/*
* Copyright 2017, Yahoo! Inc.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/
package com.yahoo.sketches.pig.sampling;
import static com.yahoo.sketches.pig.sampling.VarOptCommonImpl.DEFAULT_TARGET_K;
import java.io.IOException;
import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.Algebraic;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.sampling.VarOptItemsUnion;
/**
* Accepts binary VarOpt sketch images and unions them into a single binary output sketch.
* Due to using opaque binary objects, schema information is unavailable.
*
* <p>The VarOpt sketch can handle input sketches with different values of <tt>k</tt>, and will
* produce a result using the largest number of samples that still produces a valid VarOpt
* sketch.</p>
*
* @author Jon Malkin
*/
public class VarOptUnion extends AccumulatorEvalFunc<DataByteArray> implements Algebraic {
private static final ArrayOfTuplesSerDe SERDE = new ArrayOfTuplesSerDe();
private final int maxK_;
private VarOptItemsUnion<Tuple> union_;
/**
* VarOpt union constructor.
* @param kStr String indicating the maximum number of desired entries in the sample.
*/
public VarOptUnion(final String kStr) {
maxK_ = Integer.parseInt(kStr);
if (maxK_ < 1) {
throw new IllegalArgumentException("VarOptUnion requires max sample size >= 1: "
+ maxK_);
}
}
VarOptUnion() { maxK_ = DEFAULT_TARGET_K; }
// We could overload exec() for easy cases, but we still need to compare the incoming
// reservoir's k vs max k and possibly downsample.
@Override
public void accumulate(final Tuple inputTuple) throws IOException {
if (inputTuple == null || inputTuple.size() < 1 || inputTuple.isNull(0)) {
return;
}
final DataBag sketches = (DataBag) inputTuple.get(0);
if (union_ == null) {
union_ = VarOptItemsUnion.newInstance(maxK_);
}
for (Tuple t : sketches) {
final DataByteArray dba = (DataByteArray) t.get(0);
final Memory sketch = Memory.wrap(dba.get());
union_.update(sketch, SERDE);
}
}
@Override
public DataByteArray getValue() {
return union_ == null ? null : new DataByteArray(union_.getResult().toByteArray(SERDE));
}
@Override
public void cleanup() {
union_ = null;
}
@Override
public String getInitial() {
return VarOptCommonImpl.UnionSketchesAsTuple.class.getName();
}
@Override
public String getIntermed() {
return VarOptCommonImpl.UnionSketchesAsTuple.class.getName();
}
@Override
public String getFinal() {
return VarOptCommonImpl.UnionSketchesAsByteArray.class.getName();
}
@Override
public Schema outputSchema(final Schema input) {
return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), DataType.BYTEARRAY));
}
}