| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.io.fs; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.io.FileUtils; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.InputOperator; |
| import com.datatorrent.api.Operator.ActivationListener; |
| |
| /** |
| * This operator implements "tail -f" command. If the operator has reached the end of the file, it will wait till more |
| * data comes. |
| * <p> |
| * <b>Ports</b>:<br> |
| * <b>outport</b>: emits <String><br> |
| * <br> |
| * <b>Properties</b>:<br> |
| * <b>filePath</b> : Path for file to be read. <br> |
| * <b>delay</b>: Thread sleep interval after emitting line.<br> |
| * <b>position</b>: The position from where to start reading the file.<br> |
| * <b>numberOfTuples</b>: number of tuples to be emitted in a single emit Tuple call.<br> |
| * <b>end</b>: if the user wants to start tailing from end.<br> |
| * <br> |
| * </p> |
| * @displayName Tail Input |
| * @category Input |
| * @tags local fs, file |
| * @since 0.9.4 |
| */ |
| @org.apache.hadoop.classification.InterfaceStability.Evolving |
| public class TailFsInputOperator implements InputOperator, ActivationListener<OperatorContext> |
| { |
| /** |
| * The file which will be tailed. |
| */ |
| private String filePath; |
| |
| /** |
| * This holds the position from which to start reading the stream |
| */ |
| private long position; |
| |
| /** |
| * The amount of time to wait for the file to be updated. |
| */ |
| private long delay = 10; |
| |
| /** |
| * The number of lines to emit in a single window |
| */ |
| private int numberOfTuples = 1000; |
| |
| /** |
| * Whether to tail from the end or start of file |
| */ |
| private boolean end; |
| |
| /** |
| * The delimiter used to identify tne end of the record |
| */ |
| private char delimiter = '\n'; |
| |
| /** |
| * This is used to store the last access time of the file |
| */ |
| private transient long accessTime; |
| |
| private transient RandomAccessFile reader; |
| private transient File file; |
| |
| /** |
| * @return the filePath |
| */ |
| public String getFilePath() |
| { |
| return filePath; |
| } |
| |
| /** |
| * @param filePath |
| * the filePath to set |
| */ |
| public void setFilePath(String filePath) |
| { |
| this.filePath = filePath; |
| } |
| |
| /** |
| * @return the delay |
| */ |
| public long getDelay() |
| { |
| return delay; |
| } |
| |
| /** |
| * @param delay |
| * the delay to set |
| */ |
| public void setDelay(long delay) |
| { |
| this.delay = delay; |
| } |
| |
| /** |
| * Gets whether to tail from the end of the file. |
| * @return the end |
| */ |
| public boolean isEnd() |
| { |
| return end; |
| } |
| |
| /** |
| * Whether to tail from the end or start of file. |
| * @param end The end to set. |
| */ |
| public void setEnd(boolean end) |
| { |
| this.end = end; |
| } |
| |
| /** |
| * @return the position |
| */ |
| public long getPosition() |
| { |
| return position; |
| } |
| |
| /** |
| * @param position |
| * the position to set |
| */ |
| public void setPosition(long position) |
| { |
| this.position = position; |
| } |
| |
| /** |
| * @return the numberOfTuples |
| */ |
| public int getNumberOfTuples() |
| { |
| return numberOfTuples; |
| } |
| |
| /** |
| * @param numberOfTuples |
| * the numberOfTuples to set |
| */ |
| public void setNumberOfTuples(int numberOfTuples) |
| { |
| this.numberOfTuples = numberOfTuples; |
| } |
| |
| /** |
| * @return the delimiter |
| */ |
| public char getDelimiter() |
| { |
| return delimiter; |
| } |
| |
| /** |
| * @param delimiter |
| * the delimiter to set |
| */ |
| public void setDelimiter(char delimiter) |
| { |
| this.delimiter = delimiter; |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| try { |
| this.position = reader.getFilePointer(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| } |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| } |
| |
| @Override |
| public void teardown() |
| { |
| } |
| |
| @Override |
| public void activate(OperatorContext ctx) |
| { |
| try { |
| file = new File(filePath); |
| reader = new RandomAccessFile(file, "r"); |
| position = end ? file.length() : position; |
| reader.seek(position); |
| accessTime = System.currentTimeMillis(); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void deactivate() |
| { |
| try { |
| reader.close(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| position = 0; |
| } |
| |
| @Override |
| public void emitTuples() |
| { |
| int localCounter = numberOfTuples; |
| try { |
| while (localCounter >= 0) { |
| String str = readLine(); |
| if (str == null) { |
| //logger.debug("reached end of file"); |
| } else { |
| output.emit(str); |
| } |
| try { |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| //swallowing exception |
| } |
| --localCounter; |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private String readLine() throws IOException |
| { |
| StringBuffer sb = new StringBuffer(); |
| char readChar; |
| int ch; |
| long pos = reader.getFilePointer(); |
| long length = file.length(); |
| if ((length < pos) || (length == pos && FileUtils.isFileNewer(file, accessTime))) { |
| // file got rotated or truncated |
| reader.close(); |
| reader = new RandomAccessFile(file, "r"); |
| position = 0; |
| reader.seek(position); |
| pos = 0; |
| } |
| accessTime = System.currentTimeMillis(); |
| while ((ch = reader.read()) != -1) { |
| readChar = (char)ch; |
| if (readChar != delimiter) { |
| sb.append(readChar); |
| } else { |
| return sb.toString(); |
| } |
| } |
| reader.seek(pos); |
| return null; |
| } |
| |
| /** |
| * This is the output port, which emits lines from the specified files. |
| */ |
| public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); |
| |
| private static final Logger logger = LoggerFactory.getLogger(TailFsInputOperator.class); |
| |
| } |