/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.contrib.avro;

import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
import com.datatorrent.lib.util.PojoUtils;

/**
 * <p>
 * AvroToPojo
 * </p>
 * A generic implementation for conversion from Avro to POJO. The POJO class
 * name & field mapping should be provided by the user.<br>
 * If this mapping is not provided then reflection is used to determine this
 * mapping.<br>
 * As of now only primitive types are supported.<br>
 * Error records are emitted on the errorPort if connected
 *
 * @displayName Avro To Pojo
 * @category Converter
 * @tags avro
 *
 * @since 3.4.0
 */
@InterfaceStability.Evolving
public class AvroToPojo extends BaseOperator
{

  private transient Class<?> pojoClass;

  private static final String FIELD_SEPARATOR = ":";
  private static final String RECORD_SEPARATOR = ",";

  private String genericRecordToPOJOFieldsMapping = null;

  private transient List<FieldInfo> fieldInfos;

  private transient List<ActiveFieldInfo> columnFieldSetters;

  @AutoMetric
  @VisibleForTesting
  int recordCount = 0;

  @AutoMetric
  @VisibleForTesting
  int errorCount = 0;

  @AutoMetric
  @VisibleForTesting
  int fieldErrorCount = 0;
  
  public final transient DefaultOutputPort<GenericRecord> errorPort = new DefaultOutputPort<GenericRecord>();

  /**
   * Returns a string representing mapping from generic record to POJO fields
   */
  public String getGenericRecordToPOJOFieldsMapping()
  {
    return genericRecordToPOJOFieldsMapping;
  }

  /**
   * Comma separated list mapping a field in Avro schema to POJO field eg :
   * orderId:orderId:INTEGER,total:total:DOUBLE
   */
  public void setGenericRecordToPOJOFieldsMapping(String genericRecordToPOJOFieldsMapping)
  {
    this.genericRecordToPOJOFieldsMapping = genericRecordToPOJOFieldsMapping;
  }

  public final transient DefaultInputPort<GenericRecord> data = new DefaultInputPort<GenericRecord>()
  {
    @Override
    public void process(GenericRecord tuple)
    {
      processTuple(tuple);
    }
  };

  /**
   * Converts given Generic Record and to a POJO and emits it
   */
  protected void processTuple(GenericRecord tuple)
  {
    try {
      Object obj = getPOJOFromGenericRecord(tuple);

      if (obj != null) {
        output.emit(obj);
        recordCount++;
      } else if (errorPort.isConnected()) {
        errorPort.emit(tuple);
        errorCount++;
      }

    } catch (InstantiationException | IllegalAccessException e) {
      LOG.error("Could not initialize object of class -" + getClass().getName(), e);
      errorCount++;
    }
  }

  /**
   * Returns a POJO from a Generic Record
   *
   * @return Object
   */
  @SuppressWarnings("unchecked")
  private Object getPOJOFromGenericRecord(GenericRecord tuple) throws InstantiationException, IllegalAccessException
  {
    Object newObj = getPojoClass().newInstance();

    try {
      for (int i = 0; i < columnFieldSetters.size(); i++) {

        AvroToPojo.ActiveFieldInfo afi = columnFieldSetters.get(i);
        SupportType st = afi.fieldInfo.getType();
        Object val = null;

        try {
          val = tuple.get(afi.fieldInfo.getColumnName());
        } catch (Exception e) {
          LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in the generic record", e);
          val = null;
          fieldErrorCount++;
        }

        if (val == null) {
          continue;
        }

        try {
          switch (st) {
            case BOOLEAN:
              ((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj,
                  (boolean)tuple.get(afi.fieldInfo.getColumnName()));
              break;

            case DOUBLE:
              ((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj,
                  (double)tuple.get(afi.fieldInfo.getColumnName()));
              break;

            case FLOAT:
              ((PojoUtils.SetterFloat<Object>)afi.setterOrGetter).set(newObj,
                  (float)tuple.get(afi.fieldInfo.getColumnName()));
              break;

            case INTEGER:
              ((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj,
                  (int)tuple.get(afi.fieldInfo.getColumnName()));
              break;

            case STRING:
              ((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(newObj,
                  new String(tuple.get(afi.fieldInfo.getColumnName()).toString()));
              break;

            case LONG:
              ((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj,
                  (long)tuple.get(afi.fieldInfo.getColumnName()));
              break;

            default:
              throw new AvroRuntimeException("Invalid Support Type");

          }
        } catch (AvroRuntimeException e) {
          LOG.error("Exception in setting value", e);
          fieldErrorCount++;
        }

      }
    } catch (Exception ex) {
      LOG.error("Generic Exception in setting value" + ex.getMessage());
      errorCount++;
      newObj = null;
    }
    return newObj;
  }

  /**
   * Use reflection to generate field info values if the user has not provided
   * the inputs mapping
   *
   * @return String representing the POJO field to Avro field mapping
   */
  private String generateFieldInfoInputs(Class<?> cls)
  {
    java.lang.reflect.Field[] fields = cls.getDeclaredFields();
    StringBuilder sb = new StringBuilder();

    for (int i = 0; i < fields.length; i++) {
      java.lang.reflect.Field f = fields[i];
      Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
      sb.append(f.getName()).append(FIELD_SEPARATOR).append(f.getName()).append(FIELD_SEPARATOR)
          .append(c.getSimpleName().toUpperCase()).append(RECORD_SEPARATOR);
    }
    return sb.substring(0, sb.length() - 1);
  }

  /**
   * Creates a map representing fieldName in POJO:field in Generic Record:Data
   * type
   *
   * @return List of FieldInfo
   */
  private List<FieldInfo> createFieldInfoMap(String str)
  {
    fieldInfos = new ArrayList<FieldInfo>();
    StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR);

    while (strtok.hasMoreTokens()) {
      String[] token = strtok.nextToken().split(FIELD_SEPARATOR);
      try {
        fieldInfos.add(new FieldInfo(token[0], token[1], SupportType.valueOf(token[2])));
      } catch (Exception e) {
        LOG.error("Invalid support type", e);
      }
    }
    return fieldInfos;
  }

  @OutputPortFieldAnnotation(schemaRequired = true)
  public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
  {
    public void setup(PortContext context)
    {
      setPojoClass(context.getValue(Context.PortContext.TUPLE_CLASS));

      columnFieldSetters = Lists.newArrayList();

      /**
       * Check if the mapping of Generic record fields to POJO is given, else
       * use reflection
       */
      if (getGenericRecordToPOJOFieldsMapping() == null) {
        setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getPojoClass())));
      } else {
        setFieldInfos(createFieldInfoMap(getGenericRecordToPOJOFieldsMapping()));
      }

      initColumnFieldSetters(getFieldInfos());
      initializeActiveFieldSetters();
    }
  };

  @Override
  public void endWindow()
  {
    errorCount = 0;
    fieldErrorCount = 0;
    recordCount = 0;

  }

  private Class<?> getPojoClass()
  {
    return pojoClass;
  }

  public void setPojoClass(Class<?> pojoClass)
  {
    this.pojoClass = pojoClass;
  }

  /**
   * Class that maps fieldInfo to its getters or setters
   */
  protected static class ActiveFieldInfo
  {
    final FieldInfo fieldInfo;
    Object setterOrGetter;

    ActiveFieldInfo(FieldInfo fieldInfo)
    {
      this.fieldInfo = fieldInfo;
    }

  }

  /**
   * A list of {@link FieldInfo}s where each item maps a column name to a pojo
   * field name.
   */
  private List<FieldInfo> getFieldInfos()
  {
    return fieldInfos;
  }

  /**
   * Add the Active Fields to the columnFieldSetters {@link ActiveFieldInfo}s
   */
  private void initColumnFieldSetters(List<FieldInfo> fieldInfos)
  {
    for (FieldInfo fi : fieldInfos) {
      if (columnFieldSetters == null) {
        columnFieldSetters = Lists.newArrayList();
      }
      columnFieldSetters.add(new AvroToPojo.ActiveFieldInfo(fi));
    }
  }

  /**
   * Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a
   * pojo field name.<br/>
   * The value from fieldInfo.column is assigned to
   * fieldInfo.pojoFieldExpression.
   *
   * @description $[].columnName name of the Output Field in POJO
   * @description $[].pojoFieldExpression expression to get the respective field
   *              from generic record
   * @useSchema $[].pojoFieldExpression outputPort.fields[].name
   */
  private void setFieldInfos(List<FieldInfo> fieldInfos)
  {
    this.fieldInfos = fieldInfos;
  }

  /**
   * Initialize the setters for generating the POJO
   */
  private void initializeActiveFieldSetters()
  {
    for (int i = 0; i < columnFieldSetters.size(); i++) {
      ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);

      SupportType st = activeFieldInfo.fieldInfo.getType();

      switch (st) {

        case BOOLEAN:

          activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case DOUBLE:
          activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case FLOAT:
          activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case INTEGER:
          activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        case STRING:
          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression(), activeFieldInfo.fieldInfo.getType().getJavaType());
          break;

        case LONG:
          activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression());
          break;

        default:
          activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(),
              activeFieldInfo.fieldInfo.getPojoFieldExpression(), Byte.class);
          break;
      }

      columnFieldSetters.get(i).setterOrGetter = activeFieldInfo.setterOrGetter;
    }
  }

  private static final Logger LOG = LoggerFactory.getLogger(AvroToPojo.class);

}
