blob: cd8cc5ed79283a17c7ffc8c9167557e862603930 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.hdfs.reader;
import java.util.List;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.hdfs.HdfsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A wrapper on top of {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader}
* to manage the situation of multiple files per partition.
*
* The offset for MultiFileHdfsReader, which is also the offset that gets
* committed in and used by Samza, consists of two parts: file index,
* actual offset within file. For example, 3:127
*
* Format of the offset within file is defined by the implementation of
* {@link org.apache.samza.system.hdfs.reader.SingleFileHdfsReader} itself.
*/
public class MultiFileHdfsReader {
private static final Logger LOG = LoggerFactory.getLogger(MultiFileHdfsReader.class);
private static final String DELIMITER = ":";
private final HdfsReaderFactory.ReaderType readerType;
private final SystemStreamPartition systemStreamPartition;
private final List<String> filePaths;
private SingleFileHdfsReader curReader;
private int curFileIndex = 0;
private String curSingleFileOffset;
private int numRetries;
private int numMaxRetries;
/**
* Get the current file index from the offset string
* @param offset offset string that contains both file index and offset within file
* @return the file index part
*/
public static int getCurFileIndex(String offset) {
String[] elements = offset.split(DELIMITER);
if (elements.length < 2) {
throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
}
return Integer.parseInt(elements[0]);
}
/**
* Get the offset within file from the offset string
* @param offset offset string that contains both file index and offset within file
* @return the single file offset part
*/
public static String getCurSingleFileOffset(String offset) {
String[] elements = offset.split(DELIMITER);
if (elements.length < 2) {
throw new SamzaException("Invalid offset for MultiFileHdfsReader: " + offset);
}
// Getting the remaining of the offset string in case the single file
// offset uses the same delimiter.
return offset.substring(elements[0].length() + 1);
}
/**
* Generate the offset based on file index and offset within single file
* @param fileIndex index of the file
* @param singleFileOffset offset within single file
* @return the complete offset
*/
public static String generateOffset(int fileIndex, String singleFileOffset) {
return fileIndex + DELIMITER + singleFileOffset;
}
/*
* Get current offset: offset of the LAST message being successfully read. If no messages have
* ever been read, return the offset of first event.
*/
private String getCurOffset() {
return generateOffset(curFileIndex, curSingleFileOffset);
}
public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
List<String> partitionDescriptors, String offset) {
this(readerType, systemStreamPartition, partitionDescriptors, offset,
Integer.parseInt(HdfsConfig.CONSUMER_NUM_MAX_RETRIES_DEFAULT()));
}
private void init(String offset) {
if (curReader != null) {
curReader.close();
curReader = null;
}
curFileIndex = getCurFileIndex(offset);
if (curFileIndex >= filePaths.size()) {
throw new SamzaException(
String.format("Invalid file index %d. Number of files is %d", curFileIndex, filePaths.size()));
}
curSingleFileOffset = getCurSingleFileOffset(offset);
curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
curReader.open(filePaths.get(curFileIndex), curSingleFileOffset);
}
public MultiFileHdfsReader(HdfsReaderFactory.ReaderType readerType, SystemStreamPartition systemStreamPartition,
List<String> partitionDescriptors, String offset, int numMaxRetries) {
this.readerType = readerType;
this.systemStreamPartition = systemStreamPartition;
this.filePaths = partitionDescriptors;
this.numMaxRetries = numMaxRetries;
this.numRetries = 0;
if (partitionDescriptors.isEmpty()) {
throw new SamzaException(
"Invalid number of files based on partition descriptors: " + partitionDescriptors.size());
}
init(offset);
}
public boolean hasNext() {
while (curFileIndex < filePaths.size()) {
if (curReader.hasNext()) {
return true;
}
curReader.close();
curFileIndex++;
if (curFileIndex < filePaths.size()) {
curReader = HdfsReaderFactory.getHdfsReader(readerType, systemStreamPartition);
curReader.open(filePaths.get(curFileIndex), "0");
}
}
return false;
}
public IncomingMessageEnvelope readNext() {
if (!hasNext()) {
LOG.warn("Attempting to read more data when there aren't any. ssp=" + systemStreamPartition);
return null;
}
// record the next offset before we read, so when the read fails and we reconnect,
// we seek to the same offset that we try below
curSingleFileOffset = curReader.nextOffset();
IncomingMessageEnvelope messageEnvelope = curReader.readNext();
// Copy everything except for the offset. Turn the single-file style offset into a multi-file one
return new IncomingMessageEnvelope(messageEnvelope.getSystemStreamPartition(), getCurOffset(),
messageEnvelope.getKey(), messageEnvelope.getMessage(), messageEnvelope.getSize(),
messageEnvelope.getEventTime(), messageEnvelope.getArrivalTime());
}
/**
* Reconnect to the file systems in case of failure.
* Reset offset to the last checkpoint (last successfully read message).
* Throw {@link org.apache.samza.SamzaException} if reaches max number of
* retries.
*/
public void reconnect() {
reconnect(getCurOffset());
}
/**
* Reconnect to the file systems in case of failures.
* @param offset reset offset to the specified offset
* Throw {@link org.apache.samza.SamzaException} if reaches max number of
* retries.
*/
public void reconnect(String offset) {
if (numRetries >= numMaxRetries) {
throw new SamzaException(
String.format("Give up reconnecting. numRetries: %d; numMaxRetries: %d", numRetries, numMaxRetries));
}
LOG.info(String
.format("Reconnecting with offset: %s numRetries: %d numMaxRetries: %d", offset, numRetries, numMaxRetries));
numRetries++;
init(offset);
}
public void close() {
LOG.info(String.format("MiltiFileHdfsReader shutdown requested for %s. Current offset = %s", systemStreamPartition,
getCurOffset()));
if (curReader != null) {
curReader.close();
}
}
public SystemStreamPartition getSystemStreamPartition() {
return systemStreamPartition;
}
}