/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.avro.generic;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Map;
import java.util.Collection;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;

/** {@link DatumWriter} for generic Java objects. */
public class GenericDatumWriter<D> implements DatumWriter<D> {
  private final GenericData data;
  private Schema root;

  public GenericDatumWriter() { this(GenericData.get()); }

  protected GenericDatumWriter(GenericData data) { this.data = data; }

  public GenericDatumWriter(Schema root) {
    this();
    setSchema(root);
  }

  public GenericDatumWriter(Schema root, GenericData data) {
    this(data);
    setSchema(root);
  }

  /** Return the {@link GenericData} implementation. */
  public GenericData getData() { return data; }

  public void setSchema(Schema root) { this.root = root; }

  public void write(D datum, Encoder out) throws IOException {
    write(root, datum, out);
  }

  /** Called to write data.*/
  protected void write(Schema schema, Object datum, Encoder out)
      throws IOException {
    LogicalType logicalType = schema.getLogicalType();
    if (datum != null && logicalType != null) {
      Conversion<?> conversion = getData()
          .getConversionByClass(datum.getClass(), logicalType);
      writeWithoutConversion(schema,
          convert(schema, logicalType, conversion, datum), out);
    } else {
      writeWithoutConversion(schema, datum, out);
    }
  }

  private <T> Object convert(Schema schema, LogicalType logicalType,
                             Conversion<T> conversion, Object datum) {
    if (conversion == null) {
      return datum;
    }
    Class<T> fromClass = conversion.getConvertedType();
    switch (schema.getType()) {
    case RECORD:  return conversion.toRecord(fromClass.cast(datum), schema, logicalType);
    case ENUM:    return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType);
    case ARRAY:   return conversion.toArray(fromClass.cast(datum), schema, logicalType);
    case MAP:     return conversion.toMap(fromClass.cast(datum), schema, logicalType);
    case FIXED:   return conversion.toFixed(fromClass.cast(datum), schema, logicalType);
    case STRING:  return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType);
    case BYTES:   return conversion.toBytes(fromClass.cast(datum), schema, logicalType);
    case INT:     return conversion.toInt(fromClass.cast(datum), schema, logicalType);
    case LONG:    return conversion.toLong(fromClass.cast(datum), schema, logicalType);
    case FLOAT:   return conversion.toFloat(fromClass.cast(datum), schema, logicalType);
    case DOUBLE:  return conversion.toDouble(fromClass.cast(datum), schema, logicalType);
    case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType);
    }
    return datum;
  }

  /** Called to write data.*/
  protected void writeWithoutConversion(Schema schema, Object datum, Encoder out)
    throws IOException {
    try {
      switch (schema.getType()) {
      case RECORD: writeRecord(schema, datum, out); break;
      case ENUM:   writeEnum(schema, datum, out);   break;
      case ARRAY:  writeArray(schema, datum, out);  break;
      case MAP:    writeMap(schema, datum, out);    break;
      case UNION:
        int index = resolveUnion(schema, datum);
        out.writeIndex(index);
        write(schema.getTypes().get(index), datum, out);
        break;
      case FIXED:   writeFixed(schema, datum, out);   break;
      case STRING:  writeString(schema, datum, out);  break;
      case BYTES:   writeBytes(datum, out);           break;
      case INT:     out.writeInt(((Number)datum).intValue()); break;
      case LONG:    out.writeLong((Long)datum);       break;
      case FLOAT:   out.writeFloat((Float)datum);     break;
      case DOUBLE:  out.writeDouble((Double)datum);   break;
      case BOOLEAN: out.writeBoolean((Boolean)datum); break;
      case NULL:    out.writeNull();                  break;
      default: error(schema,datum);
      }
    } catch (NullPointerException e) {
      throw npe(e, " of "+schema.getFullName());
    }
  }

  /** Helper method for adding a message to an NPE. */
  protected NullPointerException npe(NullPointerException e, String s) {
    NullPointerException result = new NullPointerException(e.getMessage()+s);
    result.initCause(e.getCause() == null ? e : e.getCause());
    return result;
  }

  /** Called to write a record.  May be overridden for alternate record
   * representations.*/
  protected void writeRecord(Schema schema, Object datum, Encoder out)
    throws IOException {
    Object state = data.getRecordState(datum, schema);
    for (Field f : schema.getFields()) {
      writeField(datum, f, out, state);
    }
  }

  /** Called to write a single field of a record. May be overridden for more
   * efficient or alternate implementations.*/
  protected void writeField(Object datum, Field f, Encoder out, Object state)
      throws IOException {
    Object value = data.getField(datum, f.name(), f.pos(), state);
    try {
      write(f.schema(), value, out);
    } catch (NullPointerException e) {
      throw npe(e, " in field " + f.name());
    }
  }

  /** Called to write an enum value.  May be overridden for alternate enum
   * representations.*/
  protected void writeEnum(Schema schema, Object datum, Encoder out)
    throws IOException {
    if (!data.isEnum(datum))
      throw new AvroTypeException("Not an enum: "+datum);
    out.writeEnum(schema.getEnumOrdinal(datum.toString()));
  }

  /** Called to write a array.  May be overridden for alternate array
   * representations.*/
  protected void writeArray(Schema schema, Object datum, Encoder out)
    throws IOException {
    Schema element = schema.getElementType();
    long size = getArraySize(datum);
    long actualSize = 0;
    out.writeArrayStart();
    out.setItemCount(size);
    for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();) {
      out.startItem();
      write(element, it.next(), out);
      actualSize++;
    }
    out.writeArrayEnd();
    if (actualSize != size) {
      throw new ConcurrentModificationException("Size of array written was " +
          size + ", but number of elements written was " + actualSize + ". ");
    }
  }

  /** Called to find the index for a datum within a union.  By default calls
   * {@link GenericData#resolveUnion(Schema,Object)}.*/
  protected int resolveUnion(Schema union, Object datum) {
    return data.resolveUnion(union, datum);
  }

  /** Called by the default implementation of {@link #writeArray} to get the
   * size of an array.  The default implementation is for {@link Collection}.*/
  @SuppressWarnings("unchecked")
  protected long getArraySize(Object array) {
    return ((Collection) array).size();
  }

  /** Called by the default implementation of {@link #writeArray} to enumerate
   * array elements.  The default implementation is for {@link Collection}.*/
  @SuppressWarnings("unchecked")
  protected Iterator<? extends Object> getArrayElements(Object array) {
    return ((Collection) array).iterator();
  }

  /** Called to write a map.  May be overridden for alternate map
   * representations.*/
  protected void writeMap(Schema schema, Object datum, Encoder out)
    throws IOException {
    Schema value = schema.getValueType();
    int size = getMapSize(datum);
    int actualSize = 0;
    out.writeMapStart();
    out.setItemCount(size);
    for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
      out.startItem();
      writeString(entry.getKey().toString(), out);
      write(value, entry.getValue(), out);
      actualSize++;
    }
    out.writeMapEnd();
    if (actualSize != size) {
      throw new ConcurrentModificationException("Size of map written was " +
          size + ", but number of entries written was " + actualSize + ". ");
    }
  }

  /** Called by the default implementation of {@link #writeMap} to get the size
   * of a map.  The default implementation is for {@link Map}.*/
  @SuppressWarnings("unchecked")
  protected int getMapSize(Object map) {
    return ((Map) map).size();
  }

  /** Called by the default implementation of {@link #writeMap} to enumerate
   * map elements.  The default implementation is for {@link Map}.*/
  @SuppressWarnings("unchecked")
  protected Iterable<Map.Entry<Object,Object>> getMapEntries(Object map) {
    return ((Map) map).entrySet();
  }

  /** Called to write a string.  May be overridden for alternate string
   * representations.*/
  protected void writeString(Schema schema, Object datum, Encoder out)
    throws IOException {
    writeString(datum, out);
  }
  /** Called to write a string.  May be overridden for alternate string
   * representations.*/
  protected void writeString(Object datum, Encoder out) throws IOException {
    out.writeString((CharSequence) datum);
  }

  /** Called to write a bytes.  May be overridden for alternate bytes
   * representations.*/
  protected void writeBytes(Object datum, Encoder out) throws IOException {
    out.writeBytes((ByteBuffer)datum);
  }

  /** Called to write a fixed value.  May be overridden for alternate fixed
   * representations.*/
  protected void writeFixed(Schema schema, Object datum, Encoder out)
    throws IOException {
    out.writeFixed(((GenericFixed)datum).bytes(), 0, schema.getFixedSize());
  }

  private void error(Schema schema, Object datum) {
    throw new AvroTypeException("Not a "+schema+": "+datum);
  }

}

