blob: 734df380e4fde9ebd5f1723dd243600b2742e813 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.block;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.fs.PositionedReadable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.util.KryoCloneUtils;
/**
* AbstractBlockReader processes a block of data from a stream.<br/>
* It works with {@link BlockMetadata} which provides the block details and can be used to parallelize the processing of
* data from a source.
*
* <p/>
* The {@link ReaderContext} provides the way to read from the stream. It can control either to always read ahead or
* stop at the block boundary.
*
* <p/>
* Properties that can be set on AbstractBlockReader:<br/>
* {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port
* queue size. This property disables
* collecting stats and thus partitioning.<br/>
* {@link #maxReaders}: Maximum number of readers when dynamic partitioning is on.<br/>
* {@link #minReaders}: Minimum number of readers when dynamic partitioning is on.<br/>
* {@link #intervalMillis}: interval at which stats are processed by the block reader.<br/>
*
* <p/>
* It emits a {@link ReaderRecord} which wraps the record and the block id of the record.
*
* @param <R> type of records.
* @param <B> type of blocks.
* @param <STREAM> type of stream.
*
* @since 2.1.0
*/
@StatsListener.DataQueueSize
public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable>
extends BaseOperator
implements Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler
{
protected int operatorId;
protected transient long windowId;
@NotNull
protected ReaderContext<STREAM> readerContext;
protected transient STREAM stream;
protected transient int blocksPerWindow;
protected final BasicCounters<MutableLong> counters;
protected transient Context.OperatorContext context;
protected transient long sleepTimeMillis;
protected Set<Integer> partitionKeys;
protected int partitionMask;
//Stats-listener and partition-er properties
/**
* Controls stats collections. Default : true
*/
private boolean collectStats;
/**
* Max number of readers. Default : 16
*/
protected int maxReaders;
/**
* Minimum number of readers. Default : 1
*/
protected int minReaders;
/**
* Interval at which stats are processed. Default : 2 minutes
*/
protected long intervalMillis;
protected final transient StatsListener.Response response;
protected transient int partitionCount;
protected final transient Map<Integer, Integer> backlogPerOperator;
private transient long nextMillis;
protected transient B lastProcessedBlock;
protected transient long lastBlockOpenTime;
protected transient boolean consecutiveBlock;
@AutoMetric
private long bytesRead;
public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<>();
public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<>();
public final transient DefaultInputPort<B> blocksMetadataInput = new DefaultInputPort<B>()
{
@Override
public void process(B block)
{
processBlockMetadata(block);
}
};
public AbstractBlockReader()
{
maxReaders = 16;
minReaders = 1;
intervalMillis = 2 * 60 * 1000L;
response = new StatsListener.Response();
backlogPerOperator = Maps.newHashMap();
partitionCount = 1;
counters = new BasicCounters<>(MutableLong.class);
collectStats = true;
lastBlockOpenTime = -1;
}
@Override
public void setup(Context.OperatorContext context)
{
operatorId = context.getId();
LOG.debug("{}: partition keys {} mask {}", operatorId, partitionKeys, partitionMask);
this.context = context;
counters.setCounter(ReaderCounterKeys.BLOCKS, new MutableLong());
counters.setCounter(ReaderCounterKeys.RECORDS, new MutableLong());
counters.setCounter(ReaderCounterKeys.BYTES, new MutableLong());
counters.setCounter(ReaderCounterKeys.TIME, new MutableLong());
sleepTimeMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
}
@Override
public void beginWindow(long windowId)
{
this.windowId = windowId;
blocksPerWindow = 0;
bytesRead = 0;
}
@Override
public void handleIdleTime()
{
if (lastProcessedBlock != null && System.currentTimeMillis() - lastBlockOpenTime > intervalMillis) {
try {
teardownStream(lastProcessedBlock);
lastProcessedBlock = null;
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
/* nothing to do here, so sleep for a while to avoid busy loop */
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
@Override
public void endWindow()
{
counters.getCounter(ReaderCounterKeys.BLOCKS).add(blocksPerWindow);
context.setCounters(counters);
}
protected void processBlockMetadata(B block)
{
try {
long blockStartTime = System.currentTimeMillis();
if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null
|| block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) {
teardownStream(lastProcessedBlock);
consecutiveBlock = false;
lastBlockOpenTime = System.currentTimeMillis();
stream = setupStream(block);
} else {
consecutiveBlock = true;
}
readBlock(block);
lastProcessedBlock = block;
counters.getCounter(ReaderCounterKeys.TIME).add(System.currentTimeMillis() - blockStartTime);
//emit block metadata only when the block finishes
if (blocksMetadataOutput.isConnected()) {
blocksMetadataOutput.emit(block);
}
blocksPerWindow++;
} catch (IOException ie) {
try {
if (lastProcessedBlock != null) {
teardownStream(lastProcessedBlock);
lastProcessedBlock = null;
}
} catch (IOException ioe) {
throw new RuntimeException("closing last", ie);
}
throw new RuntimeException(ie);
}
}
/**
* Override this if you want to change how much of the block is read.
*
* @param blockMetadata block
* @throws IOException
*/
protected void readBlock(BlockMetadata blockMetadata) throws IOException
{
readerContext.initialize(stream, blockMetadata, consecutiveBlock);
ReaderContext.Entity entity;
while ((entity = readerContext.next()) != null) {
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
bytesRead += entity.getUsedBytes();
R record = convertToRecord(entity.getRecord());
//If the record is partial then ignore the record.
if (record != null) {
counters.getCounter(ReaderCounterKeys.RECORDS).increment();
messages.emit(new ReaderRecord<>(blockMetadata.getBlockId(), record));
}
}
}
/**
* <b>Note:</b> This partitioner does not support parallel partitioning.<br/><br/>
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(
Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context)
{
if (partitions.iterator().next().getStats() == null) {
//First time when define partitions is called
return partitions;
}
List<Partition<AbstractBlockReader<R, B, STREAM>>> newPartitions = Lists.newArrayList();
//Create new partitions
for (Partition<AbstractBlockReader<R, B, STREAM>> partition : partitions) {
newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
}
partitions.clear();
int morePartitionsToCreate = partitionCount - newPartitions.size();
List<BasicCounters<MutableLong>> deletedCounters = Lists.newArrayList();
if (morePartitionsToCreate < 0) {
//Delete partitions
Iterator<Partition<AbstractBlockReader<R, B, STREAM>>> partitionIterator = newPartitions.iterator();
while (morePartitionsToCreate++ < 0) {
Partition<AbstractBlockReader<R, B, STREAM>> toRemove = partitionIterator.next();
deletedCounters.add(toRemove.getPartitionedInstance().counters);
LOG.debug("partition removed {}", toRemove.getPartitionedInstance().operatorId);
partitionIterator.remove();
}
} else {
KryoCloneUtils<AbstractBlockReader<R, B, STREAM>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
while (morePartitionsToCreate-- > 0) {
DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(cloneUtils.getClone());
newPartitions.add(partition);
}
}
DefaultPartition.assignPartitionKeys(Collections.unmodifiableCollection(newPartitions), blocksMetadataInput);
int lPartitionMask = newPartitions.iterator().next().getPartitionKeys().get(blocksMetadataInput).mask;
//transfer the state here
for (Partition<AbstractBlockReader<R, B, STREAM>> newPartition : newPartitions) {
AbstractBlockReader<R, B, STREAM> reader = newPartition.getPartitionedInstance();
reader.partitionKeys = newPartition.getPartitionKeys().get(blocksMetadataInput).partitions;
reader.partitionMask = lPartitionMask;
LOG.debug("partitions {},{}", reader.partitionKeys, reader.partitionMask);
}
//transfer the counters
AbstractBlockReader<R, B, STREAM> targetReader = newPartitions.iterator().next().getPartitionedInstance();
for (BasicCounters<MutableLong> removedCounter : deletedCounters) {
addCounters(targetReader.counters, removedCounter);
}
return newPartitions;
}
/**
* Transfers the counters in partitioning.
*
* @param target target counter
* @param source removed counter
*/
protected void addCounters(BasicCounters<MutableLong> target, BasicCounters<MutableLong> source)
{
for (Enum<ReaderCounterKeys> key : ReaderCounterKeys.values()) {
MutableLong tcounter = target.getCounter(key);
if (tcounter == null) {
tcounter = new MutableLong();
target.setCounter(key, tcounter);
}
MutableLong scounter = source.getCounter(key);
if (scounter != null) {
tcounter.add(scounter.longValue());
}
}
}
@Override
public void partitioned(Map<Integer, Partition<AbstractBlockReader<R, B, STREAM>>> integerPartitionMap)
{
}
@Override
public Response processStats(BatchedOperatorStats stats)
{
response.repartitionRequired = false;
if (!collectStats) {
return response;
}
List<Stats.OperatorStats> lastWindowedStats = stats.getLastWindowedStats();
if (lastWindowedStats != null && lastWindowedStats.size() > 0) {
Stats.OperatorStats lastStats = lastWindowedStats.get(lastWindowedStats.size() - 1);
if (lastStats.inputPorts.size() > 0) {
backlogPerOperator.put(stats.getOperatorId(), lastStats.inputPorts.get(0).queueSize);
}
}
if (System.currentTimeMillis() < nextMillis) {
return response;
}
nextMillis = System.currentTimeMillis() + intervalMillis;
LOG.debug("Proposed NextMillis = {}", nextMillis);
long totalBacklog = 0;
for (Map.Entry<Integer, Integer> backlog : backlogPerOperator.entrySet()) {
totalBacklog += backlog.getValue();
}
LOG.debug("backlog {} partitionCount {}", totalBacklog, partitionCount);
backlogPerOperator.clear();
if (totalBacklog == partitionCount) {
return response; //do not repartition
}
int newPartitionCount;
if (totalBacklog > maxReaders) {
LOG.debug("large backlog {}", totalBacklog);
newPartitionCount = maxReaders;
} else if (totalBacklog < minReaders) {
LOG.debug("small backlog {}", totalBacklog);
newPartitionCount = minReaders;
} else {
newPartitionCount = getAdjustedCount(totalBacklog);
LOG.debug("moderate backlog {}", totalBacklog);
}
LOG.debug("backlog {} newPartitionCount {} partitionCount {}", totalBacklog, newPartitionCount, partitionCount);
if (newPartitionCount == partitionCount) {
return response; //do not repartition
}
partitionCount = newPartitionCount;
response.repartitionRequired = true;
LOG.debug("partition required", totalBacklog, partitionCount);
return response;
}
protected int getAdjustedCount(long newCount)
{
int adjustCount = 1;
while (adjustCount < newCount) {
adjustCount <<= 1;
}
if (adjustCount > newCount) {
adjustCount >>>= 1;
}
LOG.debug("adjust {} => {}", newCount, adjustCount);
return adjustCount;
}
/**
* Initializes the reading of a block and seek to an offset.
*
* @throws IOException
*/
protected abstract STREAM setupStream(B block) throws IOException;
/**
* Close the reading of a block-metadata.
*
* @throws IOException
*/
protected void teardownStream(@SuppressWarnings("unused") B block) throws IOException
{
if (stream != null) {
stream.close();
stream = null;
}
}
/**
* Converts the bytes to record. This return null when either the bytes were insufficient to convert the record,
* a record was invalid after the conversion or it failed a business logic validation. <br/>
*
* @param bytes bytes
* @return record
*/
protected abstract R convertToRecord(byte[] bytes);
/**
* Sets the maximum number of block readers.
*
* @param maxReaders max number of readers.
*/
public void setMaxReaders(int maxReaders)
{
this.maxReaders = maxReaders;
}
/**
* Sets the minimum number of block readers.
*
* @param minReaders min number of readers.
*/
public void setMinReaders(int minReaders)
{
this.minReaders = minReaders;
}
/**
* @return maximum instances of block reader.
*/
public int getMaxReaders()
{
return maxReaders;
}
/**
* @return minimum instances of block reader.
*/
public int getMinReaders()
{
return minReaders;
}
/**
* Enables/disables the block reader to collect stats and partition itself.
*/
public void setCollectStats(boolean collectStats)
{
this.collectStats = collectStats;
}
/**
* @return if collection of stat is enabled or disabled.
*/
public boolean isCollectStats()
{
return collectStats;
}
/**
* Sets the interval in millis at which the stats are processed by the reader.
*
* @param intervalMillis interval in milliseconds.
*/
public void setIntervalMillis(long intervalMillis)
{
this.intervalMillis = intervalMillis;
}
/**
* @return the interval in millis at the which stats are processed by the reader.
*/
public long getIntervalMillis()
{
return intervalMillis;
}
public void setReaderContext(ReaderContext<STREAM> readerContext)
{
this.readerContext = readerContext;
}
public ReaderContext<STREAM> getReaderContext()
{
return readerContext;
}
@Override
public String toString()
{
return "Reader{" + "nextMillis=" + nextMillis + ", intervalMillis=" + intervalMillis + '}';
}
/**
* ReaderRecord wraps the record with the blockId and the reader emits object of this type.
*
* @param <R>
*/
public static class ReaderRecord<R>
{
private final long blockId;
private final R record;
@SuppressWarnings("unused")
private ReaderRecord()
{
this.blockId = -1;
this.record = null;
}
public ReaderRecord(long blockId, R record)
{
this.blockId = blockId;
this.record = record;
}
public long getBlockId()
{
return blockId;
}
public R getRecord()
{
return record;
}
}
public enum ReaderCounterKeys
{
RECORDS, BLOCKS, BYTES, TIME
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockReader.class);
}