blob: 5e6059f7a83bdb3ec5d5a2882c1570a05e0db343 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.airavata.helix.adaptor;
import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.connection.ConnectionException;
import net.schmizz.sshj.sftp.*;
import net.schmizz.sshj.userauth.keyprovider.KeyProvider;
import net.schmizz.sshj.userauth.method.AuthKeyboardInteractive;
import net.schmizz.sshj.userauth.method.AuthMethod;
import net.schmizz.sshj.userauth.method.AuthPublickey;
import net.schmizz.sshj.userauth.method.ChallengeResponseProvider;
import net.schmizz.sshj.userauth.password.PasswordFinder;
import net.schmizz.sshj.userauth.password.PasswordUtils;
import net.schmizz.sshj.userauth.password.Resource;
import net.schmizz.sshj.xfer.FilePermission;
import net.schmizz.sshj.xfer.LocalDestFile;
import net.schmizz.sshj.xfer.LocalFileFilter;
import net.schmizz.sshj.xfer.LocalSourceFile;
import org.apache.airavata.agents.api.*;
import org.apache.airavata.helix.adaptor.wrapper.SCPFileTransferWrapper;
import org.apache.airavata.helix.adaptor.wrapper.SFTPClientWrapper;
import org.apache.airavata.helix.adaptor.wrapper.SessionWrapper;
import org.apache.airavata.helix.agent.ssh.StandardOutReader;
import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SSHJAgentAdaptor implements AgentAdaptor {
private final static Logger logger = LoggerFactory.getLogger(SSHJAgentAdaptor.class);
private PoolingSSHJClient sshjClient;
protected void createPoolingSSHJClient(String user, String host, int port, String publicKey, String privateKey, String passphrase) throws IOException {
DefaultConfig defaultConfig = new DefaultConfig();
sshjClient = new PoolingSSHJClient(defaultConfig, host, port == 0 ? 22 : port);
sshjClient.addHostKeyVerifier((h, p, key) -> true);
PasswordFinder passwordFinder = passphrase != null ? PasswordUtils.createOneOff(passphrase.toCharArray()) : null;
KeyProvider keyProvider = sshjClient.loadKeys(privateKey, publicKey, passwordFinder);
final List<AuthMethod> am = new LinkedList<>();
am.add(new AuthPublickey(keyProvider));
am.add(new AuthKeyboardInteractive(new ChallengeResponseProvider() {
public List<String> getSubmethods() {
return new ArrayList<>();
public void init(Resource resource, String name, String instruction) {
public char[] getResponse(String prompt, boolean echo) {
return new char[0];
public boolean shouldRetry() {
return false;
sshjClient.auth(user, am);
public void init(String user, String host, int port, String publicKey, String privateKey, String passphrase) throws AgentException {
try {
createPoolingSSHJClient(user, host, port, publicKey, privateKey, passphrase);
} catch (IOException e) {
logger.error("Error while initializing sshj agent for user " + user + " host " + host + " for key starting with " + publicKey.substring(0,10), e);
throw new AgentException("Error while initializing sshj agent for user " + user + " host " + host + " for key starting with " + publicKey.substring(0,10), e);
public void init(String computeResource, String gatewayId, String userId, String token) throws AgentException {
try {"Initializing Compute Resource SSH Adaptor for compute resource : "+ computeResource + ", gateway : " +
gatewayId +", user " + userId + ", token : " + token);
ComputeResourceDescription computeResourceDescription = AgentUtils.getRegistryServiceClient().getComputeResource(computeResource);"Fetching job submission interfaces for compute resource " + computeResource);
Optional<JobSubmissionInterface> jobSubmissionInterfaceOp = computeResourceDescription.getJobSubmissionInterfaces()
.stream().filter(iface -> iface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH).findFirst();
JobSubmissionInterface sshInterface = jobSubmissionInterfaceOp
.orElseThrow(() -> new AgentException("Could not find a SSH interface for compute resource " + computeResource));
SSHJobSubmission sshJobSubmission = AgentUtils.getRegistryServiceClient().getSSHJobSubmission(sshInterface.getJobSubmissionInterfaceId());"Fetching credentials for cred store token " + token);
SSHCredential sshCredential = AgentUtils.getCredentialClient().getSSHCredential(token, gatewayId);
if (sshCredential == null) {
throw new AgentException("Null credential for token " + token);
}"Description for token : " + token + " : " + sshCredential.getDescription());
String alternateHostName = sshJobSubmission.getAlternativeSSHHostName();
String selectedHostName = (alternateHostName == null || "".equals(alternateHostName))?
computeResourceDescription.getHostName() : alternateHostName;
int selectedPort = sshJobSubmission.getSshPort() == 0 ? 22 : sshJobSubmission.getSshPort();
createPoolingSSHJClient(userId, selectedHostName, selectedPort,
sshCredential.getPublicKey(), sshCredential.getPrivateKey(), sshCredential.getPassphrase());
} catch (Exception e) {
logger.error("Error while initializing ssh agent for compute resource " + computeResource + " to token " + token, e);
throw new AgentException("Error while initializing ssh agent for compute resource " + computeResource + " to token " + token, e);
public void destroy() {
try {
if (sshjClient != null) {
} catch (IOException e) {
logger.warn("Failed to stop sshj client for host " + sshjClient.getHost() + " and user " +
sshjClient.getUsername() + " due to : " + e.getMessage());
// ignore
public CommandOutput executeCommand(String command, String workingDirectory) throws AgentException {
SessionWrapper session = null;
try {
session = sshjClient.startSessionWrapper();
Session.Command exec = session.exec((workingDirectory != null ? "cd " + workingDirectory + "; " : "") + command);
StandardOutReader standardOutReader = new StandardOutReader();
try {
} finally {
exec.close(); // closing the channel before getting the exit status
standardOutReader.setExitCode(Optional.ofNullable(exec.getExitStatus()).orElseThrow(() -> new Exception("Exit status received as null")));
return standardOutReader;
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(session).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(session).ifPresent(ss -> {
try {
} catch (IOException e) {
public void createDirectory(String path) throws AgentException {
createDirectory(path, false);
public void createDirectory(String path, boolean recursive) throws AgentException {
SFTPClientWrapper sftpClient = null;
try {
sftpClient = sshjClient.newSFTPClientWrapper();
if (recursive) {
} else {
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(sftpClient).ifPresent(client -> {
try {
} catch (IOException e) {
public void uploadFile(String localFile, String remoteFile) throws AgentException {
SCPFileTransferWrapper fileTransfer = null;
try {
fileTransfer = sshjClient.newSCPFileTransferWrapper();
fileTransfer.upload(localFile, remoteFile);
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
try {
} catch (IOException e) {
public void uploadFile(InputStream localInStream, FileMetadata metadata, String remoteFile) throws AgentException {
SCPFileTransferWrapper fileTransfer = null;
try {
fileTransfer = sshjClient.newSCPFileTransferWrapper();
fileTransfer.upload(new LocalSourceFile() {
public String getName() {
return metadata.getName();
public long getLength() {
return metadata.getSize();
public InputStream getInputStream() throws IOException {
return localInStream;
public int getPermissions() throws IOException {
return 420; //metadata.getPermissions();
public boolean isFile() {
return true;
public boolean isDirectory() {
return false;
public Iterable<? extends LocalSourceFile> getChildren(LocalFileFilter filter) throws IOException {
return null;
public boolean providesAtimeMtime() {
return false;
public long getLastAccessTime() throws IOException {
return 0;
public long getLastModifiedTime() throws IOException {
return 0;
}, remoteFile);
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
try {
} catch (IOException e) {
public void downloadFile(String remoteFile, String localFile) throws AgentException {
SCPFileTransferWrapper fileTransfer = null;
try {
fileTransfer = sshjClient.newSCPFileTransferWrapper();, localFile);
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
try {
} catch (IOException e) {
public void downloadFile(String remoteFile, OutputStream localOutStream, FileMetadata metadata) throws AgentException {
SCPFileTransferWrapper fileTransfer = null;
try {
fileTransfer = sshjClient.newSCPFileTransferWrapper();, new LocalDestFile() {
public OutputStream getOutputStream() throws IOException {
return localOutStream;
public LocalDestFile getChild(String name) {
return null;
public LocalDestFile getTargetFile(String filename) throws IOException {
return this;
public LocalDestFile getTargetDirectory(String dirname) throws IOException {
return null;
public void setPermissions(int perms) throws IOException {
public void setLastAccessedTime(long t) throws IOException {
public void setLastModifiedTime(long t) throws IOException {
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(fileTransfer).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(fileTransfer).ifPresent(scpFileTransferWrapper -> {
try {
} catch (IOException e) {
public List<String> listDirectory(String path) throws AgentException {
SFTPClientWrapper sftpClient = null;
try {
sftpClient = sshjClient.newSFTPClientWrapper();
List<RemoteResourceInfo> ls =;
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(sftpClient).ifPresent(client -> {
try {
} catch (IOException e) {
public Boolean doesFileExist(String filePath) throws AgentException {
SFTPClientWrapper sftpClient = null;
try {
sftpClient = sshjClient.newSFTPClientWrapper();
return sftpClient.statExistence(filePath) != null;
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(sftpClient).ifPresent(client -> {
try {
} catch (IOException e) {
public List<String> getFileNameFromExtension(String fileName, String parentPath) throws AgentException {
/*try (SFTPClient sftpClient = sshjClient.newSFTPClientWrapper()) {
List<RemoteResourceInfo> ls =, resource -> isMatch(resource.getName(), fileName));
} catch (Exception e) {
throw new AgentException(e);
if (fileName.endsWith("*")) {
throw new AgentException("Wildcards that ends with * does not support for security reasons. Specify an extension");
CommandOutput commandOutput = executeCommand("ls " + fileName, parentPath); // This has a risk of returning folders also
String[] filesTmp = commandOutput.getStdOut().split("\n");
List<String> files = new ArrayList<>();
for (String f: filesTmp) {
if (!f.isEmpty()) {
return files;
public FileMetadata getFileMetadata(String remoteFile) throws AgentException {
SFTPClientWrapper sftpClient = null;
try {
sftpClient = sshjClient.newSFTPClientWrapper();
FileAttributes stat = sftpClient.stat(remoteFile);
FileMetadata metadata = new FileMetadata();
metadata.setName(new File(remoteFile).getName());
return metadata;
} catch (Exception e) {
if (e instanceof ConnectionException) {
Optional.ofNullable(sftpClient).ifPresent(ft -> ft.setErrored(true));
throw new AgentException(e);
} finally {
Optional.ofNullable(sftpClient).ifPresent(scpFileTransferWrapper -> {
try {
} catch (IOException e) {
private boolean isMatch(String s, String p) {
int i = 0;
int j = 0;
int starIndex = -1;
int iIndex = -1;
while (i < s.length()) {
if (j < p.length() && (p.charAt(j) == '?' || p.charAt(j) == s.charAt(i))) {
} else if (j < p.length() && p.charAt(j) == '*') {
starIndex = j;
iIndex = i;
} else if (starIndex != -1) {
j = starIndex + 1;
i = iIndex+1;
} else {
return false;
while (j < p.length() && p.charAt(j) == '*') {
return j == p.length();