blob: 032c21254dd32a3eec306498a32e8d1bb3b9644f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.mapred;
import java.io.IOException;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.NoSuchElementException;
import java.net.URI;
import java.lang.reflect.Type;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.FileReader;
import org.apache.avro.reflect.ReflectData;
/** A {@link FileReader} for sequence files. */
@SuppressWarnings(value = "unchecked")
public class SequenceFileReader<K, V> implements FileReader<Pair<K, V>> {
private SequenceFile.Reader reader;
private Schema schema;
private boolean ready = false; // true iff done & key are current
private boolean done = false; // true iff at EOF
private Writable key, spareKey, value;
private Converter<K> keyConverter = o -> (K) o;
private Converter<V> valConverter = o -> (V) o;
public SequenceFileReader(File file) throws IOException {
this(file.toURI(), new Configuration());
}
public SequenceFileReader(URI uri, Configuration c) throws IOException {
this(new SequenceFile.Reader(FileSystem.get(uri, c), new Path(uri.toString()), c), c);
}
public SequenceFileReader(SequenceFile.Reader reader, Configuration conf) {
this.reader = reader;
this.schema = Pair.getPairSchema(WritableData.get().getSchema(reader.getKeyClass()),
WritableData.get().getSchema(reader.getValueClass()));
this.key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
this.spareKey = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
this.value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
if (WRITABLE_CONVERTERS.containsKey(reader.getKeyClass()))
keyConverter = WRITABLE_CONVERTERS.get(reader.getKeyClass());
if (WRITABLE_CONVERTERS.containsKey(reader.getValueClass()))
valConverter = WRITABLE_CONVERTERS.get(reader.getValueClass());
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public Iterator<Pair<K, V>> iterator() {
return this;
}
@Override
public Schema getSchema() {
return schema;
}
private void prepare() throws IOException {
if (ready)
return;
this.done = !reader.next(key);
ready = true;
}
@Override
public boolean hasNext() {
try {
prepare();
return !done;
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
@Override
public Pair<K, V> next() {
try {
return next(null);
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
@Override
public Pair<K, V> next(Pair<K, V> reuse) throws IOException {
prepare();
if (!hasNext())
throw new NoSuchElementException();
Pair<K, V> result = reuse;
if (result == null)
result = new Pair<>(schema);
result.key(keyConverter.convert(key));
reader.getCurrentValue(value);
result.value(valConverter.convert(value));
// swap key and spareKey
Writable k = key;
key = spareKey;
spareKey = k;
ready = false;
return result;
}
@Override
public void sync(long position) throws IOException {
if (position > reader.getPosition())
reader.sync(position);
ready = false;
}
@Override
public boolean pastSync(long position) throws IOException {
return reader.getPosition() >= position && reader.syncSeen();
}
@Override
public long tell() throws IOException {
return reader.getPosition();
}
private static final Map<Type, Schema> WRITABLE_SCHEMAS = new HashMap<>();
static {
WRITABLE_SCHEMAS.put(NullWritable.class, Schema.create(Schema.Type.NULL));
WRITABLE_SCHEMAS.put(BooleanWritable.class, Schema.create(Schema.Type.BOOLEAN));
WRITABLE_SCHEMAS.put(IntWritable.class, Schema.create(Schema.Type.INT));
WRITABLE_SCHEMAS.put(LongWritable.class, Schema.create(Schema.Type.LONG));
WRITABLE_SCHEMAS.put(FloatWritable.class, Schema.create(Schema.Type.FLOAT));
WRITABLE_SCHEMAS.put(DoubleWritable.class, Schema.create(Schema.Type.DOUBLE));
WRITABLE_SCHEMAS.put(BytesWritable.class, Schema.create(Schema.Type.BYTES));
WRITABLE_SCHEMAS.put(Text.class, Schema.create(Schema.Type.STRING));
}
private static class WritableData extends ReflectData {
private static final WritableData INSTANCE = new WritableData();
protected WritableData() {
}
/** Return the singleton instance. */
public static WritableData get() {
return INSTANCE;
}
@Override
public Schema getSchema(java.lang.reflect.Type type) {
if (WRITABLE_SCHEMAS.containsKey(type))
return WRITABLE_SCHEMAS.get(type);
else
return super.getSchema(type);
}
}
private interface Converter<T> {
T convert(Writable o);
}
private static final Map<Type, Converter> WRITABLE_CONVERTERS = new HashMap<>();
static {
WRITABLE_CONVERTERS.put(NullWritable.class, (Converter<Void>) o -> null);
WRITABLE_CONVERTERS.put(BooleanWritable.class, (Converter<Boolean>) o -> ((BooleanWritable) o).get());
WRITABLE_CONVERTERS.put(IntWritable.class, (Converter<Integer>) o -> ((IntWritable) o).get());
WRITABLE_CONVERTERS.put(LongWritable.class, (Converter<Long>) o -> ((LongWritable) o).get());
WRITABLE_CONVERTERS.put(FloatWritable.class, (Converter<Float>) o -> ((FloatWritable) o).get());
WRITABLE_CONVERTERS.put(DoubleWritable.class, (Converter<Double>) o -> ((DoubleWritable) o).get());
WRITABLE_CONVERTERS.put(BytesWritable.class, (Converter<ByteBuffer>) o -> {
BytesWritable b = (BytesWritable) o;
return ByteBuffer.wrap(b.getBytes(), 0, b.getLength());
});
WRITABLE_CONVERTERS.put(Text.class, (Converter<String>) Object::toString);
}
}