Merge pull request #256 from apache/s3

Support for S3 files locations
diff --git a/.dlc.json b/.dlc.json
index ac31a3e..1fbc9bd 100644
--- a/.dlc.json
+++ b/.dlc.json
@@ -8,6 +8,12 @@
     },
     {
       "pattern": "^https://github.com/"
+    },
+    {
+      "pattern": "^https://www.linkedin.com/"
+    },
+    {
+      "pattern": "[a-z0-9]+@[a-z0-9]+"
     }
   ],
   "timeout": "10s",
diff --git a/.gitignore b/.gitignore
index 3a9f55c..210c8e3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
 .idea
 .idea/
 **/*.iml
+**/maven-wrapper.jar
 
 # Maven
 **/target/
diff --git a/README.md b/README.md
index 52b0f61..57ede60 100644
--- a/README.md
+++ b/README.md
@@ -25,6 +25,48 @@
 - [Postgres](http://www.postgresql.org)
 - [SQLite](https://www.sqlite.org/)
 
+## Installing Wayang
+
+You can download wayang from here, and to install you need to follow the next steps:
+
+```shell
+tar -xvf wayang-0.6.1-snapshot.tar.gz
+cd wayang-0.6.1-snapshot
+```
+
+In linux
+```shell 
+echo "export WAYANG_HOME=$(pwd)" >> ~/.bashrc
+echo "export PATH=${PATH}:${WAYANG_HOME}/bin" >> ~/.bashrc
+source ~/.bashrc
+```
+In MacOS
+```shell 
+echo "export WAYANG_HOME=$(pwd)" >> ~/.zshrc
+echo "export PATH=${PATH}:${WAYANG_HOME}/bin" >> ~/.zshrc
+source ~/.zshrc
+```
+
+### Requirements at Runtime
+
+Apache Wayang(incubating) is not at execution engine, but it administrate the execution engines for 
+you. Because of it is important to have installed the following requirements
+
+- Java version support from 8, the wayang team recommend Java version 11, does not forget to declare
+   the variable `JAVA_HOME`
+- Apache Spark, you need to installed Apache Spark from version 3, does not forget to have declare
+   the variable `SPARK_HOME`
+- Apache Hadoop, you need to installed Apache Hadoop from version 3, does not forget to have declare
+  the variable `HADOOP_HOME`
+
+### Validating the installation
+
+To execute your first code in wayang you need to execute the following command
+
+```shell
+wayang-submit org.apache.wayang.apps.wordcount.Main java file://$(pwd)/README.md
+```
+
 ## Getting Started
 
 Wayang is available via Maven Central. To use it with Maven, include the following into your POM file:
diff --git a/bin/wayang-submit b/bin/wayang-submit
new file mode 100755
index 0000000..a0ef256
--- /dev/null
+++ b/bin/wayang-submit
@@ -0,0 +1,105 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+CLASS=$1
+
+
+if [ -z "${CLASS}" ]; then
+    echo "Target Class for execution was not provided"
+    exit 1
+fi
+
+# Obtain the realpath of file, even if called using a symlink
+get_realpath() {
+    CANDIDATE_SRC="$1"
+    while [ -L "${CANDIDATE_SRC}" ]; do # resolve ${CANDIDATE_SRC} until is not a symlink
+      DIR=$( cd -P "$( dirname "${CANDIDATE_SRC}" )" >/dev/null 2>&1 && pwd )
+      CANDIDATE_SRC=$(readlink "${CANDIDATE_SRC}")
+      # if $CANDIDATE_SRC was a relative symlink, it resolve the link
+      [[ ${CANDIDATE_SRC} != /* ]] && CANDIDATE_SRC=$DIR/${CANDIDATE_SRC}
+    done
+    cd -P "$( dirname "${CANDIDATE_SRC}" )/.." >/dev/null 2>&1 && pwd
+}
+
+# Short circuit if the user already has this set.
+if [ -z "${WAYANG_HOME}" ]; then
+  export WAYANG_HOME=$( get_realpath ${BASH_SOURCE[0]} )
+fi
+
+if [ -z "${SPARK_HOME}" ]; then
+  echo "The variable SPARK_HOME it needs to be setup" >&2
+  exit 1
+fi
+
+#if [ -z "${FLINK_HOME}" ]; then
+#  echo "The variable FLINK_HOME it needs to be setup" >&2
+#  exit 1
+#fi
+
+if [ -z "${HADOOP_HOME}" ]; then
+  echo "The variable HADOOP_HOME it needs to be setup" >&2
+  exit 1
+fi
+
+# Find the java binary
+if [ -n "${JAVA_HOME}" ]; then
+  RUNNER="${JAVA_HOME}/bin/java"
+else
+  if [ "$(command -v java)" ]; then
+    RUNNER="java"
+  else
+    echo "JAVA_HOME is not set" >&2
+    exit 1
+  fi
+fi
+
+# Find Spark jars.
+if [ -d "${SPARK_HOME}" ]; then
+  SPARK_JARS_DIR="${SPARK_HOME}/jars"
+fi
+
+# Find Hadoop jars.
+if [ -d "${HADOOP_HOME}" ]; then
+  HADOOP_JARS_DIR="${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/common/lib/*"
+fi
+
+
+WAYANG_CODE="${WAYANG_HOME}/lib"
+WAYANG_CONF="${WAYANG_HOME}/conf"
+
+# Bootstrap the classpath.
+WAYANG_CLASSPATH="${WAYANG_CONF}:${WAYANG_CODE}/*"
+
+WAYANG_CLASSPATH="${WAYANG_CLASSPATH}:${SPARK_JARS_DIR}/*:${HADOOP_JARS_DIR}"
+
+FLAGS=""
+if [ "${FLAG_LOG}" = "true" ]; then
+	FLAGS="${FLAGS} -Dlog4j.configuration=file://${WAYANG_CONF}/log4j.properties"
+fi
+
+if [ "${FLAG_WAYANG}" = "true" ]; then
+	FLAGS="${FLAGS} -Dwayang.configuration=file://${WAYANG_CONF}/wayang.properties"
+fi
+
+if [ -n "${OTHER_FLAGS}" ]; then
+	FLAGS="${FLAGS} ${OTHER_FLAGS}"
+fi
+
+echo "$RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${@:2}"
+eval "$RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${@:2}"
+
diff --git a/bin/build.sh b/build/build.sh
similarity index 100%
rename from bin/build.sh
rename to build/build.sh
diff --git a/bin/change-scala-version.sh b/build/change-scala-version.sh
similarity index 100%
rename from bin/change-scala-version.sh
rename to build/change-scala-version.sh
diff --git a/bin/check-license.sh b/build/check-license.sh
similarity index 97%
rename from bin/check-license.sh
rename to build/check-license.sh
index f38f89d..4e7ccbf 100755
--- a/bin/check-license.sh
+++ b/build/check-license.sh
@@ -48,10 +48,13 @@
 rat_opts+=("-e" "^.*.md")
 rat_opts+=("-e" "^.*.iml")
 rat_opts+=("-e" "^.*_pb2.py") # code generated by protocol buffer
+rat_opts+=("-e" "^.*.properties")
 rat_opts+=("-e" "Gemfile.lock")
 rat_opts+=("-e" ".gitignore")
 rat_opts+=("-e" ".gitmodules")
 rat_opts+=("-e" ".rat-excludes")
+
+
 if [[ ! -z "$RAT_EXCLUSION" ]]; then
    rat_opts+=("-E")
    rat_opts+=($RAT_EXCLUSION)
diff --git a/bin/check-release.sh b/build/check-release.sh
similarity index 100%
rename from bin/check-release.sh
rename to build/check-release.sh
diff --git a/bin/contains-scala-dependencies.sh b/build/contains-scala-dependencies.sh
similarity index 100%
rename from bin/contains-scala-dependencies.sh
rename to build/contains-scala-dependencies.sh
diff --git a/bin/create_scala_structure.sh b/build/create_scala_structure.sh
similarity index 100%
rename from bin/create_scala_structure.sh
rename to build/create_scala_structure.sh
diff --git a/bin/detect-scala-dependent-projects.sh b/build/detect-scala-dependent-projects.sh
similarity index 100%
rename from bin/detect-scala-dependent-projects.sh
rename to build/detect-scala-dependent-projects.sh
diff --git a/bin/pyplangenerator.sh b/build/pyplangenerator.sh
similarity index 100%
rename from bin/pyplangenerator.sh
rename to build/pyplangenerator.sh
diff --git a/bin/rename_org.sh b/build/rename_org.sh
similarity index 100%
rename from bin/rename_org.sh
rename to build/rename_org.sh
diff --git a/bin/rm-pom-backups.sh b/build/rm-pom-backups.sh
similarity index 100%
rename from bin/rm-pom-backups.sh
rename to build/rm-pom-backups.sh
diff --git a/bin/rollback_release.sh b/build/rollback_release.sh
similarity index 100%
rename from bin/rollback_release.sh
rename to build/rollback_release.sh
diff --git a/conf/flink/default.properties b/conf/flink/default.properties
new file mode 100644
index 0000000..196b54f
--- /dev/null
+++ b/conf/flink/default.properties
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cluster configuration
+#wayang.flink.mode.run = distribution
+#wayang.flink.master =
+#wayang.flink.port =
+#wayang.flink.paralelism =
+
+# Local distribute
+#wayang.flink.mode.run = local
+#wayang.flink.paralelism = 1
+
+# collection mode
+wayang.flink.mode.run = collection
+wayang.flink.paralelism = 1
diff --git a/conf/spark/default.properties b/conf/spark/default.properties
new file mode 100644
index 0000000..9b0a98b
--- /dev/null
+++ b/conf/spark/default.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+spark.master = local[1]
+spark.app.name = Wayang App
+spark.ui.showConsoleProgress = false
+spark.driver.allowMultipleContexts=true
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 36ab3ed..f342983 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,7 +114,7 @@
         <mockito.version>3.5.10</mockito.version>
         <mockk.version>1.10.0</mockk.version>
         <external.platforms.scope>provided</external.platforms.scope>
-        <hadoop.version>2.7.7</hadoop.version>
+        <hadoop.version>3.1.2</hadoop.version>
         <!-- To be overridden by individual modules -->
         <java-module-name>org.apache.wayang.default</java-module-name>
         <code.coverage.project.folder>${basedir}/</code.coverage.project.folder>
@@ -1253,6 +1253,7 @@
                         <exclude>**/*pb2.py</exclude>
                         <exclude>**/.rat-excludes</exclude>                      
                         <exclude>**/*.csv</exclude>
+                        <exclude>**/*.properties</exclude>
                     </excludes>
                 </configuration>
             </plugin>
@@ -1517,5 +1518,6 @@
         <module>wayang-plugins</module>
         <module>wayang-resources</module>
         <module>wayang-benchmark</module>
+        <module>wayang-assembly</module>
     </modules>
 </project>
diff --git a/wayang-assembly/README.md b/wayang-assembly/README.md
new file mode 100644
index 0000000..4f78e1d
--- /dev/null
+++ b/wayang-assembly/README.md
@@ -0,0 +1,23 @@
+# Wayang Assembly
+This is an assembly module for Apache Wayang(Incubator) project.
+
+It creates a single tar.gz file that includes all needed dependency of the project
+except for the jars in the list
+
+- org.apache.hadoop.*, those are supposed to be available from the deployed Hadoop cluster.
+
+> Note: This module is off by default. To activate it specify the profile in the command line
+-Pdistribution
+
+> Note: If you need to build an assembly for a different version of Hadoop the
+> hadoop-version system property needs to be set as in this example: `-Dhadoop.version=2.7.4` at the 
+> maven command line
+
+
+# Execution Profile Assembly
+
+To execute the Wayang Assembly you need to execute the following command in the project root
+
+```shell
+./mvnw clean package -pl :wayang-assembly -Pdistribution
+```
\ No newline at end of file
diff --git a/wayang-assembly/pom.xml b/wayang-assembly/pom.xml
new file mode 100644
index 0000000..c2050ca
--- /dev/null
+++ b/wayang-assembly/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>wayang</artifactId>
+    <groupId>org.apache.wayang</groupId>
+    <version>0.6.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>wayang-assembly</artifactId>
+  <name>Wayang Project Assembly</name>
+  <url>https://wayang.apache.org/</url>
+  <packaging>pom</packaging>
+
+  <properties>
+    <sbt.project.name>assembly</sbt.project.name>
+    <build.testJarPhase>none</build.testJarPhase>
+    <build.copyDependenciesPhase>package</build.copyDependenciesPhase>
+    <wayang.name>wayang-${project.version}</wayang.name>
+  </properties>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-core</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-basic</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-java</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-spark_2.12</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-api-scala-java_2.12</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-benchmark_2.12</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-flink_2.12</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-jdbc-template</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.wayang</groupId>
+      <artifactId>wayang-postgres</artifactId>
+      <version>0.6.1-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-install-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>distribution</id>
+      <!-- This profile uses the assembly plugin to create a special "distribution" package for BigTop
+           that contains Spark but not the Hadoop JARs it depends on. -->
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <version>3.1.0</version>
+            <executions>
+              <execution>
+                <id>dist</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+                <configuration>
+                  <descriptors>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                  </descriptors>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>
\ No newline at end of file
diff --git a/wayang-assembly/src/main/assembly/assembly.xml b/wayang-assembly/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..73b43d1
--- /dev/null
+++ b/wayang-assembly/src/main/assembly/assembly.xml
@@ -0,0 +1,87 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<assembly>
+  <id>dist</id>
+  <formats>
+    <format>tar.gz</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <fileSets>
+    <fileSet>
+      <includes>
+        <include>README.md</include>
+      </includes>
+      <outputDirectory>${wayang.name}/</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>
+        ${project.parent.basedir}/bin/
+      </directory>
+      <outputDirectory>${wayang.name}/bin</outputDirectory>
+      <includes>
+        <include>**/*</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>
+        ${project.parent.basedir}/conf/
+      </directory>
+      <outputDirectory>${wayang.name}/conf</outputDirectory>
+      <includes>
+        <include>**/*</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>
+        ${project.parent.basedir}/assembly/target/
+      </directory>
+      <outputDirectory>${wayang.name}/jars</outputDirectory>
+      <includes>
+        <include>*</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+
+  <dependencySets>
+    <dependencySet>
+      <includes>
+        <include>org.apache.wayang:*:jar</include>
+      </includes>
+      <excludes>
+        <exclude>org.apache.wayang:wayang-assembly:jar</exclude>
+      </excludes>
+    </dependencySet>
+    <dependencySet>
+      <outputDirectory>${wayang.name}/lib</outputDirectory>
+      <useTransitiveDependencies>true</useTransitiveDependencies>
+      <unpack>false</unpack>
+      <useProjectArtifact>false</useProjectArtifact>
+      <excludes>
+        <exclude>org.apache.hadoop:*:jar</exclude>
+        <exclude>org.apache.spark:*:jar</exclude>
+        <exclude>org.apache.flink:*:jar</exclude>
+        <exclude>org.apache.zookeeper:*:jar</exclude>
+        <exclude>org.apache.avro:*:jar</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+
+</assembly>
diff --git a/wayang-benchmark/README.md b/wayang-benchmark/README.md
index 99d0cb7..015d05d 100644
--- a/wayang-benchmark/README.md
+++ b/wayang-benchmark/README.md
@@ -45,7 +45,7 @@
 * `wayang.apps.tpch.csv.orders`: URL to the `ORDERS` file
 * `wayang.apps.tpch.csv.lineitem`: URL to the `LINEITEM` file
 
-**Datasets.** The datasets for this app can be generated with the [TPC-H tools](http://www.tpc.org/tpch/). The generated datasets can then be either put into a database and/or a filesystem.
+**Datasets.** The datasets for this app can be generated with the [TPC-H tools](https://www.tpc.org/tpch/). The generated datasets can then be either put into a database and/or a filesystem.
 
 ### SINDY
 
@@ -58,7 +58,7 @@
 Even though this app is written in Scala, you can launch it in a regular JVM. Run the app without parameters to get a description of the required parameters.
 
 **Datasets.** Find below a list of datasets that can be used to benchmark Apache Wayang (incubating) in combination with this app:
-* CSV files generated with the [TPC-H tools](http://www.tpc.org/tpch/)
+* CSV files generated with the [TPC-H tools](https://www.tpc.org/tpch/)
 * [other CSV files](https://hpi.de/naumann/projects/repeatability/data-profiling/metanome-ind-algorithms.html)
 
 ### SGD
diff --git a/wayang-benchmark/code/main/java/org/apache/wayang/apps/sgd/SGDImpl.java b/wayang-benchmark/code/main/java/org/apache/wayang/apps/sgd/SGDImpl.java
index d5331c6..7585e73 100644
--- a/wayang-benchmark/code/main/java/org/apache/wayang/apps/sgd/SGDImpl.java
+++ b/wayang-benchmark/code/main/java/org/apache/wayang/apps/sgd/SGDImpl.java
@@ -119,15 +119,16 @@
 
     @Override
     public double[] apply(String line) {
-        String[] pointStr = line.split(" ");
+        String[] pointStr = line.split(",");
         double[] point = new double[features + 1];
         point[0] = Double.parseDouble(pointStr[0]);
         for (int i = 1; i < pointStr.length; i++) {
-            if (pointStr[i].equals("")) {
+/*            if (pointStr[i].equals("")) {
                 continue;
             }
             String kv[] = pointStr[i].split(":", 2);
-            point[Integer.parseInt(kv[0]) - 1] = Double.parseDouble(kv[1]);
+            point[Integer.parseInt(kv[0]) - 1] = Double.parseDouble(kv[1]);*/
+            point[i] = Double.parseDouble(pointStr[i]);
         }
         return point;
     }
diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml
index f230ee0..5cace14 100644
--- a/wayang-benchmark/pom.xml
+++ b/wayang-benchmark/pom.xml
@@ -59,6 +59,21 @@
       <artifactId>wayang-sqlite3</artifactId>
       <version>0.6.1-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>3.1.2</version>
+    </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>3.2.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>3.1.2</version>
+        </dependency>
   </dependencies>
 
   <modules>
diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml
index fdb0e20..2994108 100644
--- a/wayang-commons/pom.xml
+++ b/wayang-commons/pom.xml
@@ -81,6 +81,10 @@
                         <groupId>com.thoughtworks.paranamer</groupId>
                         <artifactId>paranamer</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
@@ -165,6 +169,10 @@
                         <groupId>com.thoughtworks.paranamer</groupId>
                         <artifactId>paranamer</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>commons-httpclient</groupId>
+                        <artifactId>commons-httpclient</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml
index 780c04a..b170a49 100644
--- a/wayang-commons/wayang-core/pom.xml
+++ b/wayang-commons/wayang-core/pom.xml
@@ -37,7 +37,17 @@
     <properties>
         <java-module-name>org.apache.wayang.core</java-module-name>
     </properties>
-
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.amazonaws</groupId>
+                <artifactId>aws-java-sdk-bom</artifactId>
+                <version>1.12.253</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
     <dependencies>
         <dependency>
             <groupId>org.yaml</groupId>
@@ -93,6 +103,16 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+            <version>1.12.253</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
index 21ae249..087a64d 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java
@@ -45,7 +45,8 @@
 
     private static Collection<FileSystem> registeredFileSystems = Arrays.asList(
             new LocalFileSystem(),
-            new HadoopFileSystem()
+            new HadoopFileSystem(),
+            new S3FileSystem()
     );
 
     private FileSystems() {
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
new file mode 100644
index 0000000..1b89e46
--- /dev/null
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.core.util.fs;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class S3FileSystem implements FileSystem {
+
+  private AmazonS3 s3;
+
+  final Map<String, S3Pair> pairs = new HashMap<>();
+
+  public S3FileSystem(){}
+
+  public static void main(String... args) throws IOException {
+    S3FileSystem s3 = new S3FileSystem();
+    //String url = "s3://blossom-benchmark/HIGGS.csv";
+   // String url = "s3://blossom-benchmark/README.md";
+    String url = "s3://blossom-benchmark/lulu/lolo/lala";
+    System.out.println(url);
+    System.out.println(s3.getS3Pair(url).getBucket());
+    System.out.println(s3.getS3Pair(url).getKey());
+    System.out.println(s3.preFoldersExits(s3.getS3Pair(url)));
+
+   // System.out.println(s3.getFileSize(url));
+//    InputStream content = s3.open(url);
+//    new BufferedReader(new InputStreamReader(content)).lines().forEach(System.out::println);
+//    System.out.println(s3.listChildren(url));
+//    System.out.println(s3.isDirectory(url));
+    OutputStream output = s3.create(url, true);
+    byte[] bytes = "lala".getBytes();
+    output.write(bytes);
+    output.flush();
+    output.close();
+  }
+
+  private AmazonS3 getS3Client(){
+    if(this.s3 == null){
+      if(
+          System.getProperties().contains("fs.s3.awsAccessKeyId") &&
+          System.getProperties().contains("fs.s3.awsSecretAccessKey")
+      ){
+        BasicAWSCredentials awsCreds = new BasicAWSCredentials(
+            System.getProperty("fs.s3.awsAccessKeyId"),
+            System.getProperty("fs.s3.awsSecretAccessKey")
+        );
+        this.s3 = AmazonS3ClientBuilder.standard()
+            .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+            .build();
+      }else{
+        this.s3 = AmazonS3ClientBuilder.defaultClient();
+      }
+    }
+    return this.s3;
+  }
+
+  class S3Pair{
+
+    private final String bucket;
+    private final String key;
+
+    public S3Pair(S3FileSystem s3Client, String url){
+      if( ! s3Client.canHandle(url)){
+        throw new WayangException("The files can not be handle by "+this.getClass().getSimpleName());
+      }
+      String[] parts = url.split("/", 4);
+      String key_tmp = "";
+      if(parts.length == 4) {
+        key_tmp = parts[3];
+      }
+      this.bucket = parts[2];
+      this.key = key_tmp;
+    }
+
+    public S3Pair(String bucket, String key){
+      this.bucket = bucket;
+      this.key = key;
+    }
+
+    public String getBucket() {
+      return bucket;
+    }
+
+    public String getKey() {
+      return key;
+    }
+  }
+
+  private S3Pair getS3Pair(String url){
+    S3Pair pair = this.pairs.get(url);
+    if(pair == null){
+      pair = new S3Pair(this, url);
+      this.pairs.put(url, pair);
+    }
+    return pair;
+  }
+
+  @Override
+  public long getFileSize(String fileUrl) throws FileNotFoundException {
+    return this.getFileSize(this.getS3Pair(fileUrl));
+  }
+
+  private long getFileSize(S3Pair pair) throws FileNotFoundException {
+    return this.getS3Client().getObjectMetadata(pair.getBucket(), pair.getKey()).getContentLength();
+  }
+
+  @Override
+  public boolean canHandle(String url) {
+    String url_lower = url.substring(0, 5).toLowerCase();
+    return url_lower.startsWith("s3a:/");
+  }
+
+  @Override
+  public InputStream open(String url) throws IOException {
+    return this.open(this.getS3Pair(url));
+  }
+
+  private InputStream open(S3Pair pair) throws IOException {
+    return this.getS3Client().getObject(pair.getBucket(), pair.getKey()).getObjectContent();
+  }
+
+  @Override
+  public OutputStream create(String url) throws IOException {
+    return this.create(this.getS3Pair(url));
+  }
+
+  private OutputStream create(S3Pair pair) throws IOException {
+    return this.create(pair, false);
+  }
+
+  @Override
+  public OutputStream create(String url, Boolean forceCreateParentDirs) throws IOException {
+    return this.create(this.getS3Pair(url), forceCreateParentDirs);
+  }
+
+  private OutputStream create(S3Pair pair, Boolean forceCreateParentDirs) throws IOException {
+    if( ! forceCreateParentDirs ){
+      if ( ! this.preFoldersExits(pair) )
+        throw new IOException(
+            String.format(
+              "The folder '%s' does not exist in the bucket '%s'",
+              pair.getKey(),
+              pair.getBucket()
+            )
+        );
+    }
+
+    PipedInputStream in = new PipedInputStream();
+    final PipedOutputStream out = new PipedOutputStream(in);
+
+    ObjectMetadata metadata = new ObjectMetadata();
+    metadata.setContentType("text/plain");
+    AmazonS3 s3Client = this.getS3Client();
+    new Thread(new Runnable() {
+      public void run() {
+        PutObjectResult result = s3Client.putObject(pair.getBucket(), pair.getKey(), in, metadata);
+      }
+    }).start();
+    return out;
+  }
+
+  public boolean bucketExits(S3Pair pair){
+    return this.getS3Client().doesBucketExistV2(pair.getBucket());
+  }
+
+  public boolean preFoldersExits(S3Pair pair){
+    if( ! this.getS3Client().doesBucketExistV2(pair.getBucket()) ) return false;
+    String[] keys = pair.getKey().split("/");
+    String aggregated = "";
+    for(int i = 0; i < keys.length; i++){
+      aggregated = aggregated + "/" +keys[i];
+      if( ! isDirectory(new S3Pair(pair.getBucket(), aggregated)) ){
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean isDirectory(String url) {
+    return this.isDirectory(this.getS3Pair(url));
+  }
+
+  private boolean isDirectory(S3Pair pair) {
+    if( ! this.bucketExits(pair) ) return false;
+
+    String key = pair.getKey();
+    long size = listChildren(pair).stream().filter(name -> ! name.equals(key)).count();
+    if(size > 0){
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public Collection<String> listChildren(String url) {
+    return this.listChildren(this.getS3Pair(url));
+  }
+
+  private Collection<String> listChildren(S3Pair pair) {
+    ObjectListing listing = this.getS3Client().listObjects(pair.getBucket(), pair.getKey());
+    return listing.getObjectSummaries().stream()
+        .map(obj -> obj.getKey())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public boolean delete(String url, boolean isRecursiveDelete) throws IOException {
+    return this.delete(this.getS3Pair(url), isRecursiveDelete);
+  }
+
+  private boolean delete(S3Pair pair, boolean isRecursiveDelete) throws IOException {
+    if(!isRecursiveDelete){
+      if(isDirectory(pair)){
+        throw new IOException("the path correspond to a directory");
+      }
+    }
+    this.getS3Client().deleteObject(pair.getBucket(), pair.getKey());
+    return true;
+  }
+}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index 9b574af..471275a 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -83,7 +83,12 @@
             "spark.io.compression.codec",
             "spark.driver.memory",
             "spark.executor.heartbeatInterval",
-            "spark.network.timeout"
+            "spark.network.timeout",
+    };
+
+    private static final String[] OPTIONAL_HADOOP_PROPERTIES = {
+        "fs.s3.awsAccessKeyId",
+        "fs.s3.awsSecretAccessKey"
     };
 
     /**
@@ -121,6 +126,7 @@
                     "There is already a SparkContext (master: {}): , which will be reused. " +
                             "Not all settings might be effective.", sparkContext.getConf().get("spark.master"));
             sparkConf = sparkContext.getConf();
+
         } else {
             sparkConf = new SparkConf(true);
         }
@@ -133,6 +139,7 @@
                     value -> sparkConf.set(property, value)
             );
         }
+
         if (job.getName() != null) {
             sparkConf.set("spark.app.name", job.getName());
         }
@@ -142,6 +149,14 @@
         }
         final JavaSparkContext sparkContext = this.sparkContextReference.get();
 
+        org.apache.hadoop.conf.Configuration hadoopconf = sparkContext.hadoopConfiguration();
+        for (String property: OPTIONAL_HADOOP_PROPERTIES){
+            System.out.println(property);
+            configuration.getOptionalStringProperty(property).ifPresent(
+                value -> hadoopconf.set(property, value)
+            );
+        }
+
         // Set up the JAR files.
         //sparkContext.clearJars();
         if (!sparkContext.isLocal()) {
diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml
index 32c3625..2fcfbfc 100644
--- a/wayang-platforms/wayang-spark/pom.xml
+++ b/wayang-platforms/wayang-spark/pom.xml
@@ -71,7 +71,17 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
-            <version>2.7.7</version>
+            <version>3.1.2</version>
+        </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-s3</artifactId>
+          <version>1.12.253</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>3.1.2</version>
         </dependency>
     </dependencies>