blob: c1d03955002eaf713014957c4086e7bff3d82786 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub mod registry_extension;
use crate::{
extension::registry_extension::proxy::RegistryProxy, registry::registry::StaticRegistry,
};
use dubbo_base::{extension_param::ExtensionType, url::UrlParam, StdError, Url};
use dubbo_logger::tracing::{error, info};
use std::{future::Future, pin::Pin, sync::Arc};
use thiserror::Error;
use tokio::sync::{oneshot, RwLock};
pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
once_cell::sync::Lazy::new(|| ExtensionDirectory::init());
#[derive(Default)]
struct ExtensionDirectory {
registry_extension_loader: registry_extension::RegistryExtensionLoader,
}
impl ExtensionDirectory {
fn init() -> ExtensionDirectoryCommander {
let (tx, mut rx) = tokio::sync::mpsc::channel::<ExtensionOpt>(64);
tokio::spawn(async move {
let mut extension_directory = ExtensionDirectory::default();
// register static registry extension
let _ = extension_directory.register(
StaticRegistry::name(),
StaticRegistry::convert_to_extension_factories(),
ExtensionType::Registry,
);
while let Some(extension_opt) = rx.recv().await {
match extension_opt {
ExtensionOpt::Register(
extension_name,
extension_factories,
extension_type,
tx,
) => {
let result = extension_directory.register(
extension_name,
extension_factories,
extension_type,
);
let _ = tx.send(result);
}
ExtensionOpt::Remove(extension_name, extension_type, tx) => {
let result = extension_directory.remove(extension_name, extension_type);
let _ = tx.send(result);
}
ExtensionOpt::Load(url, extension_type, tx) => {
let _ = extension_directory.load(url, extension_type, tx);
}
}
}
});
ExtensionDirectoryCommander { sender: tx }
}
fn register(
&mut self,
extension_name: String,
extension_factories: ExtensionFactories,
extension_type: ExtensionType,
) -> Result<(), StdError> {
match extension_type {
ExtensionType::Registry => match extension_factories {
ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => {
self.registry_extension_loader
.register(extension_name, registry_extension_factory);
Ok(())
}
},
}
}
fn remove(
&mut self,
extension_name: String,
extension_type: ExtensionType,
) -> Result<(), StdError> {
match extension_type {
ExtensionType::Registry => {
self.registry_extension_loader.remove(extension_name);
Ok(())
}
}
}
fn load(
&mut self,
url: Url,
extension_type: ExtensionType,
callback: oneshot::Sender<Result<Extensions, StdError>>,
) {
match extension_type {
ExtensionType::Registry => {
let extension = self.registry_extension_loader.load(url);
match extension {
Ok(mut extension) => {
tokio::spawn(async move {
let extension = extension.resolve().await;
match extension {
Ok(extension) => {
let _ = callback.send(Ok(Extensions::Registry(extension)));
}
Err(err) => {
error!("load extension failed: {}", err);
let _ = callback.send(Err(err));
}
}
});
}
Err(err) => {
error!("load extension failed: {}", err);
let _ = callback.send(Err(err));
}
}
}
}
}
}
type ExtensionCreator<T> = Box<
dyn Fn(Url) -> Pin<Box<dyn Future<Output = Result<T, StdError>> + Send + 'static>>
+ Send
+ Sync
+ 'static,
>;
pub(crate) struct ExtensionPromiseResolver<T> {
resolved_data: Option<T>,
creator: ExtensionCreator<T>,
url: Url,
}
impl<T> ExtensionPromiseResolver<T>
where
T: Send + Clone + 'static,
{
fn new(creator: ExtensionCreator<T>, url: Url) -> Self {
ExtensionPromiseResolver {
resolved_data: None,
creator,
url,
}
}
fn resolved_data(&self) -> Option<T> {
self.resolved_data.clone()
}
async fn resolve(&mut self) -> Result<T, StdError> {
match (self.creator)(self.url.clone()).await {
Ok(data) => {
self.resolved_data = Some(data.clone());
Ok(data)
}
Err(err) => {
error!("create extension failed: {}", err);
Err(LoadExtensionError::new(
"load extension failed, create extension occur an error".to_string(),
)
.into())
}
}
}
}
pub(crate) struct LoadExtensionPromise<T> {
resolver: Arc<RwLock<ExtensionPromiseResolver<T>>>,
}
impl<T> LoadExtensionPromise<T>
where
T: Send + Clone + 'static,
{
pub(crate) fn new(creator: ExtensionCreator<T>, url: Url) -> Self {
let resolver = ExtensionPromiseResolver::new(creator, url);
LoadExtensionPromise {
resolver: Arc::new(RwLock::new(resolver)),
}
}
pub(crate) async fn resolve(&mut self) -> Result<T, StdError> {
// get read lock
let resolver_read_lock = self.resolver.read().await;
// if extension is not None, return it
if let Some(extension) = resolver_read_lock.resolved_data() {
return Ok(extension);
}
drop(resolver_read_lock);
let mut write_lock = self.resolver.write().await;
match write_lock.resolved_data() {
Some(extension) => Ok(extension),
None => {
let extension = write_lock.resolve().await;
extension
}
}
}
}
impl<T> Clone for LoadExtensionPromise<T> {
fn clone(&self) -> Self {
LoadExtensionPromise {
resolver: self.resolver.clone(),
}
}
}
pub struct ExtensionDirectoryCommander {
sender: tokio::sync::mpsc::Sender<ExtensionOpt>,
}
impl ExtensionDirectoryCommander {
#[allow(private_bounds)]
pub async fn register<T>(&self) -> Result<(), StdError>
where
T: Extension,
T: ExtensionMetaInfo,
T: ConvertToExtensionFactories,
{
let extension_name = T::name();
let extension_factories = T::convert_to_extension_factories();
let extension_type = T::extension_type();
info!(
"register extension: {}, type: {}",
extension_name,
extension_type.as_str()
);
let (tx, rx) = oneshot::channel();
let send = self
.sender
.send(ExtensionOpt::Register(
extension_name.clone(),
extension_factories,
extension_type,
tx,
))
.await;
let Ok(_) = send else {
let err_msg = format!("register extension {} failed", extension_name);
return Err(RegisterExtensionError::new(err_msg).into());
};
let ret = rx.await;
match ret {
Ok(_) => Ok(()),
Err(_) => {
let err_msg = format!("register extension {} failed", extension_name);
Err(RegisterExtensionError::new(err_msg).into())
}
}
}
#[allow(private_bounds)]
pub async fn remove<T>(&self) -> Result<(), StdError>
where
T: Extension,
T: ExtensionMetaInfo,
{
let extension_name = T::name();
let extension_type = T::extension_type();
info!(
"remove extension: {}, type: {}",
extension_name,
extension_type.as_str()
);
let (tx, rx) = oneshot::channel();
let send = self
.sender
.send(ExtensionOpt::Remove(
extension_name.clone(),
extension_type,
tx,
))
.await;
let Ok(_) = send else {
let err_msg = format!("remove extension {} failed", extension_name);
return Err(RemoveExtensionError::new(err_msg).into());
};
let ret = rx.await;
match ret {
Ok(_) => Ok(()),
Err(_) => {
let err_msg = format!("remove extension {} failed", extension_name);
Err(RemoveExtensionError::new(err_msg).into())
}
}
}
pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, StdError> {
let url_str = url.to_string();
info!("load registry extension: {}", url_str);
let (tx, rx) = oneshot::channel();
let send = self
.sender
.send(ExtensionOpt::Load(url, ExtensionType::Registry, tx))
.await;
let Ok(_) = send else {
let err_msg = format!("load registry extension failed: {}", url_str);
return Err(LoadExtensionError::new(err_msg).into());
};
let extensions = rx.await;
let Ok(extension) = extensions else {
let err_msg = format!("load registry extension failed: {}", url_str);
return Err(LoadExtensionError::new(err_msg).into());
};
let Ok(extensions) = extension else {
let err_msg = format!("load registry extension failed: {}", url_str);
return Err(LoadExtensionError::new(err_msg).into());
};
match extensions {
Extensions::Registry(proxy) => Ok(proxy),
}
}
}
enum ExtensionOpt {
Register(
String,
ExtensionFactories,
ExtensionType,
oneshot::Sender<Result<(), StdError>>,
),
Remove(String, ExtensionType, oneshot::Sender<Result<(), StdError>>),
Load(
Url,
ExtensionType,
oneshot::Sender<Result<Extensions, StdError>>,
),
}
pub(crate) trait Sealed {}
#[allow(private_bounds)]
#[async_trait::async_trait]
pub trait Extension: Sealed {
type Target;
fn name() -> String;
async fn create(url: Url) -> Result<Self::Target, StdError>;
}
#[allow(private_bounds)]
pub(crate) trait ExtensionMetaInfo {
fn extension_type() -> ExtensionType;
}
pub(crate) enum Extensions {
Registry(RegistryProxy),
}
pub(crate) enum ExtensionFactories {
RegistryExtensionFactory(registry_extension::RegistryExtensionFactory),
}
#[allow(private_bounds)]
pub(crate) trait ConvertToExtensionFactories {
fn convert_to_extension_factories() -> ExtensionFactories;
}
#[derive(Error, Debug)]
#[error("{0}")]
pub(crate) struct RegisterExtensionError(String);
impl RegisterExtensionError {
pub fn new(msg: String) -> Self {
RegisterExtensionError(msg)
}
}
#[derive(Error, Debug)]
#[error("{0}")]
pub struct RemoveExtensionError(String);
impl RemoveExtensionError {
pub fn new(msg: String) -> Self {
RemoveExtensionError(msg)
}
}
#[derive(Error, Debug)]
#[error("{0}")]
pub struct LoadExtensionError(String);
impl LoadExtensionError {
pub fn new(msg: String) -> Self {
LoadExtensionError(msg)
}
}