blob: 2704d1c4801da1d8b24b918b0cb197fa8fb534c0 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or moreĀ§
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import with_statement
import os
import sys
import tempfile
from libcloud import _init_once
from libcloud.test import LibcloudTestCase
from libcloud.test import unittest
from libcloud.compute.ssh import ParamikoSSHClient
from libcloud.compute.ssh import ShellOutSSHClient
from libcloud.compute.ssh import have_paramiko
from libcloud.utils.py3 import StringIO
from libcloud.utils.py3 import u
from libcloud.utils.py3 import assertRaisesRegex
from mock import patch, Mock, MagicMock
if not have_paramiko:
ParamikoSSHClient = None # NOQA
paramiko_version = '0.0.0'
else:
import paramiko
paramiko_version = paramiko.__version__
@unittest.skipIf(not have_paramiko, 'Skipping because paramiko is not available')
class ParamikoSSHClientTests(LibcloudTestCase):
@patch('paramiko.SSHClient', Mock)
def setUp(self):
"""
Creates the object patching the actual connection.
"""
conn_params = {'hostname': 'dummy.host.org',
'port': 8822,
'username': 'ubuntu',
'key': '~/.ssh/ubuntu_ssh',
'timeout': '600'}
_, self.tmp_file = tempfile.mkstemp()
os.environ['LIBCLOUD_DEBUG'] = self.tmp_file
_init_once()
self.ssh_cli = ParamikoSSHClient(**conn_params)
@patch('paramiko.SSHClient', Mock)
def test_create_with_password(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'password': 'ubuntu'}
mock = ParamikoSSHClient(**conn_params)
mock.connect()
expected_conn = {'username': 'ubuntu',
'password': 'ubuntu',
'allow_agent': False,
'hostname': 'dummy.host.org',
'look_for_keys': False,
'port': 22}
mock.client.connect.assert_called_once_with(**expected_conn)
self.assertLogMsg('Connecting to server')
@patch('paramiko.SSHClient', Mock)
def test_deprecated_key_argument(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'key': 'id_rsa'}
mock = ParamikoSSHClient(**conn_params)
mock.connect()
expected_conn = {'username': 'ubuntu',
'allow_agent': False,
'hostname': 'dummy.host.org',
'look_for_keys': False,
'key_filename': 'id_rsa',
'port': 22}
mock.client.connect.assert_called_once_with(**expected_conn)
self.assertLogMsg('Connecting to server')
def test_key_files_and_key_material_arguments_are_mutual_exclusive(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'key_files': 'id_rsa',
'key_material': 'key'}
expected_msg = ('key_files and key_material arguments are mutually '
'exclusive')
assertRaisesRegex(self, ValueError, expected_msg,
ParamikoSSHClient, **conn_params)
@patch('paramiko.SSHClient', Mock)
def test_key_material_argument(self):
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc', 'test_rsa.key')
with open(path, 'r') as fp:
private_key = fp.read()
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'key_material': private_key}
mock = ParamikoSSHClient(**conn_params)
mock.connect()
pkey = paramiko.RSAKey.from_private_key(StringIO(private_key))
expected_conn = {'username': 'ubuntu',
'allow_agent': False,
'hostname': 'dummy.host.org',
'look_for_keys': False,
'pkey': pkey,
'port': 22}
mock.client.connect.assert_called_once_with(**expected_conn)
self.assertLogMsg('Connecting to server')
@patch('paramiko.SSHClient', Mock)
def test_key_material_argument_invalid_key(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'key_material': 'id_rsa'}
mock = ParamikoSSHClient(**conn_params)
expected_msg = 'Invalid or unsupported key type'
assertRaisesRegex(self, paramiko.ssh_exception.SSHException,
expected_msg, mock.connect)
@patch('paramiko.SSHClient', Mock)
@unittest.skipIf(paramiko_version >= '2.7.0',
'New versions of paramiko support OPENSSH key format')
def test_key_file_non_pem_format_error(self):
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_rsa_non_pem_format.key')
# Supplied as key_material
with open(path, 'r') as fp:
private_key = fp.read()
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'key_material': private_key}
mock = ParamikoSSHClient(**conn_params)
expected_msg = 'Invalid or unsupported key type'
assertRaisesRegex(self, paramiko.ssh_exception.SSHException,
expected_msg, mock.connect)
def test_key_material_valid_pem_keys_invalid_header_auto_conversion(self):
# Test a scenario where valid PEM keys with invalid headers which is
# not recognized by paramiko are automatically converted in a format
# which is recognized by paramiko
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
# 1. RSA key type with header which is not supported by paramiko
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_rsa_non_paramiko_recognized_header.key')
with open(path, 'r') as fp:
private_key = fp.read()
pkey = client._get_pkey_object(key=private_key)
self.assertTrue(pkey)
self.assertTrue(isinstance(pkey, paramiko.RSAKey))
# 2. DSA key type with header which is not supported by paramiko
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_dsa_non_paramiko_recognized_header.key')
with open(path, 'r') as fp:
private_key = fp.read()
pkey = client._get_pkey_object(key=private_key)
self.assertTrue(pkey)
self.assertTrue(isinstance(pkey, paramiko.DSSKey))
# 3. ECDSA key type with header which is not supported by paramiko
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_ecdsa_non_paramiko_recognized_header.key')
with open(path, 'r') as fp:
private_key = fp.read()
pkey = client._get_pkey_object(key=private_key)
self.assertTrue(pkey)
self.assertTrue(isinstance(pkey, paramiko.ECDSAKey))
def test_key_material_valid_pem_keys(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
# 1. RSA key type with header which is not supported by paramiko
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_rsa.key')
with open(path, 'r') as fp:
private_key = fp.read()
pkey = client._get_pkey_object(key=private_key)
self.assertTrue(pkey)
self.assertTrue(isinstance(pkey, paramiko.RSAKey))
# 2. DSA key type with header which is not supported by paramiko
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_dsa.key')
with open(path, 'r') as fp:
private_key = fp.read()
pkey = client._get_pkey_object(key=private_key)
self.assertTrue(pkey)
self.assertTrue(isinstance(pkey, paramiko.DSSKey))
# 3. ECDSA key type with header which is not supported by paramiko
path = os.path.join(os.path.dirname(__file__),
'fixtures', 'misc',
'test_ecdsa.key')
with open(path, 'r') as fp:
private_key = fp.read()
pkey = client._get_pkey_object(key=private_key)
self.assertTrue(pkey)
self.assertTrue(isinstance(pkey, paramiko.ECDSAKey))
@patch('paramiko.SSHClient', Mock)
def test_create_with_key(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'key_files': 'id_rsa'}
mock = ParamikoSSHClient(**conn_params)
mock.connect()
expected_conn = {'username': 'ubuntu',
'allow_agent': False,
'hostname': 'dummy.host.org',
'look_for_keys': False,
'key_filename': 'id_rsa',
'port': 22}
mock.client.connect.assert_called_once_with(**expected_conn)
self.assertLogMsg('Connecting to server')
@patch('paramiko.SSHClient', Mock)
def test_create_with_password_and_key(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu',
'password': 'ubuntu',
'key': 'id_rsa'}
mock = ParamikoSSHClient(**conn_params)
mock.connect()
expected_conn = {'username': 'ubuntu',
'password': 'ubuntu',
'allow_agent': False,
'hostname': 'dummy.host.org',
'look_for_keys': False,
'key_filename': 'id_rsa',
'port': 22}
mock.client.connect.assert_called_once_with(**expected_conn)
self.assertLogMsg('Connecting to server')
@patch('paramiko.SSHClient', Mock)
def test_create_without_credentials(self):
"""
Initialize object with no credentials.
Just to have better coverage, initialize the object
without 'password' neither 'key'.
"""
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
mock = ParamikoSSHClient(**conn_params)
mock.connect()
expected_conn = {'username': 'ubuntu',
'hostname': 'dummy.host.org',
'allow_agent': True,
'look_for_keys': True,
'port': 22}
mock.client.connect.assert_called_once_with(**expected_conn)
@patch.object(ParamikoSSHClient, '_consume_stdout',
MagicMock(return_value=StringIO('')))
@patch.object(ParamikoSSHClient, '_consume_stderr',
MagicMock(return_value=StringIO('')))
def test_basic_usage_absolute_path(self):
"""
Basic execution.
"""
mock = self.ssh_cli
# script to execute
sd = "/root/random_script.sh"
# Connect behavior
mock.connect()
mock_cli = mock.client # The actual mocked object: SSHClient
expected_conn = {'username': 'ubuntu',
'key_filename': '~/.ssh/ubuntu_ssh',
'allow_agent': False,
'hostname': 'dummy.host.org',
'look_for_keys': False,
'timeout': '600',
'port': 8822}
mock_cli.connect.assert_called_once_with(**expected_conn)
mock.put(sd)
# Make assertions over 'put' method
mock_cli.open_sftp().chdir.assert_called_with('root')
mock_cli.open_sftp().file.assert_called_once_with('random_script.sh',
mode='w')
mock.run(sd)
# Make assertions over 'run' method
mock_cli.get_transport().open_session().exec_command \
.assert_called_once_with(sd)
self.assertLogMsg('Executing command (cmd=/root/random_script.sh)')
self.assertLogMsg('Command finished')
mock.close()
def test_delete_script(self):
"""
Provide a basic test with 'delete' action.
"""
mock = self.ssh_cli
# script to execute
sd = '/root/random_script.sh'
mock.connect()
mock.delete(sd)
# Make assertions over the 'delete' method
mock.client.open_sftp().unlink.assert_called_with(sd)
self.assertLogMsg('Deleting file')
mock.close()
self.assertLogMsg('Closing server connection')
def assertLogMsg(self, expected_msg):
with open(self.tmp_file, 'r') as fp:
content = fp.read()
self.assertTrue(content.find(expected_msg) != -1)
def test_consume_stdout(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
client.CHUNK_SIZE = 1024
chan = Mock()
chan.recv_ready.side_effect = [True, True, False]
chan.recv.side_effect = ['123', '456']
stdout = client._consume_stdout(chan).getvalue()
self.assertEqual(u('123456'), stdout)
self.assertEqual(len(stdout), 6)
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
client.CHUNK_SIZE = 1024
chan = Mock()
chan.recv_ready.side_effect = [True, True, False]
chan.recv.side_effect = ['987', '6543210']
stdout = client._consume_stdout(chan).getvalue()
self.assertEqual(u('9876543210'), stdout)
self.assertEqual(len(stdout), 10)
def test_consume_stderr(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
client.CHUNK_SIZE = 1024
chan = Mock()
chan.recv_stderr_ready.side_effect = [True, True, False]
chan.recv_stderr.side_effect = ['123', '456']
stderr = client._consume_stderr(chan).getvalue()
self.assertEqual(u('123456'), stderr)
self.assertEqual(len(stderr), 6)
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
client.CHUNK_SIZE = 1024
chan = Mock()
chan.recv_stderr_ready.side_effect = [True, True, False]
chan.recv_stderr.side_effect = ['987', '6543210']
stderr = client._consume_stderr(chan).getvalue()
self.assertEqual(u('9876543210'), stderr)
self.assertEqual(len(stderr), 10)
def test_consume_stdout_chunk_contains_part_of_multi_byte_utf8_character(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
client.CHUNK_SIZE = 1
chan = Mock()
chan.recv_ready.side_effect = [True, True, True, True, False]
chan.recv.side_effect = ['\xF0', '\x90', '\x8D', '\x88']
stdout = client._consume_stdout(chan).getvalue()
self.assertEqual('\xf0\x90\x8d\x88', stdout.encode('utf-8'))
self.assertTrue(len(stdout) in [1, 2])
def test_consume_stderr_chunk_contains_part_of_multi_byte_utf8_character(self):
conn_params = {'hostname': 'dummy.host.org',
'username': 'ubuntu'}
client = ParamikoSSHClient(**conn_params)
client.CHUNK_SIZE = 1
chan = Mock()
chan.recv_stderr_ready.side_effect = [True, True, True, True, False]
chan.recv_stderr.side_effect = ['\xF0', '\x90', '\x8D', '\x88']
stderr = client._consume_stderr(chan).getvalue()
self.assertEqual('\xf0\x90\x8d\x88', stderr.encode('utf-8'))
self.assertTrue(len(stderr) in [1, 2])
class ShellOutSSHClientTests(LibcloudTestCase):
def test_password_auth_not_supported(self):
try:
ShellOutSSHClient(hostname='localhost', username='foo',
password='bar')
except ValueError as e:
msg = str(e)
self.assertTrue('ShellOutSSHClient only supports key auth' in msg)
else:
self.fail('Exception was not thrown')
def test_ssh_executable_not_available(self):
class MockChild(object):
returncode = 127
def communicate(*args, **kwargs):
pass
def mock_popen(*args, **kwargs):
return MockChild()
with patch('subprocess.Popen', mock_popen):
try:
ShellOutSSHClient(hostname='localhost', username='foo')
except ValueError as e:
msg = str(e)
self.assertTrue('ssh client is not available' in msg)
else:
self.fail('Exception was not thrown')
def test_connect_success(self):
client = ShellOutSSHClient(hostname='localhost', username='root')
self.assertTrue(client.connect())
def test_close_success(self):
client = ShellOutSSHClient(hostname='localhost', username='root')
self.assertTrue(client.close())
def test_get_base_ssh_command(self):
client1 = ShellOutSSHClient(hostname='localhost', username='root')
client2 = ShellOutSSHClient(hostname='localhost', username='root',
key='/home/my.key')
client3 = ShellOutSSHClient(hostname='localhost', username='root',
key='/home/my.key', timeout=5)
cmd1 = client1._get_base_ssh_command()
cmd2 = client2._get_base_ssh_command()
cmd3 = client3._get_base_ssh_command()
self.assertEqual(cmd1, ['ssh', 'root@localhost'])
self.assertEqual(cmd2, ['ssh', '-i', '/home/my.key',
'root@localhost'])
self.assertEqual(cmd3, ['ssh', '-i', '/home/my.key',
'-oConnectTimeout=5', 'root@localhost'])
if __name__ == '__main__':
sys.exit(unittest.main())