With the development of business and the update and iteration of community products, we found that Linkis1 There are too many .X services, and services can be merged appropriately to reduce the number of services and facilitate deployment and debugging. At present, Linkis services are mainly divided into three categories, including computing governance services (CG: entrance/ecp/ecm/linkismanager), public enhancement services (PS: publicservice/datasource/cs) and microservice governance services (MG: Gateway/Eureka) . There are too many sub-services extended by these three types of services, and services can be merged, so that all PS services can be merged, CG services can be merged, and ecm services can be separated out.
The main changes of this service merge are as follows:
//Override before changing def route(gatewayContext: GatewayContext): ServiceInstance = { if (gatewayContext.getGatewayRoute.getRequestURI.contains(HaContextGatewayRouter.CONTEXT_SERVICE_STR) || gatewayContext.getGatewayRoute.getRequestURI.contains(HaContextGatewayRouter.OLD_CONTEXT_SERVICE_PREFIX)){ val params: util.HashMap[String, String] = gatewayContext.getGatewayRoute.getParams if (!gatewayContext.getRequest.getQueryParams.isEmpty) { for ((k, vArr) <- gatewayContext.getRequest.getQueryParams) { if (vArr.nonEmpty) { params.putIfAbsent(k, vArr.head) } } } if (gatewayContext.getRequest.getHeaders.containsKey(ContextHTTPConstant.CONTEXT_ID_STR)) { params.putIfAbsent(ContextHTTPConstant.CONTEXT_ID_STR, gatewayContext.getRequest.getHeaders.get(ContextHTTPConstant.CONTEXT_ID_STR)(0)) } if (null == params || params.isEmpty) { dealContextCreate(gatewayContext) } else { var contextId : String = null for ((key, value) <- params) { if (key.equalsIgnoreCase(ContextHTTPConstant.CONTEXT_ID_STR)) { contextId = value } } if (StringUtils.isNotBlank(contextId)) { dealContextAccess(contextId.toString, gatewayContext) } else { dealContextCreate(gatewayContext) } } }else{ null } } //after modification override def route(gatewayContext: GatewayContext): ServiceInstance = { if ( gatewayContext.getGatewayRoute.getRequestURI.contains( RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX ) ) { val params: util.HashMap[String, String] = gatewayContext.getGatewayRoute.getParams if (!gatewayContext.getRequest.getQueryParams.isEmpty) { for ((k, vArr) <- gatewayContext.getRequest.getQueryParams.asScala) { if (vArr.nonEmpty) { params.putIfAbsent(k, vArr.head) } } } if (gatewayContext.getRequest.getHeaders.containsKey(ContextHTTPConstant.CONTEXT_ID_STR)) { params.putIfAbsent( ContextHTTPConstant.CONTEXT_ID_STR, gatewayContext.getRequest.getHeaders.get(ContextHTTPConstant.CONTEXT_ID_STR)(0) ) } if (null == params || params.isEmpty) { dealContextCreate(gatewayContext) } else { var contextId: String = null for ((key, value) <- params.asScala) { if (key.equalsIgnoreCase(ContextHTTPConstant.CONTEXT_ID_STR)) { contextId = value } } if (StringUtils.isNotBlank(contextId)) { dealContextAccess(contextId, gatewayContext) } else { dealContextCreate(gatewayContext) } } } else { null } } // before modification def dealContextCreate(gatewayContext:GatewayContext):ServiceInstance = { val serviceId = findService(HaContextGatewayRouter.CONTEXT_SERVICE_STR, list => { val services = list.filter(_.contains(HaContextGatewayRouter.CONTEXT_SERVICE_STR)) services.headOption }) val serviceInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull) if (serviceInstances.size > 0) { val index = new Random().nextInt(serviceInstances.size) serviceInstances(index) } else { logger.error(s"No valid instance for service : " + serviceId.orNull) null } } //after modification def dealContextCreate(gatewayContext: GatewayContext): ServiceInstance = { val serviceId = findService( RPCConfiguration.CONTEXT_SERVICE_NAME, list => { val services = list.filter(_.contains(RPCConfiguration.CONTEXT_SERVICE_NAME)) services.headOption } ) val serviceInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull) if (serviceInstances.size > 0) { val index = new Random().nextInt(serviceInstances.size) serviceInstances(index) } else { logger.error(s"No valid instance for service : " + serviceId.orNull) null } } // before modification def dealContextAccess(contextIdStr:String, gatewayContext: GatewayContext):ServiceInstance = { val contextId : String = { var tmpId : String = null if (serializationHelper.accepts(contextIdStr)) { val contextID : ContextID = serializationHelper.deserialize(contextIdStr).asInstanceOf[ContextID] if (null != contextID) { tmpId = contextID.getContextId } else { logger.error(s"Deserializate contextID null. contextIDStr : " + contextIdStr) } } else { logger.error(s"ContxtIDStr cannot be deserialized. contextIDStr : " + contextIdStr) } if (null == tmpId) { contextIdStr } else { tmpId } } val instances = contextIDParser.parse(contextId) var serviceId:Option[String] = None serviceId = findService(HaContextGatewayRouter.CONTEXT_SERVICE_STR, list => { val services = list.filter(_.contains(HaContextGatewayRouter.CONTEXT_SERVICE_STR)) services.headOption }) val serviceInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull) if (instances.size() > 0) { serviceId.map(ServiceInstance(_, instances.get(0))).orNull } else if (serviceInstances.size > 0) { serviceInstances(0) } else { logger.error(s"No valid instance for service : " + serviceId.orNull) null } } } //after modification def dealContextAccess(contextIdStr: String, gatewayContext: GatewayContext): ServiceInstance = { val contextId: String = { var tmpId: String = null if (serializationHelper.accepts(contextIdStr)) { val contextID: ContextID = serializationHelper.deserialize(contextIdStr).asInstanceOf[ContextID] if (null != contextID) { tmpId = contextID.getContextId } else { logger.error(s"Deserializate contextID null. contextIDStr : " + contextIdStr) } } else { logger.error(s"ContxtIDStr cannot be deserialized. contextIDStr : " + contextIdStr) } if (null == tmpId) { contextIdStr } else { tmpId } } val instances = contextIDParser.parse(contextId) var serviceId: Option[String] = None serviceId = findService( RPCConfiguration.CONTEXT_SERVICE_NAME, list => { val services = list.filter(_.contains(RPCConfiguration.CONTEXT_SERVICE_NAME)) services.headOption } ) val serviceInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull) if (instances.size() > 0) { serviceId.map(ServiceInstance(_, instances.get(0))).orNull } else if (serviceInstances.size > 0) { serviceInstances(0) } else { logger.error(s"No valid instance for service : " + serviceId.orNull) null } } // before modification object HaContextGatewayRouter{ val CONTEXT_ID_STR:String = "contextId" val CONTEXT_SERVICE_STR:String = "ps-cs" @Deprecated val OLD_CONTEXT_SERVICE_PREFIX = "contextservice" val CONTEXT_REGEX: Regex = (normalPath(API_URL_PREFIX) + "rest_[a-zA-Z][a-zA-Z_0-9]*/(v\\d+)/contextservice/" + ".+").r } //after modification object HaContextGatewayRouter { val CONTEXT_ID_STR: String = "contextId" @deprecated("please use RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX") val CONTEXT_SERVICE_REQUEST_PREFIX = RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX @deprecated("please use RPCConfiguration.CONTEXT_SERVICE_NAME") val CONTEXT_SERVICE_NAME: String = if ( RPCConfiguration.ENABLE_PUBLIC_SERVICE.getValue && RPCConfiguration.PUBLIC_SERVICE_LIST .exists(_.equalsIgnoreCase(RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX)) ) { RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue } else { RPCConfiguration.CONTEXT_SERVICE_APPLICATION_NAME.getValue } val CONTEXT_REGEX: Regex = (normalPath(API_URL_PREFIX) + "rest_[a-zA-Z][a-zA-Z_0-9]*/(v\\d+)/contextservice/" + ".+").r }
//before modification val BDP_RPC_BROADCAST_THREAD_SIZE: CommonVars[Integer] = CommonVars("wds.linkis.rpc.broadcast.thread.num", new Integer(25)) //after modification val BDP_RPC_BROADCAST_THREAD_SIZE: CommonVars[Integer] = CommonVars("wds.linkis.rpc.broadcast.thread.num", 25) //before modification val PUBLIC_SERVICE_LIST: Array[String] = CommonVars("wds.linkis.gateway.conf.publicservice.list", "query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource").getValue .split(",") //after change val PUBLIC_SERVICE_LIST: Array[String] = CommonVars("wds.linkis.gateway.conf.publicservice.list", "cs,contextservice,data-source-manager,metadataquery,metadatamanager, query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource").getValue.split(",")
##Remove part #Delete the following configuration files linkis-dist/package/conf/linkis-ps-cs.properties linkis-dist/package/conf/linkis-ps-data-source-manager.properties linkis-dist/package/conf/linkis-ps-metadataquery.properties ##modified part #modify linkis-dist/package/conf/linkis-ps-publicservice.properties #restful before modification wds.linkis.server.restful.scan.packages=org.apache.linkis.jobhistory.restful,org.apache.linkis.variable.restful,org.apache.linkis.configuration.restful,org.apache.linkis.udf.api,org.apache.linkis.filesystem.restful,org.apache.linkis.filesystem.restful,org.apache.linkis.instance.label.restful,org.apache.linkis.metadata.restful.api,org.apache.linkis.cs.server.restful,org.apache.linkis.bml.restful,org.apache.linkis.errorcode.server.restful #restful after modification wds.linkis.server.restful.scan.packages=org.apache.linkis.cs.server.restful,org.apache.linkis.datasourcemanager.core.restful,org.apache.linkis.metadata.query.server.restful,org.apache.linkis.jobhistory.restful,org.apache.linkis.variable.restful,org.apache.linkis.configuration.restful,org.apache.linkis.udf.api,org.apache.linkis.filesystem.restful,org.apache.linkis.filesystem.restful,org.apache.linkis.instance.label.restful,org.apache.linkis.metadata.restful.api,org.apache.linkis.cs.server.restful,org.apache.linkis.bml.restful,org.apache.linkis.errorcode.server.restful #mybatis before modification wds.linkis.server.mybatis.mapperLocations=classpath:org/apache/linkis/jobhistory/dao/impl/*.xml,classpath:org/apache/linkis/variable/dao/impl/*.xml,classpath:org/apache/linkis/configuration/dao/impl/*.xml,classpath:org/apache/linkis/udf/dao/impl/*.xml,classpath:org/apache/linkis/instance/label/dao/impl/*.xml,classpath:org/apache/linkis/metadata/hive/dao/impl/*.xml,org/apache/linkis/metadata/dao/impl/*.xml,classpath:org/apache/linkis/bml/dao/impl/*.xml wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.configuration.entity,org.apache.linkis.jobhistory.entity,org.apache.linkis.udf.entity,org.apache.linkis.variable.entity,org.apache.linkis.instance.label.entity,org.apache.linkis.manager.entity,org.apache.linkis.metadata.domain,org.apache.linkis.bml.entity wds.linkis.server.mybatis.BasePackage=org.apache.linkis.jobhistory.dao,org.apache.linkis.variable.dao,org.apache.linkis.configuration.dao,org.apache.linkis.udf.dao,org.apache.linkis.instance.label.dao,org.apache.linkis.metadata.hive.dao,org.apache.linkis.metadata.dao,org.apache.linkis.bml.dao,org.apache.linkis.errorcode.server.dao,org.apache.linkis.publicservice.common.lock.dao #mybatis after modification wds.linkis.server.mybatis.mapperLocations=classpath*:org/apache/linkis/cs/persistence/dao/impl/*.xml,classpath:org/apache/linkis/datasourcemanager/core/dao/mapper/*.xml,classpath:org/apache/linkis/jobhistory/dao/impl/*.xml,classpath:org/apache/linkis/variable/dao/impl/*.xml,classpath:org/apache/linkis/configuration/dao/impl/*.xml,classpath:org/apache/linkis/udf/dao/impl/*.xml,classpath:org/apache/linkis/instance/label/dao/impl/*.xml,classpath:org/apache/linkis/metadata/hive/dao/impl/*.xml,org/apache/linkis/metadata/dao/impl/*.xml,classpath:org/apache/linkis/bml/dao/impl/*.xml wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.cs.persistence.entity,org.apache.linkis.datasourcemanager.common.domain,org.apache.linkis.datasourcemanager.core.vo,org.apache.linkis.configuration.entity,org.apache.linkis.jobhistory.entity,org.apache.linkis.udf.entity,org.apache.linkis.variable.entity,org.apache.linkis.instance.label.entity,org.apache.linkis.manager.entity,org.apache.linkis.metadata.domain,org.apache.linkis.bml.entity wds.linkis.server.mybatis.BasePackage=org.apache.linkis.cs.persistence.dao,org.apache.linkis.datasourcemanager.core.dao,org.apache.linkis.jobhistory.dao,org.apache.linkis. variable.dao,org.apache.linkis.configuration.dao,org.apache.linkis.udf.dao,org.apache.linkis.instance.label.dao,org.apache.linkis.metadata.hive.dao,org. apache.linkis.metadata.dao,org.apache.linkis.bml.dao,org.apache.linkis.errorcode.server.dao,org.apache.linkis.publicservice.common.lock.dao
startup script remove the following part #linkis-ps-cs SERVER_NAME="ps-cs" SERVER_IP=$CS_INSTALL_IP startApp if [ "$ENABLE_METADATA_QUERY" == "true" ]; then #linkis-ps-data-source-manager SERVER_NAME="ps-data-source-manager" SERVER_IP=$DATASOURCE_MANAGER_INSTALL_IP startApp #linkis-ps-metadataquery SERVER_NAME="ps-metadataquery" SERVER_IP=$METADATA_QUERY_INSTALL_IP startApp fi #linkis-ps-cs SERVER_NAME="ps-cs" SERVER_IP=$CS_INSTALL_IP checkServer if [ "$ENABLE_METADATA_QUERY" == "true" ]; then #linkis-ps-data-source-manager SERVER_NAME="ps-data-source-manager" SERVER_IP=$DATASOURCE_MANAGER_INSTALL_IP checkServer #linkis-ps-metadataquery SERVER_NAME="ps-metadataquery" SERVER_IP=$METADATA_QUERY_INSTALL_IP checkServer fi #Service stop script remove the following part #linkis-ps-cs SERVER_NAME="ps-cs" SERVER_IP=$CS_INSTALL_IP stopApp if [ "$ENABLE_METADATA_QUERY" == "true" ]; then #linkis-ps-data-source-manager SERVER_NAME ="ps-data-source-manager" SERVER_IP=$DATASOURCE_MANAGER_INSTALL_IP stopApp #linkis-ps-metadataquery SERVER_NAME="ps-metadataquery" SERVER_IP=$METADATA_QUERY_INSTALL_IP stopApp fi
For more details on service merge changes, see: https://github.com/apache/linkis/pull/2927/files