/*
 * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datatorrent.contrib.cassandra;

import com.datastax.driver.core.*;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
import com.datatorrent.lib.util.PojoUtils.GetterDouble;
import com.datatorrent.lib.util.PojoUtils.GetterFloat;
import com.datatorrent.lib.util.PojoUtils.GetterInt;
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.Getter;
import java.math.BigDecimal;
import java.util.*;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * CassandraOutputOperator class.</p>
 * A Generic implementation of AbstractCassandraTransactionableOutputOperatorPS which takes in any POJO.
 *
 * @displayName Cassandra Output Operator
 * @category Output
 * @tags output operator
 * @since 2.1.0
 */
public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
{
  @NotNull
  private ArrayList<String> columns;
  private final transient ArrayList<DataType> columnDataTypes;
  @NotNull
  private ArrayList<String> expressions;
  private final transient ArrayList<Object> getters;

  /*
   * An ArrayList of Java expressions that will yield the field value from the POJO.
   * Each expression corresponds to one column in the Cassandra table.
   */
  public ArrayList<String> getExpressions()
  {
    return expressions;
  }

  public void setExpressions(ArrayList<String> expressions)
  {
    this.expressions = expressions;
  }

  /*
   * An ArrayList of Columns in the Cassandra Table.
   */
  public ArrayList<String> getColumns()
  {
    return columns;
  }

  public void setColumns(ArrayList<String> columns)
  {
    this.columns = columns;
  }

  @NotNull
  private String tablename;


  /*
   * Tablename in cassandra.
   */
  public String getTablename()
  {
    return tablename;
  }

  public void setTablename(String tablename)
  {
    this.tablename = tablename;
  }

  public CassandraPOJOOutputOperator()
  {
    super();
    columnDataTypes = new ArrayList<DataType>();
    getters = new ArrayList<Object>();
  }

  public void processFirstTuple(Object tuple)
  {
    com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);

    final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();

    final int numberOfColumns = rsMetaData.size();
    final Class<?> fqcn = tuple.getClass();

    for (int i = 0; i < numberOfColumns; i++) {
      // get the designated column's data type.
      final DataType type = rsMetaData.getType(i);
      columnDataTypes.add(type);
      final Object getter;
      final String getterExpr = expressions.get(i);
      switch (type.getName()) {
        case ASCII:
        case TEXT:
        case VARCHAR:
          getter = PojoUtils.createGetter(fqcn, getterExpr, String.class);
          break;
        case BOOLEAN:
          getter = PojoUtils.createGetterBoolean(fqcn, getterExpr);
          break;
        case INT:
          getter = PojoUtils.createGetterInt(fqcn, getterExpr);
          break;
        case BIGINT:
        case COUNTER:
          getter = PojoUtils.createGetterLong(fqcn, getterExpr);
          break;
        case FLOAT:
          getter = PojoUtils.createGetterFloat(fqcn, getterExpr);
          break;
        case DOUBLE:
          getter = PojoUtils.createGetterDouble(fqcn, getterExpr);
          break;
        case DECIMAL:
          getter = PojoUtils.createGetter(fqcn, getterExpr, BigDecimal.class);
          break;
        case SET:
          getter = PojoUtils.createGetter(fqcn, getterExpr, Set.class);
          break;
        case MAP:
          getter = PojoUtils.createGetter(fqcn, getterExpr, Map.class);
          break;
        case LIST:
          getter = PojoUtils.createGetter(fqcn, getterExpr, List.class);
          break;
        case TIMESTAMP:
          getter = PojoUtils.createGetter(fqcn, getterExpr, Date.class);
          break;
        case UUID:
          getter = PojoUtils.createGetter(fqcn, getterExpr, UUID.class);
          break;
        default:
          getter = PojoUtils.createGetter(fqcn, getterExpr, Object.class);
          break;
      }
      getters.add(getter);
    }
  }

  @Override
  protected PreparedStatement getUpdateCommand()
  {
    StringBuilder queryfields = new StringBuilder("");
    StringBuilder values = new StringBuilder("");
    for (String column: columns) {
      if (queryfields.length() == 0) {
        queryfields.append(column);
        values.append("?");
      }
      else {
        queryfields.append(",").append(column);
        values.append(",").append("?");
      }
    }
    String statement
            = "INSERT INTO " + store.keyspace + "."
            + tablename
            + " (" + queryfields.toString() + ") "
            + "VALUES (" + values.toString() + ");";
    LOG.debug("statement is {}", statement);
    return store.getSession().prepare(statement);
  }

  @Override
  @SuppressWarnings("unchecked")
  protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException
  {
    if (getters.isEmpty()) {
      processFirstTuple(tuple);
    }
    final BoundStatement boundStmnt = new BoundStatement(updateCommand);
    final int size = columnDataTypes.size();
    for (int i = 0; i < size; i++) {
      final DataType type = columnDataTypes.get(i);
      switch (type.getName()) {
        case UUID:
          final UUID id = ((Getter<Object, UUID>)getters.get(i)).get(tuple);
          boundStmnt.setUUID(i, id);
          break;
        case ASCII:
        case VARCHAR:
        case TEXT:
          final String ascii = ((Getter<Object, String>)getters.get(i)).get(tuple);
          boundStmnt.setString(i, ascii);
          break;
        case BOOLEAN:
          final boolean bool = ((GetterBoolean<Object>)getters.get(i)).get(tuple);
          boundStmnt.setBool(i, bool);
          break;
        case INT:
          final int intValue = ((GetterInt<Object>)getters.get(i)).get(tuple);
          boundStmnt.setInt(i, intValue);
          break;
        case BIGINT:
        case COUNTER:
          final long longValue = ((GetterLong<Object>)getters.get(i)).get(tuple);
          boundStmnt.setLong(i, longValue);
          break;
        case FLOAT:
          final float floatValue = ((GetterFloat<Object>)getters.get(i)).get(tuple);
          boundStmnt.setFloat(i, floatValue);
          break;
        case DOUBLE:
          final double doubleValue = ((GetterDouble<Object>)getters.get(i)).get(tuple);
          boundStmnt.setDouble(i, doubleValue);
          break;
        case DECIMAL:
          final BigDecimal decimal = ((Getter<Object, BigDecimal>)getters.get(i)).get(tuple);
          boundStmnt.setDecimal(i, decimal);
          break;
        case SET:
          Set<?> set = ((Getter<Object, Set<?>>)getters.get(i)).get(tuple);
          boundStmnt.setSet(i, set);
          break;
        case MAP:
          final Map<?,?> map = ((Getter<Object, Map<?,?>>)getters.get(i)).get(tuple);
          boundStmnt.setMap(i, map);
          break;
        case LIST:
          final List<?> list = ((Getter<Object, List<?>>)getters.get(i)).get(tuple);
          boundStmnt.setList(i, list);
          break;
        case TIMESTAMP:
          final Date date = ((Getter<Object, Date>)getters.get(i)).get(tuple);
          boundStmnt.setDate(i, date);
          break;
        default:
          throw new RuntimeException("unsupported data type " + type.getName());
      }
    }
    return boundStmnt;
  }

  private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
}
