blob: 5956c8f40b691992a6e80c39d81e7fc5d8d142f7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import ConfigParser
from hadoop.cloud import VERSION
from hadoop.cloud.cluster import get_cluster
from hadoop.cloud.service import get_service
from hadoop.cloud.service import InstanceTemplate
from hadoop.cloud.service import NAMENODE
from hadoop.cloud.service import SECONDARY_NAMENODE
from hadoop.cloud.service import JOBTRACKER
from hadoop.cloud.service import DATANODE
from hadoop.cloud.service import TASKTRACKER
from hadoop.cloud.util import merge_config_with_options
from hadoop.cloud.util import xstr
import logging
from optparse import OptionParser
from optparse import make_option
import os
import sys
DEFAULT_SERVICE_NAME = 'hadoop'
DEFAULT_CLOUD_PROVIDER = 'ec2'
DEFAULT_CONFIG_DIR_NAME = '.hadoop-cloud'
DEFAULT_CONFIG_DIR = os.path.join(os.environ['HOME'], DEFAULT_CONFIG_DIR_NAME)
CONFIG_FILENAME = 'clusters.cfg'
CONFIG_DIR_OPTION = \
make_option("--config-dir", metavar="CONFIG-DIR",
help="The configuration directory.")
PROVIDER_OPTION = \
make_option("--cloud-provider", metavar="PROVIDER",
help="The cloud provider, e.g. 'ec2' for Amazon EC2.")
BASIC_OPTIONS = [
CONFIG_DIR_OPTION,
PROVIDER_OPTION,
]
LAUNCH_OPTIONS = [
CONFIG_DIR_OPTION,
PROVIDER_OPTION,
make_option("-a", "--ami", metavar="AMI",
help="The AMI ID of the image to launch. (Amazon EC2 only. Deprecated, use \
--image-id.)"),
make_option("-e", "--env", metavar="ENV", action="append",
help="An environment variable to pass to instances. \
(May be specified multiple times.)"),
make_option("-f", "--user-data-file", metavar="URL",
help="The URL of the file containing user data to be made available to \
instances."),
make_option("--image-id", metavar="ID",
help="The ID of the image to launch."),
make_option("-k", "--key-name", metavar="KEY-PAIR",
help="The key pair to use when launching instances. (Amazon EC2 only.)"),
make_option("-p", "--user-packages", metavar="PACKAGES",
help="A space-separated list of packages to install on instances on start \
up."),
make_option("-t", "--instance-type", metavar="TYPE",
help="The type of instance to be launched. One of m1.small, m1.large, \
m1.xlarge, c1.medium, or c1.xlarge."),
make_option("-z", "--availability-zone", metavar="ZONE",
help="The availability zone to run the instances in."),
make_option("--auto-shutdown", metavar="TIMEOUT_MINUTES",
help="The time in minutes after launch when an instance will be \
automatically shut down."),
make_option("--client-cidr", metavar="CIDR", action="append",
help="The CIDR of the client, which is used to allow access through the \
firewall to the master node. (May be specified multiple times.)"),
make_option("--security-group", metavar="SECURITY_GROUP", action="append",
default=[], help="Additional security groups within which the instances \
should be run. (Amazon EC2 only.) (May be specified multiple times.)"),
make_option("--public-key", metavar="FILE",
help="The public key to authorize on launching instances. (Non-EC2 \
providers only.)"),
make_option("--private-key", metavar="FILE",
help="The private key to use when connecting to instances. (Non-EC2 \
providers only.)"),
]
SNAPSHOT_OPTIONS = [
CONFIG_DIR_OPTION,
PROVIDER_OPTION,
make_option("-k", "--key-name", metavar="KEY-PAIR",
help="The key pair to use when launching instances."),
make_option("-z", "--availability-zone", metavar="ZONE",
help="The availability zone to run the instances in."),
make_option("--ssh-options", metavar="SSH-OPTIONS",
help="SSH options to use."),
]
PLACEMENT_OPTIONS = [
CONFIG_DIR_OPTION,
PROVIDER_OPTION,
make_option("-z", "--availability-zone", metavar="ZONE",
help="The availability zone to run the instances in."),
]
FORCE_OPTIONS = [
CONFIG_DIR_OPTION,
PROVIDER_OPTION,
make_option("--force", action="store_true", default=False,
help="Do not ask for confirmation."),
]
SSH_OPTIONS = [
CONFIG_DIR_OPTION,
PROVIDER_OPTION,
make_option("--ssh-options", metavar="SSH-OPTIONS",
help="SSH options to use."),
]
def print_usage(script):
print """Usage: %(script)s COMMAND [OPTIONS]
where COMMAND and [OPTIONS] may be one of:
list [CLUSTER] list all running Hadoop clusters
or instances in CLUSTER
launch-master CLUSTER launch or find a master in CLUSTER
launch-slaves CLUSTER NUM_SLAVES launch NUM_SLAVES slaves in CLUSTER
launch-cluster CLUSTER (NUM_SLAVES| launch a master and NUM_SLAVES slaves or
N ROLE [N ROLE ...]) N instances in ROLE in CLUSTER
create-formatted-snapshot CLUSTER create an empty, formatted snapshot of
SIZE size SIZE GiB
list-storage CLUSTER list storage volumes for CLUSTER
create-storage CLUSTER ROLE create volumes for NUM_INSTANCES instances
NUM_INSTANCES SPEC_FILE in ROLE for CLUSTER, using SPEC_FILE
attach-storage ROLE attach storage volumes for ROLE to CLUSTER
login CLUSTER log in to the master in CLUSTER over SSH
proxy CLUSTER start a SOCKS proxy on localhost into the
CLUSTER
push CLUSTER FILE scp FILE to the master in CLUSTER
exec CLUSTER CMD execute CMD on the master in CLUSTER
terminate-cluster CLUSTER terminate all instances in CLUSTER
delete-cluster CLUSTER delete the group information for CLUSTER
delete-storage CLUSTER delete all storage volumes for CLUSTER
update-slaves-file CLUSTER update the slaves file on the CLUSTER
master
Use %(script)s COMMAND --help to see additional options for specific commands.
""" % locals()
def print_deprecation(script, replacement):
print "Deprecated. Use '%(script)s %(replacement)s'." % locals()
def parse_options_and_config(command, option_list=[], extra_arguments=(),
unbounded_args=False):
"""
Parse the arguments to command using the given option list, and combine with
any configuration parameters.
If unbounded_args is true then there must be at least as many extra arguments
as specified by extra_arguments (the first argument is always CLUSTER).
Otherwise there must be exactly the same number of arguments as
extra_arguments.
"""
expected_arguments = ["CLUSTER",]
expected_arguments.extend(extra_arguments)
(options_dict, args) = parse_options(command, option_list, expected_arguments,
unbounded_args)
config_dir = get_config_dir(options_dict)
config_files = [os.path.join(config_dir, CONFIG_FILENAME)]
if 'config_dir' not in options_dict:
# if config_dir not set, then also search in current directory
config_files.insert(0, CONFIG_FILENAME)
config = ConfigParser.ConfigParser()
read_files = config.read(config_files)
logging.debug("Read %d configuration files: %s", len(read_files),
", ".join(read_files))
cluster_name = args[0]
opt = merge_config_with_options(cluster_name, config, options_dict)
logging.debug("Options: %s", str(opt))
service_name = get_service_name(opt)
cloud_provider = get_cloud_provider(opt)
cluster = get_cluster(cloud_provider)(cluster_name, config_dir)
service = get_service(service_name, cloud_provider)(cluster)
return (opt, args, service)
def parse_options(command, option_list=[], expected_arguments=(),
unbounded_args=False):
"""
Parse the arguments to command using the given option list.
If unbounded_args is true then there must be at least as many extra arguments
as specified by extra_arguments (the first argument is always CLUSTER).
Otherwise there must be exactly the same number of arguments as
extra_arguments.
"""
config_file_name = "%s/%s" % (DEFAULT_CONFIG_DIR_NAME, CONFIG_FILENAME)
usage = """%%prog %s [options] %s
Options may also be specified in a configuration file called
%s located in the user's home directory.
Options specified on the command line take precedence over any in the
configuration file.""" % (command, " ".join(expected_arguments),
config_file_name)
parser = OptionParser(usage=usage, version="%%prog %s" % VERSION,
option_list=option_list)
parser.disable_interspersed_args()
(options, args) = parser.parse_args(sys.argv[2:])
if unbounded_args:
if len(args) < len(expected_arguments):
parser.error("incorrect number of arguments")
elif len(args) != len(expected_arguments):
parser.error("incorrect number of arguments")
return (vars(options), args)
def get_config_dir(options_dict):
config_dir = options_dict.get('config_dir')
if not config_dir:
config_dir = DEFAULT_CONFIG_DIR
return config_dir
def get_service_name(options_dict):
service_name = options_dict.get("service", None)
if service_name is None:
service_name = DEFAULT_SERVICE_NAME
return service_name
def get_cloud_provider(options_dict):
provider = options_dict.get("cloud_provider", None)
if provider is None:
provider = DEFAULT_CLOUD_PROVIDER
return provider
def check_options_set(options, option_names):
for option_name in option_names:
if options.get(option_name) is None:
print "Option '%s' is missing. Aborting." % option_name
sys.exit(1)
def check_launch_options_set(cluster, options):
if cluster.get_provider_code() == 'ec2':
if options.get('ami') is None and options.get('image_id') is None:
print "One of ami or image_id must be specified. Aborting."
sys.exit(1)
check_options_set(options, ['key_name'])
else:
check_options_set(options, ['image_id', 'public_key'])
def get_image_id(cluster, options):
if cluster.get_provider_code() == 'ec2':
return options.get('image_id', options.get('ami'))
else:
return options.get('image_id')
def main():
# Use HADOOP_CLOUD_LOGGING_LEVEL=DEBUG to enable debugging output.
logging.basicConfig(level=getattr(logging,
os.getenv("HADOOP_CLOUD_LOGGING_LEVEL",
"INFO")))
if len(sys.argv) < 2:
print_usage(sys.argv[0])
sys.exit(1)
command = sys.argv[1]
if command == 'list':
(opt, args) = parse_options(command, BASIC_OPTIONS, unbounded_args=True)
if len(args) == 0:
service_name = get_service_name(opt)
cloud_provider = get_cloud_provider(opt)
service = get_service(service_name, cloud_provider)(None)
service.list_all(cloud_provider)
else:
(opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
service.list()
elif command == 'launch-master':
(opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS)
check_launch_options_set(service.cluster, opt)
config_dir = get_config_dir(opt)
template = InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group'))
service.launch_master(template, config_dir, opt.get('client_cidr'))
elif command == 'launch-slaves':
(opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS,
("NUM_SLAVES",))
number_of_slaves = int(args[1])
check_launch_options_set(service.cluster, opt)
template = InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group'))
service.launch_slaves(template)
elif command == 'launch-cluster':
(opt, args, service) = parse_options_and_config(command, LAUNCH_OPTIONS,
("NUM_SLAVES",),
unbounded_args=True)
check_launch_options_set(service.cluster, opt)
config_dir = get_config_dir(opt)
instance_templates = []
if len(args) == 2:
number_of_slaves = int(args[1])
print_deprecation(sys.argv[0], 'launch-cluster %s 1 nn,snn,jt %s dn,tt' %
(service.cluster.name, number_of_slaves))
instance_templates = [
InstanceTemplate((NAMENODE, SECONDARY_NAMENODE, JOBTRACKER), 1,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group')),
InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves,
get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'), opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group')),
]
elif len(args) > 2 and len(args) % 2 == 0:
print_usage(sys.argv[0])
sys.exit(1)
else:
for i in range(len(args) / 2):
number = int(args[2 * i + 1])
roles = args[2 * i + 2].split(",")
instance_templates.append(
InstanceTemplate(roles, number, get_image_id(service.cluster, opt),
opt.get('instance_type'), opt.get('key_name'),
opt.get('public_key'), opt.get('private_key'),
opt.get('user_data_file'),
opt.get('availability_zone'),
opt.get('user_packages'),
opt.get('auto_shutdown'), opt.get('env'),
opt.get('security_group')))
service.launch_cluster(instance_templates, config_dir,
opt.get('client_cidr'))
elif command == 'login':
(opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
service.login(opt.get('ssh_options'))
elif command == 'proxy':
(opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
service.proxy(opt.get('ssh_options'))
elif command == 'push':
(opt, args, service) = parse_options_and_config(command, SSH_OPTIONS,
("FILE",))
service.push(opt.get('ssh_options'), args[1])
elif command == 'exec':
(opt, args, service) = parse_options_and_config(command, SSH_OPTIONS,
("CMD",), True)
service.execute(opt.get('ssh_options'), args[1:])
elif command == 'terminate-cluster':
(opt, args, service) = parse_options_and_config(command, FORCE_OPTIONS)
service.terminate_cluster(opt["force"])
elif command == 'delete-cluster':
(opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
service.delete_cluster()
elif command == 'create-formatted-snapshot':
(opt, args, service) = parse_options_and_config(command, SNAPSHOT_OPTIONS,
("SIZE",))
size = int(args[1])
check_options_set(opt, ['availability_zone', 'key_name'])
ami_ubuntu_intrepid_x86 = 'ami-ec48af85' # use a general AMI
service.create_formatted_snapshot(size,
opt.get('availability_zone'),
ami_ubuntu_intrepid_x86,
opt.get('key_name'),
xstr(opt.get('ssh_options')))
elif command == 'list-storage':
(opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS)
service.list_storage()
elif command == 'create-storage':
(opt, args, service) = parse_options_and_config(command, PLACEMENT_OPTIONS,
("ROLE", "NUM_INSTANCES",
"SPEC_FILE"))
role = args[1]
number_of_instances = int(args[2])
spec_file = args[3]
check_options_set(opt, ['availability_zone'])
service.create_storage(role, number_of_instances,
opt.get('availability_zone'), spec_file)
elif command == 'attach-storage':
(opt, args, service) = parse_options_and_config(command, BASIC_OPTIONS,
("ROLE",))
service.attach_storage(args[1])
elif command == 'delete-storage':
(opt, args, service) = parse_options_and_config(command, FORCE_OPTIONS)
service.delete_storage(opt["force"])
elif command == 'update-slaves-file':
(opt, args, service) = parse_options_and_config(command, SSH_OPTIONS)
check_options_set(opt, ['private_key'])
ssh_options = xstr(opt.get('ssh_options'))
config_dir = get_config_dir(opt)
service.update_slaves_file(config_dir, ssh_options, opt.get('private_key'))
else:
print "Unrecognized command '%s'" % command
print_usage(sys.argv[0])
sys.exit(1)