blob: c08ed938b42978560cc622276fe90c17252caf8a [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.manager;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import org.apache.commons.io.FileUtils;
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.base.Printer;
import com.baidu.hugegraph.base.ToolClient;
import com.baidu.hugegraph.cmd.SubCommands;
import com.baidu.hugegraph.driver.TraverserManager;
import com.baidu.hugegraph.exception.ToolsException;
import com.baidu.hugegraph.structure.constant.HugeType;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.Edges;
import com.baidu.hugegraph.structure.graph.Shard;
import com.baidu.hugegraph.structure.graph.Vertex;
import com.baidu.hugegraph.structure.graph.Vertices;
import com.baidu.hugegraph.structure.schema.EdgeLabel;
import com.baidu.hugegraph.structure.schema.IndexLabel;
import com.baidu.hugegraph.structure.schema.PropertyKey;
import com.baidu.hugegraph.structure.schema.VertexLabel;
import com.baidu.hugegraph.util.E;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
public class BackupManager extends BackupRestoreBaseManager {
private static final String SHARDS_SUFFIX = "_shards";
private static final String ALL_SHARDS = "_all" + SHARDS_SUFFIX;
private static final String TIMEOUT_SHARDS = "_timeout" + SHARDS_SUFFIX;
private static final String LIMIT_EXCEED_SHARDS = "_limit_exceed" + SHARDS_SUFFIX;
private static final String FAILED_SHARDS = "_failed" + SHARDS_SUFFIX;
public static final int BACKUP_DEFAULT_TIMEOUT = 120;
private static final String BACKEND = "backend";
private static final Set<String> BACKENDS_NO_PAGING =
ImmutableSet.of("memory");
private static final String PAGE_NONE = "";
private static final AtomicInteger nextId = new AtomicInteger(0);
private static final ThreadLocal<Integer> suffix =
ThreadLocal.withInitial(nextId::getAndIncrement);
private long splitSize;
private String backend;
private boolean compress;
private String format;
private String label;
private boolean allProperties;
private List<String> properties;
public BackupManager(ToolClient.ConnectionInfo info) {
super(info, "backup");
this.backend = this.client.graphs().getGraph(this.graph()).get(BACKEND);
}
public void init(SubCommands.Backup backup) {
super.init(backup);
this.removeShardsFilesIfExists();
this.ensureDirectoryExist(true);
this.splitSize(backup.splitSize());
this.compress = backup.compress;
this.format = backup.format;
if (backup.label != null) {
E.checkArgument(backup.types().size() == 1 &&
(backup.types().get(0) == HugeType.VERTEX ||
backup.types().get(0) == HugeType.EDGE),
"The label can only be set when " +
"backup type is vertex or edge");
}
this.label = backup.label;
this.allProperties = backup.allProperties;
this.properties = backup.properties;
}
public void splitSize(long splitSize) {
E.checkArgument(splitSize >= 1024 * 1024,
"Split size must >= 1M, but got %s", splitSize);
this.splitSize = splitSize;
}
public long splitSize() {
return this.splitSize;
}
public void backup(List<HugeType> types) {
try {
this.doBackup(types);
} catch (Throwable e) {
throw e;
} finally {
this.shutdown(this.type());
}
}
public void doBackup(List<HugeType> types) {
this.startTimer();
for (HugeType type : types) {
switch (type) {
case VERTEX:
this.backupVertices();
break;
case EDGE:
this.backupEdges();
break;
case PROPERTY_KEY:
this.backupPropertyKeys();
break;
case VERTEX_LABEL:
this.backupVertexLabels();
break;
case EDGE_LABEL:
this.backupEdgeLabels();
break;
case INDEX_LABEL:
this.backupIndexLabels();
break;
default:
throw new AssertionError(String.format(
"Bad backup type: %s", type));
}
}
this.printSummary();
}
protected void backupVertices() {
Printer.print("Vertices backup started");
Printer.printInBackward("Vertices has been backup: ");
List<Shard> shards = retry(() ->
this.client.traverser().vertexShards(splitSize()),
"querying shards of vertices");
this.writeShards(this.allShardsLog(HugeType.VERTEX), shards);
for (Shard shard : shards) {
this.backupVertexShardAsync(shard);
}
this.awaitTasks();
this.postProcessFailedShard(HugeType.VERTEX);
Printer.print("%d", this.vertexCounter.get());
Printer.print("Vertices backup finished: %d",
this.vertexCounter.get());
}
protected void backupEdges() {
Printer.print("Edges backup started");
Printer.printInBackward("Edges has been backup: ");
List<Shard> shards = retry(() ->
this.client.traverser().edgeShards(splitSize()),
"querying shards of edges");
this.writeShards(this.allShardsLog(HugeType.EDGE), shards);
for (Shard shard : shards) {
this.backupEdgeShardAsync(shard);
}
this.awaitTasks();
this.postProcessFailedShard(HugeType.EDGE);
Printer.print("%d", this.edgeCounter.get());
Printer.print("Edges backup finished: %d", this.edgeCounter.get());
}
protected void backupPropertyKeys() {
Printer.print("Property key backup started");
List<PropertyKey> pks = this.client.schema().getPropertyKeys();
this.propertyKeyCounter.getAndAdd(pks.size());
this.backup(HugeType.PROPERTY_KEY, pks);
Printer.print("Property key backup finished: %d",
this.propertyKeyCounter.get());
}
protected void backupVertexLabels() {
Printer.print("Vertex label backup started");
List<VertexLabel> vls = this.client.schema().getVertexLabels();
this.vertexLabelCounter.getAndAdd(vls.size());
this.backup(HugeType.VERTEX_LABEL, vls);
Printer.print("Vertex label backup finished: %d",
this.vertexLabelCounter.get());
}
protected void backupEdgeLabels() {
Printer.print("Edge label backup started");
List<EdgeLabel> els = this.client.schema().getEdgeLabels();
this.edgeLabelCounter.getAndAdd(els.size());
this.backup(HugeType.EDGE_LABEL, els);
Printer.print("Edge label backup finished: %d",
this.edgeLabelCounter.get());
}
protected void backupIndexLabels() {
Printer.print("Index label backup started");
List<IndexLabel> ils = this.client.schema().getIndexLabels();
this.indexLabelCounter.getAndAdd(ils.size());
this.backup(HugeType.INDEX_LABEL, ils);
Printer.print("Index label backup finished: %d",
this.indexLabelCounter.get());
}
private void backupVertexShardAsync(Shard shard) {
this.submit(() -> {
try {
backupVertexShard(shard);
} catch (Throwable e) {
this.logExceptionWithShard(e, HugeType.VERTEX, shard);
}
});
}
private void backupEdgeShardAsync(Shard shard) {
this.submit(() -> {
try {
backupEdgeShard(shard);
} catch (Throwable e) {
this.logExceptionWithShard(e, HugeType.EDGE, shard);
}
});
}
private void backupVertexShard(Shard shard) {
String desc = String.format("backing up vertices[shard:%s]", shard);
Vertices vertices = null;
String page = this.initPage();
TraverserManager g = client.traverser();
do {
String p = page;
try {
if (page == null) {
vertices = retry(() -> g.vertices(shard), desc);
} else {
vertices = retry(() -> g.vertices(shard, p), desc);
}
} catch (ToolsException e) {
this.exceptionHandler(e, HugeType.VERTEX, shard);
}
if (vertices == null) {
return;
}
List<Vertex> vertexList = vertices.results();
if (vertexList == null || vertexList.isEmpty()) {
return;
}
long count = this.backup(HugeType.VERTEX, suffix.get(), vertexList);
this.vertexCounter.getAndAdd(count);
Printer.printInBackward(this.vertexCounter.get());
} while ((page = vertices.page()) != null);
}
private void backupEdgeShard(Shard shard) {
String desc = String.format("backing up edges[shard %s]", shard);
Edges edges = null;
String page = this.initPage();
TraverserManager g = client.traverser();
do {
try {
String p = page;
if (page == null) {
edges = retry(() -> g.edges(shard), desc);
} else {
edges = retry(() -> g.edges(shard, p), desc);
}
} catch (ToolsException e) {
this.exceptionHandler(e, HugeType.EDGE, shard);
}
if (edges == null) {
return;
}
List<Edge> edgeList = edges.results();
if (edgeList == null || edgeList.isEmpty()) {
return;
}
long count = this.backup(HugeType.EDGE, suffix.get(), edgeList);
this.edgeCounter.getAndAdd(count);
Printer.printInBackward(this.edgeCounter.get());
} while ((page = edges.page()) != null);
}
private void backup(HugeType type, List<?> list) {
String file = type.string();
this.write(file, type, list, this.compress);
}
private long backup(HugeType type, int number, List<?> list) {
String file = type.string() + number;
int size = list.size();
long count = 0L;
for (int start = 0; start < size; start += BATCH) {
int end = Math.min(start + BATCH, size);
count += this.write(file, type, list.subList(start, end),
this.compress, this.format, this.label,
this.allProperties, this.properties);
}
return count;
}
private String initPage() {
return BACKENDS_NO_PAGING.contains(this.backend) ? null : PAGE_NONE;
}
private void exceptionHandler(ToolsException e, HugeType type,
Shard shard) {
String message = e.getMessage();
switch (type) {
case VERTEX:
E.checkState(message.contains("backing up vertices"),
"Unexpected exception %s", e);
break;
case EDGE:
E.checkState(message.contains("backing up edges"),
"Unexpected exception %s", e);
break;
default:
throw new AssertionError(String.format(
"Only VERTEX or EDGE exception is expected, " +
"but got '%s' exception", type));
}
if (isLimitExceedException(e)) {
this.logLimitExceedShard(type, shard);
} else if (isTimeoutException(e)) {
this.logTimeoutShard(type, shard);
} else {
this.logExceptionWithShard(e, type, shard);
}
}
private void logTimeoutShard(HugeType type, Shard shard) {
String file = type.string() + TIMEOUT_SHARDS;
this.writeShard(Paths.get(this.logDir(), file).toString(), shard);
}
private void logLimitExceedShard(HugeType type, Shard shard) {
String file = type.string() + LIMIT_EXCEED_SHARDS;
this.writeShard(Paths.get(this.logDir(), file).toString(), shard);
}
private void logExceptionWithShard(Object e, HugeType type, Shard shard) {
String fileName = type.string() + FAILED_SHARDS;
String filePath = Paths.get(this.logDir(), fileName).toString();
try (FileWriter writer = new FileWriter(filePath, true)) {
writer.write(shard.toString() + "\n");
writer.write(exceptionStackTrace(e) + "\n");
} catch (IOException e1) {
Printer.print("Failed to write shard '%s' with exception '%s'",
shard, e);
}
}
private void postProcessFailedShard(HugeType type) {
this.processTimeoutShards(type);
this.processLimitExceedShards(type);
}
private void processTimeoutShards(HugeType type) {
Path path = Paths.get(this.logDir(), type.string() + TIMEOUT_SHARDS);
File shardFile = path.toFile();
if (!shardFile.exists() || shardFile.isDirectory()) {
return;
}
Printer.print("Timeout occurs when backup %s shards in file '%s', " +
"try to use global option --timeout to increase " +
"connection timeout(default is 120s for backup) or use " +
"option --split-size to decrease split size",
type, shardFile);
}
private void processLimitExceedShards(HugeType type) {
Path path = Paths.get(this.logDir(),
type.string() + LIMIT_EXCEED_SHARDS);
File shardFile = path.toFile();
if (!shardFile.exists() || shardFile.isDirectory()) {
return;
}
Printer.print("Limit exceed occurs when backup %s shards in file '%s'",
type, shardFile);
}
private List<Shard> readShards(File file) {
E.checkArgument(file.exists() && file.isFile() && file.canRead(),
"Need to specify a readable filter file rather than:" +
" %s", file.toString());
List<Shard> shards = new ArrayList<>();
try (InputStream is = new FileInputStream(file);
InputStreamReader isr = new InputStreamReader(is, API.CHARSET);
BufferedReader reader = new BufferedReader(isr)) {
String line;
while ((line = reader.readLine()) != null) {
shards.addAll(this.readList("shards", Shard.class, line));
}
} catch (IOException e) {
throw new ToolsException("IOException occur while reading %s",
e, file.getName());
}
return shards;
}
private void writeShard(String file, Shard shard) {
this.writeShards(file, ImmutableList.of(shard));
}
private void writeShards(String file, List<Shard> shards) {
this.writeLog(file, "shards", shards);
}
private void writeLog(String file, String type, List<?> list) {
Lock lock = locks.lock(file);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(LBUF_SIZE);
FileOutputStream fos = new FileOutputStream(file, false)) {
String key = String.format("{\"%s\": ", type);
baos.write(key.getBytes(API.CHARSET));
this.client.mapper().writeValue(baos, list);
baos.write("}\n".getBytes(API.CHARSET));
fos.write(baos.toByteArray());
} catch (Exception e) {
Printer.print("Failed to serialize %s: %s", type, e);
} finally {
lock.unlock();
}
}
private String allShardsLog(HugeType type) {
String shardsFile = type.string() + ALL_SHARDS;
return Paths.get(this.logDir(), shardsFile).toString();
}
protected void removeShardsFilesIfExists() {
File logDir = new File(this.logDir());
E.checkArgument(logDir.exists() && logDir.isDirectory(),
"The log directory '%s' not exists or is file",
logDir);
for (File file : logDir.listFiles()) {
if (file.getName().endsWith(SHARDS_SUFFIX)) {
try {
FileUtils.forceDelete(file);
} catch (IOException e) {
throw new ToolsException("Failed to delete shard file " +
"'%s'", file);
}
}
}
}
private static boolean isTimeoutException(ToolsException e) {
return e.getCause() != null && e.getCause().getCause() != null &&
e.getCause().getCause().getMessage().contains("Read timed out");
}
private static boolean isLimitExceedException(ToolsException e) {
return e.getCause() != null &&
e.getCause().getMessage().contains("Too many records");
}
private static String exceptionStackTrace(Object e) {
if (!(e instanceof Throwable)) {
return e.toString();
}
Throwable t = (Throwable) e;
StringBuilder sb = new StringBuilder();
sb.append(t.getMessage()).append("\n");
if (t.getCause() != null) {
sb.append(t.getCause().toString()).append("\n");
}
for (StackTraceElement element : t.getStackTrace()) {
sb.append(element).append("\n");
}
return sb.toString();
}
}