blob: f2e9a8c35c3f8c24956929fe3755f1b8018207b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator.ActivationListener;
/**
* This operator implements "tail -f" command. If the operator has reached the end of the file, it will wait till more
* data comes.
* <p>
* <b>Ports</b>:<br>
* <b>outport</b>: emits &lt;String&gt;<br>
* <br>
* <b>Properties</b>:<br>
* <b>filePath</b> : Path for file to be read. <br>
* <b>delay</b>: Thread sleep interval after emitting line.<br>
* <b>position</b>: The position from where to start reading the file.<br>
* <b>numberOfTuples</b>: number of tuples to be emitted in a single emit Tuple call.<br>
* <b>end</b>: if the user wants to start tailing from end.<br>
* <br>
* </p>
* @displayName Tail Input
* @category Input
* @tags local fs, file
* @since 0.9.4
*/
@org.apache.hadoop.classification.InterfaceStability.Evolving
public class TailFsInputOperator implements InputOperator, ActivationListener<OperatorContext>
{
/**
* The file which will be tailed.
*/
private String filePath;
/**
* This holds the position from which to start reading the stream
*/
private long position;
/**
* The amount of time to wait for the file to be updated.
*/
private long delay = 10;
/**
* The number of lines to emit in a single window
*/
private int numberOfTuples = 1000;
/**
* Whether to tail from the end or start of file
*/
private boolean end;
/**
* The delimiter used to identify tne end of the record
*/
private char delimiter = '\n';
/**
* This is used to store the last access time of the file
*/
private transient long accessTime;
private transient RandomAccessFile reader;
private transient File file;
/**
* @return the filePath
*/
public String getFilePath()
{
return filePath;
}
/**
* @param filePath
* the filePath to set
*/
public void setFilePath(String filePath)
{
this.filePath = filePath;
}
/**
* @return the delay
*/
public long getDelay()
{
return delay;
}
/**
* @param delay
* the delay to set
*/
public void setDelay(long delay)
{
this.delay = delay;
}
/**
* Gets whether to tail from the end of the file.
* @return the end
*/
public boolean isEnd()
{
return end;
}
/**
* Whether to tail from the end or start of file.
* @param end The end to set.
*/
public void setEnd(boolean end)
{
this.end = end;
}
/**
* @return the position
*/
public long getPosition()
{
return position;
}
/**
* @param position
* the position to set
*/
public void setPosition(long position)
{
this.position = position;
}
/**
* @return the numberOfTuples
*/
public int getNumberOfTuples()
{
return numberOfTuples;
}
/**
* @param numberOfTuples
* the numberOfTuples to set
*/
public void setNumberOfTuples(int numberOfTuples)
{
this.numberOfTuples = numberOfTuples;
}
/**
* @return the delimiter
*/
public char getDelimiter()
{
return delimiter;
}
/**
* @param delimiter
* the delimiter to set
*/
public void setDelimiter(char delimiter)
{
this.delimiter = delimiter;
}
@Override
public void beginWindow(long windowId)
{
}
@Override
public void endWindow()
{
try {
this.position = reader.getFilePointer();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void setup(OperatorContext context)
{
}
@Override
public void teardown()
{
}
@Override
public void activate(OperatorContext ctx)
{
try {
file = new File(filePath);
reader = new RandomAccessFile(file, "r");
position = end ? file.length() : position;
reader.seek(position);
accessTime = System.currentTimeMillis();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void deactivate()
{
try {
reader.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
position = 0;
}
@Override
public void emitTuples()
{
int localCounter = numberOfTuples;
try {
while (localCounter >= 0) {
String str = readLine();
if (str == null) {
//logger.debug("reached end of file");
} else {
output.emit(str);
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
//swallowing exception
}
--localCounter;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private String readLine() throws IOException
{
StringBuffer sb = new StringBuffer();
char readChar;
int ch;
long pos = reader.getFilePointer();
long length = file.length();
if ((length < pos) || (length == pos && FileUtils.isFileNewer(file, accessTime))) {
// file got rotated or truncated
reader.close();
reader = new RandomAccessFile(file, "r");
position = 0;
reader.seek(position);
pos = 0;
}
accessTime = System.currentTimeMillis();
while ((ch = reader.read()) != -1) {
readChar = (char)ch;
if (readChar != delimiter) {
sb.append(readChar);
} else {
return sb.toString();
}
}
reader.seek(pos);
return null;
}
/**
* This is the output port, which emits lines from the specified files.
*/
public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
private static final Logger logger = LoggerFactory.getLogger(TailFsInputOperator.class);
}