[GEARPUMP-366] Create JarStore dir if not exists on job submit

Author: manuzhang <owenzhang1990@gmail.com>

Closes #239 from manuzhang/fix_job_submit.
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
index 0a11c03..d14677b 100644
--- a/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
@@ -39,7 +39,7 @@
   override def init(config: Config): Unit = {
     rootPath = Util.asSubDirOfGearpumpHome(
       config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
-    FileUtils.forceMkdir(rootPath)
+    createDirIfNotExists(rootPath)
   }
 
   /**
@@ -49,6 +49,7 @@
    * @return OutputStream returns a stream into which the data can be written.
    */
   override def createFile(fileName: String): OutputStream = {
+    createDirIfNotExists(rootPath)
     val localFile = new File(rootPath, fileName)
     new FileOutputStream(localFile)
   }
@@ -70,4 +71,10 @@
     }
     is
   }
+
+  private def createDirIfNotExists(file: File): Unit = {
+    if (!file.exists()) {
+      FileUtils.forceMkdir(file)
+    }
+  }
 }
\ No newline at end of file
diff --git a/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala b/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
index ae4cf46..cd1a13f 100644
--- a/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
+++ b/gearpump-hadoop/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStore.scala
@@ -29,15 +29,12 @@
  * DFSJarStore store the uploaded jar on HDFS
  */
 class DFSJarStore extends JarStore {
-  private var rootPath: Path = null
+  private var rootPath: Path = _
   override val scheme: String = "hdfs"
 
   override def init(config: Config): Unit = {
     rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
-    val fs = rootPath.getFileSystem(new Configuration())
-    if (!fs.exists(rootPath)) {
-      fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
-    }
+    createDirIfNotExists(rootPath)
   }
 
   /**
@@ -47,6 +44,7 @@
    * @return OutputStream returns a stream into which the data can be written.
    */
   override def createFile(fileName: String): OutputStream = {
+    createDirIfNotExists(rootPath)
     val filePath = new Path(rootPath, fileName)
     val fs = filePath.getFileSystem(new Configuration())
     fs.create(filePath)
@@ -63,4 +61,11 @@
     val fs = filePath.getFileSystem(new Configuration())
     fs.open(filePath)
   }
+
+  private def createDirIfNotExists(path: Path): Unit = {
+    val fs = path.getFileSystem(new Configuration())
+    if (!fs.exists(path)) {
+      fs.mkdirs(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    }
+  }
 }