blob: 64c066be324e7ebf52dc7eb7f95ecc8e45143370 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.block;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.netlet.util.Slice;
/**
* Writes a block to the appFS (HDFS on which app is running). This is temporary
* write to HDFS to handle large files.
*
* @since 3.4.0
*/
public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>>
implements Partitioner<BlockWriter>
{
/**
* Default value for blocksDirectory
*/
public static final String DEFAULT_BLOCKS_DIR = "blocks";
/**
* Directory under application directory where blocks gets stored
*/
private String blocksDirectory = DEFAULT_BLOCKS_DIR;
/**
* List of FileBlockMetadata received in the current window.
*/
private transient List<BlockMetadata.FileBlockMetadata> blockMetadatas;
/**
* Input port to receive Block meta data
*/
public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>()
{
@Override
public void process(BlockMetadata.FileBlockMetadata blockMetadata)
{
blockMetadatas.add(blockMetadata);
LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath());
}
};
/**
* Output port to send Block meta data to downstream operator
*/
public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blockMetadataOutput = new DefaultOutputPort<BlockMetadata.FileBlockMetadata>();
public BlockWriter()
{
super();
blockMetadatas = Lists.newArrayList();
//The base class puts a restriction that the file-path cannot be null. With this block writer it is
//being initialized in setup and not through configuration. So setting it to empty string.
filePath = "";
}
/**
* Also, initializes the filePath based on Application path
*/
@Override
public void setup(Context.OperatorContext context)
{
filePath = context.getValue(Context.DAGContext.APPLICATION_PATH) + Path.SEPARATOR + blocksDirectory;
super.setup(context);
}
/**
* Finalizes files for all the blockMetaDatas received during current window
*/
@Override
public void endWindow()
{
super.endWindow();
streamsCache.asMap().clear();
endOffsets.clear();
for (BlockMetadata.FileBlockMetadata blockMetadata : blockMetadatas) {
try {
finalizeFile(Long.toString(blockMetadata.getBlockId()));
} catch (IOException e) {
throw new RuntimeException(e);
}
blockMetadataOutput.emit(blockMetadata);
}
blockMetadatas.clear();
}
@Override
protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple)
{
return Long.toString(tuple.getBlockId());
}
@Override
protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple)
{
return tuple.getRecord().buffer;
}
@Override
public Collection<Partition<BlockWriter>> definePartitions(Collection<Partition<BlockWriter>> partitions,
PartitioningContext context)
{
if (context.getParallelPartitionCount() == 0) {
return partitions;
}
// if there is no change of count, return the same collection
if (context.getParallelPartitionCount() == partitions.size()) {
LOG.debug("no change is partition count: " + partitions.size());
return partitions;
}
List<BasicCounters<MutableLong>> deletedCounters = Lists.newArrayList();
LOG.debug("block writer parallel partition count {}", context.getParallelPartitionCount());
int morePartitionsToCreate = context.getParallelPartitionCount() - partitions.size();
if (morePartitionsToCreate < 0) {
//Delete partitions
Iterator<Partition<BlockWriter>> partitionIterator = partitions.iterator();
while (morePartitionsToCreate++ < 0) {
Partition<BlockWriter> toRemove = partitionIterator.next();
deletedCounters.add(toRemove.getPartitionedInstance().fileCounters);
partitionIterator.remove();
}
} else {
//Add more partitions
BlockWriter anOperator = partitions.iterator().next().getPartitionedInstance();
while (morePartitionsToCreate-- > 0) {
DefaultPartition<BlockWriter> partition = new DefaultPartition<BlockWriter>(anOperator);
partitions.add(partition);
}
}
//transfer the counters
BlockWriter targetWriter = partitions.iterator().next().getPartitionedInstance();
for (BasicCounters<MutableLong> removedCounter : deletedCounters) {
addCounters(targetWriter.fileCounters, removedCounter);
}
LOG.debug("Block writers {}", partitions.size());
return partitions;
}
/**
* Transfers the counters in partitioning.
*
* @param target
* target counter
* @param source
* removed counter
*/
protected void addCounters(BasicCounters<MutableLong> target, BasicCounters<MutableLong> source)
{
for (Enum<BlockWriter.Counters> key : BlockWriter.Counters.values()) {
MutableLong tcounter = target.getCounter(key);
if (tcounter == null) {
tcounter = new MutableLong();
target.setCounter(key, tcounter);
}
MutableLong scounter = source.getCounter(key);
if (scounter != null) {
tcounter.add(scounter.longValue());
}
}
}
/**
* Directory under application directory where blocks gets stored
* @return blocks directory
*/
public String getBlocksDirectory()
{
return blocksDirectory;
}
/**
* Directory under application directory where blocks gets stored
* @param blocksDirectory blocks directory
*/
public void setBlocksDirectory(String blocksDirectory)
{
this.blocksDirectory = blocksDirectory;
}
@Override
public void partitioned(Map<Integer, Partition<BlockWriter>> partitions)
{
}
private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class);
}