blob: ad7577cdf778d42e33312b62c037094a68ebec63 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.filereader
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.util.Logging
import java.io.RandomAccessFile
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.Map
class FileReaderSystemConsumer(
systemName: String,
metricsRegistry: MetricsRegistry,
/**
* Threshold used to determine when there are too many IncomingMessageEnvelopes to be put onto
* the BlockingQueue.
*/
queueSize: Int = 10000,
/**
* the sleep interval of checking the file length. Unit of the time is milliseconds.
*/
pollingSleepMs: Int = 500) extends BlockingEnvelopeMap with Logging {
/**
* a map for storing a systemStreamPartition and its starting offset.
*/
var systemStreamPartitionAndStartingOffset = Map[SystemStreamPartition, String]()
/**
* a thread pool for the threads reading files.
* The size of the pool equals to the number of files to read.
*/
var pool: ExecutorService = null
/**
* register the systemStreamPartition and put they SystemStreampartition and its starting offset
* into the systemStreamPartitionAndStartingOffset map
*/
override def register(systemStreamPartition: SystemStreamPartition, startingOffset: String) {
super.register(systemStreamPartition, startingOffset)
systemStreamPartitionAndStartingOffset += ((systemStreamPartition, startingOffset))
}
/**
* start one thread for each file reader
*/
override def start {
pool = Executors.newFixedThreadPool(systemStreamPartitionAndStartingOffset.size,
new ThreadFactoryBuilder().setNameFormat("Samza FileReader Thread-%d").setDaemon(true).build())
systemStreamPartitionAndStartingOffset.foreach { case (ssp, offset) => pool.execute(readInputFiles(ssp, offset)) }
}
/**
* Stop all the running threads
*/
override def stop {
pool.shutdown
}
/**
* The method returns a runnable object, which reads a file until reach the end of the file. It puts
* every line (ends with \n) and its offset (the beginning of the line) into BlockingQueue. If a line
* is not ended with \n, it is thought as uncompleted. Therefore the thread will wait until the line
* is completed and then put it into queue. The thread keeps comparing the file length with file pointer
* to read the latest/updated file content. If the file is read to the end of current content, setIsHead()
* is called to specify that the SystemStreamPartition has "caught up". The thread sleep time between
* two compares is determined by <code>pollingSleepMs</code>
*/
private def readInputFiles(ssp: SystemStreamPartition, startingOffset: String) = {
new Runnable {
@volatile var shutdown = false // tag to indicate the thread should stop running
def run() {
val path = ssp.getStream
var file: RandomAccessFile = null
var filePointer = startingOffset.toLong
var line = "" // used to form a line of characters
var offset = filePointer // record the beginning offset of a line
try {
file = new RandomAccessFile(path, "r")
while (!shutdown) {
if (file.length <= filePointer) {
Thread.sleep(pollingSleepMs)
file.close
file = new RandomAccessFile(path, "r")
} else {
file.seek(filePointer)
var i = filePointer
while (i < file.length) {
val cha = file.read.toChar
if (cha == '\n') {
// put into the queue. offset is the beginning of this line
put(ssp, new IncomingMessageEnvelope(ssp, offset.toString, null, line));
offset = i + 1 // the beginning of the newline
line = ""
} else {
line = line + cha
}
i += 1
}
filePointer = file.length
setIsAtHead(ssp, true)
}
}
} catch {
case ie: InterruptedException => {
// Swallow this exception since we don't need to clutter the logs
// with interrupt exceptions when shutting down.
info("Received an interrupt while reading file. Shutting down.")
}
} finally {
if (file != null) {
file.close
}
}
}
// stop the thread gracefully by changing the variable's value
def stop() {
shutdown = true
}
}
}
/**
* Constructs a new bounded BlockingQueue of IncomingMessageEnvelopes. The bound is determined
* by the <code>BOUNDED_QUEUE_THRESHOLD</code> constant.
*
* @return A bounded queue used for queueing IncomingMessageEnvelopes to be sent to their
* specified destinations.
*/
override def newBlockingQueue = {
new LinkedBlockingQueue[IncomingMessageEnvelope](queueSize);
}
}