#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import inspect
import time
import unittest

import mock
import pytest
from mox import IgnoreArg, IsA, Mox
from requests.auth import AuthBase
from thrift.transport import TTransport
from twitter.common.quantity import Amount, Time
from twitter.common.zookeeper.kazoo_client import TwitterKazooClient
from twitter.common.zookeeper.serverset.endpoint import ServiceInstance

import apache.aurora.client.api.scheduler_client as scheduler_client
from apache.aurora.common.auth.auth_module import AuthModule
from apache.aurora.common.cluster import Cluster
from apache.aurora.common.transport import TRequestsTransport

import gen.apache.aurora.api.AuroraAdmin as AuroraAdmin
import gen.apache.aurora.api.AuroraSchedulerManager as AuroraSchedulerManager
from gen.apache.aurora.api.constants import BYPASS_LEADER_REDIRECT_HEADER_NAME
from gen.apache.aurora.api.ttypes import (
    ExplicitReconciliationSettings,
    Hosts,
    JobConfiguration,
    JobKey,
    JobUpdateQuery,
    JobUpdateRequest,
    ResourceAggregate,
    Response,
    ResponseCode,
    ResponseDetail,
    ScheduleStatus,
    SlaPolicy,
    TaskQuery
)

ROLE = 'foorole'
JOB_NAME = 'barjobname'
JOB_ENV = 'devel'
JOB_KEY = JobKey(role=ROLE, environment=JOB_ENV, name=JOB_NAME)
DEFAULT_RESPONSE = Response()


def test_coverage():
  """Make sure a new thrift RPC doesn't get added without minimal test coverage."""
  for name, klass in inspect.getmembers(AuroraAdmin) + inspect.getmembers(AuroraSchedulerManager):
    if name.endswith('_args'):
      rpc_name = name[:-len('_args')]
      assert hasattr(TestSchedulerProxyAdminInjection, 'test_%s' % rpc_name), (
          'No test defined for RPC %s' % rpc_name)


class TestSchedulerProxyInjection(unittest.TestCase):
  def setUp(self):
    self.mox = Mox()

    self.mox.StubOutClassWithMocks(AuroraAdmin, 'Client')
    self.mox.StubOutClassWithMocks(scheduler_client, 'SchedulerClient')

    self.mock_scheduler_client = self.mox.CreateMock(scheduler_client.SchedulerClient)
    self.mock_thrift_client = self.mox.CreateMock(AuroraAdmin.Client)

    scheduler_client.SchedulerClient.get(IgnoreArg(), verbose=IgnoreArg()).AndReturn(
        self.mock_scheduler_client)
    self.mock_scheduler_client.get_thrift_client().AndReturn(self.mock_thrift_client)

  def tearDown(self):
    self.mox.UnsetStubs()
    self.mox.VerifyAll()

  def make_scheduler_proxy(self):
    return scheduler_client.SchedulerProxy(Cluster(name='local'))

  def test_startCronJob(self):
    self.mock_thrift_client.startCronJob(IsA(JobKey)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().startCronJob(JOB_KEY)

  def test_createJob(self):
    self.mock_thrift_client.createJob(
        IsA(JobConfiguration)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().createJob(JobConfiguration())

  def test_replaceCronTemplate(self):
    self.mock_thrift_client.replaceCronTemplate(
        IsA(JobConfiguration)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().replaceCronTemplate(JobConfiguration())

  def test_scheduleCronJob(self):
    self.mock_thrift_client.scheduleCronJob(
        IsA(JobConfiguration)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().scheduleCronJob(JobConfiguration())

  def test_descheduleCronJob(self):
    self.mock_thrift_client.descheduleCronJob(
        IsA(JobKey)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().descheduleCronJob(JOB_KEY)

  def test_populateJobConfig(self):
    self.mock_thrift_client.populateJobConfig(IsA(JobConfiguration)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().populateJobConfig(JobConfiguration())

  def test_restartShards(self):
    self.mock_thrift_client.restartShards(IsA(JobKey), IgnoreArg()).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().restartShards(JOB_KEY, {0})

  def test_getTasksStatus(self):
    self.mock_thrift_client.getTasksStatus(IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().getTasksStatus(TaskQuery())

  def test_getJobs(self):
    self.mock_thrift_client.getJobs(IgnoreArg()).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().getJobs(ROLE)

  def test_killTasks(self):
    self.mock_thrift_client.killTasks(IsA(JobKey), IgnoreArg(), None).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().killTasks(JobKey(), {0}, None)

  def test_getQuota(self):
    self.mock_thrift_client.getQuota(IgnoreArg()).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().getQuota(ROLE)

  def test_addInstances(self):
    self.mock_thrift_client.addInstances(IsA(JobKey), 1).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().addInstances(JobKey(), 1)

  def test_getJobUpdateSummaries(self):
    self.mock_thrift_client.getJobUpdateSummaries(IsA(JobUpdateQuery)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().getJobUpdateSummaries(JobUpdateQuery())

  def test_getJobUpdateDetails(self):
    self.mock_thrift_client.getJobUpdateDetails('update_id').AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().getJobUpdateDetails('update_id')

  def test_startJobUpdate(self):
    self.mock_thrift_client.startJobUpdate(
        IsA(JobUpdateRequest)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().startJobUpdate(JobUpdateRequest())

  def test_pauseJobUpdate(self):
    self.mock_thrift_client.pauseJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().pauseJobUpdate('update_id')

  def test_resumeJobUpdate(self):
    self.mock_thrift_client.resumeJobUpdate(
        'update_id').AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().resumeJobUpdate('update_id')

  def test_abortJobUpdate(self):
    self.mock_thrift_client.abortJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().abortJobUpdate('update_id')

  def test_rollbackJobUpdate(self):
    self.mock_thrift_client.rollbackJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().rollbackJobUpdate('update_id')

  def test_pulseJobUpdate(self):
    self.mock_thrift_client.pulseJobUpdate('update_id').AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().pulseJobUpdate('update_id')

  def test_raise_auth_error(self):
    self.mock_thrift_client.killTasks(None, None, None).AndRaise(TRequestsTransport.AuthError())
    self.mock_scheduler_client.get_failed_auth_message().AndReturn('failed auth')
    self.mox.ReplayAll()
    with pytest.raises(scheduler_client.SchedulerProxy.AuthError):
      self.make_scheduler_proxy().killTasks(None, None, None)


class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
  def test_startMaintenance(self):
    self.mock_thrift_client.startMaintenance(
      IsA(Hosts)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().startMaintenance(Hosts())

  def test_drainHosts(self):
    self.mock_thrift_client.drainHosts(IsA(Hosts)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().drainHosts(Hosts())

  def test_slaDrainHosts(self):
    self.mock_thrift_client.slaDrainHosts(
      IsA(Hosts),
      IsA(SlaPolicy),
      10).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().slaDrainHosts(Hosts(), SlaPolicy(), 10)

  def test_maintenanceStatus(self):
    self.mock_thrift_client.maintenanceStatus(
      IsA(Hosts)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().maintenanceStatus(Hosts())

  def test_endMaintenance(self):
    self.mock_thrift_client.endMaintenance(IsA(Hosts)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().endMaintenance(Hosts())

  def test_setQuota(self):
    self.mock_thrift_client.setQuota(
        IgnoreArg(),
        IsA(ResourceAggregate)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().setQuota(ROLE, ResourceAggregate())

  def test_forceTaskState(self):
    self.mock_thrift_client.forceTaskState(
        'taskid',
        IgnoreArg()).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().forceTaskState('taskid', ScheduleStatus.LOST)

  def test_performBackup(self):
    self.mock_thrift_client.performBackup().AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().performBackup()

  def test_listBackups(self):
    self.mock_thrift_client.listBackups().AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().listBackups()

  def test_stageRecovery(self):
    self.mock_thrift_client.stageRecovery(
        IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().stageRecovery(TaskQuery())

  def test_queryRecovery(self):
    self.mock_thrift_client.queryRecovery(
      IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().queryRecovery(TaskQuery())

  def test_deleteRecoveryTasks(self):
    self.mock_thrift_client.deleteRecoveryTasks(
        IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().deleteRecoveryTasks(TaskQuery())

  def test_commitRecovery(self):
    self.mock_thrift_client.commitRecovery().AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().commitRecovery()

  def test_unloadRecovery(self):
    self.mock_thrift_client.unloadRecovery().AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().unloadRecovery()

  def test_snapshot(self):
    self.mock_thrift_client.snapshot().AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().snapshot()

  def test_pruneTasks(self):
    t = TaskQuery()
    self.mock_thrift_client.pruneTasks(IsA(TaskQuery)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().pruneTasks(t)

  def test_triggerExplicitTaskReconciliation(self):
    self.mock_thrift_client.triggerExplicitTaskReconciliation(
      IsA(ExplicitReconciliationSettings)).AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().triggerExplicitTaskReconciliation(
      ExplicitReconciliationSettings(batchSize=None))

  def test_triggerImplicitTaskReconciliation(self):
    self.mock_thrift_client.triggerImplicitTaskReconciliation().AndReturn(DEFAULT_RESPONSE)
    self.mox.ReplayAll()
    self.make_scheduler_proxy().triggerImplicitTaskReconciliation()


def mock_auth():
  auth_mock = mock.create_autospec(spec=AuthModule, instance=True)
  auth_mock.auth.return_value = mock.create_autospec(AuthBase)
  return auth_mock


@pytest.mark.parametrize('scheme', ('http', 'https'))
def test_url_when_not_connected_and_cluster_has_no_proxy_url(scheme):
  host = 'some-host.example.com'
  port = 31181

  mock_zk = mock.create_autospec(spec=TwitterKazooClient, instance=True)

  service_json = '''{
    "additionalEndpoints": {
        "%(scheme)s": {
            "host": "%(host)s",
            "port": %(port)d
        }
    },
    "serviceEndpoint": {
        "host": "%(host)s",
        "port": %(port)d
    },
    "shard": 0,
    "status": "ALIVE"
  }''' % dict(host=host, port=port, scheme=scheme)

  service_endpoints = [ServiceInstance.unpack(service_json)]

  def make_mock_client(proxy_url):
    client = scheduler_client.ZookeeperSchedulerClient(
        Cluster(proxy_url=proxy_url),
        auth=None,
        user_agent='Some-User-Agent',
        _deadline=lambda x, **kws: x())
    client.get_scheduler_serverset = mock.MagicMock(return_value=(mock_zk, service_endpoints))
    client.SERVERSET_TIMEOUT = Amount(0, Time.SECONDS)
    client._connect_scheduler = mock.MagicMock()
    return client

  client = make_mock_client(proxy_url=None)
  assert client.url == '%s://%s:%d' % (scheme, host, port)
  assert client.url == client.raw_url
  client._connect_scheduler.assert_has_calls([])

  client = make_mock_client(proxy_url='https://scheduler.proxy')
  assert client.url == 'https://scheduler.proxy'
  assert client.raw_url == '%s://%s:%d' % (scheme, host, port)
  client._connect_scheduler.assert_has_calls([])

  client = make_mock_client(proxy_url=None)
  client.get_thrift_client()
  assert client.url == '%s://%s:%d' % (scheme, host, port)
  client._connect_scheduler.assert_has_calls([mock.call('%s://%s:%d/api' % (scheme, host, port))])
  client._connect_scheduler.reset_mock()
  client.get_thrift_client()
  client._connect_scheduler.assert_has_calls([])


class TestSchedulerClient(unittest.TestCase):

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_scheduler(self, mock_client):
    mock_client.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    client = scheduler_client.SchedulerClient(mock_auth(), 'Some-User-Agent', verbose=True)
    client._connect_scheduler('https://scheduler.example.com:1337', mock_time)

    assert mock_client.return_value.open.has_calls(mock.call(), mock.call())
    mock_time.sleep.assert_called_once_with(
        scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_scheduler_with_user_agent(self, mock_transport):
    mock_transport.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    auth = mock_auth()
    user_agent = 'Some-User-Agent'

    client = scheduler_client.SchedulerClient(auth, user_agent, verbose=True)

    uri = 'https://scheduler.example.com:1337'
    client._connect_scheduler(uri, mock_time)

    mock_transport.assert_called_once_with(
        uri,
        auth=auth.auth(),
        user_agent=user_agent,
        session_factory=mock.ANY)

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_scheduler_without_bypass_leader_redirect(self, mock_transport):
    mock_transport.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    auth = mock_auth()
    user_agent = 'Some-User-Agent'

    client = scheduler_client.SchedulerClient(
        auth,
        user_agent,
        verbose=True,
        bypass_leader_redirect=False)

    uri = 'https://scheduler.example.com:1337'
    client._connect_scheduler(uri, mock_time)

    mock_transport.assert_called_once_with(
        uri,
        auth=auth.auth(),
        user_agent=user_agent,
        session_factory=mock.ANY)

    _, _, kwargs = mock_transport.mock_calls[0]
    session = kwargs['session_factory']()
    assert session.headers.get(BYPASS_LEADER_REDIRECT_HEADER_NAME) is None

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_scheduler_with_bypass_leader_redirect(self, mock_transport):
    mock_transport.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    auth = mock_auth()
    user_agent = 'Some-User-Agent'

    client = scheduler_client.SchedulerClient(
        auth,
        user_agent,
        verbose=True,
        bypass_leader_redirect=True)

    uri = 'https://scheduler.example.com:1337'
    client._connect_scheduler(uri, mock_time)

    mock_transport.assert_called_once_with(
        uri,
        auth=auth.auth(),
        user_agent=user_agent,
        session_factory=mock.ANY)

    _, _, kwargs = mock_transport.mock_calls[0]
    session = kwargs['session_factory']()
    assert session.headers[BYPASS_LEADER_REDIRECT_HEADER_NAME] == 'true'

  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
              spec=scheduler_client.SchedulerClient)
  @mock.patch('threading._Event.wait')
  def test_transient_error(self, _, client):
    mock_scheduler_client = mock.create_autospec(
        spec=scheduler_client.SchedulerClient,
        spec_set=False,
        instance=True)
    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
    mock_thrift_client.killTasks.side_effect = [
        Response(responseCode=ResponseCode.ERROR_TRANSIENT,
                 details=[ResponseDetail(message="message1"), ResponseDetail(message="message2")]),
        Response(responseCode=ResponseCode.ERROR_TRANSIENT),
        Response(responseCode=ResponseCode.OK)]

    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
    client.get.return_value = mock_scheduler_client

    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
    proxy.killTasks(JobKey(), None, None)

    assert mock_thrift_client.killTasks.call_count == 3

  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
              spec=scheduler_client.SchedulerClient)
  @mock.patch('threading._Event.wait')
  def test_performBackup_retriable_errors(self, mock_wait, mock_client):
    mock_scheduler_client = mock.create_autospec(
        spec=scheduler_client.SchedulerClient,
        spec_set=False,
        instance=True)
    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
    mock_thrift_client.performBackup.side_effect = [
      Response(responseCode=ResponseCode.ERROR_TRANSIENT),
      scheduler_client.SchedulerProxy.TimeoutError,
      Response(responseCode=ResponseCode.OK)]

    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
    mock_client.get.return_value = mock_scheduler_client

    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
    proxy.performBackup()

    assert mock_thrift_client.performBackup.call_count == 3
    assert mock_wait.call_count == 2

  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
              spec=scheduler_client.SchedulerClient)
  @mock.patch('threading._Event.wait')
  def test_performBackup_transport_exception(self, mock_wait, mock_client):
    mock_scheduler_client = mock.create_autospec(
      spec=scheduler_client.SchedulerClient,
      spec_set=False,
      instance=True)

    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
    mock_thrift_client.performBackup.side_effect = TTransport.TTransportException('error')
    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
    mock_client.get.return_value = mock_scheduler_client

    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
    with pytest.raises(scheduler_client.SchedulerProxy.NotRetriableError):
      proxy.performBackup()

    assert mock_thrift_client.performBackup.call_count == 1
    assert not mock_wait.called

  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
              spec=scheduler_client.SchedulerClient)
  @mock.patch('threading._Event.wait')
  def test_getTierConfigs_transport_exception(self, mock_wait, mock_client):
    mock_scheduler_client = mock.create_autospec(
      spec=scheduler_client.SchedulerClient,
      spec_set=False,
      instance=True)

    mock_thrift_client = mock.create_autospec(spec=AuroraAdmin.Client, instance=True)
    mock_thrift_client.getTierConfigs.side_effect = [
      TTransport.TTransportException('error'),
      Response(responseCode=ResponseCode.OK)
    ]
    mock_scheduler_client.get_thrift_client.return_value = mock_thrift_client
    mock_client.get.return_value = mock_scheduler_client

    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))
    proxy.getTierConfigs(retry=True)

    assert mock_thrift_client.getTierConfigs.call_count == 2
    assert mock_wait.call_count == 1

  @mock.patch('apache.aurora.client.api.scheduler_client.SchedulerClient',
              spec=scheduler_client.SchedulerClient)
  def test_unknown_connection_error(self, client):
    mock_scheduler_client = mock.create_autospec(spec=scheduler_client.SchedulerClient,
                                                 instance=True)
    client.get.return_value = mock_scheduler_client
    proxy = scheduler_client.SchedulerProxy(Cluster(name='local'))

    # unknown, transient connection error
    mock_scheduler_client.get_thrift_client.side_effect = RuntimeError
    with pytest.raises(Exception):
      proxy.client()

    # successful connection on re-attempt
    mock_scheduler_client.get_thrift_client.side_effect = None
    assert proxy.client() is not None

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_direct_scheduler_with_user_agent(self, mock_transport):
    mock_transport.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    auth = mock_auth()
    user_agent = 'Some-User-Agent'
    uri = 'https://scheduler.example.com:1337'

    client = scheduler_client.DirectSchedulerClient(
        uri,
        auth=auth,
        verbose=True,
        user_agent=user_agent)

    client._connect_scheduler(uri, mock_time)

    mock_transport.assert_called_once_with(
        uri,
        auth=auth.auth(),
        user_agent=user_agent,
        session_factory=mock.ANY)

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_zookeeper_client_with_auth(self, mock_transport):
    mock_transport.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    user_agent = 'Some-User-Agent'
    uri = 'https://scheduler.example.com:1337'
    auth = mock_auth()
    cluster = Cluster(zk='zk', zk_port='2181')

    def auth_factory(_):
      return auth

    client = scheduler_client.SchedulerClient.get(
        cluster,
        auth_factory=auth_factory,
        user_agent=user_agent)

    client._connect_scheduler(uri, mock_time)

    mock_transport.assert_called_once_with(
        uri,
        auth=auth.auth(),
        user_agent=user_agent,
        session_factory=mock.ANY)

  @mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport',
              spec=TRequestsTransport)
  def test_connect_direct_client_with_auth(self, mock_transport):
    mock_transport.return_value.open.side_effect = [TTransport.TTransportException, True]
    mock_time = mock.create_autospec(spec=time, instance=True)

    user_agent = 'Some-User-Agent'
    uri = 'https://scheduler.example.com:1337'
    auth = mock_auth()
    cluster = Cluster(scheduler_uri='uri')

    def auth_factory(_):
      return auth

    client = scheduler_client.SchedulerClient.get(
        cluster,
        auth_factory=auth_factory,
        user_agent=user_agent)

    client._connect_scheduler(uri, mock_time)

    mock_transport.assert_called_once_with(
        uri,
        auth=auth.auth(),
        user_agent=user_agent,
        session_factory=mock.ANY)

  def test_no_zk_or_scheduler_uri(self):
    cluster = None
    with self.assertRaises(TypeError):
      scheduler_client.SchedulerClient.get(cluster)
    cluster = Cluster()
    with self.assertRaises(ValueError):
      scheduler_client.SchedulerClient.get(cluster)

  def test__internal_connect(self):
    client = scheduler_client.SchedulerClient(mock_auth(), 'Some-User-Agent', verbose=True)
    self.assertIsNone(client._connect())
