/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.pig.impl.util.avro;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * Object that wraps an Avro object in a tuple.
 * @param <T> The type of the Avro object
 */
public final class AvroTupleWrapper <T extends IndexedRecord>
    implements Tuple {
    private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
    private TupleFactory mTupleFactory = TupleFactory.getInstance();

  /**
   * The Avro object wrapped in the pig Tuple.
   */
  private T avroObject;

  /**
   * Creates a new AvroTupleWrapper object.
   * @param o The object to wrap
   */
  public AvroTupleWrapper(final T o) {
    avroObject = o;
  }

  @Override
  public void write(DataOutput out) throws IOException {
      Tuple t = mTupleFactory.newTupleNoCopy(getAll());
      t.write(out);
  }

  @SuppressWarnings("rawtypes")
  @Override
  public int compareTo(final Object o) {
    if (o instanceof AvroTupleWrapper) {
      return GenericData.get().compare(avroObject,
          ((AvroTupleWrapper) o).avroObject,
          avroObject.getSchema());
    }
    return -1;
  }

  @Override
  public void append(final Object o) {
    List<Field> fields = avroObject.getSchema().getFields();
    avroObject.put(fields.size(), o);
    Schema fieldSchema = null;
    if (o instanceof String) {
      fieldSchema = Schema.create(Type.STRING);
    } else if (o instanceof Integer) {
      fieldSchema = Schema.create(Type.INT);
    } else if (o instanceof Long) {
      fieldSchema = Schema.create(Type.LONG);
    } else if (o instanceof Double) {
      fieldSchema = Schema.create(Type.DOUBLE);
    } else if (o instanceof Float) {
      fieldSchema = Schema.create(Type.FLOAT);
    } else if (o == null) {
      fieldSchema = Schema.create(Type.NULL);
    } else if (o instanceof Boolean) {
      fieldSchema = Schema.create(Type.BOOLEAN);
    } else if (o instanceof Map) {
      fieldSchema = Schema.create(Type.MAP);
    }
    Field newField = new Field("FIELD_" + fields.size(), fieldSchema, "", null);
    fields.add(newField);
    avroObject.getSchema().setFields(fields);
  }

  @SuppressWarnings("unchecked")
  @Override
  public Object get(final int pos) throws ExecException {

    Schema s = avroObject.getSchema().getFields().get(pos).schema();
    Object o = avroObject.get(pos);

    switch(s.getType()) {
      case STRING:
        // unwrap avro UTF8 encoding
        return o.toString();
      case MAP:
        return new AvroMapWrapper((Map<CharSequence, Object>) o);
      case RECORD:
        return new AvroTupleWrapper<T>((T) o);
      case ENUM:
        return o.toString();
      case ARRAY:
        return new AvroBagWrapper<GenericData.Record>(
            (GenericArray<GenericData.Record>) o);
      case FIXED:
        return new DataByteArray(((GenericData.Fixed) o).bytes());
      case BYTES:
        return new DataByteArray(((ByteBuffer) o).array());
      case UNION:
        return getPigObject(o);
      default:
        return o;
    }
  }

  /**
   * @param o An Avro object to convert to an equivalent type in Pig
   * @return Equivalent Pig object
   */
  public static Object getPigObject(Object o) {
    if (o instanceof org.apache.avro.util.Utf8) {
      return o.toString();
    } else if (o instanceof IndexedRecord) {
      return new AvroTupleWrapper<IndexedRecord>((IndexedRecord) o);
    } else if (o instanceof GenericArray) {
      return new AvroBagWrapper<GenericData.Record>(
          (GenericArray<GenericData.Record>) o);
    } else if (o instanceof Map) {
      return new AvroMapWrapper((Map<CharSequence, Object>) o);
    } else if (o instanceof GenericData.Fixed) {
      return new DataByteArray(((GenericData.Fixed) o).bytes());
    } else if (o instanceof ByteBuffer) {
      return new DataByteArray(((ByteBuffer) o).array());
    } else if (o instanceof GenericEnumSymbol) {
      return o.toString();
    } else {
      return o;
    }
  }

  @Override
  public List<Object> getAll() {

    List<Object> all = Lists.newArrayList();
    for (Schema.Field f : avroObject.getSchema().getFields()) {
      try {
        all.add(get(f.pos()));
      } catch (ExecException e) {
        LOG.error("could not process tuple with contents " + avroObject, e);
        return null;
      }
    }
    return all;
  }

  @Override
  public long getMemorySize() {
    return getMemorySize(avroObject);
  }

  @SuppressWarnings({ "rawtypes", "unchecked" })
  private long getMemorySize(final IndexedRecord r) {
    int total = 0;
    final int bitsPerByte = 8;
    for (Field f : r.getSchema().getFields()) {
      switch (f.schema().getType()) {
      case BOOLEAN:
      case ENUM:
      case INT:
        total += Integer.SIZE << bitsPerByte;
        break;
      case DOUBLE:
        total += Double.SIZE << bitsPerByte;
        break;
      case FLOAT:
        total += Float.SIZE << bitsPerByte;
        break;
      case NULL:
        break;
      case STRING:
        total += ((String) r.get(f.pos())).length()
           * (Character.SIZE << bitsPerByte);
        break;
      case BYTES:
        total += ((Byte[]) r.get(f.pos())).length;
        break;
      case RECORD:
        total += new AvroTupleWrapper(
            (IndexedRecord) r.get(f.pos())).getMemorySize();
        break;
      case ARRAY:
        total += new AvroBagWrapper(
            (GenericArray) r.get(f.pos())).getMemorySize();
        break;
      }
    }
    return total;
  }



  @Override
  public byte getType(final int arg0) throws ExecException {
    Schema s = avroObject.getSchema().getFields().get(arg0).schema();
    return AvroStorageSchemaConversionUtilities.getPigType(s);
  }

  @Override
  public boolean isNull(final int arg0) throws ExecException {
    return avroObject == null || avroObject.get(arg0) == null;
  }

  @SuppressWarnings("unchecked")
  @Override
  public void reference(final Tuple arg0) {
    avroObject = (T) ((AvroTupleWrapper<T>) arg0).avroObject;
  }

  @Override
  public void set(final int arg0, final Object arg1) throws ExecException {
    avroObject.put(arg0, arg1);
  }

  @Override
  public int size() {
    return avroObject.getSchema().getFields().size();
  }

  @Override
  public String toDelimitedString(final String arg0) throws ExecException {
    StringBuffer delimitedString = new StringBuffer();
    boolean notfirst = false;
    for (Field f : avroObject.getSchema().getFields()) {
      if (notfirst) {
        delimitedString.append(arg0);
        notfirst = true;
      }
      Object val = avroObject.get(f.pos());
      if (val == null) {
          delimitedString.append("");
      } else {
          delimitedString.append(val.toString());
      }
    }
    return delimitedString.toString();
  }

  @Override
  public void readFields(final DataInput d) throws IOException {
    throw new IOException(
        this.getClass().toString()
        +  ".readFields called but not implemented yet");
  }

  @Override
  public Iterator<Object> iterator() {
    return Iterators.transform(avroObject.getSchema().getFields().iterator(),
        new Function<Schema.Field, Object>() {
            @Override
            public Object apply(final Field f) {
              return avroObject.get(f.pos());
            }
          }
        );
  }

}
