blob: 8638ea694994866f124f4501cbd65586be6aaccb [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.hbase;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* A base implementation of an operator that extends base class for Hbase operators.  Subclasses should provide the
implementation of processing the tuples.
* Implements the base class for HBase output operators. <br>
* <p>
* <br>
* The output operator collects all the tuples that arrive in a window and writes them to
* HBase in endWindow. The tuples are stored in an application specific manner. The concrete
* implementation that extends this operator provides a method that specifies where to store the
* tuple. The operator also stores the last processed window id into the table and loads it during setup time.
* If the processing window id is not greater than the last processed window id that was loaded those tuples
* are ignored till the processing window id becomes greater than the last processed window id.<br>
*
* <br>
* @displayName HBase Output
* @category Store
* @tags output operator
* @param <T> The tuple type
* @since 0.3.2
*/
@Deprecated
public abstract class HBaseOutputOperator<T> extends HBaseOperatorBase implements Operator {
private static final transient Logger logger = LoggerFactory.getLogger(HBaseOutputOperator.class);
private static final String DEFAULT_LAST_WINDOW_PREFIX_COLUMN_NAME = "last_window";
private transient String lastWindowColumnName;
private transient byte[] lastWindowColumnBytes;
private transient String appName;
private transient String appId;
private transient int operatorId;
private transient List<T> tuples;
// By default flush tuples only on end window
private transient long lastProcessedWindow;
private transient long currentWindow;
private transient HBaseStatePersistenceStrategy persistenceStrategy;
/**
* Input port that takes tuples from the DAG.
*/
public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() {
@Override
public void process(T tuple)
{
if (currentWindow > lastProcessedWindow) {
tuples.add(tuple);
}
}
};
public HBaseOutputOperator() {
tuples = new ArrayList<T>();
lastProcessedWindow = -1;
currentWindow = 0;
lastWindowColumnName = DEFAULT_LAST_WINDOW_PREFIX_COLUMN_NAME;
}
/**
* Get the name of the column where the last processed window id is stored.
* @return The column name
*/
public String getLastWindowColumnName()
{
return lastWindowColumnName;
}
/**
* Set the name of the column where the last processed window id is stored.
*/
public void setLastWindowColumnName(String lastWindowColumnName)
{
this.lastWindowColumnName = lastWindowColumnName;
}
private void constructKeys() {
String columnKey = appName + "_" + appId + "_" + operatorId + "_" + lastWindowColumnName;
lastWindowColumnBytes = Bytes.toBytes(columnKey);
}
@Override
public void setup(OperatorContext context)
{
try {
appName = context.getValue(DAG.APPLICATION_NAME);
appId = context.getValue(DAG.APPLICATION_ID);
operatorId = context.getId();
constructKeys();
setupConfiguration();
persistenceStrategy = getPersistenceStrategy();
persistenceStrategy.setTable(getTable());
persistenceStrategy.setup();
loadProcessState();
}catch (IOException ie) {
throw new RuntimeException(ie);
}
}
@Override
public void teardown()
{
}
@Override
public void beginWindow(long windowId)
{
currentWindow = windowId;
}
@Override
public void endWindow()
{
try {
processTuples();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Process the tuples that arrived in the window.
* @throws IOException
*/
private void processTuples() throws IOException {
Iterator<T> it = tuples.iterator();
while (it.hasNext()) {
T t = it.next();
try {
processTuple(t);
} catch (IOException e) {
logger.error("Could not output tuple", e);
throw new RuntimeException("Could not output tuple " + e.getMessage());
}
it.remove();
}
lastProcessedWindow = currentWindow;
saveProcessState();
}
/**
* Retrieve the processing state that was saved in a prior run.
* The state is loaded from the HBase table in an application specific way.
* @throws IOException
*/
private void loadProcessState() throws IOException {
byte[] lastProcessedWindowBytes = persistenceStrategy.getState(lastWindowColumnBytes);
if (lastProcessedWindowBytes != null) {
lastProcessedWindow = Bytes.toLong(lastProcessedWindowBytes);
}
}
/**
* Save the current processing state.
* The state is saved to the HBase table in an application specific way.
* @throws IOException
*/
private void saveProcessState() throws IOException {
byte[] lastProcessedWindowBytes = Bytes.toBytes(lastProcessedWindow);
persistenceStrategy.saveState(lastWindowColumnBytes, lastProcessedWindowBytes);
}
/**
* Get the persistence strategy.
* Get the persistence strategy to use to save and retrieve state. The concrete class that
* extends this calls should implement this method to specify how to save and load state.
* @return The persistence strategy
*/
public abstract HBaseStatePersistenceStrategy getPersistenceStrategy();
/**
* Process a tuple.
* @param t The tuple
* @throws IOException
*/
public abstract void processTuple(T t) throws IOException;
}