blob: 7ac84ec2aa866f39607ab4c0c2798d826492163b [file] [log] [blame]
/*
* Copyright 2016, Yahoo! Inc.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/
package com.yahoo.sketches.pig.theta;
import static com.yahoo.sketches.Util.DEFAULT_UPDATE_SEED;
import static com.yahoo.sketches.pig.theta.PigUtil.compactOrderedSketchToTuple;
import static com.yahoo.sketches.pig.theta.PigUtil.extractFieldAtIndex;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.theta.AnotB;
import com.yahoo.sketches.theta.CompactSketch;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
/**
* This is a Pig UDF that performs the A-NOT-B Set Operation on two given Sketches. Because this
* operation is fundamentally asymetric, it is structured as a single stateless operation rather
* than stateful as are Union and Intersection UDFs, which can be iterative.
* The requirement to perform iterative A\B\C\... is rare. If needed, it can be rendered easily by
* the caller.
*
* @author Lee Rhodes
*/
public class AexcludeB extends EvalFunc<Tuple> {
private final long seed_;
//TOP LEVEL API
/**
* Default constructor to make pig validation happy. Assumes:
* <ul>
* <li><a href="{@docRoot}/resources/dictionary.html#defaultUpdateSeed">See Default Update Seed</a></li>
* </ul>
*/
public AexcludeB() {
this(DEFAULT_UPDATE_SEED);
}
/**
* String constructor.
*
* @param seedStr <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>
*/
public AexcludeB(final String seedStr) {
this(Long.parseLong(seedStr));
}
/**
* Base constructor.
*
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
*/
public AexcludeB(final long seed) {
super();
this.seed_ = seed;
}
// @formatter:off
/**
* Top Level Exec Function.
* <p>
* This method accepts a <b>Sketch AnotB Input Tuple</b> and returns a
* <b>Sketch Tuple</b>.
* </p>
*
* <b>Sketch AnotB Input Tuple</b>
* <ul>
* <li>Tuple: TUPLE (Must contain 2 fields): <br>
* Java data type: Pig DataType: Description
* <ul>
* <li>index 0: DataByteArray: BYTEARRAY: Sketch A</li>
* <li>index 1: DataByteArray: BYTEARRAY: Sketch B</li>
* </ul>
* </li>
* </ul>
*
* <p>
* Any other input tuple will throw an exception!
* </p>
*
* <b>Sketch Tuple</b>
* <ul>
* <li>Tuple: TUPLE (Contains exactly 1 field)
* <ul>
* <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
* </ul>
* </li>
* </ul>
*
* @throws ExecException from Pig.
*/
// @formatter:on
@Override //TOP LEVEL EXEC
public Tuple exec(final Tuple inputTuple) throws IOException {
//The exec is a stateless function. It operates on the input and returns a result.
// It can only call static functions.
final Object objA = extractFieldAtIndex(inputTuple, 0);
Sketch sketchA = null;
if (objA != null) {
final DataByteArray dbaA = (DataByteArray)objA;
final Memory srcMem = Memory.wrap(dbaA.get());
sketchA = Sketch.wrap(srcMem, seed_);
}
final Object objB = extractFieldAtIndex(inputTuple, 1);
Sketch sketchB = null;
if (objB != null) {
final DataByteArray dbaB = (DataByteArray)objB;
final Memory srcMem = Memory.wrap(dbaB.get());
sketchB = Sketch.wrap(srcMem, seed_);
}
final AnotB aNOTb = SetOperation.builder().setSeed(seed_).buildANotB();
aNOTb.update(sketchA, sketchB);
final CompactSketch compactSketch = aNOTb.getResult(true, null);
return compactOrderedSketchToTuple(compactSketch);
}
@Override
public Schema outputSchema(final Schema input) {
if (input != null) {
try {
final Schema tupleSchema = new Schema();
tupleSchema.add(new Schema.FieldSchema("Sketch", DataType.BYTEARRAY));
return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
}
catch (final FrontendException e) {
// fall through
}
}
return null;
}
}