[FLINK-28611] Add Transformer for ElementwiseProduct

This closes #135.
diff --git a/flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java b/flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java
new file mode 100644
index 0000000..7fbf32b
--- /dev/null
+++ b/flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {
+
+    public VectorParam(
+            String name,
+            String description,
+            Vector defaultValue,
+            ParamValidator<Vector> validator) {
+        super(name, Vector.class, description, defaultValue, validator);
+    }
+
+    public VectorParam(String name, String description, Vector defaultValue) {
+        this(name, description, defaultValue, ParamValidators.alwaysTrue());
+    }
+
+    @Override
+    public Vector jsonDecode(Object object) {
+        Map<String, Object> vecValues = (Map) object;
+        if (vecValues.size() == 1) {
+            List<Double> list = (List<Double>) vecValues.get("values");
+            double[] values = new double[list.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = list.get(i);
+            }
+            return new DenseVector(values);
+        } else if (vecValues.size() == 3) {
+            List<Double> valuesList = (List<Double>) vecValues.get("values");
+            List<Integer> indicesList = (List<Integer>) vecValues.get("indices");
+            int n = (int) vecValues.get("n");
+            double[] values = new double[valuesList.size()];
+            int[] indices = new int[indicesList.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = valuesList.get(i);
+                indices[i] = indicesList.get(i);
+            }
+            return new SparseVector(n, indices, values);
+        } else {
+            throw new UnsupportedOperationException("Vector parameter is invalid.");
+        }
+    }
+}
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java
index 21f4325..1622a35 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.ml.api;
 
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
 import org.apache.flink.ml.param.BooleanParam;
 import org.apache.flink.ml.param.DoubleArrayArrayParam;
 import org.apache.flink.ml.param.DoubleArrayParam;
@@ -34,6 +36,7 @@
 import org.apache.flink.ml.param.StringArrayArrayParam;
 import org.apache.flink.ml.param.StringArrayParam;
 import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.VectorParam;
 import org.apache.flink.ml.param.WithParams;
 import org.apache.flink.ml.util.ParamUtils;
 import org.apache.flink.ml.util.ReadWriteUtils;
@@ -113,6 +116,9 @@
                         "doubleArrayArrayParam",
                         "Description",
                         new Double[][] {new Double[] {14.0, 15.0}, new Double[] {16.0, 17.0}});
+
+        Param<Vector> VECTOR_PARAM =
+                new VectorParam("vectorParam", "Description", Vectors.dense(1.0, 2.0, 3.0));
     }
 
     /**
@@ -332,6 +338,9 @@
         stage.set(MyParams.STRING_ARRAY_PARAM, new String[] {"50", "51"});
         Assert.assertArrayEquals(new String[] {"50", "51"}, stage.get(MyParams.STRING_ARRAY_PARAM));
 
+        stage.set(MyParams.VECTOR_PARAM, Vectors.dense(1, 5, 3));
+        Assert.assertEquals(Vectors.dense(1, 5, 3), stage.get(MyParams.VECTOR_PARAM));
+
         stage.set(
                 MyParams.DOUBLE_ARRAY_ARRAY_PARAM,
                 new Double[][] {new Double[] {50.0, 51.0}, new Double[] {52.0, 53.0}});
@@ -373,6 +382,7 @@
         stage.set(MyParams.FLOAT_ARRAY_PARAM, new Float[] {50.0f, 51.0f});
         stage.set(MyParams.DOUBLE_ARRAY_PARAM, new Double[] {50.0, 51.0});
         stage.set(MyParams.STRING_ARRAY_PARAM, new String[] {"50", "51"});
+        stage.set(MyParams.VECTOR_PARAM, Vectors.dense(2, 3, 4));
         stage.set(
                 MyParams.DOUBLE_ARRAY_ARRAY_PARAM,
                 new Double[][] {new Double[] {50.0, 51.0}, new Double[] {52.0, 53.0}});
@@ -411,6 +421,7 @@
                 new String[] {"50", "51"}, stage.get(MyParams.STRING_ARRAY_ARRAY_PARAM)[0]);
         Assert.assertArrayEquals(
                 new String[] {"52", "53"}, stage.get(MyParams.STRING_ARRAY_ARRAY_PARAM)[1]);
+        Assert.assertEquals(Vectors.dense(2, 3, 4), loadedStage.get(MyParams.VECTOR_PARAM));
     }
 
     @Test
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/ElementwiseProductExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/ElementwiseProductExample.java
new file mode 100644
index 0000000..9271333
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/ElementwiseProductExample.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/**
+ * Simple program that creates a ElementwiseProduct instance and uses it for feature engineering.
+ */
+public class ElementwiseProductExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(0, Vectors.dense(1.1, 3.2)), Row.of(1, Vectors.dense(2.1, 3.1)));
+
+        Table inputTable = tEnv.fromDataStream(inputStream).as("id", "vec");
+
+        // Creates an ElementwiseProduct object and initializes its parameters.
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.1, 1.1));
+
+        // Transforms input data.
+        Table outputTable = elementwiseProduct.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            Vector inputValue = (Vector) row.getField(elementwiseProduct.getInputCol());
+            Vector outputValue = (Vector) row.getField(elementwiseProduct.getOutputCol());
+            System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java
new file mode 100644
index 0000000..5a49fb6
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that multiplies each input vector with a given scaling vector using Hadamard
+ * product.
+ *
+ * <p>If the size of the input vector does not equal the size of the scaling vector, the transformer
+ * will throw {@link IllegalArgumentException}.
+ */
+public class ElementwiseProduct
+        implements Transformer<ElementwiseProduct>, ElementwiseProductParams<ElementwiseProduct> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public ElementwiseProduct() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new ElementwiseProductFunction(getInputCol(), getScalingVec()),
+                                outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    private static class ElementwiseProductFunction implements MapFunction<Row, Row> {
+        private final String inputCol;
+        private final Vector scalingVec;
+
+        public ElementwiseProductFunction(String inputCol, Vector scalingVec) {
+            this.inputCol = inputCol;
+            this.scalingVec = scalingVec;
+        }
+
+        @Override
+        public Row map(Row value) {
+            Vector inputVec = value.getFieldAs(inputCol);
+            if (inputVec != null) {
+                if (scalingVec.size() != inputVec.size()) {
+                    throw new IllegalArgumentException(
+                            "The scaling vector size is "
+                                    + scalingVec.size()
+                                    + ", which is not equal input vector size("
+                                    + inputVec.size()
+                                    + ").");
+                }
+                Vector retVec = inputVec.clone();
+                BLAS.hDot(scalingVec, retVec);
+                return Row.join(value, Row.of(retVec));
+            } else {
+                return Row.join(value, Row.of((Object) null));
+            }
+        }
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static ElementwiseProduct load(StreamTableEnvironment env, String path)
+            throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProductParams.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProductParams.java
new file mode 100644
index 0000000..4bf612c
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProductParams.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.VectorParam;
+
+/**
+ * Params of {@link ElementwiseProduct}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface ElementwiseProductParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+
+    Param<Vector> SCALING_VEC =
+            new VectorParam(
+                    "scalingVec",
+                    "The scaling vector to multiply with input vectors using hadamard product.",
+                    null,
+                    ParamValidators.notNull());
+
+    default Vector getScalingVec() {
+        return get(SCALING_VEC);
+    }
+
+    default T setScalingVec(Vector value) {
+        set(SCALING_VEC, value);
+        return (T) this;
+    }
+}
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
new file mode 100644
index 0000000..cc2041f
--- /dev/null
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(3, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 2) {
+                assertNull(result.getField(outputCol));
+            } else {
+                throw new UnsupportedOperationException("Input data id not exists.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        ElementwiseProduct elementwiseProduct = new ElementwiseProduct();
+        assertEquals("output", elementwiseProduct.getOutputCol());
+        assertEquals("input", elementwiseProduct.getInputCol());
+
+        elementwiseProduct
+                .setInputCol("vec")
+                .setOutputCol("outputVec")
+                .setScalingVec(Vectors.dense(1.0, 2.0, 3.0));
+        assertEquals("vec", elementwiseProduct.getInputCol());
+        assertEquals(Vectors.dense(1.0, 2.0, 3.0), elementwiseProduct.getScalingVec());
+        assertEquals("outputVec", elementwiseProduct.getOutputCol());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.0, 2.0, 3.0));
+        Table output = elementwiseProduct.transform(inputDataTable)[0];
+        assertEquals(
+                Arrays.asList("id", "vec", "sparseVec", "outputVec"),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testSaveLoadAndTransformDense() throws Exception {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.1, 1.1));
+        ElementwiseProduct loadedElementwiseProduct =
+                TestUtils.saveAndReload(
+                        tEnv, elementwiseProduct, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedElementwiseProduct.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedElementwiseProduct.getOutputCol(), false);
+    }
+
+    @Test
+    public void testVectorSizeNotEquals() {
+        try {
+            ElementwiseProduct elementwiseProduct =
+                    new ElementwiseProduct()
+                            .setInputCol("vec")
+                            .setOutputCol("outputVec")
+                            .setScalingVec(Vectors.dense(1.1, 1.1, 2.0));
+            Table output = elementwiseProduct.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            fail();
+        } catch (Exception e) {
+            assertEquals(
+                    "The scaling vector size is 3, which is not equal input vector size(2).",
+                    ExceptionUtils.getRootCause(e).getMessage());
+        }
+    }
+
+    @Test
+    public void testSaveLoadAndTransformSparse() throws Exception {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("sparseVec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(
+                                Vectors.sparse(5, new int[] {0, 1}, new double[] {1.1, 1.1}));
+        ElementwiseProduct loadedElementwiseProduct =
+                TestUtils.saveAndReload(
+                        tEnv, elementwiseProduct, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedElementwiseProduct.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedElementwiseProduct.getOutputCol(), true);
+    }
+}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/elementwiseproduct_example.py b/flink-ml-python/pyflink/examples/ml/feature/elementwiseproduct_example.py
new file mode 100644
index 0000000..9b893c4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/elementwiseproduct_example.py
@@ -0,0 +1,64 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a ElementwiseProduct instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (1, Vectors.dense(2.1, 3.1)),
+        (2, Vectors.dense(1.1, 3.3))
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'vec'],
+            [Types.INT(), DenseVectorTypeInfo()])))
+
+# create an elementwise product object and initialize its parameters
+elementwise_product = ElementwiseProduct() \
+    .set_input_col('vec') \
+    .set_output_col('output_vec') \
+    .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+# use the elementwise product object for feature engineering
+output = elementwise_product.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(elementwise_product.get_input_col())]
+    output_value = result[field_names.index(elementwise_product.get_output_col())]
+    print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/ml/core/param.py b/flink-ml-python/pyflink/ml/core/param.py
index 56e463a..269b413 100644
--- a/flink-ml-python/pyflink/ml/core/param.py
+++ b/flink-ml-python/pyflink/ml/core/param.py
@@ -18,6 +18,7 @@
 
 from abc import ABC, abstractmethod
 from typing import TypeVar, Generic, List, Dict, Any, Optional, Tuple, Union
+from pyflink.ml.core.linalg import Vector
 
 import jsonpickle
 
@@ -86,6 +87,8 @@
     @staticmethod
     def _is_compatible_type(param: 'Param[V]', value: V) -> bool:
         if value is not None and param.type != type(value):
+            if type(value).__name__ == 'DenseVector' or type(value).__name__ == 'SparseVector':
+                return issubclass(type(value), param.type)
             return False
         if isinstance(value, list):
             for item in value:
@@ -378,3 +381,14 @@
                  validator: ParamValidator[Tuple[str, ...]] = ParamValidators.always_true()):
         super(StringArrayParam, self).__init__(name, tuple, "Tuple[str]", description,
                                                default_value, validator)
+
+
+class VectorParam(Param[Vector]):
+    """
+    Class for the vector parameter.
+    """
+
+    def __init__(self, name: str, description: str, default_value: Optional[Vector],
+                 validator: ParamValidator[Vector] = ParamValidators.always_true()):
+        super(VectorParam, self).__init__(name, Vector, "str", description, default_value,
+                                          validator)
diff --git a/flink-ml-python/pyflink/ml/core/tests/test_stage.py b/flink-ml-python/pyflink/ml/core/tests/test_stage.py
index ac9ffe6..45527c8 100644
--- a/flink-ml-python/pyflink/ml/core/tests/test_stage.py
+++ b/flink-ml-python/pyflink/ml/core/tests/test_stage.py
@@ -21,8 +21,9 @@
 from pyflink.table import StreamTableEnvironment
 
 from pyflink.ml.core.api import Stage
+from pyflink.ml.core.linalg import Vectors
 from pyflink.ml.core.param import ParamValidators, Param, BooleanParam, IntParam, \
-    FloatParam, StringParam, IntArrayParam, FloatArrayParam, StringArrayParam
+    FloatParam, StringParam, VectorParam, IntArrayParam, FloatArrayParam, StringArrayParam
 from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
 
 BOOLEAN_PARAM = BooleanParam("boolean_param", "Description", False)
@@ -32,6 +33,7 @@
 INT_ARRAY_PARAM = IntArrayParam("int_array_param", "Description", (6, 7))
 FLOAT_ARRAY_PARAM = FloatArrayParam("float_array_param", "Description", (10.0, 11.0))
 STRING_ARRAY_PARAM = StringArrayParam("string_array_param", "Description", ("14", "15"))
+VECTOR_PARAM = VectorParam('vector_param', "Description", Vectors.dense(1, 2, 3))
 EXTRA_INT_PARAM = IntParam("extra_int_param",
                            "Description",
                            20,
@@ -49,6 +51,17 @@
         stage.set(INT_PARAM, 2)
         self.assertEqual(2, stage.get(INT_PARAM))
 
+        dense_vec = Vectors.dense(2, 2)
+        stage.set(VECTOR_PARAM, dense_vec)
+        self.assertEqual(dense_vec.get(0), stage.get(VECTOR_PARAM).get(0))
+        self.assertEqual(dense_vec.get(1), stage.get(VECTOR_PARAM).get(1))
+
+        sparse_vec = Vectors.sparse(3, [0, 2], [2, 2])
+        stage.set(VECTOR_PARAM, sparse_vec)
+        self.assertEqual(sparse_vec.get(0), stage.get(VECTOR_PARAM).get(0))
+        self.assertEqual(sparse_vec.get(1), stage.get(VECTOR_PARAM).get(1))
+        self.assertEqual(sparse_vec.get(2), stage.get(VECTOR_PARAM).get(2))
+
         param = stage.get_param("int_param")
         stage.set(param, 3)
         self.assertEqual(3, stage.get(param))
@@ -87,6 +100,11 @@
                                  " the type of <class 'int'>"):
             stage.set(STRING_PARAM, 100)
 
+        with pytest.raises(TypeError,
+                           match="Parameter vector_param's type <class 'pyflink.ml.core.linalg"
+                                 ".Vector'> is incompatible with the type of <class 'int'>"):
+            stage.set(VECTOR_PARAM, 100)
+
     def test_param_set_valid_value(self):
         stage = MyStage()
 
@@ -102,6 +120,17 @@
         stage.set(STRING_PARAM, "50")
         self.assertEqual("50", stage.get(STRING_PARAM))
 
+        dense_vec = Vectors.dense(2, 2)
+        stage.set(VECTOR_PARAM, dense_vec)
+        self.assertEqual(dense_vec.get(0), stage.get(VECTOR_PARAM).get(0))
+        self.assertEqual(dense_vec.get(1), stage.get(VECTOR_PARAM).get(1))
+
+        sparse_vec = Vectors.sparse(3, [0, 2], [2, 2])
+        stage.set(VECTOR_PARAM, sparse_vec)
+        self.assertEqual(sparse_vec.get(0), stage.get(VECTOR_PARAM).get(0))
+        self.assertEqual(sparse_vec.get(1), stage.get(VECTOR_PARAM).get(1))
+        self.assertEqual(sparse_vec.get(2), stage.get(VECTOR_PARAM).get(2))
+
         stage.set(INT_ARRAY_PARAM, (50, 51))
         self.assertEqual((50, 51), stage.get(INT_ARRAY_PARAM))
 
@@ -193,6 +222,7 @@
         self._param_map[INT_PARAM] = INT_PARAM.default_value
         self._param_map[FLOAT_PARAM] = FLOAT_PARAM.default_value
         self._param_map[STRING_PARAM] = STRING_PARAM.default_value
+        self._param_map[VECTOR_PARAM] = VECTOR_PARAM.default_value
         self._param_map[INT_ARRAY_PARAM] = INT_ARRAY_PARAM.default_value
         self._param_map[FLOAT_ARRAY_PARAM] = FLOAT_ARRAY_PARAM.default_value
         self._param_map[STRING_ARRAY_PARAM] = STRING_ARRAY_PARAM.default_value
diff --git a/flink-ml-python/pyflink/ml/core/wrapper.py b/flink-ml-python/pyflink/ml/core/wrapper.py
index 77ca05f..3359ccc 100644
--- a/flink-ml-python/pyflink/ml/core/wrapper.py
+++ b/flink-ml-python/pyflink/ml/core/wrapper.py
@@ -27,8 +27,8 @@
 
 from pyflink.ml.core.api import Model, Transformer, AlgoOperator, Stage, Estimator
 from pyflink.ml.core.linalg import DenseVectorTypeInfo, SparseVectorTypeInfo, DenseMatrixTypeInfo, \
-    VectorTypeInfo
-from pyflink.ml.core.param import Param, WithParams, StringArrayParam, IntArrayParam, \
+    VectorTypeInfo, DenseVector
+from pyflink.ml.core.param import Param, WithParams, StringArrayParam, IntArrayParam, VectorParam, \
     FloatArrayParam, FloatArrayArrayParam
 
 _from_java_type_alias = _from_java_type
@@ -222,6 +222,15 @@
         return tuple(value[i] for i in range(len(value)))
 
 
+class VectorJavaParamConverter(JavaParamConverter):
+    def to_java(self, value):
+        jarray = to_jarray(get_gateway().jvm.double, value.to_array())
+        return get_gateway().jvm.org.apache.flink.ml.linalg.DenseVector(jarray)
+
+    def to_python(self, value):
+        return DenseVector(tuple(value.get(i) for i in range(value.size())))
+
+
 class StringArrayJavaParamConverter(JavaParamConverter):
     def to_java(self, value):
         return to_jarray(get_gateway().jvm.java.lang.String, value)
@@ -259,6 +268,7 @@
     FloatArrayParam: FloatArrayJavaPramConverter(),
     FloatArrayArrayParam: FloatArrayArrayJavaPramConverter(),
     StringArrayParam: StringArrayJavaParamConverter(),
+    VectorParam: VectorJavaParamConverter(),
     Param: default_converter
 }
 
diff --git a/flink-ml-python/pyflink/ml/lib/feature/elementwiseproduct.py b/flink-ml-python/pyflink/ml/lib/feature/elementwiseproduct.py
new file mode 100644
index 0000000..c182e44
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/elementwiseproduct.py
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.ml.core.param import ParamValidators, Param, VectorParam
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
+from pyflink.ml.core.linalg import Vector
+
+
+class _ElementwiseProductParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`ElementwiseProduct`.
+    """
+
+    SCALING_VEC: Param[Vector] = VectorParam(
+        "scaling_vec",
+        "the scaling vector to multiply with input vectors using hadamard product.",
+        None,
+        ParamValidators.not_null())
+
+    def __init__(self, java_params):
+        super(_ElementwiseProductParams, self).__init__(java_params)
+
+    def set_scaling_vec(self, value: Vector):
+        return self.set(self.SCALING_VEC, value)
+
+    def get_scaling_vec(self) -> Vector:
+        return self.get(self.SCALING_VEC)
+
+    @property
+    def scaling_vec(self) -> Vector:
+        return self.get_scaling_vec()
+
+
+class ElementwiseProduct(JavaFeatureTransformer, _ElementwiseProductParams):
+    """
+    A Transformer that multiplies each input vector with a given scaling vector using Hadamard
+    product.
+
+    If the size of the input vector does not equal the size of the scaling vector,
+    the transformer will throw IllegalArgumentException.
+    """
+
+    def __init__(self, java_model=None):
+        super(ElementwiseProduct, self).__init__(java_model)
+
+    @classmethod
+    def _java_transformer_package_name(cls) -> str:
+        return "elementwiseproduct"
+
+    @classmethod
+    def _java_transformer_class_name(cls) -> str:
+        return "ElementwiseProduct"
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_elementwiseproduct.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_elementwiseproduct.py
new file mode 100644
index 0000000..576c393
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_elementwiseproduct.py
@@ -0,0 +1,82 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class ElementwiseProductTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(ElementwiseProductTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (0,
+                 Vectors.dense(2.1, 3.1)),
+                (1,
+                 Vectors.dense(1.1, 3.3)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.31, 3.41)
+        self.expected_output_data_2 = Vectors.dense(1.21, 3.63)
+
+    def test_param(self):
+        elementwise_product = ElementwiseProduct()
+
+        self.assertEqual('input', elementwise_product.get_input_col())
+        self.assertEqual('output', elementwise_product.get_output_col())
+
+        elementwise_product.set_input_col('vec') \
+            .set_output_col('output_vec') \
+            .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+        self.assertEqual('vec', elementwise_product.get_input_col())
+        self.assertEqual(Vectors.dense(1.1, 1.1), elementwise_product.get_scaling_vec())
+        self.assertEqual('output_vec', elementwise_product.get_output_col())
+
+    def test_save_load_transform(self):
+        elementwise_product = ElementwiseProduct() \
+            .set_input_col('vec') \
+            .set_output_col('output_vec') \
+            .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_elementwise_product')
+        elementwise_product.save(path)
+        elementwise_product = ElementwiseProduct.load(self.t_env, path)
+
+        output_table = elementwise_product.transform(self.input_data_table)[0]
+        actual_outputs = [(result[0], result[2]) for result in
+                          self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        self.assertEqual(2, len(actual_outputs))
+        for actual_output in actual_outputs:
+            self.assertEqual(2, len(actual_output[1]))
+            if actual_output[0] == 0:
+                for i in range(len(actual_output[1])):
+                    self.assertAlmostEqual(self.expected_output_data_1.get(i),
+                                           actual_output[1].get(i), delta=1e-7)
+            else:
+                for i in range(len(actual_output[1])):
+                    self.assertAlmostEqual(self.expected_output_data_2.get(i),
+                                           actual_output[1].get(i), delta=1e-7)