blob: f9c16cc431de2aee5c0a81cb34080af76a0980ab [file] [log] [blame]
/*
* Copyright 2015, Yahoo! Inc.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/
package com.yahoo.sketches.demo;
import static java.lang.Math.sqrt;
import static com.yahoo.sketches.hash.MurmurHash3.hash;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.ResizeFactor;
import com.yahoo.sketches.hll.HllSketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.UpdateSketch;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Random;
/**
* A simple demo that compares brute force counting of uniques vs. using sketches.
*
* <p>This demo computes a stream of values and feeds them first to
* an exact sort-based method of computing the number of unique values
* in the stream and then feeds a similar stream to two different types of
* sketches from the library.
*
* <p>This demo becomes most significant in the case where the number of uniques in the
* stream exceeds what the computer can hold in memory.
*
* <p>This demo utilizes the Unix sort and wc commands for the brute force compuation.
* So this needs to be run on a linux or mac machine. A windows machine with a similar unix
* library installed should also work, but it has not been tested.
*/
public class DemoImpl {
//Static constants
private static final String LS = System.getProperty("line.separator");
private static final byte LS_BYTE = LS.getBytes()[0];
private static Random rand = new Random();
private static StandardOpenOption C = StandardOpenOption.CREATE;
private static StandardOpenOption W = StandardOpenOption.WRITE;
private static StandardOpenOption TE = StandardOpenOption.TRUNCATE_EXISTING;
//Stream Configuration
private int byteBufCap_ = 1000000; //ByteBuffer capacity
private long n_ = (long)1E8; //stream length
private final int threshold_; //equivalent uniquesFraction on integer scale
//Sketch configuration
private int lgK_ = 14; //16K
//Internal sketch values
private int maxMemSkBytes_;
private double rse2_; //RSE for 95% confidence
private UpdateSketch tSketch_ = null;
private HllSketch hllSketch_ = null;
//Other internal values
private Path path = Paths.get("tmp/test.txt");
private long[] vArr_ = new long[1]; //reuse this array
private long fileBytes_ = 0;
private long u_ = 0; //unique count;
/**
* Constuct the demo.
* @param streamLen The total stream length.
* @param uniquesFraction the fraction of streamLen values less than 1.0, that will be unique.
* The actual # of uniques will vary around this value, because it is computed statistically.
*/
public DemoImpl(long streamLen, double uniquesFraction) {
if (uniquesFraction == 1.0) {
this.threshold_ = Integer.MAX_VALUE;
}
else {
this.threshold_ = (int)(Integer.MAX_VALUE * uniquesFraction);
}
n_ = streamLen;
lgK_ = 14; //Log-base 2 of the configured sketch size = 16K
File dir = new File("tmp");
if (!dir.exists()) {
try {
dir.mkdir();
} catch(SecurityException e) {
throw new SecurityException(e);
}
}
}
/**
* Run the demo
*/
public void runDemo() {
println("# COMPUTE DISTINCT COUNT EXACTLY:");
long exactTimeMS;
exactTimeMS = buildFile();
//exactTimeMS = buildFileAndSketch(); //used instead only for testing
println("## SORT & REMOVE DUPLICATES");
String sortCmd = "sort -u -o tmp/sorted.txt tmp/test.txt";
exactTimeMS += runUnixCmd("sort", sortCmd);
println("\n## LINE COUNT");
String wcCmd = "wc -l tmp/sorted.txt";
exactTimeMS += runUnixCmd("wc", wcCmd);
println("Total Exact "+getMinSec(exactTimeMS) +LS+LS);
println("# COMPUTE DISTINCT COUNT USING SKETCHES");
configureThetaSketch();
long sketchTimeMS = buildSketch();
double factor = exactTimeMS*1.0/sketchTimeMS;
println("Speedup Factor "+String.format("%.1f", factor) + LS);
configureHLLSketch();
sketchTimeMS = buildSketch();
factor = exactTimeMS*1.0/sketchTimeMS;
println("Speedup Factor "+String.format("%.1f", factor));
}
/**
* @return total test time in milliseconds
*/
private long buildFile() {
println("## BUILD FILE:");
ByteBuffer byteBuf = ByteBuffer.allocate(byteBufCap_);
u_ = 0;
fileBytes_ = 0;
long testStartTime_mS = System.currentTimeMillis();
try (SeekableByteChannel sbc = Files.newByteChannel(path, C, W, TE)) {
for (long i=0; i<n_; i++) {
long v = nextValue();
String s = Long.toHexString(v);
if (byteBuf.remaining() < 25) {
byteBuf.flip();
fileBytes_ += sbc.write(byteBuf);
byteBuf.clear();
}
byteBuf.put(s.getBytes()).put(LS_BYTE);
}
if (byteBuf.position() > 0) { //write remainder
byteBuf.flip();
fileBytes_ += sbc.write(byteBuf);
byteBuf.clear();
}
}
catch (IOException e) {
e.printStackTrace();
}
long testTime_mS = System.currentTimeMillis() - testStartTime_mS;
//Print common results
printCommon(testTime_mS, n_, u_);
//Print file results
println("File Size Bytes: "+String.format("%,d", fileBytes_) + LS);
return testTime_mS;
}
/**
* @return total test time in milliseconds
*/
private static long runUnixCmd(String name, String cmd) {
StringBuilder sbOut = new StringBuilder();
StringBuilder sbErr = new StringBuilder();
String out = null;
String err = null;
Process p = null;
String[] envp = {"LC_ALL=C"}; //https://bugs.launchpad.net/ubuntu/+source/coreutils/+bug/846628
long testStartTime_mS = System.currentTimeMillis();
try {
// run the Unix cmd using the Runtime exec method:
p = Runtime.getRuntime().exec(cmd, envp);
BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
// read the output from the command
boolean outFlag = true;
while ((out = stdInput.readLine()) != null) {
if (outFlag) {
sbOut.append("Output from "+name+" command:").append(LS);
outFlag = false;
}
sbOut.append(out).append(LS);
}
// read any errors from the attempted command
boolean errFlag = true;
while ((err = stdError.readLine()) != null) {
if (errFlag) {
sbErr.append("\nError from "+name+" command:").append(LS);
errFlag = false;
}
sbErr.append(err).append(LS);
}
}
catch (IOException e) {
System.out.println("Exception: ");
e.printStackTrace();
System.exit( -1);
}
if ((p != null) && (p.isAlive())) {
p.destroy();
}
long testTime_mS = System.currentTimeMillis() - testStartTime_mS;
println("Unix cmd: "+cmd);
println(getMinSec(testTime_mS));
if (sbOut.length() > 0) { println(sbOut.toString()); }
if (sbErr.length() > 0) { println(sbErr.toString()); }
return testTime_mS;
}
/**
* @return total test time in milliseconds
*/
private long buildSketch() {
u_ = 0; //unique counter for accuracy computation
long testStartTime_mS = System.currentTimeMillis();
if (tSketch_ != null) { //Theta Sketch
for (long i = 0; i < n_; i++) {
long v = nextValue();
tSketch_.update(v);
}
}
else { //HLL Sketch
for (long i = 0; i < n_; i++) {
long v = nextValue();
hllSketch_.update(v);
}
}
long testTime_mS = System.currentTimeMillis() - testStartTime_mS;
//Print sketch name
String sk = (tSketch_ != null)? "THETA" : "HLL";
println("## USING "+sk+" SKETCH");
//Print common results
printCommon(testTime_mS, n_, u_);
//Print sketch results
printSketchResults(u_, maxMemSkBytes_, rse2_);
return testTime_mS;
}
/**
* Used in testing
* @return total test time in milliseconds
*/
@SuppressWarnings("unused")
private long buildFileAndSketch() {
println("## BUILD FILE AND SKETCH:");
ByteBuffer byteBuf = ByteBuffer.allocate(byteBufCap_);
u_ = 0;
fileBytes_ = 0;
long testStartTime_mS = System.currentTimeMillis();
try (SeekableByteChannel sbc = Files.newByteChannel(path, C, W, TE)) {
if (tSketch_ != null) {
long v = nextValue();
tSketch_.update(v);
//build file
String s = Long.toHexString(v);
if (byteBuf.remaining() < 25) {
byteBuf.flip();
fileBytes_ += sbc.write(byteBuf);
byteBuf.clear();
}
byteBuf.put(s.getBytes()).put(LS_BYTE);
}
else { //HLL Sketch
long v = nextValue();
hllSketch_.update(v);
//build file
String s = Long.toHexString(v);
if (byteBuf.remaining() < 25) {
byteBuf.flip();
fileBytes_ += sbc.write(byteBuf);
byteBuf.clear();
}
byteBuf.put(s.getBytes()).put(LS_BYTE);
}
if (byteBuf.position() > 0) {
byteBuf.flip();
fileBytes_ += sbc.write(byteBuf);
byteBuf.clear();
}
}
catch (IOException e) {
e.printStackTrace();
}
long testTime_mS = System.currentTimeMillis() - testStartTime_mS;
//Print common results
printCommon(testTime_mS, n_, u_);
//Print file results
println("File Size Bytes: "+String.format("%,d", fileBytes_));
//Print sketch results
printSketchResults(u_, maxMemSkBytes_, rse2_);
return testTime_mS;
}
/**
* @return next hashed long value
*/
private long nextValue() {
if (((rand.nextInt() >>> 1) < threshold_) || (u_ == 0)) {
u_++;
}
vArr_[0] = u_;
return hash(vArr_, 0L)[0];
}
// private long nextValue() { //Faster version, always 100% uniques
// vArr_[0] = ++u_;
// return hash(vArr_, 0L)[0];
// }
private final void configureThetaSketch() {
int k = 1 << lgK_; //14
hllSketch_ = null;
maxMemSkBytes_ = k *16; //includs full hash table
rse2_ = 2.0/sqrt(k); //Error for 95% confidence
tSketch_ = Sketches.updateSketchBuilder().
setResizeFactor(ResizeFactor.X1).
setFamily(Family.ALPHA).build(k );
}
private final void configureHLLSketch() {
int k = 1 << lgK_; //14
boolean compressed = true;
boolean hipEstimator = true;
boolean denseMode = true;
tSketch_ = null;
maxMemSkBytes_ = (compressed)? k/2 : k;
rse2_ = 2.0 * ((hipEstimator)? 0.836/sqrt(k) : 1.04/sqrt(k)); //for 95% confidence
hllSketch_ = HllSketch.builder().setLogBuckets(lgK_).
setHipEstimator(hipEstimator).
setDenseMode(denseMode).
setCompressedDense(compressed).
build();
}
private static void printCommon(long testTime, long n, long u) {
println(getMinSec(testTime));
println("Total Values: "+String.format("%,d",n));
int nSecRate = (int) (testTime *1000000.0/n);
println("Build Rate: "+ String.format("%d nSec/Value", nSecRate));
println("Exact Uniques: "+String.format("%,d", u));
}
private void printSketchResults(long u, int maxMemSkBytes, double rse2) {
String sk = (tSketch_ != null)? "THETA" : "HLL";
println("## USING "+sk+" SKETCH");
double rounded = Math.round((tSketch_ != null)? tSketch_.getEstimate() : hllSketch_.getEstimate());
println("Sketch Estimate of Uniques: "+ String.format("%,d", (long)rounded));
double err = (u == 0)? 0 : (rounded/u - 1.0);
println("Sketch Relative Error: "+String.format("%.3f%%, +/- %.3f%%", err*100, rse2*100));
println("Max Sketch Size Bytes: "+ String.format("%,d", maxMemSkBytes));
}
private static String getMinSec(long mSec) {
int totSec = (int)(mSec/1000.0);
int min = totSec/60;
int sec = totSec%60;
int ms = (int)(mSec - totSec * 1000);
String t = String.format("Time Min:Sec.mSec = %d:%02d.%03d", min, sec, ms);
return t;
}
private static void println(String s) { System.out.println(s); }
}