blob: 4a7c7c17083a07e7fd51c4115796df4f3c9e8f23 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.avro;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.WeakHashMap;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.Utf8;
import org.apache.gora.mapreduce.FakeResolvingDecoder;
import org.apache.gora.persistency.ListGenericArray;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.persistency.State;
import org.apache.gora.persistency.StatefulHashMap;
import org.apache.gora.persistency.StatefulMap;
import org.apache.gora.persistency.impl.StateManagerImpl;
import org.apache.gora.util.IOUtils;
/**
* PersistentDatumReader reads, fields' dirty and readable information.
*/
public class PersistentDatumReader<T extends Persistent>
extends SpecificDatumReader<T> {
private Schema rootSchema;
private T cachedPersistent; // for creating objects
private WeakHashMap<Decoder, ResolvingDecoder> decoderCache
= new WeakHashMap<Decoder, ResolvingDecoder>();
private boolean readDirtyBits = true;
public PersistentDatumReader() {
}
public PersistentDatumReader(Schema schema, boolean readDirtyBits) {
this.readDirtyBits = readDirtyBits;
setSchema(schema);
}
@Override
public void setSchema(Schema actual) {
this.rootSchema = actual;
super.setSchema(actual);
}
@SuppressWarnings("unchecked")
public T newPersistent() {
if(cachedPersistent == null) {
cachedPersistent = (T)super.newRecord(null, rootSchema);
return cachedPersistent; //we can return the cached object
}
return (T)cachedPersistent.newInstance(new StateManagerImpl());
}
@Override
protected Object newRecord(Object old, Schema schema) {
if(old != null) {
return old;
}
if(schema.equals(rootSchema)) {
return newPersistent();
} else {
return super.newRecord(old, schema);
}
}
@Override
@SuppressWarnings("unchecked")
public T read(T reuse, Decoder in) throws IOException {
return (T) read(reuse, rootSchema, in);
}
public Object read(Object reuse, Schema schema, Decoder decoder)
throws IOException {
return super.read(reuse, schema, getResolvingDecoder(decoder));
}
protected ResolvingDecoder getResolvingDecoder(Decoder decoder)
throws IOException {
ResolvingDecoder resolvingDecoder = decoderCache.get(decoder);
if(resolvingDecoder == null) {
resolvingDecoder = new FakeResolvingDecoder(rootSchema, decoder);
decoderCache.put(decoder, resolvingDecoder);
}
return resolvingDecoder;
}
@Override
@SuppressWarnings("unchecked")
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in)
throws IOException {
Object record = newRecord(old, expected);
//check if top-level
if(expected.equals(rootSchema) && readDirtyBits) {
T persistent = (T)record;
persistent.clear();
boolean[] dirtyFields = IOUtils.readBoolArray(in);
boolean[] readableFields = IOUtils.readBoolArray(in);
//read fields
int i = 0;
for (Field f : expected.getFields()) {
if(readableFields[f.pos()]) {
int pos = f.pos();
String name = f.name();
Object oldDatum = (old != null) ? getField(record, name, pos) : null;
setField(record, name, pos, read(oldDatum, f.schema(), in));
}
}
// Now set changed bits
for (i = 0; i < dirtyFields.length; i++) {
if (dirtyFields[i]) {
persistent.setDirty(i);
}
else {
persistent.clearDirty(i);
}
}
return record;
} else {
//since ResolvingDecoder.readFieldOrder is final, we cannot override it
//so this is a copy of super.readReacord, with the readFieldOrder change
for (Field f : expected.getFields()) {
int pos = f.pos();
String name = f.name();
Object oldDatum = (old != null) ? getField(record, name, pos) : null;
setField(record, name, pos, read(oldDatum, f.schema(), in));
}
return record;
}
}
@Override
@SuppressWarnings("unchecked")
protected Object readMap(Object old, Schema expected, ResolvingDecoder in)
throws IOException {
StatefulMap<Utf8, ?> map = (StatefulMap<Utf8, ?>) newMap(old, 0);
Map<Utf8, State> tempStates = null;
if (readDirtyBits) {
tempStates = new HashMap<Utf8, State>();
int size = in.readInt();
for (int j = 0; j < size; j++) {
Utf8 key = in.readString(null);
State state = State.values()[in.readInt()];
tempStates.put(key, state);
}
}
super.readMap(map, expected, in);
map.clearStates();
if (readDirtyBits) {
for (Entry<Utf8, State> entry : tempStates.entrySet()) {
map.putState(entry.getKey(), entry.getValue());
}
}
return map;
}
@Override
@SuppressWarnings({ "rawtypes" })
protected Object newMap(Object old, int size) {
if (old instanceof StatefulHashMap) {
((StatefulHashMap)old).reuse();
return old;
}
return new StatefulHashMap<Object, Object>();
}
/** Called to create new array instances. Subclasses may override to use a
* different array implementation. By default, this returns a
* GenericData.Array instance.*/
@Override
@SuppressWarnings("rawtypes")
protected Object newArray(Object old, int size, Schema schema) {
if (old instanceof ListGenericArray) {
((GenericArray) old).clear();
return old;
} else return new ListGenericArray(size, schema);
}
public Persistent clone(Persistent persistent, Schema schema) {
Persistent cloned = persistent.newInstance(new StateManagerImpl());
List<Field> fields = schema.getFields();
for(Field field: fields) {
int pos = field.pos();
switch(field.schema().getType()) {
case MAP :
case ARRAY :
case RECORD :
case STRING : cloned.put(pos, cloneObject(
field.schema(), persistent.get(pos), cloned.get(pos))); break;
case NULL : break;
default : cloned.put(pos, persistent.get(pos)); break;
}
}
return cloned;
}
@SuppressWarnings("unchecked")
protected Object cloneObject(Schema schema, Object toClone, Object cloned) {
if(toClone == null) {
return null;
}
switch(schema.getType()) {
case MAP :
Map<Utf8, Object> map = (Map<Utf8, Object>)newMap(cloned, 0);
for(Map.Entry<Utf8, Object> entry: ((Map<Utf8, Object>)toClone).entrySet()) {
map.put((Utf8)createString(entry.getKey().toString())
, cloneObject(schema.getValueType(), entry.getValue(), null));
}
return map;
case ARRAY :
GenericArray<Object> array = (GenericArray<Object>)
newArray(cloned, (int)((GenericArray<?>)toClone).size(), schema);
for(Object element: (GenericArray<Object>)toClone) {
array.add(cloneObject(schema.getElementType(), element, null));
}
return array;
case RECORD : return clone((Persistent)toClone, schema);
case STRING : return createString(toClone.toString());
default : return toClone; //shallow copy is enough
}
}
}