blob: f4e6f7d971fecaad1332daacba49700320d7059a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import debug from 'debug'
import compose from 'koa-compose'
import Queue from './queue'
import config from './config'
import Context from './context'
import { go, util } from 'apache-dubbo-common'
import Scheduler from './scheduler'
import qs from 'querystring'
import ip from 'ip'
import * as s from './dubbo-setting'
import {
IDubboProps,
IDubboProvider,
Middleware,
TDubboInterface,
TDubboService,
TDubboUrl
} from './types'
const log = debug('dubbo:bootstrap')
const packageVersion = require('../package.json').version
log('dubbo-js version :=> %s', packageVersion)
/**
* Dubbo
*
* 1. Connect to the registration center zookeeper
* 2. Initiate method call of remote dubbo service
* 3. Serialization/deserialization of dubbo protocol
* 4. Manage tcp connection and heartbeat
* 5. The corresponding method of automatically proxying interface through proxy mechanism
* 6. Provide quick test interface in direct connection mode
* 7. Middleware
* 8. Full link tracking of dubbo calls can be realized through zone-context
* 9. Centralized message management
*/
export default class Dubbo<TService = Object> {
private readonly queue: Queue
private readonly dubboSetting: s.DubboSetting
private readonly props: IDubboProps
private readonly middlewares: Array<Middleware<Context>>
private readonly consumers: Array<{
dubboServiceInterface: TDubboInterface
dubboServiceUrl: TDubboUrl
}>
private scheduler: Scheduler
public readonly service: TDubboService<TService>
constructor(props: IDubboProps) {
this.props = props
// check dubbo register
if (!util.isObj(this.props.registry)) {
throw new Error('please specify registry instance')
}
this.consumers = []
this.middlewares = []
this.queue = Queue.init()
this.dubboSetting = props.dubboSetting || s.Setting()
// init service
this.service = <TDubboService<TService>>{}
this.consumeService(this.props.services)
log('consumerService: %O', this.consumers)
//Initialize config
//Global timeout (maximum fusing time) similar to <dubbo:consumer timeout="sometime"/>
//For the consumer client, if the user sets the interface level timeout time, the interface level is used
//If the user does not set the user level, the default is the maximum timeout
const { dubboInvokeTimeout } = props
config.dubboInvokeTimeout =
dubboInvokeTimeout ||
this.dubboSetting.maxTimeout ||
config.dubboInvokeTimeout
log('config:|> %O', config)
this.init().catch((err) => console.log(err))
}
// ========================private method=======================
private async init() {
await this.props.registry.ready()
//create scheduler
this.scheduler = Scheduler.from(this.props.registry, this.queue)
await this.props.registry.registerConsumers(this.consumers)
}
/**
* registry consume service
* service style:
* {[key: string]: (dubbo): T => dubbo.proxyService<T>({...})}
* @param services
*/
private consumeService(services: Object) {
for (let [shortName, serviceProxy] of Object.entries(services)) {
const service = serviceProxy(this) as IDubboProvider
const meta = this.dubboSetting.getDubboSetting({
dubboServiceShortName: shortName,
dubboServiceInterface: service.dubboInterface
})
service.group = meta.group
service.version = meta.version
this.service[shortName] = this.composeService(service)
}
}
private composeService = (provider: IDubboProvider) => {
const { application, isSupportedDubbox } = this.props
const { dubboInterface, methods, timeout, group, version } = provider
const proxyObj = Object.create(null)
this.consumers.push({
dubboServiceInterface: dubboInterface,
dubboServiceUrl: `consumer://${ip.address()}/${dubboInterface}?${qs.stringify(
{
application: this.props.application.name,
interface: dubboInterface,
category: 'consumers',
method: '',
revision: version,
version: version,
group: group,
timeout: timeout,
side: 'consumer',
check: false,
pid: process.pid
}
)}`
})
//proxy methods
Object.keys(methods).forEach((name) => {
proxyObj[name] = async (...args: any[]) => {
log('%s create context', name)
//创建dubbo调用的上下文
const ctx = Context.init()
ctx.application = application
ctx.isSupportedDubbox = isSupportedDubbox
// set dubbo version
ctx.dubboVersion = this.props.dubboVersion
const method = methods[name]
ctx.methodName = name
ctx.methodArgs = method.call(provider, ...args) || []
ctx.dubboInterface = dubboInterface
ctx.version = version
ctx.timeout = timeout
ctx.group = group
const self = this
const middlewares = [
...this.middlewares, //handle request middleware
async function handleRequest(ctx) {
log('start middleware handle dubbo request')
ctx.body = await go(self.queue.push(ctx))
log('end handle dubbo request')
}
]
log('middleware->', middlewares)
const fn = compose(middlewares)
try {
await fn(ctx)
} catch (err) {
log(err)
}
return ctx.body
}
})
return proxyObj
}
//========================public method===================
/**
* static factory method
* @param props
*/
static from(props: IDubboProps) {
return new Dubbo(props)
}
/**
* 代理dubbo的服务
*/
proxyService<T = any>(provider: IDubboProvider): T {
return provider as any
}
/**
* extends middleware, api: the same as koa
* @param fn
*/
use(fn: Middleware<Context>) {
if (typeof fn != 'function') {
throw new TypeError('middleware must be a function')
}
log('use middleware %s', (fn as any)._name || fn.name || '-')
this.middlewares.push(fn)
return this
}
/**
* The connection of dubbo is asynchronous. Whether the connection is successful or not is usually known at runtime.
* At this time, it may give us some trouble, we must send a request to know the status of dubbo
* Based on this scenario, we provide a method to tell the outside whether dubbo is initialized successfully,
* In this way, we will know the connection status of dubbo during node startup, if we can't connect, we can
* Timely fixed
*
* For example, in conjunction with egg, egg provides a beforeStart method
* Wait for the successful initialization of dubbo through the ready method
*
* //app.js
* export default (app: EggApplication) => {
* const dubbo = Dubbo.from({....})
* app.beforeStart(async () => {
* await dubbo.ready();
* console.log('dubbo was ready...');
* })
*}
*
* Other frameworks are similar
*/
ready() {
return this.props.registry.ready()
}
/**
* close dubbo consumer, usually used in test suite
*/
close() {
this.props.registry.close()
this.scheduler.close()
}
}