| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package com.datatorrent.lib.io.block; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang.mutable.MutableLong; |
| import org.apache.hadoop.fs.Path; |
| |
| import com.google.common.collect.Lists; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.DefaultPartition; |
| import com.datatorrent.api.Partitioner; |
| import com.datatorrent.lib.counters.BasicCounters; |
| import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; |
| import com.datatorrent.netlet.util.Slice; |
| |
| /** |
| * Writes a block to the appFS (HDFS on which app is running). This is temporary |
| * write to HDFS to handle large files. |
| * |
| * @since 3.4.0 |
| */ |
| public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>> |
| implements Partitioner<BlockWriter> |
| { |
| /** |
| * Default value for blocksDirectory |
| */ |
| public static final String DEFAULT_BLOCKS_DIR = "blocks"; |
| /** |
| * Directory under application directory where blocks gets stored |
| */ |
| private String blocksDirectory = DEFAULT_BLOCKS_DIR; |
| |
| /** |
| * List of FileBlockMetadata received in the current window. |
| */ |
| private transient List<BlockMetadata.FileBlockMetadata> blockMetadatas; |
| |
| /** |
| * Input port to receive Block meta data |
| */ |
| public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>() |
| { |
| |
| @Override |
| public void process(BlockMetadata.FileBlockMetadata blockMetadata) |
| { |
| blockMetadatas.add(blockMetadata); |
| LOG.debug("received blockId {} for file {} ", blockMetadata.getBlockId(), blockMetadata.getFilePath()); |
| } |
| }; |
| |
| /** |
| * Output port to send Block meta data to downstream operator |
| */ |
| public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blockMetadataOutput = new DefaultOutputPort<BlockMetadata.FileBlockMetadata>(); |
| |
| public BlockWriter() |
| { |
| super(); |
| blockMetadatas = Lists.newArrayList(); |
| //The base class puts a restriction that the file-path cannot be null. With this block writer it is |
| //being initialized in setup and not through configuration. So setting it to empty string. |
| filePath = ""; |
| } |
| |
| /** |
| * Also, initializes the filePath based on Application path |
| */ |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| filePath = context.getValue(Context.DAGContext.APPLICATION_PATH) + Path.SEPARATOR + blocksDirectory; |
| super.setup(context); |
| } |
| |
| /** |
| * Finalizes files for all the blockMetaDatas received during current window |
| */ |
| @Override |
| public void endWindow() |
| { |
| super.endWindow(); |
| |
| streamsCache.asMap().clear(); |
| endOffsets.clear(); |
| |
| for (BlockMetadata.FileBlockMetadata blockMetadata : blockMetadatas) { |
| try { |
| finalizeFile(Long.toString(blockMetadata.getBlockId())); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| blockMetadataOutput.emit(blockMetadata); |
| } |
| blockMetadatas.clear(); |
| } |
| |
| @Override |
| protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple) |
| { |
| return Long.toString(tuple.getBlockId()); |
| } |
| |
| @Override |
| protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple) |
| { |
| return tuple.getRecord().buffer; |
| } |
| |
| |
| @Override |
| public Collection<Partition<BlockWriter>> definePartitions(Collection<Partition<BlockWriter>> partitions, |
| PartitioningContext context) |
| { |
| if (context.getParallelPartitionCount() == 0) { |
| return partitions; |
| } |
| |
| // if there is no change of count, return the same collection |
| if (context.getParallelPartitionCount() == partitions.size()) { |
| LOG.debug("no change is partition count: " + partitions.size()); |
| return partitions; |
| } |
| |
| List<BasicCounters<MutableLong>> deletedCounters = Lists.newArrayList(); |
| |
| LOG.debug("block writer parallel partition count {}", context.getParallelPartitionCount()); |
| int morePartitionsToCreate = context.getParallelPartitionCount() - partitions.size(); |
| if (morePartitionsToCreate < 0) { |
| //Delete partitions |
| Iterator<Partition<BlockWriter>> partitionIterator = partitions.iterator(); |
| |
| while (morePartitionsToCreate++ < 0) { |
| Partition<BlockWriter> toRemove = partitionIterator.next(); |
| deletedCounters.add(toRemove.getPartitionedInstance().fileCounters); |
| partitionIterator.remove(); |
| } |
| } else { |
| //Add more partitions |
| BlockWriter anOperator = partitions.iterator().next().getPartitionedInstance(); |
| |
| while (morePartitionsToCreate-- > 0) { |
| DefaultPartition<BlockWriter> partition = new DefaultPartition<BlockWriter>(anOperator); |
| partitions.add(partition); |
| } |
| } |
| |
| //transfer the counters |
| BlockWriter targetWriter = partitions.iterator().next().getPartitionedInstance(); |
| for (BasicCounters<MutableLong> removedCounter : deletedCounters) { |
| addCounters(targetWriter.fileCounters, removedCounter); |
| } |
| LOG.debug("Block writers {}", partitions.size()); |
| return partitions; |
| } |
| |
| /** |
| * Transfers the counters in partitioning. |
| * |
| * @param target |
| * target counter |
| * @param source |
| * removed counter |
| */ |
| protected void addCounters(BasicCounters<MutableLong> target, BasicCounters<MutableLong> source) |
| { |
| for (Enum<BlockWriter.Counters> key : BlockWriter.Counters.values()) { |
| MutableLong tcounter = target.getCounter(key); |
| if (tcounter == null) { |
| tcounter = new MutableLong(); |
| target.setCounter(key, tcounter); |
| } |
| MutableLong scounter = source.getCounter(key); |
| if (scounter != null) { |
| tcounter.add(scounter.longValue()); |
| } |
| } |
| } |
| |
| /** |
| * Directory under application directory where blocks gets stored |
| * @return blocks directory |
| */ |
| public String getBlocksDirectory() |
| { |
| return blocksDirectory; |
| } |
| |
| /** |
| * Directory under application directory where blocks gets stored |
| * @param blocksDirectory blocks directory |
| */ |
| public void setBlocksDirectory(String blocksDirectory) |
| { |
| this.blocksDirectory = blocksDirectory; |
| } |
| |
| @Override |
| public void partitioned(Map<Integer, Partition<BlockWriter>> partitions) |
| { |
| |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class); |
| |
| } |