blob: 37b5f2e6ac4dcb64a0f99992d1cc9b31c69e3ad8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.hive;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.mutable.MutableInt;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator.CheckpointNotificationListener;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.netlet.util.DTThrowable;
/**
* An implementation of FS Writer that writes text files to hdfs which are
* inserted into hive on committed window callback. HiveStreamCodec is used to
* make sure that data being sent to a particular hive partition goes to a
* specific operator partition by passing FSRollingOutputOperator to the stream
* codec. Also filename is determined uniquely for each tuple going to a
* specific hive partition.
*
* @since 2.1.0
*/
public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOutputOperator<T> implements CheckpointNotificationListener
{
private transient String outputFilePath;
protected MutableInt partNumber;
protected HashMap<Long, ArrayList<String>> mapFilenames = new HashMap<Long, ArrayList<String>>();
protected HashMap<String, ArrayList<String>> mapPartition = new HashMap<String, ArrayList<String>>();
protected Queue<Long> queueWindows = new LinkedList<Long>();
protected long windowIDOfCompletedPart = Stateless.WINDOW_ID;
protected long committedWindowId = Stateless.WINDOW_ID;
private boolean isEmptyWindow;
private transient int operatorId;
private int countEmptyWindow;
private ArrayList<String> partition = new ArrayList<String>();
//This variable is user configurable.
@Min(0)
private long maxWindowsWithNoData = 100;
/**
* The output port that will emit a POJO containing file which is committed
* and specific hive partitions in which this file should be loaded to
* HiveOperator.
*/
public final transient DefaultOutputPort<FilePartitionMapping> outputPort = new DefaultOutputPort<FilePartitionMapping>();
public AbstractFSRollingOutputOperator()
{
countEmptyWindow = 0;
HiveStreamCodec<T> hiveCodec = new HiveStreamCodec<T>();
hiveCodec.rollingOperator = this;
streamCodec = hiveCodec;
}
@Override
public void setup(OperatorContext context)
{
String appId = context.getValue(DAG.APPLICATION_ID);
operatorId = context.getId();
outputFilePath = File.separator + appId + File.separator + operatorId;
super.setup(context);
}
@Override
public void beginWindow(long windowId)
{
isEmptyWindow = true;
windowIDOfCompletedPart = windowId;
}
/*
* MapFilenames has mapping from completed file to windowId in which it got completed.
* Also maintaining a queue of these windowIds which helps in reducing iteration time in committed callback.
*/
@Override
protected void rotateHook(String finishedFile)
{
isEmptyWindow = false;
if (mapFilenames.containsKey(windowIDOfCompletedPart)) {
mapFilenames.get(windowIDOfCompletedPart).add(finishedFile);
} else {
ArrayList<String> listFileNames = new ArrayList<String>();
listFileNames.add(finishedFile);
mapFilenames.put(windowIDOfCompletedPart, listFileNames);
}
queueWindows.add(windowIDOfCompletedPart);
}
/*
* Filenames include operator Id and the specific hive partitions to which the file will be loaded.
* Partition is determined based on tuple and its implementation is left to user.
*/
@Override
protected String getFileName(T tuple)
{
isEmptyWindow = false;
partition = getHivePartition(tuple);
StringBuilder output = new StringBuilder(outputFilePath);
int numPartitions = partition.size();
if (numPartitions > 0) {
for (int i = 0; i < numPartitions; i++) {
output.append(File.separator).append(partition.get(i));
}
output.append(File.separator).append(operatorId).append("-transaction.out.part");
String partFile = getPartFileNamePri(output.toString());
mapPartition.put(partFile, partition);
}
return output.toString();
}
/*
* Moving completed files into hive on committed window callback.
* Criteria for moving them is that the windowId in which they are completed
* should be less than committed window.
*/
@Override
public void committed(long windowId)
{
committedWindowId = windowId;
Iterator<Long> iterWindows = queueWindows.iterator();
ArrayList<String> list = new ArrayList<String>();
while (iterWindows.hasNext()) {
windowId = iterWindows.next();
if (committedWindowId >= windowId) {
logger.debug("list is {}", mapFilenames.get(windowId));
list = mapFilenames.get(windowId);
FilePartitionMapping partMap = new FilePartitionMapping();
if (list != null) {
for (int i = 0; i < list.size(); i++) {
partMap.setFilename(list.get(i));
partMap.setPartition(mapPartition.get(list.get(i)));
outputPort.emit(partMap);
}
}
mapFilenames.remove(windowId);
iterWindows.remove();
}
if (committedWindowId < windowId) {
break;
}
}
}
@Override
public void checkpointed(long windowId)
{
}
protected void rotateCall(String lastFile)
{
try {
this.rotate(lastFile);
} catch (IOException ex) {
logger.debug(ex.getMessage());
DTThrowable.rethrow(ex);
} catch (ExecutionException ex) {
logger.debug(ex.getMessage());
DTThrowable.rethrow(ex);
}
}
public String getHDFSRollingLastFile()
{
Iterator<String> iterFileNames = this.openPart.keySet().iterator();
String lastFile = null;
if (iterFileNames.hasNext()) {
lastFile = iterFileNames.next();
partNumber = this.openPart.get(lastFile);
}
return getPartFileName(lastFile, partNumber.intValue());
}
/**
* This method gets a List of Hive Partitions in which the tuple needs to be
* written to. Example: If hive partitions are date='2014-12-12',country='USA'
* then this method returns {"2014-12-12","USA"} The implementation is left to
* the user.
*
* @param tuple
* A received tuple to be written to a hive partition.
* @return ArrayList containing hive partition values.
*/
public abstract ArrayList<String> getHivePartition(T tuple);
@Override
public void endWindow()
{
if (isEmptyWindow) {
countEmptyWindow++;
}
if (countEmptyWindow >= maxWindowsWithNoData) {
String lastFile = getHDFSRollingLastFile();
rotateCall(lastFile);
countEmptyWindow = 0;
}
super.endWindow();
}
public long getMaxWindowsWithNoData()
{
return maxWindowsWithNoData;
}
public void setMaxWindowsWithNoData(long maxWindowsWithNoData)
{
this.maxWindowsWithNoData = maxWindowsWithNoData;
}
/*
* A POJO which is emitted by output port of AbstractFSRollingOutputOperator implementation in DAG.
* The POJO contains the filename which will not be changed by FSRollingOutputOperator once its emitted.
* The POJO also contains the hive partitions to which the respective files will be moved.
*/
public static class FilePartitionMapping
{
private String filename;
private ArrayList<String> partition = new ArrayList<String>();
public ArrayList<String> getPartition()
{
return partition;
}
public void setPartition(ArrayList<String> partition)
{
this.partition = partition;
}
public String getFilename()
{
return filename;
}
public void setFilename(String filename)
{
this.filename = filename;
}
}
private static final Logger logger = LoggerFactory.getLogger(AbstractFSRollingOutputOperator.class);
}