blob: c3d31bca41c218c5a65940e6877850a037571aad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.tool;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
/** Recovers data from a corrupt Avro Data file */
public class DataFileRepairTool implements Tool {
@Override
public String getName() {
return "repair";
}
@Override
public String getShortDescription() {
return "Recovers data from a corrupt Avro Data file";
}
private void printInfo(PrintStream output) {
output.println("Insufficient arguments. Arguments: [-o option] "
+ "input_file output_file \n"
+ " Where option is one of the following: \n"
+ " " + ALL
+ " (default) recover as many records as possible.\n"
+ " " + PRIOR
+ " recover only records prior to the first instance"
+ " of corruption \n"
+ " " + AFTER
+ " recover only records after the first instance of"
+ " corruption.\n"
+ " " + REPORT
+ " print the corruption report only, reporting the\n"
+ " number of valid and corrupted blocks and records\n"
+ " input_file is the file to read from. output_file is the file to\n"
+ " create and write recovered data to. output_file is ignored if\n"
+ " using the report option.");
}
private static final Set<String> OPTIONS = new HashSet<String>();
private static final String ALL = "all";
private static final String PRIOR = "prior";
private static final String AFTER = "after";
private static final String REPORT = "report";
static {
OPTIONS.add(ALL);
OPTIONS.add(PRIOR);
OPTIONS.add(AFTER);
OPTIONS.add(REPORT);
}
@Override
public int run(InputStream stdin, PrintStream out, PrintStream err,
List<String> args) throws Exception {
if (args.size() < 2) {
printInfo(err);
return 1;
}
int index = 0;
String input = args.get(index);
String option = "all";
if ("-o".equals(input)) {
option = args.get(1);
index += 2;
}
if (!OPTIONS.contains(option) || (args.size() - index < 1)) {
printInfo(err);
return 1;
}
input = args.get(index++);
if (!REPORT.equals(option)) {
if (args.size() - index < 1) {
printInfo(err);
return 1;
}
}
if (ALL.equals(option)) {
return recoverAll(input, args.get(index), out, err);
} else if (PRIOR.equals(option)) {
return recoverPrior(input, args.get(index), out, err);
} else if (AFTER.equals(option)) {
return recoverAfter(input, args.get(index), out, err);
} else if (REPORT.equals(option)) {
return reportOnly(input, out, err);
} else {
return 1;
}
}
private int recover(String input, String output, PrintStream out,
PrintStream err, boolean recoverPrior, boolean recoverAfter)
throws IOException {
File infile = new File(input);
if (!infile.canRead()) {
err.println("cannot read file: " + input);
return 1;
}
out.println("Recovering file: " + input);
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileReader<Object> fileReader = new DataFileReader<Object>(infile,
reader);
try {
Schema schema = fileReader.getSchema();
String codecStr = fileReader.getMetaString(DataFileConstants.CODEC);
CodecFactory codecFactory = CodecFactory.fromString("" + codecStr);
List<String> metas = fileReader.getMetaKeys();
if (recoverPrior || recoverAfter) {
GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>();
DataFileWriter<Object> fileWriter = new DataFileWriter<Object>(writer);
try {
File outfile = new File(output);
for (String key : metas) {
if (!key.startsWith("avro.")) {
byte[] val = fileReader.getMeta(key);
fileWriter.setMeta(key, val);
}
}
fileWriter.setCodec(codecFactory);
int result = innerRecover(fileReader, fileWriter, out, err, recoverPrior,
recoverAfter, schema, outfile);
return result;
} catch (Exception e) {
e.printStackTrace(err);
return 1;
}
} else {
return innerRecover(fileReader, null, out, err, recoverPrior,
recoverAfter, null, null);
}
} finally {
fileReader.close();
}
}
private int innerRecover(DataFileReader<Object> fileReader,
DataFileWriter<Object> fileWriter, PrintStream out, PrintStream err,
boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile) {
int numBlocks = 0;
int numCorruptBlocks = 0;
int numRecords = 0;
int numCorruptRecords = 0;
int recordsWritten = 0;
long position = fileReader.previousSync();
long blockSize = 0;
long blockCount = 0;
boolean fileWritten = false;
try {
while (true) {
try {
if (!fileReader.hasNext()) {
out.println("File Summary: ");
out.println(" Number of blocks: " + numBlocks
+ " Number of corrupt blocks: " + numCorruptBlocks);
out.println(" Number of records: " + numRecords
+ " Number of corrupt records: " + numCorruptRecords);
if (recoverAfter || recoverPrior) {
out.println(" Number of records written " + recordsWritten);
}
out.println();
return 0;
}
position = fileReader.previousSync();
blockCount = fileReader.getBlockCount();
blockSize = fileReader.getBlockSize();
numRecords += blockCount;
long blockRemaining = blockCount;
numBlocks++;
boolean lastRecordWasBad = false;
long badRecordsInBlock = 0;
while (blockRemaining > 0) {
try {
Object datum = fileReader.next();
if ((recoverPrior && numCorruptBlocks == 0)
|| (recoverAfter && numCorruptBlocks > 0)) {
if (!fileWritten) {
try {
fileWriter.create(schema, outfile);
fileWritten = true;
} catch (Exception e) {
e.printStackTrace(err);
return 1;
}
}
try {
fileWriter.append(datum);
recordsWritten++;
} catch (Exception e) {
e.printStackTrace(err);
throw e;
}
}
blockRemaining--;
lastRecordWasBad = false;
} catch (Exception e) {
long pos = blockCount - blockRemaining;
if (badRecordsInBlock == 0) {
// first corrupt record
numCorruptBlocks++;
err.println("Corrupt block: " + numBlocks + " Records in block: "
+ blockCount + " uncompressed block size: " + blockSize);
err.println("Corrupt record at position: "
+ (pos));
} else {
// second bad record in block, if consecutive skip block.
err.println("Corrupt record at position: "
+ (pos));
if (lastRecordWasBad) {
// consecutive bad record
err.println("Second consecutive bad record in block: " + numBlocks
+ ". Skipping remainder of block. ");
numCorruptRecords += blockRemaining;
badRecordsInBlock += blockRemaining;
try {
fileReader.sync(position);
} catch (Exception e2) {
err.println("failed to sync to sync marker, aborting");
e2.printStackTrace(err);
return 1;
}
break;
}
}
blockRemaining --;
lastRecordWasBad = true;
numCorruptRecords++;
badRecordsInBlock++;
}
}
if (badRecordsInBlock != 0) {
err.println("** Number of unrecoverable records in block: "
+ (badRecordsInBlock));
}
position = fileReader.previousSync();
} catch (Exception e) {
err.println("Failed to read block " + numBlocks + ". Unknown record "
+ "count in block. Skipping. Reason: " + e.getMessage());
numCorruptBlocks++;
try {
fileReader.sync(position);
} catch (Exception e2) {
err.println("failed to sync to sync marker, aborting");
e2.printStackTrace(err);
return 1;
}
}
}
} finally {
if (fileWritten) {
try {
fileWriter.close();
} catch (Exception e) {
e.printStackTrace(err);
return 1;
}
}
}
}
private int reportOnly(String input, PrintStream out, PrintStream err)
throws IOException {
return recover(input, null, out, err, false, false);
}
private int recoverAfter(String input, String output, PrintStream out,
PrintStream err) throws IOException {
return recover(input, output, out, err, false, true);
}
private int recoverPrior(String input, String output, PrintStream out,
PrintStream err) throws IOException {
return recover(input, output, out, err, true, false);
}
private int recoverAll(String input, String output, PrintStream out,
PrintStream err) throws IOException {
return recover(input, output, out, err, true, true);
}
}