blob: f0330ec81b962e6df988315462f6bacf43c8b1ba [file] [log] [blame] [view]
---
layout: doc_page
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Concurrent Theta Sketch
Concurrent Theta sketch is a manifestation of a generic approach for parallelizing sketches while bounding the error such parallelism introduces<sup>1</sup>.
At its core, a generic concurrent sketch ingests data through multiple sketches that are _local_ to the inserting threads.
The data in these local sketches, which are bounded in size, is merged into a single _shared_ sketch by utilizing the sketch _mergability_ property.
Queries are served from a _snapshot_ of the shared sketch.
This snapshot is taken frequently enough to guarantee the result's freshness, and seldom enough to not become the bottleneck of the sketch.
<img class="doc-img-full" src="{{site.docs_img_dir}}/theta/GenericConcurrentSketch.png" alt="GenericConcurrentSketch" />
Unlike previous solutions, this design enables simultaneous queries and updates to a sketch from an arbitrary number of threads.
The responsibility for merging the thread-local sketch into the shared sketch is divided into two
1. *Eager propagation*. When the sketch is small any delay in merging the local data into the shared thread--so it is captured by the snapshot--can incur a large error in the query result.
Therefore, data is eagerly propagated to the shared sketch by the inserting thread upon each update.
2. *Lazy propagation*. When sketches are big enough, the local sketches are used to buffer data that should be added to the shared sketch.
A _background propagation thread_ continuously merges full local sketches into the shared sketch.
## Implementation and User API
Both the local sketch and the shared sketch are descendants of UpdateSketch and therefore support its API.
However, it is important that the shared sketch is only used to get the estimate, while updates only go through the local sketches.
The shared sketch can be allocated either off-heap or on-heap, while the local sketch is always allocated on-heap.
Like other Theta sketches, `UpdateSketchBuilder` is used to build the shared and local sketches.
It is imperative that the shared sketch is built first.
Then, at the context of an application thread(/s) that feeds the data a local sketch is created and connected to the shared sketch.
This is a list of the configuration parameters for the builder:
1. Buffer size of shared sketch
2. Buffer size of local sketches
3. Size of the threads pool to handle propagation of all sketches
4. Flag to indicate if the propagated data is to be sorted prior to propagation
5. Max concurrency error; the point the sketch flips from exact to estimate mode is derived from this parameter
6. Max number of local threads to be used
## Code Example for Building a Concurrent Theta Sketch
import org.apache.datasketches.memory.WritableDirectHandle;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;
class ApplicationWithsketches {
private UpdateSketchBuilder bldr;
private UpdateSketch sharedSketch;
private Thread writer;
private int sharedLgK;
private int localLgK;
private boolean ordered;
private boolean offHeap;
private int poolThreads;
private double maxConcurrencyError;
private int maxNumWriterThreads;
private WritableDirectHandle wdh;
private WritableMemory wmem;
//configures builder for both local and shared
void buildConcSketch() {
bldr = new UpdateSketchBuilder();
// All configuration parameters are optional
bldr.setLogNominalEntries(sharedLgK); // default 12 (K=4096)
bldr.setLocalLogNominalEntries(localLgK); // default 4 (B=16)
bldr.setNumPoolThreads(poolThreads); // default 3
bldr.setPropagateOrderedCompact(ordered); // default true
bldr.setMaxConcurrencyError(maxConcurrencyError); // default 0
bldr.setMaxNumLocalThreads(maxNumWriterThreads); // default 1
// build shared sketch first
final int maxSharedUpdateBytes = Sketch.getMaxUpdateSketchBytes(1 << sharedLgK);
if (offHeap) {
wdh = WritableMemory.allocateDirect(maxSharedUpdateBytes);
wmem = wdh.get();
} else {
wmem = null; //WritableMemory.allocate(maxSharedUpdateBytes);
}
sharedSketch = bldr.buildShared(wmem);
}
void mainApplicationMethod() {
// init attributes, e.g, with properties file
...
buildConcSketch();
writer = new WriterThread(bldr, sharedSketch);
while(#some_application_condition) {
// get estimate through shared sketch
doSomethingWithEstimate(sharedSketch.getEstimate());
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}
}
// Context of writer thread
class WriterThread extends Thread {
private UpdateSketch local;
// build local sketches from bldr and reference to shared sketch
public WriterThread(final UpdateSketchBuilder bldr, final UpdateSketch shared) {
local = bldr.buildLocal(shared);
//init input stream, such as a queue, or a communication buffer, etc.
}
// updtae concurrent sketch through local sketch - no need for locks or any other synchronization
public void run() {
while(true) {
if(#input_stream_is_not_empty) {
long data = getDataFromInputStream();
local.update(data);
}
}
}
## Serializing a Concurrent Sketch
A concurrent sketch is not a single unit of computation. It is composed of the shared sketch and the local buffers.
Only the shared sketch supports serialization as it captures the most up-to-date content of the sketch.
In the current implementation, deserializing a shred sketch yields an `UpdateSketch`.
Therefore when de-serializing a concurrent sketch both the shared sketch and the local buffers need to be re-created again.
## Code Example for Serializing and Deserializing a Concurrent Theta Sketch
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;
public class serDeTest {
private UpdateSketchBuilder bldr;
private UpdateSketch sharedSketch;
private WritableMemory wmem;
void serDeConcurrentQuickSelectSketch() {
int k = 512;
// build shared sketch and local buffer as in the example above
bldr = new UpdateSketchBuilder();
...
sharedSketch = bldr.buildShared(wmem);
UpdateSketch local = bldr.buildLocal(sharedSketch);
int i=0;
// update sketch through local buffer
for (; i<10000; i++) {
local.update(i);
}
// serialize shared
byte[] serArr = shared.toByteArray();
Memory srcMem = Memory.wrap(serArr);
Sketch recovered = Sketches.heapifyUpdateSketch(srcMem);
//reconstruct to Native/Direct
final int bytes = Sketch.getMaxUpdateSketchBytes(k);
wmem = WritableMemory.allocate(bytes);
shared = bldr.buildSharedFromSketch((UpdateSketch)recovered, wmem);
UpdateSketch local2 = bldr.buildLocal(shared);
// check estimate ~10K
System.out.println("Estimate="+sharedSketch.getEstimate();
// continue updating through new local buffer
for (; i<20000; i++) {
local2.update(i);
}
// check estimate ~20K
System.out.println("Estimate2="+sharedSketch.getEstimate();
}
}
[1] Arik Rinberg, Alexander Spiegelman, Edward Bortnikov, Eshcar Hillel, Idit Keidar, Hadar Serviansky, *Fast Concurrent Data Sketches*, https://arxiv.org/abs/1902.10995