blob: 32e0ad91b44b0b18a84a34406c969a46e933b341 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.tasks;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.utils.AtlasJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.repository.Constants.TASK_GUID;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
@Component
public class TaskRegistry {
private static final Logger LOG = LoggerFactory.getLogger(TaskRegistry.class);
private AtlasGraph graph;
@Inject
public TaskRegistry(AtlasGraph graph) {
this.graph = graph;
}
@GraphTransaction
public AtlasTask save(AtlasTask task) {
AtlasVertex vertex = createVertex(task);
return toAtlasTask(vertex);
}
@GraphTransaction
public List<AtlasTask> getPendingTasks() {
List<AtlasTask> ret = new ArrayList<>();
try {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)
.orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
Iterator<AtlasVertex> results = query.vertices().iterator();
while (results.hasNext()) {
AtlasVertex vertex = results.next();
ret.add(toAtlasTask(vertex));
}
} catch (Exception exception) {
LOG.error("Error fetching pending tasks!", exception);
} finally {
graph.commit();
}
return ret;
}
@GraphTransaction
public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
if (taskVertex == null) {
return;
}
setEncodedProperty(taskVertex, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
setEncodedProperty(taskVertex, Constants.TASK_STATUS, task.getStatus().toString());
setEncodedProperty(taskVertex, Constants.TASK_UPDATED_TIME, System.currentTimeMillis());
setEncodedProperty(taskVertex, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
}
@GraphTransaction
public void deleteByGuid(String guid) throws AtlasBaseException {
try {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(TASK_GUID, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
if (results.hasNext()) {
graph.removeVertex(results.next());
}
} catch (Exception exception) {
LOG.error("Error: deletingByGuid: {}", guid);
throw new AtlasBaseException(exception);
}
}
@GraphTransaction
public void deleteComplete(AtlasVertex taskVertex, AtlasTask task) {
updateStatus(taskVertex, task);
deleteVertex(taskVertex);
}
@GraphTransaction
public AtlasTask getById(String guid) {
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.has(TASK_GUID, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
return results.hasNext() ? toAtlasTask(results.next()) : null;
}
@GraphTransaction
public AtlasVertex getVertex(String taskGuid) {
AtlasGraphQuery query = graph.query().has(Constants.TASK_GUID, taskGuid);
Iterator<AtlasVertex> results = query.vertices().iterator();
return results.hasNext() ? results.next() : null;
}
@GraphTransaction
public List<AtlasTask> getAll() {
List<AtlasTask> ret = new ArrayList<>();
AtlasGraphQuery query = graph.query()
.has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
.orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
Iterator<AtlasVertex> results = query.vertices().iterator();
while (results.hasNext()) {
ret.add(toAtlasTask(results.next()));
}
return ret;
}
public void commit() {
this.graph.commit();
}
public AtlasTask createVertex(String taskType, String createdBy, Map<String, Object> parameters) {
AtlasTask ret = new AtlasTask(taskType, createdBy, parameters);
createVertex(ret);
return ret;
}
private void deleteVertex(AtlasVertex taskVertex) {
if (taskVertex == null) {
return;
}
graph.removeVertex(taskVertex);
}
private AtlasTask toAtlasTask(AtlasVertex v) {
AtlasTask ret = new AtlasTask();
ret.setGuid(v.getProperty(Constants.TASK_GUID, String.class));
ret.setType(v.getProperty(Constants.TASK_TYPE, String.class));
ret.setStatus(v.getProperty(Constants.TASK_STATUS, String.class));
ret.setCreatedBy(v.getProperty(Constants.TASK_CREATED_BY, String.class));
Long createdTime = v.getProperty(Constants.TASK_CREATED_TIME, Long.class);
if (createdTime != null) {
ret.setCreatedTime(new Date(createdTime));
}
Long updatedTime = v.getProperty(Constants.TASK_UPDATED_TIME, Long.class);
if (updatedTime != null) {
ret.setUpdatedTime(new Date(updatedTime));
}
Long startTime = v.getProperty(Constants.TASK_START_TIME, Long.class);
if (startTime != null) {
ret.setStartTime(new Date(startTime));
}
Long endTime = v.getProperty(Constants.TASK_END_TIME, Long.class);
if (endTime != null) {
ret.setEndTime(new Date(endTime));
}
String parametersJson = v.getProperty(Constants.TASK_PARAMETERS, String.class);
ret.setParameters(AtlasType.fromJson(parametersJson, Map.class));
ret.setAttemptCount(v.getProperty(Constants.TASK_ATTEMPT_COUNT, Integer.class));
ret.setErrorMessage(v.getProperty(Constants.TASK_ERROR_MESSAGE, String.class));
return ret;
}
private AtlasVertex createVertex(AtlasTask task) {
AtlasVertex ret = graph.addVertex();
setEncodedProperty(ret, Constants.TASK_GUID, task.getGuid());
setEncodedProperty(ret, Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME);
setEncodedProperty(ret, Constants.TASK_STATUS, task.getStatus().toString());
setEncodedProperty(ret, Constants.TASK_TYPE, task.getType());
setEncodedProperty(ret, Constants.TASK_CREATED_BY, task.getCreatedBy());
setEncodedProperty(ret, Constants.TASK_CREATED_TIME, task.getCreatedTime());
setEncodedProperty(ret, Constants.TASK_UPDATED_TIME, task.getUpdatedTime());
if (task.getStartTime() != null) {
setEncodedProperty(ret, Constants.TASK_START_TIME, task.getStartTime().getTime());
}
if (task.getEndTime() != null) {
setEncodedProperty(ret, Constants.TASK_END_TIME, task.getEndTime().getTime());
}
setEncodedProperty(ret, Constants.TASK_PARAMETERS, AtlasJson.toJson(task.getParameters()));
setEncodedProperty(ret, Constants.TASK_ATTEMPT_COUNT, task.getAttemptCount());
setEncodedProperty(ret, Constants.TASK_ERROR_MESSAGE, task.getErrorMessage());
return ret;
}
}