blob: c4fbe0919e4c6dd107bc54bfc2d85b1efa18b477 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for the SeaTunnel client."""
import pytest
import httpx
from unittest.mock import patch, MagicMock
from src.seatunnel_mcp.client import SeaTunnelClient
@pytest.fixture
def client():
"""Create a client for testing."""
return SeaTunnelClient(base_url="http://localhost:8090", api_key="test_key")
def test_init(client):
"""Test client initialization."""
assert client.base_url == "http://localhost:8090"
assert client.api_key == "test_key"
assert client.headers == {
"Content-Type": "application/json",
"Authorization": "Bearer test_key",
}
def test_get_connection_settings(client):
"""Test get_connection_settings."""
settings = client.get_connection_settings()
assert settings == {
"url": "http://localhost:8090",
"has_api_key": True,
}
def test_update_connection_settings(client):
"""Test update_connection_settings."""
settings = client.update_connection_settings(
url="http://new-host:8090",
api_key="new_key",
)
assert client.base_url == "http://new-host:8090"
assert client.api_key == "new_key"
assert client.headers["Authorization"] == "Bearer new_key"
assert settings == {
"url": "http://new-host:8090",
"has_api_key": True,
}
@patch("httpx.Client")
def test_submit_job(mock_client, client):
"""Test submit_job."""
mock_response = MagicMock()
mock_response.json.return_value = {"jobId": "123"}
mock_response.raise_for_status.return_value = None
mock_client_instance = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client.return_value.__enter__.return_value = mock_client_instance
job_content = "env { job.mode = \"batch\" }"
result = client.submit_job(
job_content=job_content,
jobName="test_job",
format="hocon",
)
mock_client_instance.request.assert_called_once_with(
"POST",
"http://localhost:8090/submit-job",
headers={
"Content-Type": "text/plain",
"Authorization": "Bearer test_key",
},
params={"jobName": "test_job", "format": "hocon"},
content=job_content,
)
assert result == {"jobId": "123"}
@patch("httpx.Client")
def test_submit_jobs(mock_client, client):
"""Test submit_jobs."""
mock_response = MagicMock()
mock_response.json.return_value = {"jobIds": ["123", "456"]}
mock_response.raise_for_status.return_value = None
mock_client_instance = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client.return_value.__enter__.return_value = mock_client_instance
# 直接作为请求体的任意数据
request_body = [
{
"params": {"jobId": "123", "jobName": "job-1"},
"env": {"job.mode": "batch"},
"source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}],
"transform": [],
"sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}]
},
{
"params": {"jobId": "456", "jobName": "job-2"},
"env": {"job.mode": "batch"},
"source": [{"plugin_name": "FakeSource", "plugin_output": "fake"}],
"transform": [],
"sink": [{"plugin_name": "Console", "plugin_input": ["fake"]}]
}
]
result = client.submit_jobs(request_body=request_body)
mock_client_instance.request.assert_called_once_with(
"POST",
"http://localhost:8090/submit-jobs",
headers={
"Content-Type": "application/json",
"Authorization": "Bearer test_key",
},
json=request_body,
)
assert result == {"jobIds": ["123", "456"]}
@patch("httpx.Client")
def test_submit_job_upload(mock_client, client):
"""Test submit_job_upload."""
mock_response = MagicMock()
mock_response.json.return_value = {"jobId": "123"}
mock_response.raise_for_status.return_value = None
mock_client_instance = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client.return_value.__enter__.return_value = mock_client_instance
# Mock file-like object
config_file = MagicMock()
config_file.name = "test_job.conf"
result = client.submit_job_upload(
config_file=config_file,
jobName="test_job",
)
mock_client_instance.request.assert_called_once_with(
"POST",
"http://localhost:8090/submit-job/upload",
headers={
"Authorization": "Bearer test_key",
},
params={"jobName": "test_job"},
files={'config_file': config_file},
)
assert result == {"jobId": "123"}
@patch("httpx.Client")
def test_submit_job_upload_json(mock_client, client):
"""Test submit_job_upload with JSON file."""
mock_response = MagicMock()
mock_response.json.return_value = {"jobId": "123"}
mock_response.raise_for_status.return_value = None
mock_client_instance = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client.return_value.__enter__.return_value = mock_client_instance
# Mock file-like object with json extension
config_file = MagicMock()
config_file.name = "test_job.json"
result = client.submit_job_upload(
config_file=config_file,
jobName="test_job",
format="json", # Explicitly specify format
)
mock_client_instance.request.assert_called_once_with(
"POST",
"http://localhost:8090/submit-job/upload",
headers={
"Authorization": "Bearer test_key",
},
params={"jobName": "test_job", "format": "json"},
files={'config_file': config_file},
)
assert result == {"jobId": "123"}
@patch("httpx.Client")
@patch("builtins.open")
def test_submit_job_upload_path(mock_open, mock_client, client):
"""Test submit_job_upload with a file path."""
mock_response = MagicMock()
mock_response.json.return_value = {"jobId": "123"}
mock_response.raise_for_status.return_value = None
mock_client_instance = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client.return_value.__enter__.return_value = mock_client_instance
# Mock the file object returned by open()
mock_file = MagicMock()
mock_open.return_value = mock_file
file_path = "/path/to/test_job.conf"
result = client.submit_job_upload(
config_file=file_path,
jobName="test_job",
jobId="987654321",
)
# Check that open was called with the file path
mock_open.assert_called_once_with(file_path, 'rb')
# Check that the request was made with the proper parameters
mock_client_instance.request.assert_called_once_with(
"POST",
"http://localhost:8090/submit-job/upload",
headers={
"Authorization": "Bearer test_key",
},
params={"jobName": "test_job", "jobId": "987654321"},
files={'config_file': mock_file},
)
# Verify the mock file was closed after the request
mock_file.close.assert_called_once()
assert result == {"jobId": "123"}