blob: 1b165f26bb0c592c54d8485c7495c7b17b8b57ab [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Connection sub-commands"""
import reprlib
from urllib.parse import urlunparse
from sqlalchemy.orm import exc
from tabulate import tabulate
from airflow.models import Connection
from airflow.utils import cli as cli_utils
from airflow.utils.cli import alternative_conn_specs
from airflow.utils.session import create_session
def connections_list(args):
"""Lists all connections at the command line"""
with create_session() as session:
conns = session.query(Connection.conn_id, Connection.conn_type,
Connection.host, Connection.port,
Connection.is_encrypted,
Connection.is_extra_encrypted,
Connection.extra).all()
conns = [map(reprlib.repr, conn) for conn in conns]
msg = tabulate(conns, ['Conn Id', 'Conn Type', 'Host', 'Port',
'Is Encrypted', 'Is Extra Encrypted', 'Extra'],
tablefmt=args.output)
print(msg)
@cli_utils.action_logging
def connections_add(args):
"""Adds new connection"""
# Check that the conn_id and conn_uri args were passed to the command:
missing_args = []
invalid_args = []
if args.conn_uri:
for arg in alternative_conn_specs:
if getattr(args, arg) is not None:
invalid_args.append(arg)
elif not args.conn_type:
missing_args.append('conn_uri or conn_type')
if missing_args:
msg = ('The following args are required to add a connection:' +
' {missing!r}'.format(missing=missing_args))
raise SystemExit(msg)
if invalid_args:
msg = ('The following args are not compatible with the ' +
'--add flag and --conn_uri flag: {invalid!r}')
msg = msg.format(invalid=invalid_args)
raise SystemExit(msg)
if args.conn_uri:
new_conn = Connection(conn_id=args.conn_id, uri=args.conn_uri)
else:
new_conn = Connection(conn_id=args.conn_id,
conn_type=args.conn_type,
host=args.conn_host,
login=args.conn_login,
password=args.conn_password,
schema=args.conn_schema,
port=args.conn_port)
if args.conn_extra is not None:
new_conn.set_extra(args.conn_extra)
with create_session() as session:
if not (session.query(Connection)
.filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
msg = '\n\tSuccessfully added `conn_id`={conn_id} : {uri}\n'
msg = msg.format(conn_id=new_conn.conn_id,
uri=args.conn_uri or
urlunparse((args.conn_type,
'{login}:{password}@{host}:{port}'
.format(login=args.conn_login or '',
password='******' if args.conn_password else '',
host=args.conn_host or '',
port=args.conn_port or ''),
args.conn_schema or '', '', '', '')))
print(msg)
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(msg)
@cli_utils.action_logging
def connections_delete(args):
"""Deletes connection from DB"""
with create_session() as session:
try:
to_delete = (session
.query(Connection)
.filter(Connection.conn_id == args.conn_id)
.one())
except exc.NoResultFound:
msg = '\n\tDid not find a connection with `conn_id`={conn_id}\n'
msg = msg.format(conn_id=args.conn_id)
print(msg)
return
except exc.MultipleResultsFound:
msg = ('\n\tFound more than one connection with ' +
'`conn_id`={conn_id}\n')
msg = msg.format(conn_id=args.conn_id)
print(msg)
return
else:
deleted_conn_id = to_delete.conn_id
session.delete(to_delete)
msg = '\n\tSuccessfully deleted `conn_id`={conn_id}\n'
msg = msg.format(conn_id=deleted_conn_id)
print(msg)