blob: 648e63910d6cbe55e02656f956f7b374b535514b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.util.avro;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Object that wraps an Avro object in a tuple.
* @param <T> The type of the Avro object
*/
public final class AvroTupleWrapper <T extends IndexedRecord>
implements Tuple {
private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
/**
* The Avro object wrapped in the pig Tuple.
*/
private T avroObject;
/**
* Creates a new AvroTupleWrapper object.
* @param o The object to wrap
*/
public AvroTupleWrapper(final T o) {
avroObject = o;
}
@Override
public void write(final DataOutput o) throws IOException {
throw new IOException(
this.getClass().toString() + ".write called, but not implemented yet");
}
@SuppressWarnings("rawtypes")
@Override
public int compareTo(final Object o) {
if (o instanceof AvroTupleWrapper) {
return GenericData.get().compare(avroObject,
((AvroTupleWrapper) o).avroObject,
avroObject.getSchema());
}
return -1;
}
@Override
public void append(final Object o) {
List<Field> fields = avroObject.getSchema().getFields();
avroObject.put(fields.size(), o);
Schema fieldSchema = null;
if (o instanceof String) {
fieldSchema = Schema.create(Type.STRING);
} else if (o instanceof Integer) {
fieldSchema = Schema.create(Type.INT);
} else if (o instanceof Long) {
fieldSchema = Schema.create(Type.LONG);
} else if (o instanceof Double) {
fieldSchema = Schema.create(Type.DOUBLE);
} else if (o instanceof Float) {
fieldSchema = Schema.create(Type.FLOAT);
} else if (o == null) {
fieldSchema = Schema.create(Type.NULL);
} else if (o instanceof Boolean) {
fieldSchema = Schema.create(Type.BOOLEAN);
} else if (o instanceof Map) {
fieldSchema = Schema.create(Type.MAP);
}
Field newField = new Field("FIELD_" + fields.size(), fieldSchema, "", null);
fields.add(newField);
avroObject.getSchema().setFields(fields);
}
@SuppressWarnings("unchecked")
@Override
public Object get(final int pos) throws ExecException {
Schema s = avroObject.getSchema().getFields().get(pos).schema();
Object o = avroObject.get(pos);
switch(s.getType()) {
case STRING:
// unwrap avro UTF8 encoding
return o.toString();
case MAP:
return new AvroMapWrapper((Map<CharSequence, Object>) o);
case RECORD:
return new AvroTupleWrapper<T>((T) o);
case ENUM:
return o.toString();
case ARRAY:
return new AvroBagWrapper<GenericData.Record>(
(GenericArray<GenericData.Record>) o);
case FIXED:
return new DataByteArray(((GenericData.Fixed) o).bytes());
case BYTES:
return new DataByteArray(((ByteBuffer) o).array());
case UNION:
return getPigObject(o);
default:
return o;
}
}
/**
* @param o An Avro object to convert to an equivalent type in Pig
* @return Equivalent Pig object
*/
public static Object getPigObject(Object o) {
if (o instanceof org.apache.avro.util.Utf8) {
return o.toString();
} else if (o instanceof IndexedRecord) {
return new AvroTupleWrapper<IndexedRecord>((IndexedRecord) o);
} else if (o instanceof GenericArray) {
return new AvroBagWrapper<GenericData.Record>(
(GenericArray<GenericData.Record>) o);
} else if (o instanceof Map) {
return new AvroMapWrapper((Map<CharSequence, Object>) o);
} else if (o instanceof GenericData.Fixed) {
return new DataByteArray(((GenericData.Fixed) o).bytes());
} else if (o instanceof ByteBuffer) {
return new DataByteArray(((ByteBuffer) o).array());
} else if (o instanceof GenericEnumSymbol) {
return o.toString();
} else {
return o;
}
}
@Override
public List<Object> getAll() {
List<Object> all = Lists.newArrayList();
for (Schema.Field f : avroObject.getSchema().getFields()) {
try {
all.add(get(f.pos()));
} catch (ExecException e) {
LOG.error("could not process tuple with contents " + avroObject, e);
return null;
}
}
return all;
}
@Override
public long getMemorySize() {
return getMemorySize(avroObject);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private long getMemorySize(final IndexedRecord r) {
int total = 0;
final int bitsPerByte = 8;
for (Field f : r.getSchema().getFields()) {
switch (f.schema().getType()) {
case BOOLEAN:
case ENUM:
case INT:
total += Integer.SIZE << bitsPerByte;
break;
case DOUBLE:
total += Double.SIZE << bitsPerByte;
break;
case FLOAT:
total += Float.SIZE << bitsPerByte;
break;
case NULL:
break;
case STRING:
total += ((String) r.get(f.pos())).length()
* (Character.SIZE << bitsPerByte);
break;
case BYTES:
total += ((Byte[]) r.get(f.pos())).length;
break;
case RECORD:
total += new AvroTupleWrapper(
(IndexedRecord) r.get(f.pos())).getMemorySize();
break;
case ARRAY:
total += new AvroBagWrapper(
(GenericArray) r.get(f.pos())).getMemorySize();
break;
}
}
return total;
}
@Override
public byte getType(final int arg0) throws ExecException {
Schema s = avroObject.getSchema().getFields().get(arg0).schema();
return AvroStorageSchemaConversionUtilities.getPigType(s);
}
@Override
public boolean isNull(final int arg0) throws ExecException {
return avroObject == null || avroObject.get(arg0) == null;
}
@SuppressWarnings("unchecked")
@Override
public void reference(final Tuple arg0) {
avroObject = (T) ((AvroTupleWrapper<T>) arg0).avroObject;
}
@Override
public void set(final int arg0, final Object arg1) throws ExecException {
avroObject.put(arg0, arg1);
}
@Override
public int size() {
return avroObject.getSchema().getFields().size();
}
@Override
public String toDelimitedString(final String arg0) throws ExecException {
StringBuffer delimitedString = new StringBuffer();
boolean notfirst = false;
for (Field f : avroObject.getSchema().getFields()) {
if (notfirst) {
delimitedString.append(arg0);
notfirst = true;
}
Object val = avroObject.get(f.pos());
if (val == null) {
delimitedString.append("");
} else {
delimitedString.append(val.toString());
}
}
return delimitedString.toString();
}
@Override
public void readFields(final DataInput d) throws IOException {
throw new IOException(
this.getClass().toString()
+ ".readFields called but not implemented yet");
}
@Override
public Iterator<Object> iterator() {
return Iterators.transform(avroObject.getSchema().getFields().iterator(),
new Function<Schema.Field, Object>() {
@Override
public Object apply(final Field f) {
return avroObject.get(f.pos());
}
}
);
}
}