| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.avro; |
| |
| import static org.junit.Assert.*; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.avro.file.CodecFactory; |
| import org.apache.avro.file.DataFileReader; |
| import org.apache.avro.file.DataFileWriter; |
| import org.apache.avro.generic.GenericDatumReader; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| import org.junit.runners.Parameterized.Parameters; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @RunWith(Parameterized.class) |
| public class TestDataFileConcat { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TestDataFileConcat.class); |
| |
| CodecFactory codec = null; |
| CodecFactory codec2 = null; |
| boolean recompress; |
| public TestDataFileConcat(CodecFactory codec, CodecFactory codec2, Boolean recompress) { |
| this.codec = codec; |
| this.codec2 = codec2; |
| this.recompress = recompress; |
| LOG.info("Testing concatenating files, " + codec2 + " into " + codec + |
| " with recompress=" + recompress); |
| } |
| |
| @Parameters |
| public static List<Object[]> codecs() { |
| List<Object[]> r = new ArrayList<Object[]>(); |
| r.add(new Object[] { null , null, false}); |
| r.add(new Object[] { null , null, true}); |
| r.add(new Object[] |
| { CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), false }); |
| r.add(new Object[] |
| { CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), true }); |
| r.add(new Object[] |
| { CodecFactory.deflateCodec(3), CodecFactory.nullCodec(), false }); |
| r.add(new Object[] |
| { CodecFactory.nullCodec(), CodecFactory.deflateCodec(6), false }); |
| r.add(new Object[] |
| { CodecFactory.xzCodec(1), CodecFactory.xzCodec(2), false }); |
| r.add(new Object[] |
| { CodecFactory.xzCodec(1), CodecFactory.xzCodec(2), true }); |
| r.add(new Object[] |
| { CodecFactory.xzCodec(2), CodecFactory.nullCodec(), false }); |
| r.add(new Object[] |
| { CodecFactory.nullCodec(), CodecFactory.xzCodec(2), false }); |
| return r; |
| } |
| |
| private static final int COUNT = |
| Integer.parseInt(System.getProperty("test.count", "200")); |
| private static final boolean VALIDATE = |
| !"false".equals(System.getProperty("test.validate", "true")); |
| private static final File DIR |
| = new File(System.getProperty("test.dir", "/tmp")); |
| private static final long SEED = System.currentTimeMillis(); |
| |
| private static final String SCHEMA_JSON = |
| "{\"type\": \"record\", \"name\": \"Test\", \"fields\": [" |
| +"{\"name\":\"stringField\", \"type\":\"string\"}" + |
| "," |
| +"{\"name\":\"longField\", \"type\":\"long\"}" + |
| "]}"; |
| private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON); |
| |
| private File makeFile(String name) { |
| return new File(DIR, "test-" + name + ".avro"); |
| } |
| |
| @Test |
| public void testConcatenateFiles() throws IOException { |
| System.out.println("SEED = "+SEED); |
| System.out.println("COUNT = "+COUNT); |
| for (int k = 0; k < 5; k++) { |
| int syncInterval = 460 +k; |
| RandomData data1 = new RandomData(SCHEMA, COUNT, SEED); |
| RandomData data2 = new RandomData(SCHEMA, COUNT, SEED+1); |
| File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A"); |
| File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B"); |
| DataFileWriter<Object> writer = |
| new DataFileWriter<Object>(new GenericDatumWriter<Object>()) |
| .setSyncInterval(syncInterval); |
| if (codec != null) { |
| writer.setCodec(codec); |
| } |
| writer.create(SCHEMA, file1); |
| try { |
| for (Object datum : data1) { |
| writer.append(datum); |
| } |
| } finally { |
| writer.close(); |
| } |
| DataFileWriter<Object> writer2 = |
| new DataFileWriter<Object>(new GenericDatumWriter<Object>()) |
| .setSyncInterval(syncInterval); |
| if (codec2 != null) { |
| writer2.setCodec(codec2); |
| } |
| writer2.create(SCHEMA, file2); |
| try { |
| for (Object datum : data2) { |
| writer2.append(datum); |
| } |
| } finally { |
| writer2.close(); |
| } |
| DataFileWriter<Object> concatinto = |
| new DataFileWriter<Object>(new GenericDatumWriter<Object>()) |
| .setSyncInterval(syncInterval); |
| concatinto.appendTo(file1); |
| DataFileReader<Object> concatfrom = |
| new DataFileReader<Object>(file2, new GenericDatumReader<Object>()); |
| concatinto.appendAllFrom(concatfrom, recompress); |
| concatinto.close(); |
| concatfrom.close(); |
| |
| concatfrom = new DataFileReader<Object>(file2, new GenericDatumReader<Object>()); |
| |
| |
| DataFileReader<Object> concat = |
| new DataFileReader<Object>(file1, new GenericDatumReader<Object>()); |
| int count = 0; |
| try { |
| Object datum = null; |
| if (VALIDATE) { |
| for (Object expected : data1) { |
| datum = concat.next(datum); |
| assertEquals("at "+count++, expected, datum); |
| } |
| for (Object expected : data2) { |
| datum = concatfrom.next(datum); |
| assertEquals("at "+count++, expected, datum); |
| } |
| for (Object expected : data2) { |
| datum = concat.next(datum); |
| assertEquals("at "+count++, expected, datum); |
| } |
| } else { |
| for (int i = 0; i < COUNT*2; i++) { |
| datum = concat.next(datum); |
| } |
| } |
| } finally { |
| if (count != 3 * COUNT) { |
| System.out.println(count + " " + k); |
| } |
| concat.close(); |
| concatfrom.close(); |
| } |
| |
| } |
| } |
| |
| } |