blob: 9d591b503a90151f57a7e4c0897f94508484a6f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.dict.lookup;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.WriteConflictException;
import org.apache.kylin.metadata.TableMetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
/**
* @author yangli9
*/
public class SnapshotManager {
private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
private static final ConcurrentHashMap<String, Object> lockObjectMap = new ConcurrentHashMap<>();
private KylinConfig config;
// path ==> SnapshotTable
private LoadingCache<String, SnapshotTable> snapshotCache; // resource
// ============================================================================
private SnapshotManager(KylinConfig config) {
this.config = config;
this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() {
@Override
public void onRemoval(RemovalNotification<String, SnapshotTable> notification) {
SnapshotManager.logger.info("Snapshot with resource path {} is removed due to {}",
notification.getKey(), notification.getCause());
}
}).maximumSize(config.getCachedSnapshotMaxEntrySize())//
.expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, SnapshotTable>() {
@Override
public SnapshotTable load(String key) throws Exception {
SnapshotTable snapshotTable = SnapshotManager.this.load(key, true);
return snapshotTable;
}
});
}
public static SnapshotManager getInstance(KylinConfig config) {
return config.getManager(SnapshotManager.class);
}
// called by reflection
static SnapshotManager newInstance(KylinConfig config) throws IOException {
return new SnapshotManager(config);
}
private Object getConcurrentObject(String resPath) {
if (!lockObjectMap.containsKey(resPath)) {
addObject(resPath);
}
return lockObjectMap.get(resPath);
}
private synchronized void addObject(String resPath) {
if (!lockObjectMap.containsKey(resPath)) {
lockObjectMap.put(resPath, new Object());
}
}
public void wipeoutCache() {
snapshotCache.invalidateAll();
}
public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
try {
SnapshotTable r = snapshotCache.get(resourcePath);
if (r == null) {
r = loadAndUpdateLocalCache(resourcePath);
}
return r;
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
private SnapshotTable loadAndUpdateLocalCache(String snapshotResPath) throws IOException {
SnapshotTable snapshotTable = load(snapshotResPath, true);
snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
return snapshotTable;
}
public List<SnapshotTable> getSnapshots(String tableName, TableSignature sourceTableSignature) throws IOException {
List<SnapshotTable> result = Lists.newArrayList();
String tableSnapshotsPath = SnapshotTable.getResourceDir(tableName);
ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
result.addAll(store.getAllResources(tableSnapshotsPath, SnapshotTableSerializer.INFO_SERIALIZER));
if (sourceTableSignature != null) {
String oldTableSnapshotsPath = SnapshotTable.getOldResourceDir(sourceTableSignature);
result.addAll(store.getAllResources(oldTableSnapshotsPath, SnapshotTableSerializer.INFO_SERIALIZER));
}
return result;
}
public void removeSnapshot(String resourcePath) throws IOException {
ResourceStore store = getStore();
store.deleteResource(resourcePath);
snapshotCache.invalidate(resourcePath);
}
public SnapshotTable buildSnapshot(IReadableTable table, TableDesc tableDesc, KylinConfig cubeConfig)
throws IOException {
SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
snapshot.updateRandomUuid();
synchronized (getConcurrentObject(tableDesc.getIdentity())) {
SnapshotTable reusableSnapshot = getReusableSnapShot(table, snapshot, tableDesc, cubeConfig);
if (reusableSnapshot != null)
return updateDictLastModifiedTime(reusableSnapshot.getResourcePath());
snapshot.takeSnapshot(table, tableDesc);
return trySaveNewSnapshot(snapshot);
}
}
public SnapshotTable rebuildSnapshot(IReadableTable table, TableDesc tableDesc, String overwriteUUID)
throws IOException {
SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
snapshot.setUuid(overwriteUUID);
snapshot.takeSnapshot(table, tableDesc);
try {
SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
snapshot.setLastModified(existing.getLastModified());
} catch (Exception ex) {
logger.error("Error reading {}, delete it and save rebuild", snapshot.getResourcePath(), ex);
}
save(snapshot);
snapshotCache.put(snapshot.getResourcePath(), snapshot);
return snapshot;
}
private SnapshotTable getReusableSnapShot(IReadableTable table, SnapshotTable snapshot, TableDesc tableDesc,
KylinConfig cubeConfig) throws IOException {
String dup = checkDupByInfo(snapshot);
if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
throw new IllegalStateException(
"Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
+ " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
}
if (dup != null) {
logger.info("Identical input {}, reuse existing snapshot at {}", table.getSignature(), dup);
return getSnapshotTable(dup);
} else {
return null;
}
}
public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException {
String dupTable = checkDupByContent(snapshotTable);
if (dupTable != null) {
logger.info("Identical snapshot content {}, reuse existing snapshot at {}", snapshotTable, dupTable);
return updateDictLastModifiedTime(dupTable);
}
save(snapshotTable);
snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
return snapshotTable;
}
private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
ResourceStore store = getStore();
String resourceDir = snapshot.getResourceDir();
NavigableSet<String> existings = store.listResources(resourceDir);
if (existings == null)
return null;
TableSignature sig = snapshot.getSignature();
for (String existing : existings) {
SnapshotTable existingTable = load(existing, false); // skip cache,
// direct load from store
if (existingTable != null && sig.equals(existingTable.getSignature()))
return existing;
}
return null;
}
private String checkDupByContent(SnapshotTable snapshot) throws IOException {
ResourceStore store = getStore();
String resourceDir = snapshot.getResourceDir();
NavigableSet<String> existings = store.listResources(resourceDir);
if (existings == null)
return null;
for (String existing : existings) {
SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store
if (existingTable != null && existingTable.equals(snapshot))
return existing;
}
return null;
}
private SnapshotTable updateDictLastModifiedTime(String snapshotPath) throws IOException {
ResourceStore store = getStore();
int retry = 7;
while (retry-- > 0) {
try {
long now = System.currentTimeMillis();
store.updateTimestamp(snapshotPath, now);
logger.info("Update snapshotTable {} lastModifiedTime to {}", snapshotPath, now);
return loadAndUpdateLocalCache(snapshotPath);
} catch (WriteConflictException e) {
if (retry <= 0) {
logger.error("Retry is out, till got error, abandoning...", e);
throw e;
}
logger.warn("Write conflict to update snapshotTable " + snapshotPath + " retry remaining " + retry
+ ", will retry...");
}
}
// update cache
return loadAndUpdateLocalCache(snapshotPath);
}
private void save(SnapshotTable snapshot) throws IOException {
ResourceStore store = getStore();
String path = snapshot.getResourcePath();
store.putBigResource(path, snapshot, System.currentTimeMillis(), SnapshotTableSerializer.FULL_SERIALIZER);
}
private SnapshotTable load(String resourcePath, boolean loadData) throws IOException {
logger.info("Loading snapshotTable from {}, with loadData: {}", resourcePath, loadData);
ResourceStore store = getStore();
SnapshotTable table = store.getResource(resourcePath,
loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER);
if (loadData)
logger.debug("Loaded snapshot at {}", resourcePath);
return table;
}
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
}