/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.kafka;

import org.apache.drill.common.FunctionNames;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
import org.apache.drill.common.expression.ValueExpressions.DateExpression;
import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
import org.apache.drill.common.expression.ValueExpressions.IntExpression;
import org.apache.drill.common.expression.ValueExpressions.LongExpression;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

class KafkaNodeProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {

  private String functionName;
  private Boolean success;
  private Long value;
  private String path;

  public KafkaNodeProcessor(String functionName) {
    this.functionName = functionName;
    this.success = false;
  }

  public static boolean isPushdownFunction(String functionName) {
    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
  }

  @Override
  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
    return false;
  }

  public static KafkaNodeProcessor process(FunctionCall call) {
    String functionName = call.getName();
    LogicalExpression nameArg = call.arg(0);
    LogicalExpression valueArg = call.argCount() >= 2? call.arg(1) : null;
    KafkaNodeProcessor evaluator = new KafkaNodeProcessor(functionName);

    if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
      LogicalExpression swapArg = valueArg;
      valueArg = nameArg;
      nameArg = swapArg;
      evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
    }
    evaluator.success = nameArg.accept(evaluator, valueArg);
    return evaluator;
  }

  public boolean isSuccess() {
    // TODO Auto-generated method stub
    return success;
  }

  public String getPath() {
    return path;
  }

  public Long getValue() {
    return value;
  }

  public String getFunctionName() {
    return functionName;
  }

  @Override
  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
    this.path = path.getRootSegmentPath();

    if(valueArg == null) {
      return false;
    }

    switch (this.path) {
      case "kafkaMsgOffset":
        /*
         * Do not pushdown FunctionNames.NE on kafkaMsgOffset.
         */
        if(functionName.equals(FunctionNames.NE)) {
          return false;
        }
      case "kafkaPartitionId":
        if(valueArg instanceof IntExpression) {
          value = (long) ((IntExpression) valueArg).getInt();
          return true;
        }

        if(valueArg instanceof LongExpression) {
          value = ((LongExpression) valueArg).getLong();
          return true;
        }
        break;
      case "kafkaMsgTimestamp":
        /*
        Only pushdown FunctionNames.EQ, FunctionNames.GT, "greater_than_or_equal" on kafkaMsgTimestamp
         */
        if(!functionName.equals(FunctionNames.EQ) && !functionName.equals(FunctionNames.GT)
               && !functionName.equals(FunctionNames.GE)) {
          return false;
        }

        if(valueArg instanceof LongExpression) {
          value = ((LongExpression) valueArg).getLong();
          return true;
        }

        if (valueArg instanceof DateExpression) {
          value = ((DateExpression)valueArg).getDate();
          return true;
        }

        if (valueArg instanceof TimeExpression) {
          value = (long) ((TimeExpression)valueArg).getTime();
          return true;
        }

        if (valueArg instanceof TimeStampExpression) {
          value = ((TimeStampExpression) valueArg).getTimeStamp();
          return true;
        }

        if(valueArg instanceof IntExpression) {
          value = (long) ((IntExpression) valueArg).getInt();
          return true;
        }
        break;
    }
    return false;
  }

  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
  static {
    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
    VALUE_EXPRESSION_CLASSES = builder
                                   .add(BooleanExpression.class)
                                   .add(DateExpression.class)
                                   .add(DoubleExpression.class)
                                   .add(FloatExpression.class)
                                   .add(IntExpression.class)
                                   .add(LongExpression.class)
                                   .add(QuotedString.class)
                                   .add(TimeExpression.class)
                                   .build();
  }

  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
  static {
    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
                                          .put(FunctionNames.EQ, FunctionNames.EQ)
                                          .put(FunctionNames.NE, FunctionNames.NE)
                                          .put(FunctionNames.GE, FunctionNames.LE)
                                          .put(FunctionNames.GT, FunctionNames.LT)
                                          .put(FunctionNames.LE, FunctionNames.GE)
                                          .put(FunctionNames.LT, FunctionNames.GT)
                                          .build();
  }
}


