blob: 94898ee38d59982aea132bb45c9dacd6ee83d66d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import com.google.common.base.Charsets;
class RollingLogsImpl implements RollingLogs {
private static final String CURR_SUFFIX = ".curr";
private static final String PREV_SUFFIX = ".prev";
static boolean isFilePresent(String dir, String filePrefix) {
return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
new File(dir, filePrefix + PREV_SUFFIX).exists();
}
private final File curr;
private final File prev;
private PrintWriter out; //require synchronized access
private Appender appender = new Appender() {
@Override
public Appendable append(CharSequence csq) {
synchronized(RollingLogsImpl.this) {
if (out == null) {
throw new IllegalStateException(RollingLogsImpl.this
+ " is not yet opened.");
}
out.print(csq);
}
return this;
}
@Override
public Appendable append(char c) {
throw new UnsupportedOperationException();
}
@Override
public Appendable append(CharSequence csq, int start, int end) {
throw new UnsupportedOperationException();
}
@Override
public void close() {
synchronized(RollingLogsImpl.this) {
if (out != null) {
out.close();
out = null;
}
}
}
};
private final AtomicInteger numReaders = new AtomicInteger();
RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
curr = new File(dir, filePrefix + CURR_SUFFIX);
prev = new File(dir, filePrefix + PREV_SUFFIX);
out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
curr, true), Charsets.UTF_8));
}
@Override
public Reader iterator(boolean skipPrevFile) throws IOException {
numReaders.incrementAndGet();
return new Reader(skipPrevFile);
}
@Override
public Appender appender() {
return appender;
}
@Override
public boolean roll() throws IOException {
if (numReaders.get() > 0) {
return false;
}
if (!prev.delete() && prev.exists()) {
throw new IOException("Failed to delete " + prev);
}
synchronized(this) {
appender.close();
final boolean renamed = curr.renameTo(prev);
out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
curr, true), Charsets.UTF_8));
if (!renamed) {
throw new IOException("Failed to rename " + curr + " to " + prev);
}
}
return true;
}
@Override
public String toString() {
return curr.toString();
}
/**
* This is used to read the lines in order.
* If the data is not read completely (i.e, untill hasNext() returns
* false), it needs to be explicitly
*/
private class Reader implements RollingLogs.LineIterator {
private File file;
private BufferedReader reader;
private String line;
private boolean closed = false;
private Reader(boolean skipPrevFile) throws IOException {
reader = null;
file = skipPrevFile? curr : prev;
readNext();
}
@Override
public boolean isPrevious() {
return file == prev;
}
private boolean openFile() throws IOException {
for(int i=0; i<2; i++) {
if (reader != null || i > 0) {
// move to next file
file = isPrevious()? curr : null;
}
if (file == null) {
return false;
}
if (file.exists()) {
break;
}
}
if (reader != null ) {
reader.close();
reader = null;
}
reader = new BufferedReader(new InputStreamReader(new FileInputStream(
file), Charsets.UTF_8));
return true;
}
// read next line if possible.
private void readNext() throws IOException {
line = null;
try {
if (reader != null && (line = reader.readLine()) != null) {
return;
}
if (line == null) {
// move to the next file.
if (openFile()) {
readNext();
}
}
} finally {
if (!hasNext()) {
close();
}
}
}
@Override
public boolean hasNext() {
return line != null;
}
@Override
public String next() {
String curLine = line;
try {
readNext();
} catch (IOException e) {
DataBlockScanner.LOG.warn("Failed to read next line.", e);
}
return curLine;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
if (!closed) {
try {
if (reader != null) {
reader.close();
}
} finally {
file = null;
reader = null;
closed = true;
final int n = numReaders.decrementAndGet();
assert(n >= 0);
}
}
}
}
}