Remove Channel from interface Connector
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
index 69be278..8627403 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/Connector.java
@@ -21,7 +21,6 @@
import org.apache.airavata.mft.core.bufferedImpl.ConnectorConfig;
-import java.nio.channels.Channel;
import java.util.Properties;
/**
@@ -47,5 +46,5 @@
* @param channel
* @throws Exception
*/
- void closeChannel(Channel channel) throws Exception;
+ void closeChannel(ConnectorChannel channel) throws Exception;
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
index 4d2eb65..dd02e10 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/SinkConnector.java
@@ -19,7 +19,6 @@
package org.apache.airavata.mft.core.api;
-import java.nio.channels.Channel;
import java.util.Properties;
/**
@@ -39,5 +38,5 @@
* @param channel
* @return true if succes else false
*/
- boolean verifyUpload(Channel channel);
+ boolean verifyUpload(ConnectorChannel channel);
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
index 20e5989..8849aac 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/AbstractConnector.java
@@ -20,42 +20,24 @@
package org.apache.airavata.mft.core.bufferedImpl;
import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.core.api.ConnectorChannel;
import org.apache.airavata.mft.core.api.SinkConnector;
import org.apache.airavata.mft.core.api.SourceConnector;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.channels.Channel;
-import java.util.concurrent.ConcurrentHashMap;
+
/**
* Common class for {@link SinkConnector} and {@link SourceConnector} implementations
* This overrides closeChannel method and release all resources allocated to releasing channel
*/
public abstract class AbstractConnector implements Connector {
- ConcurrentHashMap<Channel, Object> channelMap = new ConcurrentHashMap<>();
@Override
- public void closeChannel(Channel channel) throws ConnectorException {
- OutChannel outChannel;
- InChannel inChannel;
+ public void closeChannel(ConnectorChannel channel) throws ConnectorException {
try {
- if (channelMap.get(channel) instanceof OutputStream) {
- outChannel = (OutChannel) channelMap.get(channel);
- if (outChannel != null) {
- outChannel.closeChannel();
- channelMap.remove(channel);
- }
-
- } else if (channelMap.get(channel) instanceof InputStream) {
- inChannel = (InChannel) channelMap.get(channel);
- if (inChannel != null) {
- inChannel.closeChannel();
- channelMap.remove(channel);
- }
- }
+ channel.closeChannel();
} catch (IOException e) {
throw new ConnectorException("Error occurred while closing stream", e);
@@ -63,24 +45,4 @@
}
- /**
- * Cache the operating channel
- * @param channel
- * @param obj
- */
- public void cacheChannel(Channel channel, Object obj) {
- channelMap.put(channel, obj);
- }
-
-
- /**
- * get the channel
- * @param channel
- * @return
- */
- public Object getConnectorChannel(Channel channel) {
- return channelMap.get(channel);
- }
-
-
}
diff --git a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
index f25348c..af9b663 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/bufferedImpl/PassthroughMediator.java
@@ -51,7 +51,7 @@
Object obj = dst.getChannelAttribute(Constants.CONNECTOR);
if (obj != null && obj instanceof SinkConnector) {
SinkConnector connector = (SinkConnector) obj;
- boolean success = connector.verifyUpload(dChannel);
+ boolean success = connector.verifyUpload(dst);
if (success) {
callback.onComplete("Successfully uploaded", null);
} else {
@@ -69,8 +69,8 @@
Connector sourceConnector = (Connector) src.getChannelAttribute(Constants.CONNECTOR);
Connector sinkConnector = (Connector) dst.getChannelAttribute(Constants.CONNECTOR);
try {
- sourceConnector.closeChannel(rChannel);
- sinkConnector.closeChannel(dChannel);
+ sourceConnector.closeChannel(src);
+ sinkConnector.closeChannel(dst);
} catch (Exception ex) {
String msg = "Error occurred while closing channels";
ConnectorException connectorException = new ConnectorException(msg, ex);
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
index 8f3b959..b2f647c 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/Main.java
@@ -42,17 +42,17 @@
public void execute() {
Properties connectorConfProp = new Properties();
- connectorConfProp.put(S3Constants.ACCESS_KEY, "XXXX");
- connectorConfProp.put(S3Constants.SECRET_KEY, "YYY");
+ connectorConfProp.put(S3Constants.ACCESS_KEY, "AKIA2SKSIFUH7QDAROSR");
+ connectorConfProp.put(S3Constants.SECRET_KEY, "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");
connectorConfProp.put(S3Constants.REGION, "us-east-2");
Properties srcProp = new Properties();
- srcProp.put(S3Constants.BUCKET, "test");
+ srcProp.put(S3Constants.BUCKET, "blimpit-test");
srcProp.put(S3Constants.REMOTE_FILE, "test.pdf");
Properties dstProp = new Properties();
dstProp.put(S3Constants.BUCKET, "blimpit-test");
- dstProp.put(S3Constants.REMOTE_FILE, "meemure.pdf");
+ dstProp.put(S3Constants.REMOTE_FILE, "teemure.pdf");
try {
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
index c2cb503..3ee2bae 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SinkConnector.java
@@ -33,7 +33,6 @@
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.nio.channels.Channel;
import java.util.Properties;
/**
@@ -70,13 +69,12 @@
OutChannel outChannel = new OutChannel(outputStream);
outChannel.addChannelAttribute(S3Constants.HTTP_CONNECTION, connection);
outChannel.addChannelAttribute(Constants.CONNECTOR, this);
- cacheChannel(outChannel.getChannel(), outChannel);
return outChannel;
}
@Override
- public boolean verifyUpload(Channel channel) {
- OutChannel outChannel = (OutChannel) getConnectorChannel(channel);
+ public boolean verifyUpload(ConnectorChannel channel) {
+ OutChannel outChannel = (OutChannel) channel;
HttpURLConnection connection = (HttpURLConnection) outChannel.getChannelAttribute(S3Constants.HTTP_CONNECTION);
try {
if (connection.getResponseCode() == S3Constants.HTTP_SUCCESS_RESPONSE_CODE) {
diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
index ce66c0c..6539d04 100644
--- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
+++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3SourceConnector.java
@@ -55,7 +55,6 @@
inputStream = s3object.getObjectContent();
InChannel inChannel = new InChannel(inputStream);
inChannel.addChannelAttribute(Constants.CONNECTOR, this);
- cacheChannel(inChannel.getChannel(), inChannel);
return inChannel;
}
return null;