blob: d5468b6ca29e195e02f60e42461878544a9ed677 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.loader.progress;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.loader.exception.LoadException;
import org.apache.hugegraph.loader.util.JsonUtil;
import org.apache.hugegraph.loader.util.LoadUtil;
import org.apache.hugegraph.loader.constant.Constants;
import org.apache.hugegraph.loader.executor.LoadContext;
import org.apache.hugegraph.loader.executor.LoadOptions;
import org.apache.hugegraph.loader.mapping.InputStruct;
import org.apache.hugegraph.util.E;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* LoadProgress was used to record progress of loading, in order to
* continue loading when the last work was dropped out halfway.
* The LoadProgress will only be operated by a single thread.
*/
public final class LoadProgress {
@JsonProperty("vertex_progress")
private long vertexLoaded;
@JsonProperty("edge_progress")
private long edgeLoaded;
@JsonProperty("input_progress")
private final Map<String, InputProgress> inputProgress;
public LoadProgress() {
this.vertexLoaded = 0L;
this.edgeLoaded = 0L;
this.inputProgress = new LinkedHashMap<>();
}
public long vertexLoaded() {
return this.vertexLoaded;
}
public void plusVertexLoaded(long count) {
this.vertexLoaded += count;
}
public long edgeLoaded() {
return this.edgeLoaded;
}
public void plusEdgeLoaded(long count) {
this.edgeLoaded += count;
}
public Map<String, InputProgress> inputProgress() {
return this.inputProgress;
}
public long totalInputRead() {
long count = 0L;
for (InputProgress inputProgress : this.inputProgress.values()) {
Set<InputItemProgress> itemProgresses = inputProgress.loadedItems();
for (InputItemProgress itemProgress : itemProgresses) {
count += itemProgress.offset();
}
if (inputProgress.loadingItem() != null) {
count += inputProgress.loadingItem().offset();
}
}
return count;
}
public InputProgress addStruct(InputStruct struct) {
E.checkNotNull(struct, "mapping mapping");
this.inputProgress.put(struct.id(), new InputProgress(struct));
return this.inputProgress.get(struct.id());
}
public InputProgress get(String id) {
return this.inputProgress.get(id);
}
public void markLoaded(InputStruct struct, boolean markAll) {
InputProgress progress = this.inputProgress.get(struct.id());
E.checkArgumentNotNull(progress, "Invalid mapping '%s'", struct);
progress.markLoaded(markAll);
}
public void write(LoadContext context) throws IOException {
String fileName = format(context.options(), context.timestamp());
File file = FileUtils.getFile(fileName);
String json = JsonUtil.toJson(this);
FileUtils.write(file, json, Constants.CHARSET, false);
}
public static LoadProgress read(File file) throws IOException {
String json = FileUtils.readFileToString(file, Constants.CHARSET);
return JsonUtil.fromJson(json, LoadProgress.class);
}
public static LoadProgress parse(LoadOptions options) {
if (!options.incrementalMode) {
return new LoadProgress();
}
String dir = LoadUtil.getStructDirPrefix(options);
File dirFile = FileUtils.getFile(dir);
if (!dirFile.exists()) {
return new LoadProgress();
}
File[] subFiles = dirFile.listFiles((d, name) -> {
return name.startsWith(Constants.LOAD_PROGRESS);
});
if (subFiles == null || subFiles.length == 0) {
return new LoadProgress();
}
// Sort progress files by time, then get the last progress file
List<File> progressFiles = Arrays.asList(subFiles);
progressFiles.sort(Comparator.comparing(File::getName));
File lastProgressFile = progressFiles.get(progressFiles.size() - 1);
try {
return LoadProgress.read(lastProgressFile);
} catch (IOException e) {
throw new LoadException("Failed to read progress file", e);
}
}
public static String format(LoadOptions options, String timestamp) {
String dir = LoadUtil.getStructDirPrefix(options);
String name = Constants.LOAD_PROGRESS + Constants.UNDERLINE_STR +
timestamp;
return Paths.get(dir, name).toString();
}
}