blob: 261d429a8d5873bb852d50f6fc24936328976e46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eventmesh.dashboard.console.function.metadata;
import org.apache.eventmesh.dashboard.common.model.metadata.MetadataConfig;
import org.apache.eventmesh.dashboard.console.entity.base.BaseEntity;
import org.apache.eventmesh.dashboard.console.function.metadata.MetadataServiceWrapper.SingleMetadataServiceWrapper;
import org.apache.eventmesh.dashboard.console.function.metadata.handler.MetadataHandlerWrapper;
import org.apache.eventmesh.dashboard.console.function.metadata.syncservice.SyncDataServiceWrapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.validation.constraints.NotNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* MetadataManager is a manager for metadata service, it will sync the data between cluster service and database. database should be empty when this
* manager booted
*/
@Slf4j
public class MetadataManager {
@Setter
private Boolean toDbSync = true;
private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(2);
private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(32, 32, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger(0);
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "metadata-manager-" + counter.incrementAndGet());
}
});
/**
* singleton id for service wrapper map, even if the cache is not on, the id should be increased.
*/
private static final AtomicLong staticServiceId = new AtomicLong(0);
private static final ConcurrentHashMap<Long, Boolean> firstRunToDb = new ConcurrentHashMap<>();
private final Map<Long, MetadataServiceWrapper> metaDataServiceWrapperMap = new ConcurrentHashMap<>();
private final Map<Long, List<Object>> cacheData = new ConcurrentHashMap<>();
public void init(Integer initialDelay, Integer period) {
scheduledExecutorService.scheduleAtFixedRate(() -> MetadataManager.this.run(toDbSync, true), initialDelay, period, TimeUnit.SECONDS);
}
/**
* entrance of a sync scheduled task
*
* @param metaDataServiceWrapper
*/
public void addMetadataService(MetadataServiceWrapper metaDataServiceWrapper) {
Long cacheId = staticServiceId.incrementAndGet();
metaDataServiceWrapper.setCacheId(cacheId);
metaDataServiceWrapperMap.put(cacheId, metaDataServiceWrapper);
}
public void run() {
metaDataServiceWrapperMap.forEach(this::handlers);
}
public void run(Boolean toDbOn, Boolean toServiceOn) {
try {
metaDataServiceWrapperMap.forEach((cacheId, metaDataServiceWrapper) -> handlers(cacheId, metaDataServiceWrapper, toDbOn, toServiceOn));
} catch (Exception e) {
log.error("metadata manager run error", e);
}
}
public void handlers(Long cacheId, MetadataServiceWrapper metaDataServiceWrapper, Boolean toDbOn, Boolean toServiceOn) {
this.threadPoolExecutor.execute(() -> {
try {
if (toDbOn) {
this.handler(cacheId, metaDataServiceWrapper.getDbToService(), true);
}
if (toServiceOn) {
this.handler(cacheId, metaDataServiceWrapper.getServiceToDb(), false);
}
} catch (Throwable e) {
log.error("metadata manager handler error", e);
}
});
}
public void handlers(Long cacheId, MetadataServiceWrapper metaDataServiceWrapper) {
handlers(cacheId, metaDataServiceWrapper, true, true);
}
public void handler(Long cacheID, SingleMetadataServiceWrapper singleMetadataServiceWrapper, boolean isDbToService) {
if (singleMetadataServiceWrapper == null) {
return;
}
try {
List<Object> newObjectList = (List<Object>) singleMetadataServiceWrapper.getSyncService().getData();
if (newObjectList.isEmpty()) {
return;
}
//if cache is false, we don't need to compare the data
// full volume updates
if (!singleMetadataServiceWrapper.getCache()) {
singleMetadataServiceWrapper.getHandler()
.replaceMetadata(newObjectList);
return;
}
List<Object> cacheDataList = cacheData.get(cacheID);
//update old cache
cacheData.put(cacheID, newObjectList);
Map<String, Object> newObjectMap = getUniqueKeyMap(newObjectList);
Map<String, Object> oldObjectMap = getUniqueKeyMap(cacheDataList);
//these three List are in target type
List<Object> toUpdate = new ArrayList<>();
List<Object> toDelete = new ArrayList<>();
List<Object> toInsert;
for (Entry<String, Object> entry : oldObjectMap.entrySet()) {
Object serviceObject = newObjectMap.remove(entry.getKey());
//if new Data don't have a key in oldMap,
if (serviceObject == null) {
toDelete.add(entry.getValue());
} else {
//primary id, creat time and update time should not be compared
//if not equal, we need to update fields except unique key(they are equal)
//cause entry is from the oldMap, it should contain the primary key.
if (!serviceObject.equals(entry.getValue())) {
toUpdate.add(entry.getValue());
}
}
}
toInsert = new ArrayList<>(newObjectMap.values());
//if target is db, we use handler to provide transaction
if (!isDbToService) {
firstRunToDb.putIfAbsent(cacheID, false);
singleMetadataServiceWrapper.getHandler().handleAllObject(toInsert, toUpdate, toDelete);
//if target is eventmesh, we just use that 3 basic method
} else {
toInsert.forEach(singleMetadataServiceWrapper.getHandler()::addMetadataObject);
toUpdate.forEach(singleMetadataServiceWrapper.getHandler()::updateMetadataObject);
toDelete.forEach(singleMetadataServiceWrapper.getHandler()::deleteMetadataObject);
}
} catch (Throwable e) {
log.error("metadata manager handler error", e);
}
}
public void setUpSyncMetadataManager(SyncDataServiceWrapper syncDataServiceWrapper, MetadataHandlerWrapper metadataHandlerWrapper) {
MetadataServiceWrapper metadataServiceWrapper = new MetadataServiceWrapper();
SingleMetadataServiceWrapper singleMetadataServiceWrapper = SingleMetadataServiceWrapper.builder()
.syncService(syncDataServiceWrapper.getRuntimeSyncFromClusterService())
.handler(metadataHandlerWrapper.getRuntimeMetadataHandlerToDb()).build();
metadataServiceWrapper.setServiceToDb(singleMetadataServiceWrapper);
this.addMetadataService(metadataServiceWrapper);
}
private Map<String, Object> getUniqueKeyMap(List<Object> list) {
Map<String, Object> map = new HashMap<>();
if (Objects.nonNull(list) && !list.isEmpty()) {
Object firstItem = list.get(0);
if (firstItem instanceof MetadataConfig) {
for (Object item : list) {
MetadataConfig metadataItem = (MetadataConfig) item;
map.put(metadataItem.getUnique(), metadataItem);
}
} else if (firstItem instanceof BaseEntity) {
for (Object item : list) {
BaseEntity baseEntityItem = (BaseEntity) item;
//TODO we don't have db2service method and getUniqueKeyMap from entity is not used
// map.put(baseEntityItem.getUniqueKey(), baseEntityItem);
}
}
}
return map;
}
//TODO if database is modified by other service, we need to update the cache
}