blob: 91e02878aa017b47b4a2fb993ed993bc9cd7ad17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.rocksdb;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.SstFileManager;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.rocksdb.RocksDBIteratorPool.ReusedRocksIterator;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
public class OpenedRocksDB implements AutoCloseable {
private static final Logger LOG = Log.logger(OpenedRocksDB.class);
private final RocksDB rocksdb;
private final Map<String, CFHandle> cfHandles;
private final SstFileManager sstFileManager;
public OpenedRocksDB(RocksDB rocksdb, Map<String, CFHandle> cfHandles,
SstFileManager sstFileManager) {
this.rocksdb = rocksdb;
this.cfHandles = cfHandles;
this.sstFileManager = sstFileManager;
}
protected final RocksDB rocksdb() {
return this.rocksdb;
}
public Set<String> cfs() {
return this.cfHandles.keySet();
}
public CFHandle cf(String cfName) {
return this.cfHandles.get(cfName);
}
public void addCf(String cfName, CFHandle cfHandle) {
this.cfHandles.put(cfName, cfHandle);
}
public CFHandle removeCf(String cfName) {
return this.cfHandles.remove(cfName);
}
public boolean existCf(String cfName) {
return this.cfHandles.containsKey(cfName);
}
public boolean isOwningHandle() {
return this.rocksdb.isOwningHandle();
}
@Override
public void close() {
if (!this.isOwningHandle()) {
return;
}
for (CFHandle cf : this.cfHandles.values()) {
cf.close();
}
this.cfHandles.clear();
this.rocksdb.close();
}
public long totalSize() {
return this.sstFileManager.getTotalSize();
}
public void createCheckpoint(String targetPath) {
Path parentName = Paths.get(targetPath).toAbsolutePath().getParent().getFileName();
assert parentName.toString().startsWith("snapshot") : targetPath;
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
String tempPath = targetPath + "_temp";
File tempFile = new File(tempPath);
FileUtils.deleteDirectory(tempFile);
LOG.debug("Deleted temp directory {}", tempFile);
FileUtils.forceMkdir(tempFile.getParentFile());
checkpoint.createCheckpoint(tempPath);
File snapshotFile = new File(targetPath);
FileUtils.deleteDirectory(snapshotFile);
LOG.debug("Deleted stale directory {}", snapshotFile);
if (!tempFile.renameTo(snapshotFile)) {
throw new IOException(String.format("Failed to rename %s to %s",
tempFile, snapshotFile));
}
} catch (Exception e) {
throw new BackendException("Failed to create checkpoint at path %s",
e, targetPath);
}
}
protected static final class CFHandle implements AutoCloseable {
private final ColumnFamilyHandle handle;
private final AtomicInteger refs;
private final RocksDBIteratorPool iterPool;
public CFHandle(RocksDB rocksdb, ColumnFamilyHandle handle) {
E.checkNotNull(handle, "handle");
this.handle = handle;
this.refs = new AtomicInteger(1);
this.iterPool = new RocksDBIteratorPool(rocksdb, this.handle);
}
public synchronized ColumnFamilyHandle get() {
E.checkState(this.handle.isOwningHandle(),
"It seems CF has been closed");
assert this.refs.get() >= 1;
return this.handle;
}
public synchronized ReusedRocksIterator newIterator() {
assert this.handle.isOwningHandle();
assert this.refs.get() >= 1;
return this.iterPool.newIterator();
}
public synchronized void open() {
this.refs.incrementAndGet();
}
@Override
public void close() {
if (this.refs.decrementAndGet() <= 0) {
this.iterPool.close();
this.handle.close();
}
}
public synchronized ColumnFamilyHandle waitForDrop() {
assert this.refs.get() >= 1;
// When entering this method, the refs won't increase any more
final long timeout = TimeUnit.MINUTES.toMillis(30L);
final long unit = 100L;
for (long i = 1; this.refs.get() > 1; i++) {
try {
Thread.sleep(unit);
} catch (InterruptedException ignored) {
// 30s rest api timeout may cause InterruptedException
}
if (i * unit > timeout) {
throw new BackendException("Timeout after %sms to drop CF",
timeout);
}
}
assert this.refs.get() == 1;
return this.handle;
}
public synchronized void destroy() {
this.close();
assert this.refs.get() == 0 && !this.handle.isOwningHandle();
}
}
}