blob: d08b9fc31a86629990412ea83031dad13bf57333 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.bufferserver.internal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.bufferserver.packet.BeginWindowTuple;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.packet.ResetWindowTuple;
import com.datatorrent.bufferserver.packet.Tuple;
import com.datatorrent.bufferserver.storage.Storage;
import com.datatorrent.bufferserver.util.BitVector;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.bufferserver.util.SerializedData;
import com.datatorrent.bufferserver.util.VarInt;
import com.datatorrent.netlet.AbstractClient;
import com.datatorrent.netlet.util.VarInt.MutableInt;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
/**
* Maintains list of data and manages addition and deletion of the data<p>
* <br>
*
* @since 0.3.2
*/
public class DataList
{
private final int MAX_COUNT_OF_INMEM_BLOCKS;
protected final String identifier;
private final int blockSize;
private final HashMap<BitVector, HashSet<DataListener>> listeners = newHashMap();
protected final HashSet<DataListener> all_listeners = newHashSet();
protected Block first;
protected Block last;
protected Storage storage;
protected ExecutorService autoFlushExecutor;
protected ExecutorService storageExecutor;
protected int size;
protected int processingOffset;
protected long baseSeconds;
private final Set<AbstractClient> suspendedClients = newHashSet();
private final AtomicInteger numberOfInMemBlockPermits;
private MutableInt nextOffset = new MutableInt();
private final ListenersNotifier listenersNotifier = new ListenersNotifier();
private final boolean backPressureEnabled;
public DataList(final String identifier, final int blockSize, final int numberOfCacheBlocks, final boolean backPressureEnabled)
{
if (numberOfCacheBlocks < 1) {
throw new IllegalArgumentException("Invalid number of Data List Memory blocks " + numberOfCacheBlocks);
}
this.MAX_COUNT_OF_INMEM_BLOCKS = numberOfCacheBlocks;
numberOfInMemBlockPermits = new AtomicInteger(MAX_COUNT_OF_INMEM_BLOCKS - 1);
this.identifier = identifier;
this.blockSize = blockSize;
this.backPressureEnabled = backPressureEnabled;
first = last = new Block(identifier, blockSize);
}
public DataList(String identifier)
{
/*
* We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block
* at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory
*/
this(identifier, 64 * 1024 * 1024, 8, true);
}
public int getBlockSize()
{
return blockSize;
}
public void rewind(final int baseSeconds, final int windowId) throws IOException
{
final long longWindowId = (long)baseSeconds << 32 | windowId;
logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window),
Codec.getStringWindowId(longWindowId));
int numberOfInMemBlockRewound = 0;
synchronized (this) {
for (Block temp = first; temp != null; temp = temp.next) {
if (temp.starting_window >= longWindowId || temp.ending_window > longWindowId) {
if (temp != last) {
last.refCount.decrementAndGet();
last = temp;
do {
temp = temp.next;
temp.discard(false);
synchronized (temp) {
if (temp.refCount.get() != 0) {
logger.debug("Discarded block {} has positive reference count. Listeners: {}", temp, all_listeners);
throw new IllegalStateException("Discarded block " + temp + " has positive reference count!");
}
if (temp.data != null) {
temp.data = null;
numberOfInMemBlockRewound++;
}
}
} while (temp.next != null);
last.next = null;
last.acquire(true);
}
this.baseSeconds = last.rewind(longWindowId);
processingOffset = last.writingOffset;
size = 0;
break;
}
}
}
/*
* TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last
* block.
*/
final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound);
assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
resumeSuspendedClients(numberOfInMemBlockPermits);
logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after" +
" rewinding {}.", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this);
}
public void reset()
{
logger.debug("Resetting {}", this);
listeners.clear();
all_listeners.clear();
synchronized (this) {
if (storage != null) {
Block temp = first;
while (temp != last) {
temp.discard(false);
synchronized (temp) {
if (temp.refCount.get() != 0) {
throw new IllegalStateException("Discarded block " + temp + " not zero reference count!");
}
temp.data = null;
temp = temp.next;
}
}
}
first = last;
first.prev = null;
}
numberOfInMemBlockPermits.set(MAX_COUNT_OF_INMEM_BLOCKS - 1);
}
public void purge(final long windowId)
{
logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window),
Codec.getStringWindowId(windowId));
int numberOfInMemBlockPurged = 0;
synchronized (this) {
for (Block prev = null, temp = first; temp != null && temp.starting_window <= windowId;
prev = temp, temp = temp.next) {
if (temp.ending_window > windowId || temp == last) {
if (prev != null) {
first = temp;
first.prev = null;
}
first.purge(windowId);
break;
}
temp.discard(false);
synchronized (temp) {
if (temp.refCount.get() != 0) {
logger.debug("Discarded block {} has positive reference count. Listeners: {}", temp, all_listeners);
throw new IllegalStateException("Discarded block " + temp + " has positive reference count!");
}
if (temp.data != null) {
temp.data = null;
numberOfInMemBlockPurged++;
}
}
}
}
final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged);
assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
resumeSuspendedClients(numberOfInMemBlockPermits);
logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ",
numberOfInMemBlockPurged, numberOfInMemBlockPermits, this);
}
/**
* @return the identifier
*/
public String getIdentifier()
{
return identifier;
}
public void flush(final int writeOffset)
{
//logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset,
// nextOffset.integer, writeOffset);
flush:
do {
while (size == 0) {
size = VarInt.read(last.data, processingOffset, writeOffset, nextOffset);
if (nextOffset.integer > -5 && nextOffset.integer < 1) {
if (writeOffset == last.data.length) {
nextOffset.integer = 0;
processingOffset = 0;
size = 0;
}
break flush;
} else if (nextOffset.integer == -5) {
throw new RuntimeException("problemo!");
}
}
processingOffset = nextOffset.integer;
if (processingOffset + size <= writeOffset) {
switch (last.data[processingOffset]) {
case MessageType.BEGIN_WINDOW_VALUE:
Tuple bwt = Tuple.getTuple(last.data, processingOffset, size);
if (last.starting_window == -1) {
last.starting_window = baseSeconds | bwt.getWindowId();
last.ending_window = last.starting_window;
//logger.debug("assigned both window id {}", last);
} else {
last.ending_window = baseSeconds | bwt.getWindowId();
//logger.debug("assigned last window id {}", last);
}
break;
case MessageType.RESET_WINDOW_VALUE:
Tuple rwt = Tuple.getTuple(last.data, processingOffset, size);
baseSeconds = (long)rwt.getBaseSeconds() << 32;
break;
default:
break;
}
processingOffset += size;
size = 0;
} else {
if (writeOffset == last.data.length) {
nextOffset.integer = 0;
processingOffset = 0;
size = 0;
}
break;
}
} while (true);
last.writingOffset = writeOffset;
notifyListeners();
}
public void notifyListeners()
{
listenersNotifier.moreDataAvailable();
}
public void setAutoFlushExecutor(final ExecutorService es)
{
autoFlushExecutor = es;
}
public void setSecondaryStorage(Storage storage, ExecutorService es)
{
this.storage = storage;
storageExecutor = es;
}
/*
* Iterator related functions.
*/
protected DataListIterator getIterator(final Block block)
{
return new DataListIterator(block);
}
private synchronized Block getNextBlock(final Block block)
{
return block.next;
}
public DataListIterator newIterator(long windowId)
{
//logger.debug("request for a new iterator {} and {}", identifier, windowId);
Block temp = first;
while (temp != last) {
if (temp.starting_window >= windowId || temp.ending_window > windowId) {
break;
}
temp = temp.next;
}
//logger.debug("returning new iterator on temp = {}", temp);
return getIterator(temp);
}
public void addDataListener(DataListener dl)
{
all_listeners.add(dl);
//logger.debug("total {} listeners {} -> {}", all_listeners.size(), dl, this);
ArrayList<BitVector> partitions = new ArrayList<>();
if (dl.getPartitions(partitions) > 0) {
for (BitVector partition : partitions) {
HashSet<DataListener> set;
if (listeners.containsKey(partition)) {
set = listeners.get(partition);
} else {
set = new HashSet<>();
listeners.put(partition, set);
}
set.add(dl);
}
} else {
HashSet<DataListener> set;
if (listeners.containsKey(DataListener.NULL_PARTITION)) {
set = listeners.get(DataListener.NULL_PARTITION);
} else {
set = new HashSet<>();
listeners.put(DataListener.NULL_PARTITION, set);
}
set.add(dl);
}
}
public void removeDataListener(DataListener dl)
{
ArrayList<BitVector> partitions = new ArrayList<>();
if (dl.getPartitions(partitions) > 0) {
for (BitVector partition : partitions) {
if (listeners.containsKey(partition)) {
listeners.get(partition).remove(dl);
}
}
} else {
if (listeners.containsKey(DataListener.NULL_PARTITION)) {
listeners.get(DataListener.NULL_PARTITION).remove(dl);
}
}
all_listeners.remove(dl);
}
public boolean suspendRead(final AbstractClient client)
{
synchronized (suspendedClients) {
return suspendedClients.add(client) && client.suspendReadIfResumed();
}
}
public boolean resumeSuspendedClients(final int numberOfInMemBlockPermits)
{
boolean resumedSuspendedClients = false;
if (numberOfInMemBlockPermits > 0) {
synchronized (suspendedClients) {
for (AbstractClient client : suspendedClients) {
resumedSuspendedClients |= client.resumeReadIfSuspended();
}
suspendedClients.clear();
}
} else {
logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients,
numberOfInMemBlockPermits, all_listeners);
}
return resumedSuspendedClients;
}
public boolean isMemoryBlockAvailable()
{
return (storage == null) || (numberOfInMemBlockPermits.get() > 0);
}
public byte[] newBuffer(final int size)
{
if (size > blockSize) {
logger.error("Tuple size {} exceeds buffer server current block size {}. Please decrease tuple size. " +
"Proceeding with allocating larger block that may cause out of memory exception.", size, blockSize);
return new byte[size];
}
return new byte[blockSize];
}
public synchronized void addBuffer(byte[] array)
{
final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.decrementAndGet();
if (numberOfInMemBlockPermits < 0) {
logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
}
last.next = new Block(identifier, array, last.ending_window, last.ending_window);
last.next.prev = last;
last.release(false, true);
last = last.next;
}
public byte[] getBuffer(long windowId)
{
//logger.debug("getBuffer windowid = {} when starting_window = {}", windowId, last.starting_window);
if (last.starting_window == -1) {
last.starting_window = windowId;
last.ending_window = windowId;
baseSeconds = windowId & 0xffffffff00000000L;
}
return last.data;
}
public int getPosition()
{
return last.writingOffset;
}
public static class Status
{
public long numBytesWaiting = 0;
public long numBytesAllocated = 0;
public String slowestConsumer;
}
public Status getStatus()
{
Status status = new Status();
// When the number of subscribers becomes high or the number of blocks becomes high, consider optimize it.
Block b = first;
Map<Block, Integer> indices = new HashMap<>();
int i = 0;
while (b != null) {
indices.put(b, i++);
b = b.next;
}
int oldestBlockIndex = Integer.MAX_VALUE;
int oldestReadOffset = Integer.MAX_VALUE;
for (DataListener dl : all_listeners) {
LogicalNode logicalNode = (LogicalNode)dl;
DataListIterator dli = logicalNode.getIterator();
Integer index = indices.get(dli.da);
if (index == null) {
// error
throw new RuntimeException("problemo!");
}
if (index < oldestBlockIndex) {
oldestBlockIndex = index;
oldestReadOffset = dli.getReadOffset();
status.slowestConsumer = logicalNode.getIdentifier();
} else if (index == oldestBlockIndex && dli.getReadOffset() < oldestReadOffset) {
oldestReadOffset = dli.getReadOffset();
status.slowestConsumer = logicalNode.getIdentifier();
}
}
b = first;
i = 0;
while (b != null) {
status.numBytesAllocated += b.data.length;
if (oldestBlockIndex == i) {
status.numBytesWaiting += b.writingOffset - oldestReadOffset;
} else if (oldestBlockIndex < i) {
status.numBytesWaiting += b.writingOffset - b.readingOffset;
}
b = b.next;
++i;
}
return status;
}
@Override
public String toString()
{
return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {" + identifier + '}';
}
/**
* <p>Block class.</p>
*
* @since 0.3.2
*/
public class Block
{
final String identifier;
/**
* actual data - stored as length followed by actual data.
*/
byte[] data;
/**
* readingOffset is the offset of the first valid byte in the array.
*/
int readingOffset;
/**
* writingOffset is the offset of the first available byte to write into.
*/
int writingOffset;
/**
* The starting window which is available in this data array.
*/
long starting_window;
/**
* the ending window which is available in this data array
*/
long ending_window;
/**
* when the data is null, uniqueIdentifier is the identifier in the backup storage to retrieve the object.
*/
int uniqueIdentifier;
/**
* the next in the chain.
*/
Block next;
/**
* the previous in the chain
*/
Block prev;
/**
* how count of references to this block.
*/
private final AtomicInteger refCount;
private Future<?> future;
public Block(String id, int size)
{
this(id, new byte[size]);
}
public Block(String id, byte[] array)
{
this(id, array, -1, 0);
}
public Block(final String id, final byte[] array, final long starting_window, final long ending_window)
{
identifier = id;
data = array;
refCount = new AtomicInteger(1);
this.starting_window = starting_window;
this.ending_window = ending_window;
//logger.debug("Allocated new {}", this);
}
void getNextData(SerializedData current)
{
if (current.offset < writingOffset) {
VarInt.read(current);
if (current.offset + current.length > writingOffset) {
current.length = 0;
}
} else {
current.length = 0;
}
}
public long rewind(long windowId)
{
long bs = starting_window & 0x7fffffff00000000L;
try (DataListIterator dli = getIterator(this)) {
done:
while (dli.hasNext()) {
final SerializedData sd = dli.next();
final int length = sd.length - sd.dataOffset + sd.offset;
switch (sd.buffer[sd.dataOffset]) {
case MessageType.RESET_WINDOW_VALUE:
final ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
bs = (long)rwt.getBaseSeconds() << 32;
if (bs > windowId) {
writingOffset = sd.offset;
break done;
}
break;
case MessageType.BEGIN_WINDOW_VALUE:
BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
if ((bs | bwt.getWindowId()) >= windowId) {
writingOffset = sd.offset;
break done;
}
break;
default:
break;
}
}
}
if (starting_window == -1) {
starting_window = windowId;
ending_window = windowId;
//logger.debug("assigned both window id {}", this);
} else if (windowId < ending_window) {
ending_window = windowId;
//logger.debug("assigned end window id {}", this);
}
discard(false);
return bs;
}
public void purge(long longWindowId)
{
//logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}",
// VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId),
// VarInt.getStringWindowId(ending_window));
boolean found = false;
long bs = starting_window & 0xffffffff00000000L;
SerializedData lastReset = null;
try (DataListIterator dli = getIterator(this)) {
done:
while (dli.hasNext()) {
SerializedData sd = dli.next();
final int length = sd.length - sd.dataOffset + sd.offset;
switch (sd.buffer[sd.dataOffset]) {
case MessageType.RESET_WINDOW_VALUE:
ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
bs = (long)rwt.getBaseSeconds() << 32;
lastReset = sd;
break;
case MessageType.BEGIN_WINDOW_VALUE:
BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length);
if ((bs | bwt.getWindowId()) > longWindowId) {
found = true;
if (lastReset != null) {
/*
* Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of
* the reset tuple.
*/
if (sd.offset >= lastReset.length) {
sd.offset -= lastReset.length;
if (!(sd.buffer == lastReset.buffer && sd.offset == lastReset.offset)) {
System.arraycopy(lastReset.buffer, lastReset.offset, sd.buffer, sd.offset, lastReset.length);
}
}
this.starting_window = bs | bwt.getWindowId();
this.readingOffset = sd.offset;
//logger.debug("assigned starting window id {}", this);
}
break done;
}
break;
default:
break;
}
}
}
/**
* If we ended up purging all the data from the current Block then,
* it also makes sense to start all over.
* It helps with better utilization of the RAM.
*/
if (!found) {
//logger.debug("we could not find a tuple which is in a window later than the window to be purged, " +
// "so this has to be the last window published so far");
if (lastReset != null && lastReset.offset != 0) {
this.readingOffset = this.writingOffset - lastReset.length;
System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length);
this.starting_window = this.ending_window = bs;
//logger.debug("=20140220= reassign the windowids {}", this);
} else {
this.readingOffset = this.writingOffset;
this.starting_window = this.ending_window = longWindowId;
//logger.debug("=20140220= avoid the windowids {}", this);
}
SerializedData sd = new SerializedData(this.data, readingOffset, 0);
// the rest of it is just a copy from beginWindow case here to wipe the data - refactor
int i = 1;
while (i < VarInt.getSize(sd.offset - i)) {
i++;
}
if (i <= sd.offset) {
sd.length = sd.offset;
sd.offset = 0;
sd.dataOffset = VarInt.write(sd.length - i, sd.buffer, sd.offset, i);
sd.buffer[sd.dataOffset] = MessageType.NO_MESSAGE_VALUE;
} else {
logger.warn("Unhandled condition while purging the data purge to offset {}", sd.offset);
}
discard(false);
}
}
private Runnable getRetriever()
{
return new Runnable()
{
@Override
public void run()
{
byte[] data = storage.retrieve(identifier, uniqueIdentifier);
synchronized (Block.this) {
if (Block.this.data == null) {
Block.this.data = data;
readingOffset = 0;
writingOffset = data.length;
Block.this.notifyAll();
int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.decrementAndGet();
if (numberOfInMemBlockPermits < 0) {
logger.warn("Exceeded allowed memory block allocation by {}", -numberOfInMemBlockPermits);
}
} else {
logger.debug("Block {} was already loaded into memory", Block.this);
}
}
}
};
}
protected void acquire(boolean wait)
{
int refCount = this.refCount.getAndIncrement();
synchronized (Block.this) {
if (data != null) {
return;
}
}
if (refCount == 0 && storage != null) {
final Runnable retriever = getRetriever();
if (future != null && future.cancel(false)) {
logger.debug("Block {} future is cancelled", this);
}
if (wait) {
future = null;
retriever.run();
} else {
future = storageExecutor.submit(retriever);
}
} else if (wait) {
try {
synchronized (Block.this) {
if (future == null) {
throw new IllegalStateException("No task is scheduled to retrieve block " + Block.this);
}
while (data == null) {
wait();
}
}
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while waiting for data to be loaded!", ex);
}
}
}
private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset,
final Storage storage)
{
return new Runnable()
{
@Override
public void run()
{
if (uniqueIdentifier == 0) {
uniqueIdentifier = storage.store(identifier, data, readingOffset, writingOffset);
}
if (uniqueIdentifier == 0) {
logger.warn("Storage returned unexpectedly, please check the status of the spool directory!");
} else {
int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get();
synchronized (Block.this) {
if (refCount.get() == 0 && Block.this.data != null) {
Block.this.data = null;
numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.incrementAndGet();
} else {
logger.debug("Keeping Block {} unchanged", Block.this);
}
}
assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " +
numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.';
resumeSuspendedClients(numberOfInMemBlockPermits);
}
}
};
}
protected void release(boolean wait, boolean writer)
{
final int refCount = this.refCount.decrementAndGet();
if (canEvict(refCount, writer)) {
assert (next != null);
final Runnable storer = getStorer(data, readingOffset, writingOffset, storage);
if (future != null && future.cancel(false)) {
logger.debug("Block {} future is cancelled", this);
}
final int numberOfInMemBlockPermits = DataList.this.numberOfInMemBlockPermits.get();
if (wait && numberOfInMemBlockPermits == 0) {
future = null;
storer.run();
} else if (numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS / 2) {
future = storageExecutor.submit(storer);
} else {
future = null;
}
}
}
private boolean canEvict(final int refCount, boolean writer)
{
if (refCount == 0 && storage != null) {
if (backPressureEnabled) {
if (!writer) {
boolean evict = true;
int blocks = 0;
// Search backwards from current block as opposed to searching forward from first block till current block as
// it is more likely to find the match quicker. No need to search more than maximum number of in memory
// permits.
for (Block temp = this.prev; (blocks < (MAX_COUNT_OF_INMEM_BLOCKS - 1)) && (temp != null); temp = temp.prev, ++blocks) {
if (temp.refCount.get() != 0) {
evict = false;
break;
}
}
logger.debug("Block {} evict {}", this, evict);
return evict;
} else {
return false;
}
} else {
return true;
}
} else {
logger.debug("Holding {} in memory due to {} references.", this, refCount);
return false;
}
}
private Runnable getDiscarder()
{
return new Runnable()
{
@Override
public void run()
{
if (uniqueIdentifier > 0) {
logger.debug("Discarding {}", Block.this);
storage.discard(identifier, uniqueIdentifier);
uniqueIdentifier = 0;
}
}
};
}
protected void discard(final boolean wait)
{
if (storage != null) {
final Runnable discarder = getDiscarder();
if (future != null && future.cancel(false)) {
logger.debug("Block {} future is cancelled", this);
}
if (wait) {
future = null;
discarder.run();
} else {
future = storageExecutor.submit(discarder);
}
}
}
@Override
public String toString()
{
final String future = this.future == null ? "null" : this.future.isDone() ? "Done" :
this.future.isCancelled() ? "Cancelled" : this.future.toString();
return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier +
", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset +
", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) +
", ending_window=" + Codec.getStringWindowId(ending_window) + ", refCount=" + refCount.get() +
", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) +
", future=" + future + '}';
}
}
/**
* <p>DataListIterator class.</p>
*
* @since 0.3.2
*/
public class DataListIterator implements Iterator<SerializedData>, AutoCloseable
{
Block da;
SerializedData current;
protected byte[] buffer;
protected int readOffset;
MutableInt nextOffset = new MutableInt();
int size;
/**
*
* @param da
*/
DataListIterator(Block da)
{
da.acquire(true);
this.da = da;
buffer = da.data;
readOffset = da.readingOffset;
}
// this is a hack! Get rid of it.
public int getBaseSeconds()
{
return da == null ? 0 : (int)(da.starting_window >> 32);
}
public int getReadOffset()
{
return readOffset;
}
protected boolean switchToNextBlock()
{
Block next = getNextBlock(da);
if (next == null) {
return false;
}
//logger.debug("{}: switching to the next block {}->{}", this, da, da.next);
next.acquire(true);
da.release(false, false);
da = next;
size = 0;
buffer = da.data;
readOffset = da.readingOffset;
return true;
}
/**
*
* @return boolean
*/
@Override
public boolean hasNext()
{
while (size == 0) {
size = VarInt.read(buffer, readOffset, da.writingOffset, nextOffset);
if (nextOffset.integer > -5 && nextOffset.integer < 1) {
if (da.writingOffset == buffer.length && switchToNextBlock()) {
continue;
}
return false;
} else if (size == -5) {
throw new RuntimeException("problemo!");
}
}
if (nextOffset.integer + size <= da.writingOffset) {
current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset);
current.dataOffset = nextOffset.integer;
//final byte messageType = buffer[current.dataOffset];
//if (messageType == MessageType.BEGIN_WINDOW_VALUE || messageType == MessageType.END_WINDOW_VALUE) {
// final int length = current.length - current.dataOffset + current.offset;
// Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, length);
// logger.debug("next t = {}", t);
//}
return true;
} else if (da.writingOffset == buffer.length && switchToNextBlock()) {
nextOffset.integer = da.readingOffset;
return hasNext();
}
return false;
}
/**
*
* @return {@link com.datatorrent.bufferserver.util.SerializedData}
*/
@Override
public SerializedData next()
{
readOffset = current.offset + current.length;
size = 0;
return current;
}
/**
* Removes from the underlying collection the last element returned by the iterator (optional operation). This
* method can be called only once per call to next. The behavior of an iterator is unspecified if the underlying
* collection is modified while the iteration is in progress in any way other than by calling this method.
*/
@Override
public void remove()
{
current.buffer[current.dataOffset] = MessageType.NO_MESSAGE_VALUE;
}
@Override
public void close()
{
if (da != null) {
da.release(false, false);
da = null;
buffer = null;
}
}
void rewind(int processingOffset)
{
readOffset = processingOffset;
size = 0;
}
@Override
public String toString()
{
return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{da=" + da + '}';
}
}
private class ListenersNotifier implements Runnable
{
private volatile Future<?> future;
private boolean isMoreDataAvailable = false;
private void moreDataAvailable()
{
final Future<?> future = this.future;
if (future == null || future.isDone() || future.isCancelled()) {
// Do not schedule a new task if there is an existing one that is still running or is waiting in the queue
this.future = autoFlushExecutor.submit(listenersNotifier);
} else {
synchronized (this) {
if (this.future == null) {
// future is set to null before run() exists, no need to check whether future isDone() or isCancelled()
this.future = autoFlushExecutor.submit(this);
} else {
isMoreDataAvailable = true;
}
}
}
}
private boolean addedData()
{
boolean doesAtLeastOneListenerHaveDataToSend = false;
for (DataListener dl : all_listeners) {
try {
doesAtLeastOneListenerHaveDataToSend |= dl.addedData(false);
} catch (RuntimeException e) {
logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e);
removeDataListener(dl);
break;
}
}
return doesAtLeastOneListenerHaveDataToSend;
}
private boolean checkIfListenersHaveDataToSendOnly()
{
for (DataListener dl : all_listeners) {
try {
if (dl.addedData(true)) {
return true;
}
} catch (RuntimeException e) {
logger.error("{}: removing DataListener {} due to exception", DataList.this, dl, e);
removeDataListener(dl);
return checkIfListenersHaveDataToSendOnly();
}
}
return false;
}
@Override
public void run()
{
try {
if (addedData() || checkIfListenersHaveDataToSendOnly()) {
future = autoFlushExecutor.submit(this);
} else {
synchronized (this) {
if (isMoreDataAvailable) {
isMoreDataAvailable = false;
future = autoFlushExecutor.submit(this);
} else {
future = null;
}
}
}
} catch (Exception e) {
logger.error("{}", DataList.this, e);
}
}
}
private static final Logger logger = LoggerFactory.getLogger(DataList.class);
}