blob: 8fa2e26edfedf4043029c82e4f6177681b115877 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.manager;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.base.Directory;
import com.baidu.hugegraph.base.HdfsDirectory;
import com.baidu.hugegraph.base.LocalDirectory;
import com.baidu.hugegraph.base.Printer;
import com.baidu.hugegraph.base.RetryManager;
import com.baidu.hugegraph.base.ToolClient;
import com.baidu.hugegraph.cmd.SubCommands;
import com.baidu.hugegraph.concurrent.KeyLock;
import com.baidu.hugegraph.exception.ToolsException;
import com.baidu.hugegraph.structure.GraphElement;
import com.baidu.hugegraph.structure.constant.HugeType;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.util.E;
import com.google.common.collect.ImmutableMap;
public class BackupRestoreBaseManager extends RetryManager {
public static final int BATCH = 500;
protected static final int LBUF_SIZE = 1024;
protected AtomicLong propertyKeyCounter = new AtomicLong(0);
protected AtomicLong vertexLabelCounter = new AtomicLong(0);
protected AtomicLong edgeLabelCounter = new AtomicLong(0);
protected AtomicLong indexLabelCounter = new AtomicLong(0);
protected AtomicLong vertexCounter = new AtomicLong(0);
protected AtomicLong edgeCounter = new AtomicLong(0);
private long startTime = 0L;
protected static KeyLock locks = new KeyLock();
private String logDir;
protected Directory directory;
private Map<String, OutputStream> outputStreams;
private Map<String, InputStream> inputStreams;
public BackupRestoreBaseManager(ToolClient.ConnectionInfo info,
String type) {
super(info, type);
this.outputStreams = new ConcurrentHashMap<>();
this.inputStreams = new ConcurrentHashMap<>();
}
public void init(SubCommands.BackupRestore cmd) {
this.threadsNum(cmd.threadsNum());
assert cmd.retry() > 0;
this.retry(cmd.retry());
LocalDirectory.ensureDirectoryExist(cmd.logDir());
this.logDir(cmd.logDir());
this.directory(cmd.directory(), cmd.hdfsConf());
this.initExecutors();
}
public void logDir(String logDir) {
this.logDir = logDir;
}
public String logDir() {
return this.logDir;
}
public Directory directory() {
return this.directory;
}
protected void ensureDirectoryExist(boolean create) {
this.directory.ensureDirectoryExist(create);
}
protected void removeDirectory() {
this.directory.removeDirectory();
}
protected long write(String path, HugeType type,
List<?> list, boolean compress) {
OutputStream os = this.outputStream(path, compress);
ByteArrayOutputStream baos = new ByteArrayOutputStream(LBUF_SIZE);
try {
String key = String.format("{\"%s\": ", type.string());
baos.write(key.getBytes(API.CHARSET));
this.client.mapper().writeValue(baos, list);
baos.write("}\n".getBytes(API.CHARSET));
os.write(baos.toByteArray());
} catch (Throwable e) {
throw new ToolsException("Failed to serialize %s to %s",
e, type, path);
}
return list.size();
}
protected long write(String path, HugeType type, List<?> list,
boolean compress, String format,
String label, boolean allProperties,
List<String> properties) {
// format == null when called by dump
if (format == null || format.equals("json")) {
return this.write(path, type, list, compress);
}
assert format.equals("text");
return this.writeText(path, type, list, compress, label,
allProperties, properties);
}
protected long writeText(String path, HugeType type, List<?> list,
boolean compress, String label,
boolean allProperties, List<String> properties) {
OutputStream os = this.outputStream(path, compress);
ByteArrayOutputStream baos = new ByteArrayOutputStream(LBUF_SIZE);
StringBuilder builder = new StringBuilder(LBUF_SIZE);
long count = 0L;
try {
for (Object e : list) {
GraphElement element = (GraphElement) e;
if (label != null && !label.equals(element.label())) {
continue;
}
count++;
if (type == HugeType.VERTEX) {
builder.append(element.id()).append("\t");
} else {
Edge edge = (Edge) e;
builder.append(edge.sourceId()).append("\t")
.append(edge.targetId()).append("\t");
}
if (allProperties) {
for (Object value : element.properties().values()) {
builder.append(value).append(",");
}
} else {
for (String property : properties) {
builder.append(element.property(property)).append(",");
}
}
builder.setCharAt(builder.length() - 1, '\n');
}
baos.write(builder.toString().getBytes(API.CHARSET));
os.write(baos.toByteArray());
} catch (Throwable e) {
throw new ToolsException("Failed to serialize %s to %s",
e, type, path);
}
return count;
}
protected void read(String file, HugeType type,
BiConsumer<String, String> consumer) {
InputStream is = this.inputStream(file);
try (InputStreamReader isr = new InputStreamReader(is, API.CHARSET);
BufferedReader reader = new BufferedReader(isr)) {
String line;
while ((line = reader.readLine()) != null) {
consumer.accept(type.string(), line);
}
} catch (IOException e) {
throw new ToolsException("Failed to deserialize %s from %s",
e, type, file);
}
}
protected OutputStream outputStream(String file, boolean compress) {
OutputStream os = this.outputStreams.get(file);
if (os != null) {
return os;
}
os = this.directory.outputStream(file, compress, true);
OutputStream prev = this.outputStreams.putIfAbsent(file, os);
if (prev != null) {
Directory.closeAndIgnoreException(os);
os = prev;
}
return os;
}
protected InputStream inputStream(String file) {
InputStream is = this.inputStreams.get(file);
if (is != null) {
return is;
}
is = this.directory.inputStream(file);
InputStream prev = this.inputStreams.putIfAbsent(file, is);
if (prev != null) {
Directory.closeAndIgnoreException(is);
is = prev;
}
return is;
}
protected void directory(String dir, Map<String, String> hdfsConf) {
if (hdfsConf == null || hdfsConf.isEmpty()) {
// Local FS directory
this.directory = LocalDirectory.constructDir(dir, this.graph());
} else {
// HDFS directory
this.directory = HdfsDirectory.constructDir(dir, this.graph(),
hdfsConf);
}
}
protected String fileWithPrefix(HugeType type) {
List<String> files = this.filesWithPrefix(type);
E.checkState(files.size() == 1,
"There should be only one file of '%s', but got '%s'",
type, files.size());
return files.get(0);
}
protected List<String> filesWithPrefix(HugeType type) {
List<String> files = new ArrayList<>();
for (String file : this.directory.files()) {
if (file.startsWith(type.string())) {
files.add(file);
}
}
return files;
}
public void startTimer() {
this.startTime = System.currentTimeMillis();
}
public long elapseSeconds() {
E.checkState(this.startTime != 0,
"Must call startTimer() to set start time, " +
"before call elapse()");
return (System.currentTimeMillis() - this.startTime) / 1000;
}
protected void printSummary() {
this.printSummary(this.type());
}
protected void printSummary(String type) {
Printer.print("===============================================");
Map<String, Long> summary = ImmutableMap.<String, Long>builder()
.put("property key number", this.propertyKeyCounter.longValue())
.put("vertex label number", this.vertexLabelCounter.longValue())
.put("edge label number", this.edgeLabelCounter.longValue())
.put("index label number", this.indexLabelCounter.longValue())
.put("vertex number", this.vertexCounter.longValue())
.put("edge number", this.edgeCounter.longValue()).build();
Printer.printMap(type + " summary", summary);
Printer.printKV("cost time(s)", this.elapseSeconds());
}
@Override
public void shutdown(String taskType) {
super.shutdown(taskType);
for (Map.Entry<String, OutputStream> e : this.outputStreams.entrySet()) {
try {
OutputStream os = e.getValue();
os.close();
} catch (IOException exception) {
Printer.print("Failed to close file '%s'", e.getKey());
}
}
for (Map.Entry<String, InputStream> e : this.inputStreams.entrySet()) {
try {
InputStream is = e.getValue();
is.close();
} catch (IOException exception) {
Printer.print("Failed to close file '%s'", e.getKey());
}
}
}
}