blob: ec066fbc04c4b4f2b49462802c80e023c138ea10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.task;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
public abstract class TaskCallable<V> implements Callable<V> {
private static final Logger LOG = Log.logger(TaskCallable.class);
private static final String ERROR_COMMIT = "Failed to commit changes: ";
private static final Set<String> ERROR_MESSAGES = ImmutableSet.of(
/*
* "The max length of bytes is" exception message occurs when
* task input size exceeds TASK_INPUT_SIZE_LIMIT or task result size
* exceeds TASK_RESULT_SIZE_LIMIT
*/
"The max length of bytes is",
/*
* "Batch too large" exception message occurs when using
* cassandra store and task input size is in
* [batch_size_fail_threshold_in_kb, TASK_INPUT_SIZE_LIMIT) or
* task result size is in
* [batch_size_fail_threshold_in_kb, TASK_RESULT_SIZE_LIMIT)
*/
"Batch too large"
);
private HugeTask<V> task = null;
private HugeGraph graph = null;
private volatile long lastSaveTime = System.currentTimeMillis();
private volatile long saveInterval = 1000 * 30;
public TaskCallable() {
// pass
}
protected void done() {
this.closeTx();
}
protected void cancelled() {
// Do nothing, subclasses may override this method
}
protected void closeTx() {
Transaction tx = this.graph().tx();
if (tx.isOpen()) {
tx.close();
}
}
public void setMinSaveInterval(long seconds) {
E.checkArgument(seconds > 0,
"Must set interval > 0, but got '%s'", seconds);
this.saveInterval = seconds * 1000L;
}
public void updateProgress(int progress) {
HugeTask<V> task = this.task();
task.progress(progress);
long elapse = System.currentTimeMillis() - this.lastSaveTime;
if (elapse > this.saveInterval) {
this.save();
this.lastSaveTime = System.currentTimeMillis();
}
}
public int progress() {
HugeTask<V> task = this.task();
return task.progress();
}
protected void save() {
HugeTask<V> task = this.task();
task.updateTime(new Date());
try {
this.graph().taskScheduler().save(task);
} catch (Throwable e) {
if (task.completed()) {
/*
* Failed to save task and the status is stable(can't be update)
* just log the task, and try again.
*/
LOG.error("Failed to save task with error \"{}\": {}",
e, task.asMap(false));
String message = e.getMessage();
if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) {
task.failToSave(e);
this.graph().taskScheduler().save(task);
return;
}
}
throw e;
}
}
protected void graph(HugeGraph graph) {
this.graph = graph;
}
public HugeGraph graph() {
E.checkState(this.graph != null,
"Can't call graph() before scheduling task");
return this.graph;
}
protected void task(HugeTask<V> task) {
this.task = task;
}
public HugeTask<V> task() {
E.checkState(this.task != null,
"Can't call task() before scheduling task");
return this.task;
}
@SuppressWarnings("unchecked")
public static <V> TaskCallable<V> fromClass(String className) {
try {
Class<?> clazz = Class.forName(className);
return (TaskCallable<V>) clazz.newInstance();
} catch (Exception e) {
throw new HugeException("Failed to load task: %s", e, className);
}
}
private static boolean needSaveWithEx(String message) {
for (String error : ERROR_MESSAGES) {
if (message.contains(error)) {
return true;
}
}
return false;
}
public static <V> TaskCallable<V> empty(Exception e) {
return new TaskCallable<V>() {
@Override
public V call() throws Exception {
throw e;
}
};
}
public abstract static class SysTaskCallable<V> extends TaskCallable<V> {
private HugeGraphParams params = null;
protected void params(HugeGraphParams params) {
this.params = params;
}
protected HugeGraphParams params() {
E.checkState(this.params != null,
"Can't call scheduler() before scheduling task");
return this.params;
}
}
}