blob: 32afcbbed10741dd80d935e6a28ace0f835f35df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.scp;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Hashtable;
import java.util.List;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.UIKeyboardInteractive;
import com.jcraft.jsch.UserInfo;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.remote.RemoteFileConfiguration;
import org.apache.camel.component.file.remote.RemoteFileOperations;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SCP remote file operations
*/
public class ScpOperations implements RemoteFileOperations<ScpFile> {
private static final Logger LOG = LoggerFactory.getLogger(ScpOperations.class);
private ScpEndpoint endpoint;
private Session session;
private ChannelExec channel;
private String userKnownHostFile;
@Override
public void setEndpoint(GenericFileEndpoint<ScpFile> endpoint) {
this.endpoint = (ScpEndpoint)endpoint;
}
@Override
public boolean deleteFile(String name) throws GenericFileOperationFailedException {
throw new GenericFileOperationFailedException("Operation 'delete' not supported by the scp: protocol");
}
@Override
public boolean existsFile(String name) throws GenericFileOperationFailedException {
// maybe... cannot determine using the scp: protocol
return false;
}
@Override
public boolean renameFile(String from, String to) throws GenericFileOperationFailedException {
throw new GenericFileOperationFailedException("Operation 'rename' not supported by the scp: protocol");
}
@Override
public boolean buildDirectory(String directory, boolean absolute) throws GenericFileOperationFailedException {
// done by the server
return true;
}
@Override
public boolean retrieveFile(String name, Exchange exchange) throws GenericFileOperationFailedException {
// TODO: implement
return false;
}
@Override
public void releaseRetreivedFileResources(Exchange exchange) throws GenericFileOperationFailedException {
// No-op
}
@Override
public boolean storeFile(String name, Exchange exchange) throws GenericFileOperationFailedException {
ObjectHelper.notNull(session, "session");
ScpConfiguration cfg = endpoint.getConfiguration();
int timeout = cfg.getConnectTimeout();
if (LOG.isTraceEnabled()) {
LOG.trace("Opening channel to {} with {} timeout...", cfg.remoteServerInformation(),
timeout > 0 ? (Integer.toString(timeout) + " ms") : "no");
}
String file = getRemoteFile(name, cfg);
InputStream is = null;
if (exchange.getIn().getBody() == null) {
// Do an explicit test for a null body and decide what to do
if (endpoint.isAllowNullBody()) {
LOG.trace("Writing empty file.");
is = new ByteArrayInputStream(new byte[]{});
} else {
throw new GenericFileOperationFailedException("Cannot write null body to file: " + name);
}
}
try {
channel = (ChannelExec) session.openChannel("exec");
channel.setCommand(getScpCommand(cfg, file));
channel.connect(timeout);
LOG.trace("Channel connected to {}", cfg.remoteServerInformation());
try {
if (is == null) {
is = exchange.getIn().getMandatoryBody(InputStream.class);
}
write(channel, file, is, cfg);
} catch (InvalidPayloadException e) {
throw new GenericFileOperationFailedException("Cannot store file: " + name, e);
} catch (IOException e) {
throw new GenericFileOperationFailedException("Failed to write file " + file, e);
} finally {
// must close stream after usage
IOHelper.close(is);
}
} catch (JSchException e) {
throw new GenericFileOperationFailedException("Failed to write file " + file, e);
} finally {
if (channel != null) {
LOG.trace("Disconnecting 'exec' scp channel");
channel.disconnect();
channel = null;
LOG.trace("Channel disconnected from {}", cfg.remoteServerInformation());
}
}
return true;
}
@Override
public String getCurrentDirectory() throws GenericFileOperationFailedException {
return endpoint.getConfiguration().getDirectory();
}
@Override
public void changeCurrentDirectory(String path) throws GenericFileOperationFailedException {
throw new GenericFileOperationFailedException("Operation 'cd " + path + "' not supported by the scp: protocol");
}
@Override
public void changeToParentDirectory() throws GenericFileOperationFailedException {
throw new GenericFileOperationFailedException("Operation 'cd ..' not supported by the scp: protocol");
}
@Override
public List<ScpFile> listFiles() throws GenericFileOperationFailedException {
throw new GenericFileOperationFailedException("Operation 'ls' not supported by the scp: protocol");
}
@Override
public List<ScpFile> listFiles(String path) throws GenericFileOperationFailedException {
throw new GenericFileOperationFailedException("Operation 'ls " + path + "' not supported by the scp: protocol");
}
@Override
public boolean connect(RemoteFileConfiguration configuration) throws GenericFileOperationFailedException {
if (!isConnected()) {
session = createSession(configuration instanceof ScpConfiguration ? (ScpConfiguration)configuration : null);
// TODO: deal with reconnection attempts
if (!isConnected()) {
session = null;
throw new GenericFileOperationFailedException("Failed to connect to " + configuration.remoteServerInformation());
}
}
return true;
}
@Override
public boolean isConnected() throws GenericFileOperationFailedException {
return session != null && session.isConnected();
}
@Override
public void disconnect() throws GenericFileOperationFailedException {
if (isConnected()) {
session.disconnect();
}
session = null;
}
@Override
public boolean sendNoop() throws GenericFileOperationFailedException {
// not supported by scp:
return true;
}
@Override
public boolean sendSiteCommand(String command) throws GenericFileOperationFailedException {
return true;
}
private Session createSession(ScpConfiguration config) {
ObjectHelper.notNull(config, "ScpConfiguration");
try {
final JSch jsch = new JSch();
// get from configuration
if (ObjectHelper.isNotEmpty(config.getCiphers())) {
LOG.trace("Using ciphers: {}", config.getCiphers());
Hashtable<String, String> ciphers = new Hashtable<String, String>();
ciphers.put("cipher.s2c", config.getCiphers());
ciphers.put("cipher.c2s", config.getCiphers());
JSch.setConfig(ciphers);
}
if (ObjectHelper.isNotEmpty(config.getPrivateKeyFile())) {
LOG.trace("Using private keyfile: {}", config.getPrivateKeyFile());
String pkfp = config.getPrivateKeyFilePassphrase();
jsch.addIdentity(config.getPrivateKeyFile(), ObjectHelper.isNotEmpty(pkfp) ? pkfp : null);
}
String knownHostsFile = config.getKnownHostsFile();
if (knownHostsFile == null && config.isUseUserKnownHostsFile()) {
if (userKnownHostFile == null) {
userKnownHostFile = System.getProperty("user.home") + "/.ssh/known_hosts";
LOG.info("Known host file not configured, using user known host file: " + userKnownHostFile);
}
knownHostsFile = userKnownHostFile;
}
jsch.setKnownHosts(ObjectHelper.isEmpty(knownHostsFile) ? null : knownHostsFile);
session = jsch.getSession(config.getUsername(), config.getHost(), config.getPort());
session.setTimeout(config.getTimeout());
session.setUserInfo(new SessionUserInfo(config));
if (ObjectHelper.isNotEmpty(config.getStrictHostKeyChecking())) {
LOG.trace("Using StrickHostKeyChecking: {}", config.getStrictHostKeyChecking());
session.setConfig("StrictHostKeyChecking", config.getStrictHostKeyChecking());
}
int timeout = config.getConnectTimeout();
LOG.debug("Connecting to {} with {} timeout...", config.remoteServerInformation(),
timeout > 0 ? (Integer.toString(timeout) + " ms") : "no");
if (timeout > 0) {
session.connect(timeout);
} else {
session.connect();
}
} catch (JSchException e) {
session = null;
LOG.warn("Could not create ssh session for " + config.remoteServerInformation(), e);
}
return session;
}
private void write(ChannelExec c, String name, InputStream data, ScpConfiguration cfg) throws IOException {
OutputStream os = c.getOutputStream();
InputStream is = c.getInputStream();
try {
writeFile(name, data, os, is, cfg);
} finally {
IOHelper.close(is, os);
}
}
private void writeFile(String filename, InputStream data, OutputStream os, InputStream is, ScpConfiguration cfg) throws IOException {
final int lineFeed = '\n';
String bytes;
int pos = filename.indexOf('/');
if (pos >= 0) {
// write to child directory
String dir = filename.substring(0, pos);
bytes = "D0775 0 " + dir;
LOG.trace("[scp:sink] {}", bytes);
os.write(bytes.getBytes());
os.write(lineFeed);
os.flush();
readAck(is, false);
writeFile(filename.substring(pos + 1), data, os, is, cfg);
bytes = "E";
LOG.trace("[scp:sink] {}", bytes);
os.write(bytes.getBytes());
os.write(lineFeed);
os.flush();
readAck(is, false);
} else {
int count = 0;
int read;
int size = endpoint.getBufferSize();
byte[] reply = new byte[size];
// figure out the stream size as we need to pass it in the header
BufferedInputStream buffer = new BufferedInputStream(data, size);
try {
buffer.mark(Integer.MAX_VALUE);
while ((read = buffer.read(reply)) != -1) {
count += read;
}
// send the header
bytes = "C0" + cfg.getChmod() + " " + count + " " + filename;
LOG.trace("[scp:sink] {}", bytes);
os.write(bytes.getBytes());
os.write(lineFeed);
os.flush();
readAck(is, false);
// now send the stream
buffer.reset();
while ((read = buffer.read(reply)) != -1) {
os.write(reply, 0, read);
}
writeAck(os);
readAck(is, false);
} finally {
IOHelper.close(buffer);
}
}
}
private void writeAck(OutputStream os) throws IOException {
os.write(0);
os.flush();
}
private int readAck(InputStream is, boolean failOnEof) throws IOException {
String message;
int answer = is.read();
switch (answer) {
case -1:
if (failOnEof) {
message = "[scp] Unexpected end of stream";
throw new EOFException(message);
}
break;
case 1:
message = "[scp] WARN " + readLine(is);
LOG.warn(message);
break;
case 2:
message = "[scp] NACK " + readLine(is);
throw new IOException(message);
default:
// case 0:
break;
}
return answer;
}
private String readLine(InputStream is) throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
int c;
do {
c = is.read();
if (c == '\n') {
return bytes.toString();
}
bytes.write(c);
} while (c != -1);
} finally {
IOHelper.close(bytes);
}
String message = "[scp] Unexpected end of stream";
throw new IOException(message);
}
private static String getRemoteTarget(ScpConfiguration config) {
// use current dir (".") if target directory not specified in uri
return config.getDirectory().isEmpty() ? "." : config.getDirectory();
}
private static String getRemoteFile(String name, ScpConfiguration config) {
String dir = config.getDirectory();
dir = dir.endsWith("/") ? dir : dir + "/";
return name.startsWith(dir) ? name.substring(dir.length()) : name;
}
private static boolean isRecursiveScp(String name) {
return name.indexOf('/') > 0;
}
private static String getScpCommand(ScpConfiguration config, String name) {
StringBuilder cmd = new StringBuilder();
cmd.append("scp ");
// TODO: need config for scp *-p* (preserves modification times, access times, and modes from the original file)
// String command="scp " + (ptimestamp ? "-p " : "") + "-t " + configuration.getDirectory();
// TODO: refactor to use generic command
cmd.append(isRecursiveScp(name) ? "-r " : "");
cmd.append("-t ");
cmd.append(getRemoteTarget(config));
return cmd.toString();
}
protected static final class SessionUserInfo implements UserInfo, UIKeyboardInteractive {
private final ScpConfiguration config;
public SessionUserInfo(ScpConfiguration config) {
ObjectHelper.notNull(config, "config");
this.config = config;
}
@Override
public String getPassphrase() {
LOG.warn("Private Key authentication not supported");
return null;
}
@Override
public String getPassword() {
LOG.debug("Providing password for ssh authentication of user '{}'", config.getUsername());
return config.getPassword();
}
@Override
public boolean promptPassword(String message) {
LOG.debug(message);
return true;
}
@Override
public boolean promptPassphrase(String message) {
LOG.debug(message);
return true;
}
@Override
public boolean promptYesNo(String message) {
LOG.debug(message);
return false;
}
@Override
public void showMessage(String message) {
LOG.debug(message);
}
@Override
public String[] promptKeyboardInteractive(String destination, String name,
String instruction, String[] prompt, boolean[] echo) {
LOG.debug(instruction);
// Called for either SSH_MSG_USERAUTH_INFO_REQUEST or SSH_MSG_USERAUTH_PASSWD_CHANGEREQ
// The most secure choice (especially for the second case) is to return null
return null;
}
}
}