package org.example.textclassification

import org.apache.predictionio.controller.P2LAlgorithm
import org.apache.predictionio.controller.Params
import org.apache.spark.SparkContext
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import grizzled.slf4j.Logger
import org.apache.spark.mllib.regression.LabeledPoint

case class LRAlgorithmParams(regParam: Double) extends Params

class LRAlgorithm(val ap: LRAlgorithmParams)
  extends P2LAlgorithm[PreparedData, LRModel, Query, PredictedResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def train(sc: SparkContext, pd: PreparedData): LRModel = {

    // Import SQLContext for creating DataFrame.
    val sql: SQLContext = new SQLContext(sc)
    import sql.implicits._

    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setThreshold(0.5)
      .setRegParam(ap.regParam)

    val labels: Seq[Double] = pd.categoryMap.keys.toSeq

    val data = labels.foldLeft(pd.transformedData.map { case LabeledPoint(label, feature) =>
      // Convert old LabeledPoint to ML's LabeledPoint
      new org.apache.spark.ml.feature.LabeledPoint(label, feature.asML)
    }.toDF)( //transform to Spark DataFrame
      // Add the different binary columns for each label.
      (data: DataFrame, label: Double) => {
        // function: multiclass labels --> binary labels
        val f: UserDefinedFunction = functions.udf((e : Double) => if (e == label) 1.0 else 0.0)

        data.withColumn(label.toInt.toString, f(data("label")))
      }
    )

    // Create a logistic regression model for each class.
    val lrModels : Seq[(Double, LREstimate)] = labels.map(
      label => {
        val lab = label.toInt.toString

        val fit = lr.setLabelCol(lab).fit(
          data.select(lab, "features")
        )

        // Return (label, feature coefficients, and intercept term.
        (label, LREstimate(fit.coefficients.toArray, fit.intercept))

      }
    )

    new LRModel(
      tfIdf = pd.tfIdf,
      categoryMap = pd.categoryMap,
      lrModels = lrModels
    )
  }

  override
  def predict(model: LRModel, query: Query): PredictedResult = {
    model.predict(query.text)
  }
}

case class LREstimate (
  coefficients : Array[Double],
  intercept : Double
)

class LRModel(
  val tfIdf: TFIDFModel,
  val categoryMap: Map[Double, String],
  val lrModels: Seq[(Double, LREstimate)]) extends Serializable {

  /** Enable vector inner product for prediction. */
  private def innerProduct (x : Array[Double], y : Array[Double]) : Double = {
    x.zip(y).map(e => e._1 * e._2).sum
  }

  /** Define prediction rule. */
  def predict(text: String): PredictedResult = {
    val x: Array[Double] = tfIdf.transform(text).toArray

    // Logistic Regression binary formula for positive probability.
    // According to MLLib documentation, class labeled 0 is used as pivot.
    // Thus, we are using:
    // log(p1/p0) = log(p1/(1 - p1)) = b0 + xTb =: z
    // p1 = exp(z) * (1 - p1)
    // p1 * (1 + exp(z)) = exp(z)
    // p1 = exp(z)/(1 + exp(z))
    val pred = lrModels.map(
      e => {
        val z = scala.math.exp(innerProduct(e._2.coefficients, x) + e._2.intercept)
        (e._1, z / (1 + z))
      }
    ).maxBy(_._2)

    PredictedResult(categoryMap(pred._1), pred._2)
  }

  override def toString = s"LR model"
}
