blob: 904a18495d3e321b086dffcc14f1f318ea6733ad [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.instance.label.service.impl;
import com.webank.wedatasphere.linkis.common.ServiceInstance;
import com.webank.wedatasphere.linkis.instance.label.async.AsyncConsumerQueue;
import com.webank.wedatasphere.linkis.instance.label.async.GenericAsyncConsumerQueue;
import com.webank.wedatasphere.linkis.instance.label.conf.InsLabelConf;
import com.webank.wedatasphere.linkis.instance.label.dao.InsLabelRelationDao;
import com.webank.wedatasphere.linkis.instance.label.dao.InstanceInfoDao;
import com.webank.wedatasphere.linkis.instance.label.dao.InstanceLabelDao;
import com.webank.wedatasphere.linkis.instance.label.entity.InsPersistenceLabel;
import com.webank.wedatasphere.linkis.instance.label.entity.InsPersistenceLabelValue;
import com.webank.wedatasphere.linkis.instance.label.entity.InstanceInfo;
import com.webank.wedatasphere.linkis.instance.label.service.InsLabelAccessService;
import com.webank.wedatasphere.linkis.instance.label.service.annotation.AdapterMode;
import com.webank.wedatasphere.linkis.instance.label.vo.InsPersistenceLabelSearchVo;
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactory;
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactoryContext;
import com.webank.wedatasphere.linkis.manager.label.entity.Label;
import com.webank.wedatasphere.linkis.manager.label.utils.LabelUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopContext;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@AdapterMode
public class DefaultInsLabelService implements InsLabelAccessService {
private static final Logger LOG = LoggerFactory.getLogger(DefaultInsLabelService.class);
@Resource
private InstanceLabelDao instanceLabelDao;
@Resource
private InstanceInfoDao instanceDao;
@Resource
private InsLabelRelationDao insLabelRelationDao;
private AsyncConsumerQueue<InsPersistenceLabel> asyncRemoveLabelQueue;
private InsLabelAccessService selfService;
private AtomicBoolean asyncQueueInit = new AtomicBoolean(false);
/**
* init method
*/
private synchronized void initQueue(){
if(!asyncQueueInit.get()) {
selfService = (InsLabelAccessService) AopContext.currentProxy();
LOG.info("SelfService: [" + this.getClass().getName() + "]");
asyncRemoveLabelQueue = new GenericAsyncConsumerQueue<>(InsLabelConf.ASYNC_QUEUE_CAPACITY.getValue());
asyncRemoveLabelQueue.consumer(InsLabelConf.ASYNC_QUEUE_CONSUME_BATCH_SIZE.getValue(),
InsLabelConf.ASYNC_QUEUE_CONSUME_INTERVAL.getValue(), TimeUnit.SECONDS, insLabels -> {
selfService.removeLabelsIfNotRelation(insLabels);
});
asyncQueueInit.set(true);
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void attachLabelToInstance(Label<?> label, ServiceInstance serviceInstance) {
attachLabelsToInstance(Collections.singletonList(label), serviceInstance);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void attachLabelsToInstance(List<? extends Label<?>> labels, ServiceInstance serviceInstance) {
List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
List<InsPersistenceLabel> labelsNeedInsert = filterLabelNeededInsert(insLabels, true);
if(!labelsNeedInsert.isEmpty()) {
LOG.info("Persist labels: [" + LabelUtils.Jackson.toJson(labels, null) + "]");
doInsertInsLabels(labelsNeedInsert);
}
LOG.info("Insert/Update service instance info: [" + serviceInstance + "]");
doInsertInstance(serviceInstance);
List<Integer> insLabelIds = insLabels.stream().map(InsPersistenceLabel :: getId).collect(Collectors.toList());
LOG.trace("Build relation between labels: " +
LabelUtils.Jackson.toJson(insLabelIds, null)
+ " and instance: [" + serviceInstance.getInstance() + "]");
batchOperation(insLabelIds, subInsLabelIds -> insLabelRelationDao.insertRelations(serviceInstance.getInstance(), subInsLabelIds), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue());
}
@Override
@Transactional(rollbackFor = Exception.class)
public void refreshLabelsToInstance(List<? extends Label<?>> labels, ServiceInstance serviceInstance) {
List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
//Label candidate to be removed
List<InsPersistenceLabel> labelsCandidateRemoved = insLabelRelationDao.searchLabelsByInstance(serviceInstance.getInstance());
if(!labelsCandidateRemoved.isEmpty()){
labelsCandidateRemoved.removeAll(insLabels);
}
LOG.info("Drop relationships related by instance: [" + serviceInstance.getInstance() + "]");
insLabelRelationDao.dropRelationsByInstance(serviceInstance.getInstance());
//Attach labels to instance
attachLabelsToInstance(insLabels, serviceInstance);
// Async to delete labels that have no relationship
if(!labelsCandidateRemoved.isEmpty()){
if(!asyncQueueInit.get()) {
initQueue();
}
labelsCandidateRemoved.forEach( label -> {
if(!asyncRemoveLabelQueue.offer(label)){
LOG.warn("Async queue for removing labels maybe full. current size: [" + asyncRemoveLabelQueue.size() + "]");
}
});
}
}
@Override
@Transactional(rollbackFor=Exception.class)
public void removeLabelsFromInstance(ServiceInstance serviceInstance) {
//Label candidate to be removed
List<InsPersistenceLabel> labelsCandidateRemoved = insLabelRelationDao.searchLabelsByInstance(serviceInstance.getInstance());
LOG.info("Drop relationships related by instance: [" + serviceInstance.getInstance() + "]");
insLabelRelationDao.dropRelationsByInstance(serviceInstance.getInstance());
// Async to delete labels that have no relationship
if(!labelsCandidateRemoved.isEmpty()){
labelsCandidateRemoved.forEach( label -> {
if(!asyncQueueInit.get()) {
initQueue();
}
if(!asyncRemoveLabelQueue.offer(label)){
LOG.warn("Async queue for removing labels maybe full. current size: [" + asyncRemoveLabelQueue.size() + "]");
}
});
}
}
@Override
public List<ServiceInstance> searchInstancesByLabels(List<? extends Label<?>> labels) {
return searchInstancesByLabels(labels, Label.ValueRelation.ALL);
}
@Override
public List<ServiceInstance> searchInstancesByLabels(List<? extends Label<?>> labels, Label.ValueRelation relation) {
List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
if (!insLabels.isEmpty()) {
List<Map<String, String>> valueContent = new ArrayList<>();
AtomicBoolean searchByValues = new AtomicBoolean(false);
insLabels.forEach(insLabel -> {
//It means that the labels provided is not regular,
//so we should search instances by key-value map of labels
if (StringUtils.isBlank(insLabel.getStringValue())) {
searchByValues.set(true);
}
valueContent.add(insLabel.getValue());
});
List<InstanceInfo> instanceInfoList = new ArrayList<>();
if ((relation != Label.ValueRelation.ALL || searchByValues.get()) && valueContent.size() > 0) {
instanceInfoList = insLabelRelationDao.searchInsDirectByValues(valueContent, relation.name());
} else if (relation == Label.ValueRelation.ALL && !searchByValues.get()) {
instanceInfoList = insLabelRelationDao.searchInsDirectByLabels(insLabels);
}
return instanceInfoList.stream().map(instanceInfo -> (ServiceInstance)instanceInfo).collect(Collectors.toList());
}
return Collections.emptyList();
}
@Override
public List<ServiceInstance> searchUnRelateInstances(ServiceInstance serviceInstance) {
if(null != serviceInstance) {
return insLabelRelationDao.searchUnRelateInstances(new InstanceInfo(serviceInstance))
.stream().map(instanceInfo -> (ServiceInstance) instanceInfo).collect(Collectors.toList());
}
return Collections.emptyList();
}
@Override
public List<ServiceInstance> searchLabelRelatedInstances(ServiceInstance serviceInstance) {
if(null != serviceInstance){
return insLabelRelationDao.searchLabelRelatedInstances(new InstanceInfo(serviceInstance))
.stream().map(instanceInfo -> (ServiceInstance) instanceInfo).collect(Collectors.toList());
}
return Collections.emptyList();
}
@Override
@Transactional(rollbackFor = Exception.class)
public void removeLabelsIfNotRelation(List<? extends Label<?>> labels) {
List<InsPersistenceLabel> insLabels = toInsPersistenceLabels(labels);
insLabels.forEach(insLabel -> {
if(Optional.ofNullable(insLabel.getId()).isPresent()){
insLabel = instanceLabelDao.selectForUpdate(insLabel.getId());
}else{
insLabel = instanceLabelDao.searchForUpdate(insLabel.getLabelKey(), insLabel.getStringValue());
}
if(null != insLabel) {
Integer exist = insLabelRelationDao.existRelations(insLabel.getId());
//Not exist
if(null == exist){
LOG.info("Remove information of instance label: [" + insLabel.toString() + "]");
instanceLabelDao.remove(insLabel);
instanceLabelDao.doRemoveKeyValues(insLabel.getId());
}
}
});
}
/**
* Filter labels
* @param insLabels labels
* @return
*/
private List<InsPersistenceLabel> filterLabelNeededInsert(List<InsPersistenceLabel> insLabels, boolean needLock){
List<InsPersistenceLabel> storedLabels = instanceLabelDao.search(insLabels.stream().map(InsPersistenceLabelSearchVo :: new).collect(Collectors.toList()));
if(!storedLabels.isEmpty()){
List<InsPersistenceLabel> labelsNeedInsert = new ArrayList<>(insLabels);
labelsNeedInsert.removeIf(labelNeedInsert -> {
for(InsPersistenceLabel storedLabel : storedLabels){
if(labelNeedInsert.equals(storedLabel)){
Integer labelId = storedLabel.getId();
labelNeedInsert.setId(labelId);
if(needLock) {
//Update to lock the record
return instanceLabelDao.updateForLock(labelId) >= 0;
}
return true;
}
}
return false;
});
return labelsNeedInsert;
}
return insLabels;
}
/**
* Operation of inserting labels
* @param insLabels labels
*/
private void doInsertInsLabels(List<InsPersistenceLabel> insLabels){
//Try to insert, use ON DUPLICATE KEY on mysql
batchOperation(insLabels, subInsLabels -> instanceLabelDao.insertBatch(subInsLabels), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue());
List<InsPersistenceLabelValue> labelValues = insLabels.stream().flatMap(insLabel -> {
if(!Optional.ofNullable(insLabel.getId()).isPresent()){
throw new IllegalArgumentException("Cannot get the generated id from bulk insertion, please check your mybatis version");
}
return insLabel.getValue().entrySet().stream().map( entry ->
new InsPersistenceLabelValue(insLabel.getId(), entry.getKey(), entry.getValue()));
}).collect(Collectors.toList());
batchOperation(labelValues, subLabelValues -> instanceLabelDao.doInsertKeyValues(subLabelValues), InsLabelConf.DB_PERSIST_BATCH_SIZE.getValue());
}
/**
* Operation of inserting instances
* @param serviceInstance service instance
*/
private void doInsertInstance(ServiceInstance serviceInstance){
//ON DUPLICATE KEY
instanceDao.insertOne(new InstanceInfo(serviceInstance));
}
/**
* Transform to <em>InsPersistenceLabel</em>
* @param labels
* @return
*/
private List<InsPersistenceLabel> toInsPersistenceLabels(List<? extends Label<?>> labels){
LabelBuilderFactory builderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory();
return labels.stream().map(label -> {
if(label instanceof InsPersistenceLabel){
return (InsPersistenceLabel)label;
}
InsPersistenceLabel insLabel = builderFactory.convertLabel(label, InsPersistenceLabel.class);
insLabel.setStringValue(label.getStringValue());
if(StringUtils.isNotBlank(insLabel.getStringValue())){
insLabel.setLabelValueSize(insLabel.getValue().size());
}
return insLabel;
}).collect(Collectors.toList());
}
@SuppressWarnings("unchecked")
private <T extends List<?>>void batchOperation(T input, Consumer<T> batchFunc, int batchSize){
int listLen = input.size();
if(listLen > 0) {
for (int from = 0; from < listLen; ) {
int to = Math.min(from + batchSize, listLen);
T subInput = (T) input.subList(from, to);
batchFunc.accept(subInput);
from = to;
}
}
}
}