blob: e80685712ae347d7b262bd66022f0ec267831d51 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class TestDataFileConcat {
private static final Logger LOG =
LoggerFactory.getLogger(TestDataFileConcat.class);
CodecFactory codec = null;
CodecFactory codec2 = null;
boolean recompress;
public TestDataFileConcat(CodecFactory codec, CodecFactory codec2, Boolean recompress) {
this.codec = codec;
this.codec2 = codec2;
this.recompress = recompress;
LOG.info("Testing concatenating files, " + codec2 + " into " + codec +
" with recompress=" + recompress);
}
@Parameters
public static List<Object[]> codecs() {
List<Object[]> r = new ArrayList<Object[]>();
r.add(new Object[] { null , null, false});
r.add(new Object[] { null , null, true});
r.add(new Object[]
{ CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), false });
r.add(new Object[]
{ CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), true });
r.add(new Object[]
{ CodecFactory.deflateCodec(3), CodecFactory.nullCodec(), false });
r.add(new Object[]
{ CodecFactory.nullCodec(), CodecFactory.deflateCodec(6), false });
r.add(new Object[]
{ CodecFactory.xzCodec(1), CodecFactory.xzCodec(2), false });
r.add(new Object[]
{ CodecFactory.xzCodec(1), CodecFactory.xzCodec(2), true });
r.add(new Object[]
{ CodecFactory.xzCodec(2), CodecFactory.nullCodec(), false });
r.add(new Object[]
{ CodecFactory.nullCodec(), CodecFactory.xzCodec(2), false });
return r;
}
private static final int COUNT =
Integer.parseInt(System.getProperty("test.count", "200"));
private static final boolean VALIDATE =
!"false".equals(System.getProperty("test.validate", "true"));
private static final File DIR
= new File(System.getProperty("test.dir", "/tmp"));
private static final long SEED = System.currentTimeMillis();
private static final String SCHEMA_JSON =
"{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
+"{\"name\":\"stringField\", \"type\":\"string\"}" +
","
+"{\"name\":\"longField\", \"type\":\"long\"}" +
"]}";
private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
private File makeFile(String name) {
return new File(DIR, "test-" + name + ".avro");
}
@Test
public void testConcatenateFiles() throws IOException {
System.out.println("SEED = "+SEED);
System.out.println("COUNT = "+COUNT);
for (int k = 0; k < 5; k++) {
int syncInterval = 460 +k;
RandomData data1 = new RandomData(SCHEMA, COUNT, SEED);
RandomData data2 = new RandomData(SCHEMA, COUNT, SEED+1);
File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
DataFileWriter<Object> writer =
new DataFileWriter<Object>(new GenericDatumWriter<Object>())
.setSyncInterval(syncInterval);
if (codec != null) {
writer.setCodec(codec);
}
writer.create(SCHEMA, file1);
try {
for (Object datum : data1) {
writer.append(datum);
}
} finally {
writer.close();
}
DataFileWriter<Object> writer2 =
new DataFileWriter<Object>(new GenericDatumWriter<Object>())
.setSyncInterval(syncInterval);
if (codec2 != null) {
writer2.setCodec(codec2);
}
writer2.create(SCHEMA, file2);
try {
for (Object datum : data2) {
writer2.append(datum);
}
} finally {
writer2.close();
}
DataFileWriter<Object> concatinto =
new DataFileWriter<Object>(new GenericDatumWriter<Object>())
.setSyncInterval(syncInterval);
concatinto.appendTo(file1);
DataFileReader<Object> concatfrom =
new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
concatinto.appendAllFrom(concatfrom, recompress);
concatinto.close();
concatfrom.close();
concatfrom = new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
DataFileReader<Object> concat =
new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
int count = 0;
try {
Object datum = null;
if (VALIDATE) {
for (Object expected : data1) {
datum = concat.next(datum);
assertEquals("at "+count++, expected, datum);
}
for (Object expected : data2) {
datum = concatfrom.next(datum);
assertEquals("at "+count++, expected, datum);
}
for (Object expected : data2) {
datum = concat.next(datum);
assertEquals("at "+count++, expected, datum);
}
} else {
for (int i = 0; i < COUNT*2; i++) {
datum = concat.next(datum);
}
}
} finally {
if (count != 3 * COUNT) {
System.out.println(count + " " + k);
}
concat.close();
concatfrom.close();
}
}
}
}