blob: 73faedd370a9e0f92d7332f60b133e97d1bd55a1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import socket
import unittest
from unittest import mock
from urllib3.connection import HTTPConnection, HTTPSConnection
from airflow.kubernetes.kube_client import RefreshConfiguration, _enable_tcp_keepalive, get_kube_client
class TestClient(unittest.TestCase):
@mock.patch('airflow.kubernetes.kube_client.config')
def test_load_cluster_config(self, _):
client = get_kube_client(in_cluster=True)
assert not isinstance(client.api_client.configuration, RefreshConfiguration)
@mock.patch('airflow.kubernetes.kube_client.config')
@mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
def test_load_file_config(self, _, _2):
client = get_kube_client(in_cluster=False)
assert isinstance(client.api_client.configuration, RefreshConfiguration)
def test_enable_tcp_keepalive(self):
socket_options = [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 120),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 6),
]
expected_http_connection_options = HTTPConnection.default_socket_options + socket_options
expected_https_connection_options = HTTPSConnection.default_socket_options + socket_options
_enable_tcp_keepalive()
self.assertEqual(HTTPConnection.default_socket_options, expected_http_connection_options)
self.assertEqual(HTTPSConnection.default_socket_options, expected_https_connection_options)