blob: 96b9b2814fdd8930c6971933305c16732c53a4a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::error::Error;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use futures::future;
use futures::Future;
use tracing::{debug, info};
use dubbo_config::protocol::ProtocolRetrieve;
use dubbo_config::{get_global_config, RootConfig};
use crate::common::url::Url;
use crate::protocol::{BoxExporter, Protocol};
use crate::registry::protocol::RegistryProtocol;
use crate::registry::types::{Registries, RegistriesOperation};
use crate::registry::{BoxRegistry, Registry};
// Invoker是否可以基于hyper写一个通用的
#[derive(Default)]
pub struct Dubbo {
// cached protocols, key of map means protocol name eg. dubbo, triple, grpc
protocols: HashMap<String, Vec<Url>>,
registries: Option<Registries>,
service_registry: HashMap<String, Vec<Url>>, // registry: Urls
config: Option<RootConfig>,
}
impl Dubbo {
pub fn new() -> Dubbo {
tracing_subscriber::fmt::init();
Self {
protocols: HashMap::new(),
registries: None,
service_registry: HashMap::new(),
config: None,
}
}
pub fn with_config(mut self, c: RootConfig) -> Self {
self.config = Some(c);
self
}
pub fn add_registry(mut self, registry_key: &str, registry: BoxRegistry) -> Self {
if self.registries.is_none() {
self.registries = Some(Arc::new(Mutex::new(HashMap::new())));
}
self.registries
.as_ref()
.unwrap()
.insert(registry_key.to_string(), Arc::new(Mutex::new(registry)));
self
}
pub fn init(&mut self) -> Result<(), Box<dyn Error>> {
if self.config.is_none() {
self.config = Some(get_global_config())
}
let root_config = self.config.as_ref().unwrap();
debug!("global conf: {:?}", root_config);
// env::set_var("ZOOKEEPER_SERVERS",root_config);
for (_, service_config) in root_config.provider.services.iter() {
info!("init service name: {}", service_config.interface);
let url = if root_config
.protocols
.contains_key(service_config.protocol.as_str())
{
let protocol = root_config
.protocols
.get_protocol_or_default(service_config.protocol.as_str());
let protocol_url =
format!("{}/{}", protocol.to_url(), service_config.interface.clone(),);
info!("protocol_url: {:?}", protocol_url);
Url::from_url(&protocol_url)
} else {
return Err(format!("protocol {:?} not exists", service_config.protocol).into());
};
info!("url: {:?}", url);
if url.is_none() {
continue;
}
let u = url.unwrap();
if self.protocols.get(&service_config.protocol).is_some() {
self.protocols
.get_mut(&service_config.protocol)
.unwrap()
.push(u);
} else {
self.protocols
.insert(service_config.protocol.clone(), vec![u]);
}
}
Ok(())
}
pub async fn start(&mut self) {
self.init().unwrap();
info!("starting...");
// TODO: server registry
let mem_reg = Box::new(
RegistryProtocol::new()
.with_registries(self.registries.as_ref().unwrap().clone())
.with_services(self.service_registry.clone()),
);
let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
for (name, items) in self.protocols.iter() {
for url in items.iter() {
info!("protocol: {:?}, service url: {:?}", name, url);
let exporter = mem_reg.clone().export(url.to_owned());
async_vec.push(exporter);
//TODO multiple registry
if self.registries.is_some() {
self.registries
.as_ref()
.unwrap()
.default_registry()
.register(url.clone())
.unwrap();
}
}
}
let _res = future::join_all(async_vec).await;
}
}