| /* |
| * Copyright 2019 WeBank |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.webank.wedatasphere.linkis.instance.label.service.impl; |
| |
| import com.webank.wedatasphere.linkis.common.ServiceInstance; |
| import com.webank.wedatasphere.linkis.instance.label.async.AsyncConsumerQueue; |
| import com.webank.wedatasphere.linkis.instance.label.async.GenericAsyncConsumerQueue; |
| import com.webank.wedatasphere.linkis.instance.label.conf.InsLabelConf; |
| import com.webank.wedatasphere.linkis.instance.label.dao.InsLabelRelationDao; |
| import com.webank.wedatasphere.linkis.instance.label.dao.InstanceInfoDao; |
| import com.webank.wedatasphere.linkis.instance.label.dao.InstanceLabelDao; |
| import com.webank.wedatasphere.linkis.instance.label.entity.InsPersistenceLabel; |
| import com.webank.wedatasphere.linkis.instance.label.entity.InsPersistenceLabelValue; |
| import com.webank.wedatasphere.linkis.instance.label.entity.InstanceInfo; |
| import com.webank.wedatasphere.linkis.instance.label.service.InsLabelAccessService; |
| import com.webank.wedatasphere.linkis.instance.label.service.annotation.AdapterMode; |
| import com.webank.wedatasphere.linkis.instance.label.vo.InsPersistenceLabelSearchVo; |
| import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactory; |
| import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactoryContext; |
| import com.webank.wedatasphere.linkis.manager.label.entity.Label; |
| import com.webank.wedatasphere.linkis.manager.label.utils.LabelUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.aop.framework.AopContext; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import javax.annotation.Resource; |
| import java.util.*; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| |
| |
| @AdapterMode |
| public class DefaultInsLabelService implements InsLabelAccessService { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(DefaultInsLabelService.class); |
| |
| @Resource |
| private InstanceLabelDao instanceLabelDao; |
| |
| @Resource |
| private InstanceInfoDao instanceDao; |
| |
| @Resource |
| private InsLabelRelationDao insLabelRelationDao; |
| |
| private AsyncConsumerQueue<InsPersistenceLabel> asyncRemoveLabelQueue; |
| |
| private InsLabelAccessService selfService; |
| |
| private AtomicBoolean asyncQueueInit = new AtomicBoolean(false); |
| /** |
| * init method |
| */ |
| private synchronized void initQueue(){ |
| if(!asyncQueueInit.get()) { |
| selfService = (InsLabelAccessService) AopContext.currentProxy(); |
| LOG.info("SelfService: [" + this.getClass().getName() + "]"); |
| asyncRemoveLabelQueue = new GenericAsyncConsumerQueue<>(InsLabelConf.ASYNC_QUEUE_CAPACITY.getValue()); |
| asyncRemoveLabelQueue.consumer(InsLabelConf.ASYNC_QUEUE_CONSUME_BATCH_SIZE.getValue(), |
| InsLabelConf.ASYNC_QUEUE_CONSUME_INTERVAL.getValue(), TimeUnit.SECONDS, insLabels -> { |
| selfService.removeLabelsIfNotRelation(insLabels); |
| }); |
| asyncQueueInit.set(true); |
| } |
| } |
| |
| @Override |
| @Transactional(rollbackFor = Exception.class) |
| public void attachLabelToInstance(Label<?> label, ServiceInstance serviceInstance) { |
| attachLabelsToInstance(Collections.singletonList(label), serviceInstance); |
| } |
| |
| @Override |
| @Transactional(rollbackFor = Exception.class) |
| public void attachLabelsToInstance(List<? extends Label<?>> labels, ServiceInstance serviceInstance) { |
| List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels); |
| List<InsPersistenceLabel> labelsNeedInsert = filterLabelNeededInsert(insLabels, true); |
| if(!labelsNeedInsert.isEmpty()) { |
| LOG.info("Persist labels: [" + LabelUtils.Jackson.toJson(labels, null) + "]"); |
| doInsertInsLabels(labelsNeedInsert); |
| } |
| LOG.info("Insert/Update service instance info: [" + serviceInstance + "]"); |
| doInsertInstance(serviceInstance); |
| List<Integer> insLabelIds = insLabels.stream().map(InsPersistenceLabel :: getId).collect(Collectors.toList()); |
| LOG.trace("Build relation between labels: " + |
| LabelUtils.Jackson.toJson(insLabelIds, null) |
| + " and instance: [" + serviceInstance.getInstance() + "]"); |
| batchOperation(insLabelIds, subInsLabelIds -> insLabelRelationDao.insertRelations(serviceInstance.getInstance(), subInsLabelIds), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue()); |
| } |
| |
| @Override |
| @Transactional(rollbackFor = Exception.class) |
| public void refreshLabelsToInstance(List<? extends Label<?>> labels, ServiceInstance serviceInstance) { |
| List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels); |
| //Label candidate to be removed |
| List<InsPersistenceLabel> labelsCandidateRemoved = insLabelRelationDao.searchLabelsByInstance(serviceInstance.getInstance()); |
| if(!labelsCandidateRemoved.isEmpty()){ |
| labelsCandidateRemoved.removeAll(insLabels); |
| } |
| LOG.info("Drop relationships related by instance: [" + serviceInstance.getInstance() + "]"); |
| insLabelRelationDao.dropRelationsByInstance(serviceInstance.getInstance()); |
| //Attach labels to instance |
| attachLabelsToInstance(insLabels, serviceInstance); |
| // Async to delete labels that have no relationship |
| if(!labelsCandidateRemoved.isEmpty()){ |
| if(!asyncQueueInit.get()) { |
| initQueue(); |
| } |
| labelsCandidateRemoved.forEach( label -> { |
| if(!asyncRemoveLabelQueue.offer(label)){ |
| LOG.warn("Async queue for removing labels maybe full. current size: [" + asyncRemoveLabelQueue.size() + "]"); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| @Transactional(rollbackFor=Exception.class) |
| public void removeLabelsFromInstance(ServiceInstance serviceInstance) { |
| //Label candidate to be removed |
| List<InsPersistenceLabel> labelsCandidateRemoved = insLabelRelationDao.searchLabelsByInstance(serviceInstance.getInstance()); |
| LOG.info("Drop relationships related by instance: [" + serviceInstance.getInstance() + "]"); |
| insLabelRelationDao.dropRelationsByInstance(serviceInstance.getInstance()); |
| // Async to delete labels that have no relationship |
| if(!labelsCandidateRemoved.isEmpty()){ |
| labelsCandidateRemoved.forEach( label -> { |
| if(!asyncQueueInit.get()) { |
| initQueue(); |
| } |
| if(!asyncRemoveLabelQueue.offer(label)){ |
| LOG.warn("Async queue for removing labels maybe full. current size: [" + asyncRemoveLabelQueue.size() + "]"); |
| } |
| }); |
| } |
| } |
| |
| |
| @Override |
| public List<ServiceInstance> searchInstancesByLabels(List<? extends Label<?>> labels) { |
| return searchInstancesByLabels(labels, Label.ValueRelation.ALL); |
| } |
| |
| @Override |
| public List<ServiceInstance> searchInstancesByLabels(List<? extends Label<?>> labels, Label.ValueRelation relation) { |
| List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels); |
| if (!insLabels.isEmpty()) { |
| List<Map<String, String>> valueContent = new ArrayList<>(); |
| AtomicBoolean searchByValues = new AtomicBoolean(false); |
| insLabels.forEach(insLabel -> { |
| //It means that the labels provided is not regular, |
| //so we should search instances by key-value map of labels |
| if (StringUtils.isBlank(insLabel.getStringValue())) { |
| searchByValues.set(true); |
| } |
| valueContent.add(insLabel.getValue()); |
| }); |
| List<InstanceInfo> instanceInfoList = new ArrayList<>(); |
| if ((relation != Label.ValueRelation.ALL || searchByValues.get()) && valueContent.size() > 0) { |
| instanceInfoList = insLabelRelationDao.searchInsDirectByValues(valueContent, relation.name()); |
| } else if (relation == Label.ValueRelation.ALL && !searchByValues.get()) { |
| instanceInfoList = insLabelRelationDao.searchInsDirectByLabels(insLabels); |
| } |
| return instanceInfoList.stream().map(instanceInfo -> (ServiceInstance)instanceInfo).collect(Collectors.toList()); |
| } |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public List<ServiceInstance> searchUnRelateInstances(ServiceInstance serviceInstance) { |
| if(null != serviceInstance) { |
| return insLabelRelationDao.searchUnRelateInstances(new InstanceInfo(serviceInstance)) |
| .stream().map(instanceInfo -> (ServiceInstance) instanceInfo).collect(Collectors.toList()); |
| } |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public List<ServiceInstance> searchLabelRelatedInstances(ServiceInstance serviceInstance) { |
| if(null != serviceInstance){ |
| return insLabelRelationDao.searchLabelRelatedInstances(new InstanceInfo(serviceInstance)) |
| .stream().map(instanceInfo -> (ServiceInstance) instanceInfo).collect(Collectors.toList()); |
| } |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| @Transactional(rollbackFor = Exception.class) |
| public void removeLabelsIfNotRelation(List<? extends Label<?>> labels) { |
| List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels); |
| insLabels.forEach(insLabel -> { |
| if(Optional.ofNullable(insLabel.getId()).isPresent()){ |
| insLabel = instanceLabelDao.selectForUpdate(insLabel.getId()); |
| }else{ |
| insLabel = instanceLabelDao.searchForUpdate(insLabel.getLabelKey(), insLabel.getStringValue()); |
| } |
| if(null != insLabel) { |
| Integer exist = insLabelRelationDao.existRelations(insLabel.getId()); |
| //Not exist |
| if(null == exist){ |
| LOG.info("Remove information of instance label: [" + insLabel.toString() + "]"); |
| instanceLabelDao.remove(insLabel); |
| instanceLabelDao.doRemoveKeyValues(insLabel.getId()); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Filter labels |
| * @param insLabels labels |
| * @return |
| */ |
| private List<InsPersistenceLabel> filterLabelNeededInsert(List<InsPersistenceLabel> insLabels, boolean needLock){ |
| List<InsPersistenceLabel> storedLabels = instanceLabelDao.search(insLabels.stream().map(InsPersistenceLabelSearchVo :: new).collect(Collectors.toList())); |
| if(!storedLabels.isEmpty()){ |
| List<InsPersistenceLabel> labelsNeedInsert = new ArrayList<>(insLabels); |
| labelsNeedInsert.removeIf(labelNeedInsert -> { |
| for(InsPersistenceLabel storedLabel : storedLabels){ |
| if(labelNeedInsert.equals(storedLabel)){ |
| Integer labelId = storedLabel.getId(); |
| labelNeedInsert.setId(labelId); |
| if(needLock) { |
| //Update to lock the record |
| return instanceLabelDao.updateForLock(labelId) >= 0; |
| } |
| return true; |
| } |
| } |
| return false; |
| }); |
| return labelsNeedInsert; |
| } |
| return insLabels; |
| } |
| |
| /** |
| * Operation of inserting labels |
| * @param insLabels labels |
| */ |
| private void doInsertInsLabels(List<InsPersistenceLabel> insLabels){ |
| //Try to insert, use ON DUPLICATE KEY on mysql |
| batchOperation(insLabels, subInsLabels -> instanceLabelDao.insertBatch(subInsLabels), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue()); |
| List<InsPersistenceLabelValue> labelValues = insLabels.stream().flatMap(insLabel -> { |
| if(!Optional.ofNullable(insLabel.getId()).isPresent()){ |
| throw new IllegalArgumentException("Cannot get the generated id from bulk insertion, please check your mybatis version"); |
| } |
| return insLabel.getValue().entrySet().stream().map( entry -> |
| new InsPersistenceLabelValue(insLabel.getId(), entry.getKey(), entry.getValue())); |
| }).collect(Collectors.toList()); |
| batchOperation(labelValues, subLabelValues -> instanceLabelDao.doInsertKeyValues(subLabelValues), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue()); |
| } |
| |
| /** |
| * Operation of inserting instances |
| * @param serviceInstance service instance |
| */ |
| private void doInsertInstance(ServiceInstance serviceInstance){ |
| //ON DUPLICATE KEY |
| instanceDao.insertOne(new InstanceInfo(serviceInstance)); |
| } |
| |
| /** |
| * Transform to <em>InsPersistenceLabel</em> |
| * @param labels |
| * @return |
| */ |
| private List<InsPersistenceLabel> toInsPersistenceLabels(List<? extends Label<?>> labels){ |
| LabelBuilderFactory builderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory(); |
| return labels.stream().map(label -> { |
| if(label instanceof InsPersistenceLabel){ |
| return (InsPersistenceLabel)label; |
| } |
| InsPersistenceLabel insLabel = builderFactory.convertLabel(label, InsPersistenceLabel.class); |
| insLabel.setStringValue(label.getStringValue()); |
| if(StringUtils.isNotBlank(insLabel.getStringValue())){ |
| insLabel.setLabelValueSize(insLabel.getValue().size()); |
| } |
| return insLabel; |
| }).collect(Collectors.toList()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <T extends List<?>>void batchOperation(T input, Consumer<T> batchFunc, int batchSize){ |
| int listLen = input.size(); |
| if(listLen > 0) { |
| for (int from = 0; from < listLen; ) { |
| int to = Math.min(from + batchSize, listLen); |
| T subInput = (T) input.subList(from, to); |
| batchFunc.accept(subInput); |
| from = to; |
| } |
| } |
| } |
| } |