blob: bf087d78b52fb9678d1ce6fa23d74f5bd2a26b95 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.state.managed;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.fileaccess.FileAccess;
import org.apache.apex.malhar.lib.utils.serde.KeyValueByteStreamProvider;
import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.datatorrent.netlet.util.Slice;
/**
* A bucket that groups events.
*
* @since 3.4.0
*/
public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvider
{
/**
* @return bucket id
*/
long getBucketId();
/**
*
* @return size of bucket in memory.
*/
long getSizeInBytes();
/**
* Get value of a key.
*
* @param key key.
* @param timeBucket time bucket of the key if known; -1 otherwise.
* @param source source to read from
* @return value of the key.
*/
Slice get(Slice key, long timeBucket, ReadSource source);
/**
* Set value of a key.
*
* @param key key.
* @param timeBucket timeBucket of the key.
* @param value value of the key.
*/
void put(Slice key, long timeBucket, Slice value);
/**
* Triggers the bucket to checkpoint. Returns the non checkpointed data so far.
*
* @return non checkpointed data.
*/
Map<Slice, BucketedValue> checkpoint(long windowId);
/**
* Triggers the bucket to commit data till provided window id.
*
* @param windowId window id
*/
void committed(long windowId);
/**
* Triggers bucket to free memory which is already persisted in bucket data files.
*
* @param windowId All the data corresponding to windowId's less than or equal to this winowId
* will be freed.
* @return amount of memory freed in bytes.
* @throws IOException
*/
long freeMemory(long windowId) throws IOException;
/**
* Allows the bucket to process/cache data which is recovered (from window files) after failure.
*
* @param windowId recovery window
* @param recoveredData recovered data
*/
void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData);
enum ReadSource
{
MEMORY, //state in memory in key/value form
READERS, //these are streams in which the key will be searched and serialized.
ALL //both the above states.
}
class BucketedValue
{
private long timeBucket;
private Slice value;
protected BucketedValue()
{
}
protected BucketedValue(long timeBucket, Slice value)
{
this.timeBucket = timeBucket;
this.value = value;
}
protected long getTimeBucket()
{
return timeBucket;
}
protected void setTimeBucket(long timeBucket)
{
this.timeBucket = timeBucket;
}
public Slice getValue()
{
return value;
}
public void setValue(Slice value)
{
this.value = value;
}
public long getSize()
{
long size = 0;
if (value != null) {
size += value.length;
}
size += Longs.BYTES; //time-bucket
return size;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof BucketedValue)) {
return false;
}
BucketedValue that = (BucketedValue)o;
return timeBucket == that.timeBucket && value.equals(that.value);
}
@Override
public int hashCode()
{
return Objects.hash(timeBucket, value);
}
}
/**
* Default bucket.<br/>
* Not thread-safe.
*/
class DefaultBucket implements Bucket
{
private final long bucketId;
//Key -> Ordered values
private transient Map<Slice, BucketedValue> flash = Maps.newConcurrentMap();
//Data persisted in write ahead logs. window -> bucket
private final transient ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> checkpointedData =
new ConcurrentSkipListMap<>();
//Data persisted in bucket data files
private final transient ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> committedData =
new ConcurrentSkipListMap<>();
//Data serialized/deserialized from bucket data files: key -> value from latest time bucket on file
private final transient Map<Slice, BucketedValue> fileCache = Maps.newConcurrentMap();
//TimeBucket -> FileReaders
private final transient Map<Long, FileAccess.FileReader> readers = Maps.newTreeMap();
protected transient ManagedStateContext managedStateContext;
private AtomicLong sizeInBytes = new AtomicLong(0);
private final transient Slice dummyGetKey = new Slice(null, 0, 0);
private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
/**
* By default, separate keys and values into two different streams.
* key stream and value stream should be created during construction instead of setup, as the reference of the streams will be passed to the serialize method
*/
protected WindowedBlockStream keyStream = new WindowedBlockStream();
protected WindowedBlockStream valueStream = new WindowedBlockStream();
protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
private static boolean disableBloomFilterByDefault = false;
private boolean disableBloomFilter = disableBloomFilterByDefault;
private static int bloomFilterDefaultBitSize = 1000000;
private SliceBloomFilter bloomFilter = null;
private int bloomFilterBitSize = bloomFilterDefaultBitSize;
private DefaultBucket()
{
//for kryo
bucketId = -1;
}
public DefaultBucket(long bucketId)
{
this.bucketId = bucketId;
}
@Override
public void setup(@NotNull ManagedStateContext managedStateContext)
{
this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
if (!disableBloomFilter && bloomFilter == null) {
bloomFilter = new SliceBloomFilter(bloomFilterBitSize, 0.99);
}
}
@Override
public long getBucketId()
{
return bucketId;
}
@Override
public long getSizeInBytes()
{
return sizeInBytes.longValue();
}
private Slice getFromMemory(Slice key)
{
//search the cache for key
BucketedValue bucketedValue = flash.get(key);
if (bucketedValue != null) {
return bucketedValue.getValue();
}
for (Long window : checkpointedData.descendingKeySet()) {
//traverse the checkpointed data in reverse order
bucketedValue = checkpointedData.get(window).get(key);
if (bucketedValue != null) {
return bucketedValue.getValue();
}
}
for (Long window : committedData.descendingKeySet()) {
//traverse the committed data in reverse order
bucketedValue = committedData.get(window).get(key);
if (bucketedValue != null) {
return bucketedValue.getValue();
}
}
bucketedValue = fileCache.get(key);
if (bucketedValue != null) {
return bucketedValue.getValue();
}
return null;
}
private Slice getFromReaders(Slice key, long timeBucket)
{
try {
if (cachedBucketMetas == null) {
cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId);
}
if (timeBucket != -1) {
BucketedValue bucketedValue = getValueFromTimeBucketReader(key, timeBucket);
if (bucketedValue != null) {
if (!cachedBucketMetas.isEmpty() && timeBucket == cachedBucketMetas.firstKey()) {
//if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
//Since the size of the whole time-bucket is added to total size, there is no need to add the size of
//entries in file cache.
fileCache.put(key, bucketedValue);
}
return bucketedValue.value;
}
} else {
//search all the time buckets
for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas.values()) {
if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
//keys in the time bucket files are sorted so if the first key in the file is greater than the key being
//searched, the key will not be present in that file.
BucketedValue bucketedValue = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
if (bucketedValue != null) {
//Only when the key is read from the latest time bucket on the file, the key/value is put in the file
// cache.
fileCache.put(key, bucketedValue);
return bucketedValue.value;
}
}
}
}
return null;
} catch (IOException e) {
throw new RuntimeException("get time-buckets " + bucketId, e);
}
}
@Override
public Slice get(Slice key, long timeBucket, ReadSource readSource)
{
// This call is lightweight
releaseMemory();
key = SliceUtils.toBufferSlice(key);
switch (readSource) {
case MEMORY:
return getFromMemory(key);
case READERS:
return getFromReaders(key, timeBucket);
case ALL:
default:
Slice value = getFromMemory(key);
if (value != null) {
return value;
}
return getFromReaders(key, timeBucket);
}
}
private int filteredCount = 0;
private int unfilteredCount = 0;
private static final int STATISTICS_BARRIER_NUM = 10000;
/**
* Test the result of using bloom filter and remove it if it not helpful for improving the performance.
*
* @param mightContain true if collection might contain the value
*/
private void verifyBloomFilter(boolean mightContain)
{
if (!mightContain) {
filteredCount++;
} else {
unfilteredCount++;
}
if (unfilteredCount + filteredCount > STATISTICS_BARRIER_NUM && unfilteredCount > filteredCount * 4) {
unloadBloomFilter();
}
}
private void unloadBloomFilter()
{
bloomFilter = null;
}
/**
* Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable.
* If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null.
* @param key key
* @param timeBucket time bucket
* @return value if key is found in the time bucket; null otherwise
*/
private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
{
if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner &&
timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) {
return null;
}
if (bloomFilter != null) {
boolean mightContain = bloomFilter.mightContain(key);
verifyBloomFilter(mightContain);
if (!mightContain) {
return null;
}
}
FileAccess.FileReader fileReader = readers.get(timeBucket);
if (fileReader != null) {
return readValue(fileReader, key, timeBucket);
}
//file reader is not loaded and is null
try {
if (loadFileReader(timeBucket)) {
return readValue(readers.get(timeBucket), key, timeBucket);
}
return null;
} catch (IOException e) {
throw new RuntimeException("while loading " + bucketId + ", " + timeBucket, e);
}
}
private BucketedValue readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
{
Slice valSlice = new Slice(null, 0, 0);
try {
if (fileReader.seek(key)) {
fileReader.next(dummyGetKey, valSlice);
return new BucketedValue(timeBucket, valSlice);
} else {
return null;
}
} catch (IOException e) {
throw new RuntimeException("reading " + bucketId + ", " + timeBucket, e);
}
}
private boolean loadFileReader(long timeBucketId) throws IOException
{
BucketsFileSystem.TimeBucketMeta tbm = managedStateContext.getBucketsFileSystem()
.getTimeBucketMeta(bucketId, timeBucketId);
if (tbm != null) {
FileAccess.FileReader reader = managedStateContext.getBucketsFileSystem().getReader(bucketId,
BucketsFileSystem.getFileName(timeBucketId));
readers.put(timeBucketId, reader);
sizeInBytes.getAndAdd(tbm.getSizeInBytes());
return true;
}
return false;
}
@Override
public void put(Slice key, long timeBucket, Slice value)
{
// This call is lightweight
releaseMemory();
key = SliceUtils.toBufferSlice(key);
value = SliceUtils.toBufferSlice(value);
BucketedValue bucketedValue = flash.get(key);
if (bucketedValue == null) {
bucketedValue = new BucketedValue(timeBucket, value);
flash.put(key, bucketedValue);
sizeInBytes.getAndAdd(key.length + value.length + Longs.BYTES);
} else {
if (timeBucket >= bucketedValue.getTimeBucket()) {
int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
sizeInBytes.getAndAdd(inc);
bucketedValue.setTimeBucket(timeBucket);
bucketedValue.setValue(value);
} else {
throw new AssertionError("newer entry exists for " + key);
}
}
}
/**
* Free memory up to the given windowId
* This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
* This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
*
* We intend to manage memory by keyStream and valueStream. But the we can't avoid caller use other mechanism to manage memory.
* It is required to cleanup maps in this case.
*/
@Override
public long freeMemory(long windowId) throws IOException
{
long memoryFreed = 0;
Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> entryIter = committedData.entrySet().iterator();
while (entryIter.hasNext()) {
Map.Entry<Long, Map<Slice, BucketedValue>> bucketEntry = entryIter.next();
if (bucketEntry.getKey() > windowId) {
break;
}
Map<Slice, BucketedValue> windowData = bucketEntry.getValue();
entryIter.remove();
for (Map.Entry<Slice, BucketedValue> entry : windowData.entrySet()) {
/**
* The data still in memory and reachable before the memory released
* So put key into bloom filter here
*/
if (bloomFilter != null) {
bloomFilter.put(entry.getKey());
}
memoryFreed += entry.getKey().length + entry.getValue().getSize();
}
}
fileCache.clear();
if (cachedBucketMetas != null) {
for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
if (reader != null) {
memoryFreed += tbm.getSizeInBytes();
reader.close();
}
}
}
sizeInBytes.getAndAdd(-memoryFreed);
//add the windowId to the queue to let operator thread release memory from keyStream and valueStream
windowsForFreeMemory.add(windowId);
return memoryFreed;
}
/**
* Release the memory managed by keyStream and valueStream.
* This operation must be called from operator thread. It won't do anything if no memory to be freed
*/
protected long releaseMemory()
{
long memoryFreed = 0;
while (!windowsForFreeMemory.isEmpty()) {
long windowId = windowsForFreeMemory.poll();
long originSize = keyStream.size() + valueStream.size();
keyStream.completeWindow(windowId);
valueStream.completeWindow(windowId);
memoryFreed += originSize - (keyStream.size() + valueStream.size());
}
if (memoryFreed > 0) {
//release the free memory immediately
keyStream.releaseAllFreeMemory();
valueStream.releaseAllFreeMemory();
}
return memoryFreed;
}
@Override
public Map<Slice, BucketedValue> checkpoint(long windowId)
{
releaseMemory();
try {
//transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
return flash;
} finally {
checkpointedData.put(windowId, flash);
flash = Maps.newHashMap();
}
}
@Override
public void committed(long committedWindowId)
{
releaseMemory();
Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator();
while (stateIterator.hasNext()) {
Map.Entry<Long, Map<Slice, BucketedValue>> entry = stateIterator.next();
long savedWindow = entry.getKey();
if (savedWindow <= committedWindowId) {
Map<Slice, BucketedValue> bucketData = entry.getValue();
//removing any stale values from the file cache
for (Slice key : bucketData.keySet()) {
fileCache.remove(key);
}
long memoryFreed = 0;
for (BucketedValue bucketedValue : bucketData.values()) {
FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket());
if (reader != null) {
//closing the file reader for the time bucket if it is in memory because the time-bucket is modified
//so it will be re-written by BucketsDataManager
try {
BucketsFileSystem.TimeBucketMeta tbm = cachedBucketMetas.get(bucketedValue.getTimeBucket());
if (tbm != null) {
memoryFreed += tbm.getSizeInBytes();
}
LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
reader.close();
} catch (IOException e) {
throw new RuntimeException("closing reader " + bucketId + ", " + bucketedValue.getTimeBucket(), e);
}
readers.remove(bucketedValue.getTimeBucket());
}
if (readers.isEmpty()) {
break;
}
}
sizeInBytes.getAndAdd(-memoryFreed);
if (!bucketData.isEmpty()) {
committedData.put(savedWindow, bucketData);
}
stateIterator.remove();
} else {
break;
}
}
cachedBucketMetas = null;
}
@Override
public void recoveredData(long recoveredWindow, Map<Slice, BucketedValue> data)
{
checkpointedData.put(recoveredWindow, data);
}
@Override
public void teardown()
{
Set<Long> failureBuckets = Sets.newHashSet();
for (Map.Entry<Long, FileAccess.FileReader> entry : readers.entrySet()) {
try {
LOG.debug("closing reader {} {}", bucketId, entry.getKey());
entry.getValue().close();
} catch (IOException e) {
//will try to close all readers
failureBuckets.add(entry.getKey());
}
}
if (!failureBuckets.isEmpty()) {
StringBuilder builder = new StringBuilder("teardown of ");
builder.append(bucketId).append(" < ");
for (Long timeBucket : failureBuckets) {
builder.append(timeBucket);
}
builder.append(">");
throw new RuntimeException(builder.toString());
}
}
@VisibleForTesting
Map<Long, FileAccess.FileReader> getReaders()
{
return readers;
}
@VisibleForTesting
ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> getCommittedData()
{
return committedData;
}
@VisibleForTesting
ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> getCheckpointedData()
{
return checkpointedData;
}
@Override
public WindowedBlockStream getKeyStream()
{
return keyStream;
}
@Override
public WindowedBlockStream getValueStream()
{
return valueStream;
}
public static int getBloomFilterDefaultBitSize()
{
return bloomFilterDefaultBitSize;
}
public static void setBloomFilterDefaultBitSize(int bloomFilterDefaultBitSize)
{
DefaultBucket.bloomFilterDefaultBitSize = bloomFilterDefaultBitSize;
}
public int getBloomFilterBitSize()
{
return bloomFilterBitSize;
}
public void setBloomFilterBitSize(int bloomFilterBitSize)
{
this.bloomFilterBitSize = bloomFilterBitSize;
}
public static boolean isDisableBloomFilterByDefault()
{
return disableBloomFilterByDefault;
}
public static void setDisableBloomFilterByDefault(boolean disableBloomFilterByDefault)
{
DefaultBucket.disableBloomFilterByDefault = disableBloomFilterByDefault;
}
public boolean isDisableBloomFilter()
{
return disableBloomFilter;
}
public void setDisableBloomFilter(boolean disableBloomFilter)
{
this.disableBloomFilter = disableBloomFilter;
this.unloadBloomFilter();
}
private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
}
}