blob: 53aec3b04d9468515ee787f9e8b5917b201112a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import junit.framework.TestCase;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ProtoTest.ProtoKey;
import org.apache.hadoop.io.ProtoTest.ProtoValue;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.serial.Serialization;
import org.apache.hadoop.io.serial.SerializationFactory;
import org.apache.hadoop.io.serial.lib.CompatibilitySerialization;
import org.apache.hadoop.io.serial.lib.JavaSerialization;
import org.apache.hadoop.io.serial.lib.avro.AvroSerialization;
import org.apache.hadoop.io.serial.lib.avro.AvroSerialization.Kind;
public class TestSequenceFileSerialization extends TestCase {
private Configuration conf;
private FileSystem fs;
private Path file;
@Override
protected void setUp() throws Exception {
conf = new Configuration();
fs = FileSystem.getLocal(conf);
file = new Path(System.getProperty("test.build.data",".") + "/test.seq");
}
@Override
protected void tearDown() throws Exception {
fs.close();
}
public void writeAvroSpecificSerialization(CompressionType kind
) throws Exception {
AvroKey key = new AvroKey();
AvroValue value = new AvroValue();
fs.delete(file, true);
Writer writer =
SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.compression(kind),
SequenceFile.Writer.keyClass(AvroKey.class),
SequenceFile.Writer.valueClass(AvroValue.class));
key.value = 1;
value.value = new Utf8("one");
writer.append(key, value);
key.value = 2;
value.value = new Utf8("two");
writer.append(key, value);
String writerKeySerialStr = writer.getKeySerialization().toString();
String writerValueSerialStr = writer.getValueSerialization().toString();
writer.close();
assertEquals("{schema: '{\"type\":\"record\",\"name\":\"AvroKey\"," +
"\"namespace\":\"org.apache.hadoop.io\",\"fields\":[{" +
"\"name\":\"value\",\"type\":\"int\"}]}',\n" +
" kind: SPECIFIC}\n",
writerKeySerialStr);
assertEquals("{schema: '{\"type\":\"record\",\"name\":\"AvroValue\"," +
"\"namespace\":\"org.apache.hadoop.io\",\"fields\":[{" +
"\"name\":\"value\",\"type\":\"string\"}]}',\n" +
" kind: SPECIFIC}\n",
writerValueSerialStr);
SerializationFactory factory = SerializationFactory.getInstance(conf);
Serialization<?> keySerialClone = factory.getSerialization("avro");
keySerialClone.fromString(writerKeySerialStr);
Serialization<?> valueSerialClone = factory.getSerialization("avro");
valueSerialClone.fromString(writerValueSerialStr);
Reader reader = new Reader(conf, SequenceFile.Reader.file(file));
Serialization<?> keySerial = reader.getKeySerialization();
Serialization<?> valueSerial = reader.getValueSerialization();
assertEquals(kind, reader.getCompressionType());
assertEquals("avro", keySerial.getName());
assertEquals(writerKeySerialStr, keySerial.toString());
assertEquals(keySerialClone, keySerial);
assertEquals("avro", valueSerial.getName());
assertEquals(writerValueSerialStr, valueSerial.toString());
assertEquals(valueSerialClone, valueSerial);
assertEquals(1, ((AvroKey) reader.nextKey(key)).value);
assertEquals(new Utf8("one"),
((AvroValue) reader.getCurrentValue(value)).value);
assertEquals(2, ((AvroKey) reader.nextKey(key)).value);
assertEquals(new Utf8("two"),
((AvroValue) reader.getCurrentValue(value)).value);
assertNull(reader.nextKey(null));
reader.close();
}
public void readAvroGenericSerialization() throws Exception {
Serialization<?> serial = new AvroSerialization(Kind.GENERIC);
Reader reader = new Reader(conf, SequenceFile.Reader.file(file),
SequenceFile.Reader.keySerialization(serial),
SequenceFile.Reader.valueSerialization(serial.clone()));
assertEquals(1, ((GenericRecord) reader.nextKey(null)).get("value"));
assertEquals(new Utf8("one"),
((GenericRecord) reader.getCurrentValue(null)).get("value"));
assertEquals(2, ((GenericRecord) reader.nextKey(null)).get("value"));
assertEquals(new Utf8("two"),
((GenericRecord) reader.getCurrentValue(null)).get("value"));
assertNull(reader.nextKey(null));
reader.close();
}
public void writeProtobufSerialization(CompressionType kind
) throws Exception {
fs.delete(file, true);
Writer writer =
SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.compression(kind),
SequenceFile.Writer.keyClass(ProtoKey.class),
SequenceFile.Writer.valueClass(ProtoValue.class));
writer.append(ProtoKey.newBuilder().setValue(1).build(),
ProtoValue.newBuilder().setValue("one").build());
writer.append(ProtoKey.newBuilder().setValue(2).build(),
ProtoValue.newBuilder().setValue("two").build());
String keySerialStr = writer.getKeySerialization().toString();
assertEquals("{class: org.apache.hadoop.io.ProtoTest$ProtoKey}\n",
keySerialStr);
String valueSerialStr = writer.getValueSerialization().toString();
assertEquals("{class: org.apache.hadoop.io.ProtoTest$ProtoValue}\n",
valueSerialStr);
writer.close();
// build serializers from the string form
SerializationFactory factory = SerializationFactory.getInstance(conf);
Serialization<?> keySerial = factory.getSerialization("protobuf");
keySerial.fromString(keySerialStr);
Serialization<?> valueSerial = factory.getSerialization("protobuf");
valueSerial.fromString(valueSerialStr);
Reader reader = new Reader(conf, SequenceFile.Reader.file(file));
assertEquals(kind, reader.getCompressionType());
Serialization<?> readerKeySerial = reader.getKeySerialization();
Serialization<?> readerValueSerial = reader.getValueSerialization();
assertEquals("protobuf", readerKeySerial.getName());
assertEquals(keySerialStr, readerKeySerial.toString());
assertEquals(keySerial, readerKeySerial);
assertEquals("protobuf", readerValueSerial.getName());
assertEquals(valueSerialStr, readerValueSerial.toString());
assertEquals(valueSerial, readerValueSerial);
assertEquals(ProtoKey.newBuilder().setValue(1).build(),
reader.nextKey(null));
assertEquals(ProtoValue.newBuilder().setValue("one").build(),
reader.getCurrentValue(null));
assertEquals(ProtoKey.newBuilder().setValue(2).build(),
reader.nextKey(null));
assertEquals(ProtoValue.newBuilder().setValue("two").build(),
reader.getCurrentValue(null));
assertNull(reader.nextKey(null));
reader.close();
}
public void writeThriftSerialization(CompressionType kind) throws Exception {
fs.delete(file, true);
Writer writer =
SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.compression(kind),
SequenceFile.Writer.keyClass(ThriftKey.class),
SequenceFile.Writer.valueClass(ThriftValue.class));
writer.append(new ThriftKey(1), new ThriftValue("one"));
writer.append(new ThriftKey(2), new ThriftValue("two"));
writer.close();
Reader reader = new Reader(conf, SequenceFile.Reader.file(file));
assertEquals(kind, reader.getCompressionType());
assertEquals("thrift", reader.getKeySerialization().getName());
assertEquals("thrift", reader.getValueSerialization().getName());
assertEquals(new ThriftKey(1), reader.nextKey(null));
assertEquals(new ThriftValue("one"), reader.getCurrentValue(null));
assertEquals(new ThriftKey(2), reader.nextKey(null));
assertEquals(new ThriftValue("two"), reader.getCurrentValue(null));
assertNull(reader.nextKey(null));
reader.close();
}
public void writeWritableSerialization(CompressionType kind
) throws Exception {
fs.delete(file, true);
Writer writer =
SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.compression(kind),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class));
writer.append(new IntWritable(1), new Text("one"));
writer.append(new IntWritable(2), new Text("two"));
writer.close();
Reader reader = new Reader(conf, SequenceFile.Reader.file(file));
assertEquals(kind, reader.getCompressionType());
assertEquals("writable", reader.getKeySerialization().getName());
assertEquals("writable", reader.getValueSerialization().getName());
assertEquals(new IntWritable(1), reader.nextKey(null));
assertEquals(new Text("one"), reader.getCurrentValue(null));
assertEquals(new IntWritable(2), reader.nextKey(null));
assertEquals(new Text("two"), reader.getCurrentValue(null));
assertNull(reader.nextKey(null));
reader.close();
}
public void writeJavaSerialization(CompressionType kind) throws Exception {
fs.delete(file, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SERIALIZATIONS_KEY,
JavaSerialization.class.getName());
Writer writer =
SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.compression(kind),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class));
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(conf, SequenceFile.Reader.file(file));
assertEquals(kind, reader.getCompressionType());
assertEquals("java", reader.getKeySerialization().getName());
assertEquals("java", reader.getValueSerialization().getName());
assertEquals(1L, reader.nextKey(null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.nextKey(null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.nextKey(null));
reader.close();
}
/**
* Test the compatibility layer to load the old java serialization.
*/
public void writeOldJavaSerialization(CompressionType kind
) throws Exception {
fs.delete(file, true);
// set the old attribute to include the java serialization
conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization");
SerializationFactory factory = SerializationFactory.getInstance(conf);
Serialization<?> serial = factory.getSerializationByType(Long.class);
assertEquals(CompatibilitySerialization.class, serial.getClass());
Writer writer =
SequenceFile.createWriter(conf,
SequenceFile.Writer.file(file),
SequenceFile.Writer.compression(kind),
SequenceFile.Writer.keyClass(Long.class),
SequenceFile.Writer.valueClass(String.class));
writer.append(1L, "one");
writer.append(2L, "two");
writer.close();
Reader reader = new Reader(conf, SequenceFile.Reader.file(file));
assertEquals("compatibility", reader.getKeySerialization().getName());
assertEquals("compatibility", reader.getValueSerialization().getName());
assertEquals(kind, reader.getCompressionType());
assertEquals(1L, reader.nextKey(null));
assertEquals("one", reader.getCurrentValue((Object) null));
assertEquals(2L, reader.nextKey(null));
assertEquals("two", reader.getCurrentValue((Object) null));
assertNull(reader.nextKey(null));
reader.close();
}
public void testAvro() throws Exception {
writeAvroSpecificSerialization(CompressionType.NONE);
readAvroGenericSerialization();
writeAvroSpecificSerialization(CompressionType.RECORD);
writeAvroSpecificSerialization(CompressionType.BLOCK);
}
public void testProtobuf() throws Exception {
writeProtobufSerialization(CompressionType.NONE);
writeProtobufSerialization(CompressionType.RECORD);
writeProtobufSerialization(CompressionType.BLOCK);
}
public void testThrift() throws Exception {
writeThriftSerialization(CompressionType.NONE);
writeThriftSerialization(CompressionType.RECORD);
writeThriftSerialization(CompressionType.BLOCK);
}
public void testWritable() throws Exception {
writeWritableSerialization(CompressionType.NONE);
writeWritableSerialization(CompressionType.RECORD);
writeWritableSerialization(CompressionType.BLOCK);
}
public void testJava() throws Exception {
writeJavaSerialization(CompressionType.NONE);
writeJavaSerialization(CompressionType.RECORD);
writeJavaSerialization(CompressionType.BLOCK);
}
public void testOldJava() throws Exception {
writeOldJavaSerialization(CompressionType.NONE);
writeOldJavaSerialization(CompressionType.RECORD);
writeOldJavaSerialization(CompressionType.BLOCK);
}
}