Refactoring
diff --git a/airavata-kubernetes/modules/task-api/pom.xml b/airavata-kubernetes/modules/helix-task-api/pom.xml
similarity index 76%
rename from airavata-kubernetes/modules/task-api/pom.xml
rename to airavata-kubernetes/modules/helix-task-api/pom.xml
index d2621fc..e2363e6 100644
--- a/airavata-kubernetes/modules/task-api/pom.xml
+++ b/airavata-kubernetes/modules/helix-task-api/pom.xml
@@ -10,32 +10,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>task-api</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>api-resource</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>compute-resource-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-freemarker</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
+ <artifactId>helix-task-api</artifactId>
<build>
<plugins>
@@ -51,4 +26,32 @@
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ <version>3.0.2.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>api-resource</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>compute-resource-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.1</version>
+ </dependency>
+ </dependencies>
+
</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
similarity index 98%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java
rename to airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
index 8e581ea..e7290c2 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/AbstractTask.java
+++ b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/AbstractTask.java
@@ -1,4 +1,4 @@
-package org.apache.airavata.helix.tasks;
+package org.apache.airavata.helix.api;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
similarity index 94%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java
rename to airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
index cbbc300..77073ff 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixParticipant.java
+++ b/airavata-kubernetes/modules/helix-task-api/src/main/java/org/apache/airavata/helix/api/HelixParticipant.java
@@ -1,8 +1,5 @@
-package org.apache.airavata.helix;
+package org.apache.airavata.helix.api;
-import org.apache.airavata.helix.tasks.command.CommandTask;
-import org.apache.airavata.helix.tasks.DataCollectingTask;
-import org.apache.airavata.helix.tasks.DataPushingTask;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.apache.helix.InstanceType;
import org.apache.helix.examples.OnlineOfflineStateModelFactory;
@@ -13,15 +10,12 @@
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.web.client.RestTemplate;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java
deleted file mode 100644
index 2b96328..0000000
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixCluster.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.airavata.helix;
-
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.OnlineOfflineSMD;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class HelixCluster {
-
- private static final Logger logger = LogManager.getLogger(HelixCluster.class);
-
- private String zkAddress;
- private String clusterName;
- private int numPartitions;
-
- private ZkClient zkClient;
- private ZKHelixAdmin zkHelixAdmin;
-
- public HelixCluster(String zkAddress, String clusterName, int numPartitions) {
- this.zkAddress = zkAddress;
- this.clusterName = clusterName;
- this.numPartitions = numPartitions;
-
- zkClient = new ZkClient(this.zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
- zkHelixAdmin = new ZKHelixAdmin(zkClient);
- }
-
- public void setup() {
- zkHelixAdmin.addCluster(clusterName, true);
- zkHelixAdmin.addStateModelDef(clusterName, OnlineOfflineSMD.name, OnlineOfflineSMD.build());
- logger.info("Cluster: " + clusterName + ", has been added.");
- }
-
- public void disconnect() {
- if (zkClient != null) {
- zkClient.close();
- }
- }
-}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java
deleted file mode 100644
index e09f307..0000000
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixManager.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.apache.airavata.helix;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class HelixManager {
- public static void main(String args[]) {
- HelixCluster helixCluster = new HelixCluster("localhost:2199", "AiravataDemoCluster", 1);
- helixCluster.setup();
- }
-}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java
deleted file mode 100644
index 6866af3..0000000
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/WorkflowManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.airavata.helix;
-
-import org.apache.airavata.helix.tasks.command.CommandTask;
-import org.apache.airavata.helix.tasks.DataCollectingTask;
-import org.apache.airavata.helix.tasks.DataPushingTask;
-import org.apache.helix.*;
-import org.apache.helix.task.*;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class WorkflowManager {
-
- private static final Logger logger = LogManager.getLogger(WorkflowManager.class);
-
-
- public static void main(String args[]) {
- Workflow workflow = createWorkflow().build();
-
- org.apache.helix.HelixManager helixManager = HelixManagerFactory.getZKHelixManager("AiravataDemoCluster", "Admin",
- InstanceType.SPECTATOR, "localhost:2199");
-
- try {
- helixManager.connect();
- TaskDriver taskDriver = new TaskDriver(helixManager);
-
- Runtime.getRuntime().addShutdownHook(
- new Thread() {
- @Override
- public void run() {
- helixManager.disconnect();
- }
- }
- );
-
- taskDriver.start(workflow);
- logger.info("Started workflow");
- TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED);
- System.out.println("Task state " + taskState.name());
-
- } catch (Exception ex) {
- logger.error("Error in connect() for Admin, reason: " + ex, ex);
- }
- }
-
- private static Workflow.Builder createWorkflow() {
- List<TaskConfig> downloadDataTasks = new ArrayList<>();
- downloadDataTasks.add(new TaskConfig.Builder().setTaskId("Download_Task").setCommand(DataCollectingTask.NAME).build());
-
- List<TaskConfig> commandExecuteTasks = new ArrayList<>();
- commandExecuteTasks.add(new TaskConfig.Builder().setTaskId("Command_Task").setCommand(CommandTask.NAME).build());
-
- List<TaskConfig> pushDataTasks = new ArrayList<>();
- pushDataTasks.add(new TaskConfig.Builder().setTaskId("Push_Task").setCommand(DataPushingTask.NAME).build());
-
- JobConfig.Builder downloadDataJob = new JobConfig.Builder()
- .addTaskConfigs(downloadDataTasks)
- .setMaxAttemptsPerTask(3).setInstanceGroupTag("p1");
-
- JobConfig.Builder commandExecuteJob = new JobConfig.Builder()
- .addTaskConfigs(commandExecuteTasks)
- .setMaxAttemptsPerTask(3).setInstanceGroupTag("p2");
-
- JobConfig.Builder dataPushJob = new JobConfig.Builder()
- .addTaskConfigs(pushDataTasks)
- .setMaxAttemptsPerTask(3).setInstanceGroupTag("p3");
-
- Workflow.Builder workflow = new Workflow.Builder("Airavata_Workflow3").setExpiry(0);
- workflow.addJob("downloadDataJob", downloadDataJob);
- workflow.addJob("commandExecuteJob", commandExecuteJob);
- workflow.addJob("dataPushJob", dataPushJob);
-
- workflow.addParentChildDependency("downloadDataJob", "commandExecuteJob");
- workflow.addParentChildDependency("downloadDataJob", "dataPushJob");
-
- return workflow;
- }
-}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java
deleted file mode 100644
index 594ce35..0000000
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataCollectingTask.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.airavata.helix.tasks;
-
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.UserContentStore;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class DataCollectingTask extends UserContentStore implements Task {
-
- public static final String NAME = "DATA_COLLECTING";
-
- public DataCollectingTask(TaskCallbackContext callbackContext) {
- }
-
- public TaskResult run() {
- System.out.println("Executing data collecting");
- putUserContent("Key", "Hooo", Scope.WORKFLOW);
-
- return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
-
- }
-
- public void cancel() {
-
- }
-}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java b/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java
deleted file mode 100644
index fc7ee8b..0000000
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/DataPushingTask.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.airavata.helix.tasks;
-
-import org.apache.helix.task.Task;
-import org.apache.helix.task.TaskCallbackContext;
-import org.apache.helix.task.TaskResult;
-import org.apache.helix.task.UserContentStore;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class DataPushingTask extends UserContentStore implements Task {
-
- public static final String NAME = "DATA_PUSHING";
-
- public DataPushingTask(TaskCallbackContext callbackContext) {
- }
-
- public TaskResult run() {
- System.out.println("Executing data pushing");
- try {
- Thread.currentThread().sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("Continuing");
- String key2 = getUserContent("Key2", Scope.WORKFLOW);
-
- System.out.println(key2);
- return new TaskResult(TaskResult.Status.COMPLETED, "HelixTaskB completed!");
-
- }
-
- public void cancel() {
-
- }
-}
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties b/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties
deleted file mode 100644
index 5e31e3c..0000000
--- a/airavata-kubernetes/modules/helix-tasks/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,9 +0,0 @@
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=INFO, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/pom.xml b/airavata-kubernetes/modules/microservices/helix-controller/pom.xml
similarity index 82%
rename from airavata-kubernetes/modules/helix-tasks/pom.xml
rename to airavata-kubernetes/modules/microservices/helix-controller/pom.xml
index 568c922..9fe9f06 100644
--- a/airavata-kubernetes/modules/helix-tasks/pom.xml
+++ b/airavata-kubernetes/modules/microservices/helix-controller/pom.xml
@@ -6,11 +6,11 @@
<artifactId>airavata-kubernetes</artifactId>
<groupId>org.apache.airavata</groupId>
<version>1.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
+ <relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>helix-tasks</artifactId>
+ <artifactId>helix-controller</artifactId>
<dependencies>
<dependency>
@@ -72,18 +72,4 @@
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java b/airavata-kubernetes/modules/microservices/helix-controller/src/main/java/org/apache/airavata/helix/HelixController.java
similarity index 98%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java
rename to airavata-kubernetes/modules/microservices/helix-controller/src/main/java/org/apache/airavata/helix/HelixController.java
index 8237d43..50fd82b 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/HelixController.java
+++ b/airavata-kubernetes/modules/microservices/helix-controller/src/main/java/org/apache/airavata/helix/HelixController.java
@@ -1,6 +1,5 @@
package org.apache.airavata.helix;
-import org.apache.helix.*;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -81,5 +80,4 @@
HelixController helixController = new HelixController("localhost:2199", "AiravataDemoCluster", "AiravataController");
helixController.start();
}
-
}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/pom.xml
deleted file mode 100644
index eff73f5..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/pom.xml
+++ /dev/null
@@ -1,161 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>airavata-kubernetes</artifactId>
- <groupId>org.apache.airavata</groupId>
- <version>1.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>command-executing-task</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>compute-resource-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>api-resource</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>task-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-freemarker</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
-
- <profile>
- <id>jar</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <properties>
- <artifact-packaging>jar</artifact-packaging>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.3.RELEASE</version>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!-- Create a docker image that runs the executable jar-->
- <plugin>
- <groupId>com.spotify</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>1.0.0</version>
- <configuration>
- <imageName>${docker.image.prefix}/job-submission-task</imageName>
- <baseImage>java:openjdk-8-jdk-alpine</baseImage>
- <entryPoint>["java","-jar","/${project.build.finalName}.jar"]</entryPoint>
- <resources>
- <resource>
- <targetPath>/</targetPath>
- <directory>${project.build.directory}</directory>
- <include>${project.build.finalName}.jar</include>
- </resource>
- </resources>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
- <id>war</id>
- <properties>
- <artifact-packaging>war</artifact-packaging>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-tomcat</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.3.RELEASE</version>
- <configuration>
- <!-- this will get rid of version info from war file name -->
- <finalName>job-submission</finalName>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/Application.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/Application.java
deleted file mode 100644
index d11a7c5..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/Application.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.job;
-
-import org.apache.airavata.k8s.task.api.messaging.ReceiverConfig;
-import org.apache.airavata.k8s.task.api.messaging.SenderConfig;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@SpringBootApplication
-@Configuration
-@ComponentScan(basePackages = "org.apache.airavata.*")
-@EnableAutoConfiguration
-@Import({ReceiverConfig.class, SenderConfig.class})
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
-
- @Bean
- public RestTemplate restTemplate(RestTemplateBuilder builder) {
- return builder.build();
- }
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/CommandTaskInfo.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/CommandTaskInfo.java
deleted file mode 100644
index cbffce3..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/CommandTaskInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.airavata.k8s.task.job;
-
-import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-
-import java.util.Arrays;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class CommandTaskInfo {
-
- public static final String COMMAND = "command";
- public static final String ARGUMENTS = "arguments";
- public static final String STD_OUT_PATH = "std_out_path";
- public static final String STD_ERR_PATH = "std_err_path";
- public static final String COMPUTE_RESOURCE = "compute_resource";
-
- public static TaskTypeResource getTaskType() {
- TaskTypeResource taskTypeResource = new TaskTypeResource();
- taskTypeResource.setName("Command Execute");
- taskTypeResource.setTopicName("airavata-command");
- taskTypeResource.setIcon("assets/icons/ssh.png");
- taskTypeResource.getInputTypes().addAll(
- Arrays.asList(
- new TaskInputTypeResource()
- .setName(COMMAND)
- .setType("String")
- .setDefaultValue(""),
- new TaskInputTypeResource()
- .setName(ARGUMENTS)
- .setType("String"),
- new TaskInputTypeResource()
- .setName(COMPUTE_RESOURCE)
- .setType("Long"),
- new TaskInputTypeResource()
- .setName(STD_OUT_PATH)
- .setType("String"),
- new TaskInputTypeResource()
- .setName(STD_ERR_PATH)
- .setType("String")));
-
- taskTypeResource.getOutPorts().addAll(
- Arrays.asList(
- new TaskOutPortTypeResource()
- .setName("Out")
- .setOrder(0),
- new TaskOutPortTypeResource()
- .setName("Error")
- .setOrder(1))
- );
-
- return taskTypeResource;
- }
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/config/RestConfig.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/config/RestConfig.java
deleted file mode 100644
index acbf74f..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/config/RestConfig.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.airavata.k8s.task.job.config;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class RestConfig {
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
deleted file mode 100644
index 2fdb337..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/java/org/apache/airavata/k8s/task/job/service/TaskExecutionService.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.job.service;
-
-import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
-import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-import org.apache.airavata.k8s.compute.api.ExecutionResult;
-import org.apache.airavata.k8s.task.api.AbstractTaskExecutionService;
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.messaging.KafkaSender;
-import org.apache.airavata.k8s.task.job.CommandTaskInfo;
-import org.springframework.stereotype.Service;
-import org.springframework.web.client.RestTemplate;
-
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Service
-public class TaskExecutionService extends AbstractTaskExecutionService {
-
- public TaskExecutionService(RestTemplate restTemplate, KafkaSender kafkaSender) {
- super(restTemplate, null, 10);
- }
-
- @Override
- public TaskTypeResource getType() {
- return CommandTaskInfo.getTaskType();
- }
-
- @Override
- public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
-
- taskContext.getLocalContext().put(CommandTaskInfo.COMMAND, findInput(taskContext, taskResource, CommandTaskInfo.COMMAND, false));
- taskContext.getLocalContext().put(CommandTaskInfo.ARGUMENTS, findInput(taskContext, taskResource, CommandTaskInfo.ARGUMENTS, true));
- taskContext.getLocalContext().put(CommandTaskInfo.STD_OUT_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_OUT_PATH, false));
- taskContext.getLocalContext().put(CommandTaskInfo.STD_ERR_PATH, findInput(taskContext, taskResource, CommandTaskInfo.STD_ERR_PATH, false));
-
- String computeId = findInput(taskContext, taskResource, CommandTaskInfo.COMPUTE_RESOURCE, false);
- taskContext.getLocalContext().put(CommandTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
- + "/compute/" + Long.parseLong(computeId), ComputeResource.class));
-
- }
-
- @Override
- public void executeTask(TaskResource taskResource, TaskContext taskContext) {
-
- try {
- String command = (String) taskContext.getLocalContext().get(CommandTaskInfo.COMMAND);
- String arguments = (String) taskContext.getLocalContext().get(CommandTaskInfo.ARGUMENTS);
- ComputeResource computeResource = (ComputeResource) taskContext.getLocalContext().get(CommandTaskInfo.COMPUTE_RESOURCE);
- String stdOutPath = (String) taskContext.getLocalContext().get(CommandTaskInfo.STD_OUT_PATH);
- String stdErrPath = (String) taskContext.getLocalContext().get(CommandTaskInfo.STD_ERR_PATH);
- String stdOutSuffix = " > " + stdOutPath + " 2> " + stdErrPath;
-
- publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
-
- String finalCommand = command + (arguments != null ? arguments : "") + stdOutSuffix;
-
- System.out.println("Executing command " + finalCommand);
-
- ExecutionResult executionResult = fetchComputeResourceOperation(computeResource).executeCommand(finalCommand);
-
- if (executionResult.getExitStatus() == 0) {
- finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
- } else if (executionResult.getExitStatus() == -1) {
- publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process didn't exit successfully");
- } else {
- publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, "Process exited with error status " + executionResult.getExitStatus());
- }
-
- } catch (Exception e) {
-
- e.printStackTrace();
- publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage());
- }
- }
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
deleted file mode 100644
index 188693e..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-server.port = 8491
-api.server.url = api-server.default.svc.cluster.local:8080
-task.group.name = command-execution
-task.event.topic.name = airavata-task-event
-task.read.topic.name = airavata-command
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.yml b/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.yml
deleted file mode 100644
index 069dd61..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/command-executing-task/src/main/resources/application.yml
+++ /dev/null
@@ -1,4 +0,0 @@
-kafka:
- bootstrap-servers: kafka.default.svc.cluster.local:9092
- topic:
- helloworld: helloworld.t
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/command-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/command-task/pom.xml
new file mode 100644
index 0000000..a08c014
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>command-task</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>helix-task-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
similarity index 97%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java
rename to airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
index 5aae5b7..16c600e 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/CommandTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/CommandTask.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.tasks.command;
+package org.apache.airavata.helix.task.command;
-import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.helix.api.AbstractTask;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
similarity index 87%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java
rename to airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
index f852d8f..742ed88 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/command/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/command-task/src/main/java/org/apache/airavata/helix/task/command/Participant.java
@@ -1,8 +1,6 @@
-package org.apache.airavata.helix.tasks.command;
+package org.apache.airavata.helix.task.command;
-import org.apache.airavata.helix.HelixParticipant;
-import org.apache.airavata.helix.tasks.DataCollectingTask;
-import org.apache.airavata.helix.tasks.DataPushingTask;
+import org.apache.airavata.helix.api.HelixParticipant;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/pom.xml
deleted file mode 100644
index fe50fe8..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/pom.xml
+++ /dev/null
@@ -1,161 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>airavata-kubernetes</artifactId>
- <groupId>org.apache.airavata</groupId>
- <version>1.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>data-collecting-task</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>compute-resource-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>api-resource</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>task-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-freemarker</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
-
- <profile>
- <id>jar</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <properties>
- <artifact-packaging>jar</artifact-packaging>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.3.RELEASE</version>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!-- Create a docker image that runs the executable jar-->
- <plugin>
- <groupId>com.spotify</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>1.0.0</version>
- <configuration>
- <imageName>${docker.image.prefix}/egress-staging-task</imageName>
- <baseImage>java:openjdk-8-jdk-alpine</baseImage>
- <entryPoint>["java","-jar","/${project.build.finalName}.jar"]</entryPoint>
- <resources>
- <resource>
- <targetPath>/</targetPath>
- <directory>${project.build.directory}</directory>
- <include>${project.build.finalName}.jar</include>
- </resource>
- </resources>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
- <id>war</id>
- <properties>
- <artifact-packaging>war</artifact-packaging>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-tomcat</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.3.RELEASE</version>
- <configuration>
- <!-- this will get rid of version info from war file name -->
- <finalName>egress-staging</finalName>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/Application.java b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/Application.java
deleted file mode 100644
index 1642d18..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/Application.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apacher.airavata.k8s.task.egress;
-
-import org.apache.airavata.k8s.task.api.messaging.ReceiverConfig;
-import org.apache.airavata.k8s.task.api.messaging.SenderConfig;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@SpringBootApplication
-@Configuration
-@ComponentScan
-@EnableAutoConfiguration
-@Import({ReceiverConfig.class, SenderConfig.class})
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
-
- @Bean
- public RestTemplate restTemplate(RestTemplateBuilder builder) {
- return builder.build();
- }
-}
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/DataCollectingTaskInfo.java b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/DataCollectingTaskInfo.java
deleted file mode 100644
index 73a354f..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/DataCollectingTaskInfo.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.apacher.airavata.k8s.task.egress;
-
-import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskOutPortTypeResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-
-import java.util.Arrays;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class DataCollectingTaskInfo {
- public static final String REMOTE_SOURCE_PATH = "remote_source_path";
- public static final String COMPUTE_RESOURCE = "compute_resource";
- public static final String IDENTIFIER = "identifier";
-
- public static TaskTypeResource getTaskType() {
- TaskTypeResource taskTypeResource = new TaskTypeResource();
- taskTypeResource.setName("Data Collect");
- taskTypeResource.setTopicName("airavata-data-collect");
- taskTypeResource.setIcon("assets/icons/copy.png");
- taskTypeResource.getInputTypes().addAll(
- Arrays.asList(
- new TaskInputTypeResource()
- .setName(REMOTE_SOURCE_PATH)
- .setType("String")
- .setDefaultValue(""),
- new TaskInputTypeResource()
- .setName(IDENTIFIER)
- .setType("String"),
- new TaskInputTypeResource()
- .setName(COMPUTE_RESOURCE)
- .setType("Long")));
-
- taskTypeResource.getOutPorts().addAll(
- Arrays.asList(
- new TaskOutPortTypeResource()
- .setName("Out")
- .setOrder(0),
- new TaskOutPortTypeResource()
- .setName("Error")
- .setOrder(1))
- );
-
- return taskTypeResource;
- }
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
deleted file mode 100644
index c3ed302..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/java/org/apacher/airavata/k8s/task/egress/service/TaskExecutionService.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apacher.airavata.k8s.task.egress.service;
-
-import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
-import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-import org.apache.airavata.k8s.task.api.AbstractTaskExecutionService;
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.messaging.KafkaSender;
-import org.apacher.airavata.k8s.task.egress.DataCollectingTaskInfo;
-import org.springframework.core.io.FileSystemResource;
-import org.springframework.http.*;
-import org.springframework.stereotype.Service;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.web.client.RestTemplate;
-
-import java.util.UUID;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Service
-public class TaskExecutionService extends AbstractTaskExecutionService {
-
- public TaskExecutionService(RestTemplate restTemplate, KafkaSender kafkaSender) {
- super(restTemplate, kafkaSender, 10);
- }
-
- @Override
- public TaskTypeResource getType() {
- return DataCollectingTaskInfo.getTaskType();
- }
-
- @Override
- public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
-
- taskContext.getLocalContext().put(DataCollectingTaskInfo.REMOTE_SOURCE_PATH, findInput(taskContext, taskResource, DataCollectingTaskInfo.REMOTE_SOURCE_PATH, false));
- taskContext.getLocalContext().put(DataCollectingTaskInfo.IDENTIFIER, findInput(taskContext, taskResource, DataCollectingTaskInfo.IDENTIFIER, false));
-
- String computeId = findInput(taskContext, taskResource, DataCollectingTaskInfo.COMPUTE_RESOURCE, false);
- taskContext.getLocalContext().put(DataCollectingTaskInfo.COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
- + "/compute/" + Long.parseLong(computeId), ComputeResource.class));
-
- }
-
- public void executeTask(TaskResource taskResource, TaskContext taskContext) {
-
- try {
-
- ComputeResource computeResource = (ComputeResource) taskContext.getLocalContext().get(DataCollectingTaskInfo.COMPUTE_RESOURCE);
- String identifier = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.IDENTIFIER);
- String remoteSourcePath = (String) taskContext.getLocalContext().get(DataCollectingTaskInfo.REMOTE_SOURCE_PATH);
-
- publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
-
- String temporaryFile = "/tmp/" + UUID.randomUUID().toString();
- System.out.println("Downloading " + remoteSourcePath + " to " + temporaryFile + " from compute resource "
- + computeResource.getName());
-
- fetchComputeResourceOperation(computeResource).transferDataOut(remoteSourcePath, temporaryFile, "SCP");
-
- LinkedMultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
- map.add("file", new FileSystemResource(temporaryFile));
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.MULTIPART_FORM_DATA);
-
- HttpEntity<LinkedMultiValueMap<String, Object>> requestEntity = new HttpEntity<>(map, headers);
-
- System.out.println("Uploading data file with task id " + taskResource.getId() + " and identifier "
- + identifier + " to data store");
-
- getRestTemplate().exchange("http://" + getApiServerUrl() + "/data/" + taskResource.getId()+ "/"
- + identifier + "/upload", HttpMethod.POST, requestEntity, Long.class);
-
- finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
-
- } catch (Exception e) {
- e.printStackTrace();
- publishTaskStatus(taskContext, TaskStatusResource.State.FAILED, e.getMessage());
-
- }
- }
-
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/resources/application.properties
deleted file mode 100644
index 3b443c2..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/resources/application.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-server.port = 8691
-api.server.url = api-server.default.svc.cluster.local:8080
-task.group.name = egress-staging
-task.event.topic.name = airavata-task-event
-task.read.topic.name = airavata-task-egress-staging
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/resources/application.yml b/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/resources/application.yml
deleted file mode 100644
index 069dd61..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-collecting-task/src/main/resources/application.yml
+++ /dev/null
@@ -1,4 +0,0 @@
-kafka:
- bootstrap-servers: kafka.default.svc.cluster.local:9092
- topic:
- helloworld: helloworld.t
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-in-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/data-in-task/pom.xml
new file mode 100644
index 0000000..866ef14
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>data-in-task</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>helix-task-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
similarity index 97%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java
rename to airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
index 9d4a27e..00aeadc 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/DataInputTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/DataInputTask.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.tasks.datain;
+package org.apache.airavata.helix.task.datain;
-import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.helix.api.AbstractTask;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
similarity index 90%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java
rename to airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
index f06f56b..cfbe86a 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/datain/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-in-task/src/main/java/org/apache/airavata/helix/task/datain/Participant.java
@@ -1,7 +1,6 @@
-package org.apache.airavata.helix.tasks.datain;
+package org.apache.airavata.helix.task.datain;
-import org.apache.airavata.helix.HelixParticipant;
-import org.apache.airavata.helix.tasks.command.CommandTask;
+import org.apache.airavata.helix.api.HelixParticipant;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-out-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/data-out-task/pom.xml
new file mode 100644
index 0000000..e88a3a0
--- /dev/null
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>airavata-kubernetes</artifactId>
+ <groupId>org.apache.airavata</groupId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>data-out-task</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>helix-task-api</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.helix</groupId>
+ <artifactId>helix-core</artifactId>
+ <version>0.6.7</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/datain/dataout/DataOutputTask.java
similarity index 97%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java
rename to airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/datain/dataout/DataOutputTask.java
index fb696b2..8426c90 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/DataOutputTask.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/datain/dataout/DataOutputTask.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.tasks.dataout;
+package org.apache.airavata.helix.task.datain.dataout;
-import org.apache.airavata.helix.tasks.AbstractTask;
+import org.apache.airavata.helix.api.AbstractTask;
import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
import org.apache.airavata.k8s.api.resources.task.type.TaskInputTypeResource;
diff --git a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/datain/dataout/Participant.java
similarity index 92%
rename from airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java
rename to airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/datain/dataout/Participant.java
index b226511..8177eb4 100644
--- a/airavata-kubernetes/modules/helix-tasks/src/main/java/org/apache/airavata/helix/tasks/dataout/Participant.java
+++ b/airavata-kubernetes/modules/microservices/tasks/data-out-task/src/main/java/org/apache/airavata/helix/task/datain/dataout/Participant.java
@@ -1,6 +1,6 @@
-package org.apache.airavata.helix.tasks.dataout;
+package org.apache.airavata.helix.task.datain.dataout;
-import org.apache.airavata.helix.HelixParticipant;
+import org.apache.airavata.helix.api.HelixParticipant;
import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/pom.xml b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/pom.xml
deleted file mode 100644
index 533914f..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/pom.xml
+++ /dev/null
@@ -1,161 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>airavata-kubernetes</artifactId>
- <groupId>org.apache.airavata</groupId>
- <version>1.0-SNAPSHOT</version>
- <relativePath>../../../../pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>data-pushing-task</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>compute-resource-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>api-resource</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>task-api</artifactId>
- <version>1.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-freemarker</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>${java.version}</source>
- <target>${java.version}</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
-
- <profile>
- <id>jar</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <properties>
- <artifact-packaging>jar</artifact-packaging>
- </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.3.RELEASE</version>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!-- Create a docker image that runs the executable jar-->
- <plugin>
- <groupId>com.spotify</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>1.0.0</version>
- <configuration>
- <imageName>${docker.image.prefix}/ingress-staging-task</imageName>
- <baseImage>java:openjdk-8-jdk-alpine</baseImage>
- <entryPoint>["java","-jar","/${project.build.finalName}.jar"]</entryPoint>
- <resources>
- <resource>
- <targetPath>/</targetPath>
- <directory>${project.build.directory}</directory>
- <include>${project.build.finalName}.jar</include>
- </resource>
- </resources>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
- <id>war</id>
- <properties>
- <artifact-packaging>war</artifact-packaging>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-tomcat</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>1.4.3.RELEASE</version>
- <configuration>
- <!-- this will get rid of version info from war file name -->
- <finalName>ingress-staging</finalName>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-</project>
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/Application.java b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/Application.java
deleted file mode 100644
index b51e719..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/Application.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.ingress;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.web.client.RestTemplateBuilder;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@SpringBootApplication
-@Configuration
-@ComponentScan
-public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
-
- @Bean
- public RestTemplate restTemplate(RestTemplateBuilder builder) {
- return builder.build();
- }
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
deleted file mode 100644
index 88de3a5..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/java/org/apache/airavata/k8s/task/ingress/service/TaskExecutionService.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.ingress.service;
-
-import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
-import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-import org.apache.airavata.k8s.task.api.AbstractTaskExecutionService;
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.messaging.KafkaSender;
-import org.springframework.stereotype.Service;
-import org.springframework.web.client.RestTemplate;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Service
-public class TaskExecutionService extends AbstractTaskExecutionService {
-
- private final String COMPUTE_RESOURCE = "compute_resource";
- private final String REMOTE_TARGET_PATH = "remote_target_path";
- private final String DATA_LOCATION_ID = "data_location_id";
-
- public TaskExecutionService(RestTemplate restTemplate, KafkaSender kafkaSender) {
- super(restTemplate, kafkaSender, 10);
- }
-
- @Override
- public TaskTypeResource getType() {
- return null;
- }
-
- @Override
- public void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception {
-
- taskContext.getLocalContext().put(DATA_LOCATION_ID, findInput(taskContext, taskResource, DATA_LOCATION_ID, false));
- taskContext.getLocalContext().put(REMOTE_TARGET_PATH, findInput(taskContext, taskResource, REMOTE_TARGET_PATH, false));
-
- String computeId = findInput(taskContext, taskResource, COMPUTE_RESOURCE, false);
- taskContext.getLocalContext().put(COMPUTE_RESOURCE, this.getRestTemplate().getForObject("http://" + this.getApiServerUrl()
- + "/compute/" + Long.parseLong(computeId), ComputeResource.class));
- }
-
- @Override
- public void executeTask(TaskResource taskResource, TaskContext taskContext) {
-
- String remoteTargetPath = (String) taskContext.getLocalContext().get(REMOTE_TARGET_PATH);
- String dataLocationId = (String) taskContext.getLocalContext().get(DATA_LOCATION_ID);
- ComputeResource computeResource = (ComputeResource) taskContext.getLocalContext().get(COMPUTE_RESOURCE);
-
- try {
- publishTaskStatus(taskContext, TaskStatusResource.State.EXECUTING);
- fetchComputeResourceOperation(computeResource).transferDataIn(dataLocationId, remoteTargetPath, "SCP");
- finishTaskExecution(taskContext, taskResource, "Out", TaskStatusResource.State.COMPLETED, "Task completed");
-
-
- } catch (Exception e) {
-
- e.printStackTrace();
- publishTaskStatus(taskContext, TaskStatusResource.State.FAILED);
- }
- }
-}
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/resources/application.properties
deleted file mode 100644
index 6ec9766..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/resources/application.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-server.port = 8291
-api.server.url = api-server.default.svc.cluster.local:8080
-task.group.name = ingress-staging
-task.event.topic.name = airavata-task-event
-task.read.topic.name = airavata-task-ingress-staging
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/resources/application.yml b/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/resources/application.yml
deleted file mode 100644
index 069dd61..0000000
--- a/airavata-kubernetes/modules/microservices/tasks/data-pushing-task/src/main/resources/application.yml
+++ /dev/null
@@ -1,4 +0,0 @@
-kafka:
- bootstrap-servers: kafka.default.svc.cluster.local:9092
- topic:
- helloworld: helloworld.t
\ No newline at end of file
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml b/airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml
similarity index 98%
rename from airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml
index 8f502c8..245716a 100644
--- a/airavata-kubernetes/modules/microservices/task-scheduler/pom.xml
+++ b/airavata-kubernetes/modules/microservices/workflow-scheduler/pom.xml
@@ -31,7 +31,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>task-scheduler</artifactId>
+ <artifactId>workflow-scheduler</artifactId>
<dependencies>
<dependency>
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/Application.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/Application.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/Application.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/Application.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/HelixWorkflowManager.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/core/ProcessLifeCycleManager.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaReceiver.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/KafkaSender.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/ReceiverConfig.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/messaging/SenderConfig.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/HelixWorkflowService.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/java/org/apache/airavata/k8s/gfac/service/WorkerService.java
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/resources/application.properties
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.properties
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/resources/application.properties
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.yml b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/resources/application.yml
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/application.yml
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/resources/application.yml
diff --git a/airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties b/airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/resources/log4j.properties
similarity index 100%
rename from airavata-kubernetes/modules/microservices/task-scheduler/src/main/resources/log4j.properties
rename to airavata-kubernetes/modules/microservices/workflow-scheduler/src/main/resources/log4j.properties
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
deleted file mode 100644
index 69837a6..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/AbstractTaskExecutionService.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.apache.airavata.k8s.task.api;
-
-import org.apache.airavata.k8s.api.resources.compute.ComputeResource;
-import org.apache.airavata.k8s.api.resources.task.TaskInputResource;
-import org.apache.airavata.k8s.api.resources.task.TaskOutPortResource;
-import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
-import org.apache.airavata.k8s.api.resources.task.type.TaskTypeResource;
-import org.apache.airavata.k8s.compute.api.ComputeOperations;
-import org.apache.airavata.k8s.compute.impl.MockComputeOperation;
-import org.apache.airavata.k8s.compute.impl.SSHComputeOperations;
-import org.apache.airavata.k8s.task.api.messaging.KafkaSender;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.web.client.RestTemplate;
-
-import javax.annotation.PostConstruct;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public abstract class AbstractTaskExecutionService {
-
- private final ExecutorService executorService;
-
- private final RestTemplate restTemplate;
- private final KafkaSender kafkaSender;
-
- @Value("${api.server.url}")
- private String apiServerUrl;
-
- @Value("${task.event.topic.name}")
- private String taskEventPublishTopic;
-
- public AbstractTaskExecutionService(RestTemplate restTemplate, KafkaSender kafkaSender, int concurrentTasks) {
- this.restTemplate = restTemplate;
- this.kafkaSender = kafkaSender;
- executorService = Executors.newFixedThreadPool(concurrentTasks);
- }
-
- @PostConstruct
- public void init() {
- getRestTemplate().postForObject("http://" + apiServerUrl + "/taskType", getType(), Long.class);
- }
-
- public abstract TaskTypeResource getType();
-
- public void executeTaskAsync(TaskContext taskContext) {
-
- System.out.println("Executing task " + taskContext.getTaskId());
- TaskResource taskResource = this.restTemplate.getForObject("http://" + apiServerUrl + "/task/" + taskContext.getTaskId(), TaskResource.class);
-
- publishTaskStatus(taskContext, TaskStatusResource.State.SCHEDULED);
-
- this.executorService.execute(() -> {
- try {
- initializeParameters(taskResource, taskContext);
- executeTask(taskResource, taskContext);
- } catch (Exception e) {
- e.printStackTrace();
- // Ignore silently as this is already handled
- // TODO add a new exception type
- }
- });
- }
-
- public ComputeOperations fetchComputeResourceOperation(ComputeResource computeResource) throws Exception {
- ComputeOperations operations;
- if ("SSH".equals(computeResource.getCommunicationType())) {
- operations = new SSHComputeOperations(computeResource.getHost(), computeResource.getUserName(), computeResource.getPassword());
- } else if ("Mock".equals(computeResource.getCommunicationType())) {
- operations = new MockComputeOperation(computeResource.getHost());
- } else {
- throw new Exception("No compatible communication method {" + computeResource.getCommunicationType() + "} not found for compute resource " + computeResource.getName());
- }
- return operations;
- }
-
- public String findInput(TaskContext taskContext, TaskResource taskResource, String name, boolean optional) throws Exception {
-
- Optional<TaskInputResource> inputResource = taskResource.getInputs()
- .stream()
- .filter(input -> name.equals(input.getValue()))
- .findFirst();
-
- if (inputResource.isPresent()) {
- return inputResource.get().getValue();
-
- } else {
- if (!optional) {
- publishTaskStatus(taskContext, TaskStatusResource.State.FAILED,
- name + " is not available in inputs");
- throw new Exception(name + " is not available in inputs");
- } else {
- return null;
- }
- }
- }
-
- public abstract void initializeParameters(TaskResource taskResource, TaskContext taskContext) throws Exception;
- public abstract void executeTask(TaskResource taskResource, TaskContext taskContext);
-
- public void publishTaskStatus(TaskContext taskContext, int status) {
- publishTaskStatus(taskContext, status, "");
- }
-
- public void publishTaskStatus(TaskContext taskContext, int status, String reason) {
- taskContext.setStatus(status);
- taskContext.setReason(reason);
-
- this.kafkaSender.send(this.taskEventPublishTopic, taskContext);
- }
-
- public void finishTaskExecution(TaskContext taskContext, TaskResource task, String outPortName, int status, String reason) throws Exception {
- Optional<TaskOutPortResource> selectedOutPort = task.getOutPorts().stream().filter(outPort -> outPort.getName().equals(outPortName)).findFirst();
- if (!selectedOutPort.isPresent()) {
- throw new Exception("Selected out port " + outPortName + " does not exist in the task " + task.getName());
- }
-
- taskContext.setStatus(status);
- taskContext.setReason(reason);
- taskContext.setOutPortId(selectedOutPort.get().getId());
- }
-
- public RestTemplate getRestTemplate() {
- return restTemplate;
- }
-
- public String getApiServerUrl() {
- return apiServerUrl;
- }
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
deleted file mode 100644
index e94beab..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContext.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package org.apache.airavata.k8s.task.api;
-
-import org.apache.airavata.k8s.api.resources.task.TaskResource;
-import org.apache.airavata.k8s.api.resources.task.TaskStatusResource;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class TaskContext implements Serializable {
-
- private long processId;
- private long taskId;
- private int status;
- private String reason;
-
- public long getOutPortId() {
- return outPortId;
- }
-
- public TaskContext setOutPortId(long outPortId) {
- this.outPortId = outPortId;
- return this;
- }
-
- private long outPortId;
- private Map<String, String> contextVariableParams = new HashMap<>();
- private Map<String, String> contextDataParams = new HashMap<>();
- private transient Map<String, Object> localContext = new HashMap<>();
-
- private void resetStatus() {
- setStatus(-1);
- setReason("");
- setOutPortId(-1);
- setProcessId(-1);
- setTaskId(-1);
- }
-
- public void assignTask(TaskResource taskResource) {
- resetStatus();
- setTaskId(taskResource.getId());
- setProcessId(taskResource.getParentProcessId());
- setStatus(TaskStatusResource.State.SCHEDULED);
- }
-
- public void resetPublicContext() {
- this.contextVariableParams = new HashMap<>();
- this.contextDataParams = new HashMap<>();
- }
-
- public void resetLocalContext() {
- this.localContext = new HashMap<>();
- }
-
- public long getTaskId() {
- return taskId;
- }
-
- public TaskContext setTaskId(long taskId) {
- this.taskId = taskId;
- return this;
- }
-
- public Map<String, String> getContextVariableParams() {
- return contextVariableParams;
- }
-
- public TaskContext setContextVariableParams(Map<String, String> contextVariableParams) {
- this.contextVariableParams = contextVariableParams;
- return this;
- }
-
- public Map<String, String> getContextDataParams() {
- return contextDataParams;
- }
-
- public TaskContext setContextDataParams(Map<String, String> contextDataParams) {
- this.contextDataParams = contextDataParams;
- return this;
- }
-
- public Map<String, Object> getLocalContext() {
- return localContext;
- }
-
- public TaskContext setLocalContext(Map<String, Object> localContext) {
- this.localContext = localContext;
- return this;
- }
-
- public long getProcessId() {
- return processId;
- }
-
- public TaskContext setProcessId(long processId) {
- this.processId = processId;
- return this;
- }
-
- public int getStatus() {
- return status;
- }
-
- public TaskContext setStatus(int status) {
- this.status = status;
- return this;
- }
-
- public String getReason() {
- return reason;
- }
-
- public TaskContext setReason(String reason) {
- this.reason = reason;
- return this;
- }
-
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
deleted file mode 100644
index b826d5b..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextDeserializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.airavata.k8s.task.api;
-
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.io.*;
-import java.util.Map;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class TaskContextDeserializer implements Deserializer<TaskContext> {
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
-
- }
-
- @Override
- public TaskContext deserialize(String topic, byte[] data) {
- ByteArrayInputStream bis = new ByteArrayInputStream(data);
- ObjectInput in = null;
- try {
- in = new ObjectInputStream(bis);
- return(TaskContext)in.readObject();
- } catch (IOException e) {
- // ignore exception
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } finally {
- try {
- if (in != null) {
- in.close();
- }
- } catch (IOException ex) {
- // ignore close exception
- }
- }
- return null;
- }
-
- @Override
- public void close() {
-
- }
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
deleted file mode 100644
index 0edac4b..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/TaskContextSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.airavata.k8s.task.api;
-
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class TaskContextSerializer implements Serializer<TaskContext> {
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
-
- }
-
- @Override
- public byte[] serialize(String topic, TaskContext data) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutput out = null;
- try {
- out = new ObjectOutputStream(bos);
- out.writeObject(data);
- out.flush();
- return bos.toByteArray();
- } catch (IOException e) {
- // ignore catch
- } finally {
- try {
- bos.close();
- } catch (IOException ex) {
- // ignore close exception
- }
- }
- return null;
- }
-
- @Override
- public void close() {
-
- }
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaReceiver.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaReceiver.java
deleted file mode 100644
index c3597dc..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaReceiver.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.api.messaging;
-
-import org.apache.airavata.k8s.task.api.AbstractTaskExecutionService;
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.Acknowledgment;
-
-import javax.annotation.Resource;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class KafkaReceiver {
-
- @Resource
- private AbstractTaskExecutionService taskExecutionService;
-
- @KafkaListener(topics = "${task.read.topic.name}")
- public void receiveTasks(TaskContext taskContext, Acknowledgment ack) {
- System.out.println("received task=" + taskContext.toString());
- taskExecutionService.executeTaskAsync(taskContext);
- ack.acknowledge();
- }
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
deleted file mode 100644
index b584833..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/KafkaSender.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.api.messaging;
-
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.core.KafkaTemplate;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-public class KafkaSender {
-
- @Autowired
- private KafkaTemplate<String, TaskContext> kafkaTemplate;
-
- public void send(String topic, TaskContext taskContext) {
- kafkaTemplate.send(topic, taskContext);
- }
-
- public void send(String topic, String key, TaskContext taskContext) {
- kafkaTemplate.send(topic, key, taskContext);
- }
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
deleted file mode 100644
index 8a09a4e..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/ReceiverConfig.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.api.messaging;
-
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.TaskContextDeserializer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.config.KafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.listener.AbstractMessageListenerContainer;
-import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Configuration
-@ComponentScan
-@EnableKafka
-public class ReceiverConfig {
-
- @Value("${kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${task.group.name}")
- private String taskGroupName;
-
- @Bean
- public Map<String, Object> consumerConfigs() {
- Map<String, Object> props = new HashMap<>();
- // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TaskContextDeserializer.class);
- // allows a pool of processes to divide the work of consuming and processing records
- props.put(ConsumerConfig.GROUP_ID_CONFIG, taskGroupName);
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- return props;
- }
-
- @Bean
- public ConsumerFactory<String, TaskContext> consumerFactory() {
- return new DefaultKafkaConsumerFactory<String, TaskContext>(consumerConfigs());
- }
-
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, TaskContext>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, TaskContext> factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
- return factory;
- }
-
- @Bean
- public KafkaReceiver receiver() {
- return new KafkaReceiver();
- }
-}
diff --git a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java b/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
deleted file mode 100644
index e66e1fd..0000000
--- a/airavata-kubernetes/modules/task-api/src/main/java/org/apache/airavata/k8s/task/api/messaging/SenderConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.airavata.k8s.task.api.messaging;
-
-import org.apache.airavata.k8s.task.api.TaskContext;
-import org.apache.airavata.k8s.task.api.TaskContextSerializer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * TODO: Class level comments please
- *
- * @author dimuthu
- * @since 1.0.0-SNAPSHOT
- */
-@Configuration
-public class SenderConfig {
- @Value("${kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Bean
- public Map<String, Object> producerConfigs() {
- Map<String, Object> props = new HashMap<>();
- // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, TaskContextSerializer.class);
- return props;
- }
-
- @Bean
- public ProducerFactory<String, TaskContext> producerFactory() {
- return new DefaultKafkaProducerFactory<String, TaskContext>(producerConfigs());
- }
-
- @Bean
- public KafkaTemplate<String, TaskContext> kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
-
- @Bean
- public KafkaSender kafkaSender() {
- return new KafkaSender();
- }
-}
diff --git a/airavata-kubernetes/pom.xml b/airavata-kubernetes/pom.xml
index 222fffe..30439d3 100644
--- a/airavata-kubernetes/pom.xml
+++ b/airavata-kubernetes/pom.xml
@@ -32,15 +32,15 @@
<modules>
<module>modules/api-resource</module>
<module>modules/compute-resource-api</module>
+ <module>modules/helix-task-api</module>
<module>modules/microservices/api-server</module>
- <module>modules/microservices/workflow-generator</module>
- <module>modules/microservices/task-scheduler</module>
<module>modules/microservices/event-sink</module>
- <module>modules/microservices/tasks/command-executing-task</module>
- <module>modules/microservices/tasks/data-pushing-task</module>
- <module>modules/microservices/tasks/data-collecting-task</module>
- <module>modules/task-api</module>
- <module>modules/helix-tasks</module>
+ <module>modules/microservices/helix-controller</module>
+ <module>modules/microservices/workflow-generator</module>
+ <module>modules/microservices/workflow-scheduler</module>
+ <module>modules/microservices/tasks/command-task</module>
+ <module>modules/microservices/tasks/data-in-task</module>
+ <module>modules/microservices/tasks/data-out-task</module>
</modules>
<dependencyManagement>
diff --git a/airavata-kubernetes/readme.txt b/airavata-kubernetes/readme.txt
index 194d2f2..1b4f6b4 100644
--- a/airavata-kubernetes/readme.txt
+++ b/airavata-kubernetes/readme.txt
@@ -18,4 +18,8 @@
When running as docker containers, pass following environment variables to api-server container
spring_datasource_username=<db user>
-spring_datasource_password=<db password>
\ No newline at end of file
+spring_datasource_password=<db password>
+
+Create Helix cluster
+
+./helix-admin.sh --zkSvr localhost:2199 --addCluster AiravataDemoCluster
\ No newline at end of file