blob: f1c788e4fafd4b75403356f60153a778ccfa5845 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.ml.preprocessing
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{DataSet, _}
import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
import org.apache.flink.ml.math.{Vector, VectorBuilder}
import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer}
import org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree
import scala.reflect.ClassTag
/** Maps a vector into the polynomial feature space.
*
* This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the
* polynomial feature space of degree `d`. That is to say, it calculates the following
* representation:
*
* `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T`
*
* This transformer can be prepended to all [[org.apache.flink.ml.pipeline.Transformer]] and
* [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect an input of
* [[LabeledVector]].
*
* @example
* {{{
* val trainingDS: DataSet[LabeledVector] = ...
*
* val polyFeatures = PolynomialFeatures()
* .setDegree(3)
*
* val mlr = MultipleLinearRegression()
*
* val pipeline = polyFeatures.chainPredictor(mlr)
*
* pipeline.fit(trainingDS)
* }}}
*
* =Parameters=
*
* - [[org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree]]: Maximum polynomial degree
*/
class PolynomialFeatures extends Transformer[PolynomialFeatures] {
def setDegree(degree: Int): PolynomialFeatures = {
parameters.add(Degree, degree)
this
}
}
object PolynomialFeatures{
// ====================================== Parameters =============================================
case object Degree extends Parameter[Int] {
override val defaultValue: Option[Int] = Some(1)
}
// =================================== Factory methods ===========================================
def apply(): PolynomialFeatures = {
new PolynomialFeatures()
}
// ====================================== Operations =============================================
/** The [[PolynomialFeatures]] transformer does not need a fitting phase.
*
* @tparam T The fitting works with arbitrary input types
* @return
*/
implicit def fitNoOp[T] = {
new FitOperation[PolynomialFeatures, T]{
override def fit(
instance: PolynomialFeatures,
fitParameters: ParameterMap,
input: DataSet[T])
: Unit = {}
}
}
/** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a [[Vector]] into the
* polynomial feature space.
*
* @tparam T Subclass of [[Vector]]
* @return
*/
implicit def transformVectorIntoPolynomialBase[
T <: Vector : VectorBuilder: TypeInformation: ClassTag
] = {
new TransformDataSetOperation[PolynomialFeatures, T, T] {
override def transformDataSet(
instance: PolynomialFeatures,
transformParameters: ParameterMap,
input: DataSet[T])
: DataSet[T] = {
val resultingParameters = instance.parameters ++ transformParameters
val degree = resultingParameters(Degree)
input.map {
vector => {
calculatePolynomial(degree, vector)
}
}
}
}
}
/** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a [[LabeledVector]] into the
* polynomial feature space
*/
implicit val transformLabeledVectorIntoPolynomialBase =
new TransformDataSetOperation[PolynomialFeatures, LabeledVector, LabeledVector] {
override def transformDataSet(
instance: PolynomialFeatures,
transformParameters: ParameterMap,
input: DataSet[LabeledVector])
: DataSet[LabeledVector] = {
val resultingParameters = instance.parameters ++ transformParameters
val degree = resultingParameters(Degree)
input.map {
labeledVector => {
val vector = labeledVector.vector
val label = labeledVector.label
val transformedVector = calculatePolynomial(degree, vector)
LabeledVector(label, transformedVector)
}
}
}
}
private def calculatePolynomial[T <: Vector: VectorBuilder](degree: Int, vector: T): T = {
val builder = implicitly[VectorBuilder[T]]
builder.build(calculateCombinedCombinations(degree, vector))
}
/** Calculates for a given vector its representation in the polynomial feature space.
*
* @param degree Maximum degree of polynomial
* @param vector Values of the polynomial variables
* @return List of polynomial values
*/
private def calculateCombinedCombinations(degree: Int, vector: Vector): List[Double] = {
if(degree == 0) {
List()
} else {
val partialResult = calculateCombinedCombinations(degree - 1, vector)
val combinations = calculateCombinations(vector.size, degree)
val result = combinations map {
combination =>
combination.zipWithIndex.map{
case (exp, idx) => math.pow(vector(idx), exp)
}.fold(1.0)(_ * _)
}
result ::: partialResult
}
}
/** Calculates all possible combinations of a polynom of degree `value`, whereas the polynom
* can consist of up to `length` factors. The return value is the list of the exponents of the
* individual factors
*
* @param length maximum number of factors
* @param value degree of polynomial
* @return List of lists which contain the exponents of the individual factors
*/
private def calculateCombinations(length: Int, value: Int): List[List[Int]] = {
if(length == 0) {
List()
} else if (length == 1) {
List(List(value))
} else {
value to 0 by -1 flatMap {
v =>
calculateCombinations(length - 1, value - v) map {
v::_
}
} toList
}
}
}