blob: 48b3533435ea2dd5127b0b5faac9f68cb023e818 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::errors::BallistaError;
use crate::utils::wait_for_future;
use ballista_core::config::TaskSchedulingPolicy;
use ballista_scheduler::cluster::BallistaCluster;
use ballista_scheduler::config::{ClusterStorageConfig, SchedulerConfig, TaskDistribution};
use ballista_scheduler::scheduler_process::start_server;
use log::info;
use pyo3::prelude::*;
/// Python wrapper around a scheduler, allowing users to run an scheduler within a Python process.
#[pyclass(name = "Scheduler", module = "ballista", subclass)]
pub struct PyScheduler {}
#[pymethods]
impl PyScheduler {
#[new]
#[pyo3(signature = (
bind_host,
bind_port,
external_host,
))]
fn new(bind_host: &str, bind_port: u16, external_host: &str, py: Python) -> PyResult<Self> {
env_logger::init();
info!("Starting scheduler on {bind_host}:{bind_port}");
let addr = format!("{}:{}", bind_host, bind_port);
let addr = addr.parse()?;
let config = SchedulerConfig {
namespace: "default".to_string(),
external_host: external_host.to_string(),
bind_port: bind_port,
scheduling_policy: TaskSchedulingPolicy::PullStaged,
event_loop_buffer_size: 1000,
finished_job_data_clean_up_interval_seconds: 60,
finished_job_state_clean_up_interval_seconds: 60,
advertise_flight_sql_endpoint: None,
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: None,
executor_termination_grace_period: 30000,
scheduler_event_expected_processing_duration: 1000,
task_distribution: TaskDistribution::RoundRobin,
};
let cluster = BallistaCluster::new_from_config(&config);
let cluster =
wait_for_future(py, cluster).map_err(|e| BallistaError::Common(format!("{}", e)))?;
let fut = start_server(cluster, addr, config);
let _ = wait_for_future(py, fut).map_err(|e| BallistaError::Common(format!("{}", e)))?;
Ok(Self {})
}
}