ATLAS-4733 : Download Basic and DSL search results
Signed-off-by: Mandar Ambawane <mandar.ambawane@freestoneinfotech.com>
diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
index 6083424..21ac7f7 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java
@@ -219,6 +219,7 @@
GLOSSARY_CATEGORY_ALREADY_EXISTS(409, "ATLAS-409-00-00A", "Glossary category with qualifiedName {0} already exists"),
GLOSSARY_IMPORT_FAILED(409, "ATLAS-409-00-011", "Glossary import failed"),
METRICSSTAT_ALREADY_EXISTS(409, "ATLAS-409-00-012", "Metric Statistics already collected at {0}"),
+ PENDING_TASKS_ALREADY_IN_PROGRESS(409, "ATLAS-409-00-013", "There are already {0} pending tasks in queue"),
// All internal errors go here
INTERNAL_ERROR(500, "ATLAS-500-00-001", "Internal server error {0}"),
diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResultDownloadStatus.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResultDownloadStatus.java
new file mode 100644
index 0000000..f2f73e6
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResultDownloadStatus.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.discovery;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.tasks.AtlasTask;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.Date;
+import java.util.List;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasSearchResultDownloadStatus implements Serializable {
+
+ private List<AtlasSearchDownloadRecord> searchDownloadRecords;
+
+ public List<AtlasSearchDownloadRecord> getSearchDownloadRecords() {
+ return searchDownloadRecords;
+ }
+
+ public void setSearchDownloadRecords(List<AtlasSearchDownloadRecord> searchDownloadRecords) {
+ this.searchDownloadRecords = searchDownloadRecords;
+ }
+
+ @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ @XmlRootElement
+ @XmlAccessorType(XmlAccessType.PROPERTY)
+ public static class AtlasSearchDownloadRecord implements Serializable {
+ private AtlasTask.Status status;
+ private String fileName;
+ private String createdBy;
+ private Date createdTime;
+ private Date startTime;
+
+
+ public AtlasSearchDownloadRecord(AtlasTask.Status status, String fileName, String createdBy, Date createdTime, Date startTime) {
+ this.status = status;
+ this.fileName = fileName;
+ this.createdBy = createdBy;
+ this.createdTime = createdTime;
+ this.startTime = startTime;
+ }
+
+ public AtlasSearchDownloadRecord(AtlasTask.Status status, String fileName, String createdBy, Date createdTime) {
+ this(status, fileName, createdBy, createdTime, null);
+ }
+
+ public AtlasTask.Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(AtlasTask.Status status) {
+ this.status = status;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+ public void setCreatedBy(String createdBy) {
+ this.createdBy = createdBy;
+ }
+
+ public Date getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(Date createdTime) {
+ this.createdTime = createdTime;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+ sb.append("AtlasSearchDownloadRecord{");
+ sb.append("status=").append(status);
+ sb.append(", fileName=").append(fileName);
+ sb.append(", createdBy=").append(createdBy);
+ sb.append(", createTime=").append(createdTime);
+ sb.append(", startTime=").append(startTime);
+ sb.append("}");
+ return sb;
+ }
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index d941100..f8e55b8 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -19,13 +19,13 @@
package org.apache.atlas.discovery;
-import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.*;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
+import java.io.IOException;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
public interface AtlasDiscoveryService {
/**
@@ -160,4 +160,17 @@
* @return top 5 suggestion strings for the given prefix.
*/
AtlasSuggestionsResult getSuggestions(String prefixString, String fieldName);
+
+ /**
+ * Creates task to search and download the results of Basic and DSL search
+ * @param taskParams parameters of AtlasTask
+ */
+ void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException;
+
+ /**
+ *
+ * @return AtlasSearchResultDownloadStatus
+ * @throws IOException
+ */
+ AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException;
}
diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 582d975..5b43953 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -22,6 +22,7 @@
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@@ -32,6 +33,8 @@
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasFullTextResult;
import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType;
+import org.apache.atlas.model.discovery.AtlasSearchResultDownloadStatus;
+import org.apache.atlas.model.discovery.AtlasSearchResultDownloadStatus.AtlasSearchDownloadRecord;
import org.apache.atlas.model.discovery.AtlasSuggestionsResult;
import org.apache.atlas.model.discovery.QuickSearchParameters;
import org.apache.atlas.model.discovery.RelationshipSearchParameters;
@@ -41,6 +44,7 @@
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelationshipHeader;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
+import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.query.QueryParams;
import org.apache.atlas.query.executors.DSLQueryExecutor;
import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
@@ -56,7 +60,10 @@
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
+import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory;
import org.apache.atlas.repository.userprofile.UserProfileService;
+import org.apache.atlas.tasks.TaskManagement;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
import org.apache.atlas.type.AtlasClassificationType;
@@ -83,15 +90,21 @@
import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.SortOrder.ASCENDING;
@@ -120,13 +133,15 @@
private final UserProfileService userProfileService;
private final SuggestionsProvider suggestionsProvider;
private final DSLQueryExecutor dslQueryExecutor;
+ private final TaskManagement taskManagement;
@Inject
EntityDiscoveryService(AtlasTypeRegistry typeRegistry,
AtlasGraph graph,
GraphBackedSearchIndexer indexer,
SearchTracker searchTracker,
- UserProfileService userProfileService) throws AtlasException {
+ UserProfileService userProfileService,
+ TaskManagement taskManagement) throws AtlasException {
this.graph = graph;
this.entityRetriever = new EntityGraphRetriever(this.graph, typeRegistry);
this.indexer = indexer;
@@ -142,6 +157,7 @@
this.dslQueryExecutor = AtlasConfiguration.DSL_EXECUTOR_TRAVERSAL.getBoolean()
? new TraversalBasedExecutor(typeRegistry, graph, entityRetriever)
: new ScriptEngineBasedExecutor(typeRegistry, graph, entityRetriever);
+ this.taskManagement = taskManagement;
LOG.info("DSL Executor: {}", this.dslQueryExecutor.getClass().getSimpleName());
}
@@ -453,6 +469,48 @@
@Override
@GraphTransaction
+ public void createAndQueueSearchResultDownloadTask(Map<String, Object> taskParams) throws AtlasBaseException {
+
+ List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD);
+ if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() > SearchResultDownloadTaskFactory.MAX_PENDING_TASKS_ALLOWED) {
+ throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS, String.valueOf(pendingTasks.size()));
+ }
+ AtlasTask task = taskManagement.createTask(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD, RequestContext.getCurrentUser(), taskParams);
+ RequestContext.get().queueTask(task);
+ }
+
+ @Override
+ public AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException {
+ List<AtlasTask> pendingTasks = taskManagement.getPendingTasksByType(SearchResultDownloadTaskFactory.SEARCH_RESULT_DOWNLOAD);
+ List<AtlasTask> currentUserPendingTasks = pendingTasks.stream().filter(task -> task.getCreatedBy()
+ .equals(RequestContext.getCurrentUser())).collect(Collectors.toList());
+
+ List<AtlasSearchDownloadRecord> searchDownloadRecords = new ArrayList<>();
+ for (AtlasTask pendingTask : currentUserPendingTasks) {
+ String fileName = (String) pendingTask.getParameters().get(SearchResultDownloadTask.CSV_FILE_NAME_KEY);
+ AtlasSearchDownloadRecord searchDownloadRecord = new AtlasSearchDownloadRecord(pendingTask.getStatus(), fileName, pendingTask.getCreatedBy(), pendingTask.getCreatedTime(), pendingTask.getStartTime());
+ searchDownloadRecords.add(searchDownloadRecord);
+ }
+
+ File fileDir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
+ if (fileDir.exists()) {
+ File[] currentUserFiles = fileDir.listFiles();
+ if (currentUserFiles != null) {
+ for (File file : currentUserFiles) {
+ BasicFileAttributes attr = Files.readAttributes(file.toPath(), BasicFileAttributes.class);
+ Date createdTime = new Date(attr.creationTime().toMillis());
+ AtlasSearchDownloadRecord searchDownloadRecord = new AtlasSearchDownloadRecord(AtlasTask.Status.COMPLETE, file.getName(), RequestContext.getCurrentUser(), createdTime);
+ searchDownloadRecords.add(searchDownloadRecord);
+ }
+ }
+ }
+ AtlasSearchResultDownloadStatus result = new AtlasSearchResultDownloadStatus();
+ result.setSearchDownloadRecords(searchDownloadRecords);
+ return result;
+ }
+
+ @Override
+ @GraphTransaction
public AtlasSearchResult searchRelationsWithParameters(RelationshipSearchParameters searchParameters) throws AtlasBaseException {
SearchContext searchContext = new SearchContext(createSearchParameters(searchParameters),
typeRegistry,
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTask.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTask.java
new file mode 100644
index 0000000..fd90fd4
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTask.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload;
+
+import com.opencsv.CSVWriter;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC;
+import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+
+public class SearchResultDownloadTask extends AbstractTask {
+ private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTask.class);
+
+ public static final String SEARCH_PARAMETERS_JSON_KEY = "search_parameters_json";
+ public static final String CSV_FILE_NAME_KEY = "csv_file_Name";
+ public static final String SEARCH_TYPE_KEY = "search_type";
+ public static final String ATTRIBUTE_LABEL_MAP_KEY = "attribute_label_map";
+ public static final String QUERY_KEY = "query";
+ public static final String TYPE_NAME_KEY = "type_name";
+ public static final String CLASSIFICATION_KEY = "classification";
+ public static final String LIMIT_KEY = "limit";
+ public static final String OFFSET_KEY = "offset";
+ public static final String CSV_FILE_EXTENSION = ".csv";
+ public static String DOWNLOAD_DIR_PATH;
+ private static final String EMPTY_STRING = "";
+ private static final String DOWNLOAD_DIR_PATH_KEY = "atlas.download.search.dir.path";
+ private static final String DOWNLOAD_DIR_PATH_DEFAULT = StringUtils.isEmpty(System.getProperty("atlas.home")) ? System.getProperty("user.dir") : System.getProperty("atlas.home");
+ private static final String CSV_DOWNLOAD_DIR = "search_result_downloads";
+
+ private static Configuration configuration;
+
+ static {
+ try {
+ configuration = ApplicationProperties.get();
+ } catch (AtlasException e) {
+ LOG.error("Failed to load application properties", e);
+ }
+ if (configuration != null) {
+ DOWNLOAD_DIR_PATH = configuration.getString(DOWNLOAD_DIR_PATH_KEY, DOWNLOAD_DIR_PATH_DEFAULT) + File.separator + CSV_DOWNLOAD_DIR;
+ } else {
+ DOWNLOAD_DIR_PATH = DOWNLOAD_DIR_PATH_DEFAULT + File.separator + CSV_DOWNLOAD_DIR;
+ }
+ }
+
+ private final AtlasDiscoveryService discoveryService;
+ private final AtlasTypeRegistry typeRegistry;
+
+ public SearchResultDownloadTask(AtlasTask task, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
+ super(task);
+ this.discoveryService = discoveryService;
+ this.typeRegistry = typeRegistry;
+ }
+
+ @Override
+ public AtlasTask.Status perform() throws Exception {
+ RequestContext.clear();
+ Map<String, Object> params = getTaskDef().getParameters();
+
+ if (MapUtils.isEmpty(params)) {
+ LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
+
+ return FAILED;
+ }
+
+ String userName = getTaskDef().getCreatedBy();
+
+ if (StringUtils.isEmpty(userName)) {
+ LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
+
+ return FAILED;
+ }
+
+ RequestContext.get().setUser(userName, null);
+
+ try {
+ run(params);
+
+ setStatus(COMPLETE);
+ } catch (Exception e) {
+ LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+ setStatus(FAILED);
+
+ throw e;
+ } finally {
+ RequestContext.clear();
+ }
+
+ return getStatus();
+ }
+
+ protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException {
+ Map<String, String> attributeLabelMap;
+ AtlasSearchResult searchResult = null;
+ AtlasSearchResult.AtlasQueryType queryType = null;
+
+ if (parameters.get(SEARCH_TYPE_KEY) == BASIC) {
+ String searchParametersJson = (String) parameters.get(SEARCH_PARAMETERS_JSON_KEY);
+ SearchParameters searchParameters = AtlasJson.fromJson(searchParametersJson, SearchParameters.class);
+ searchParameters.setLimit(AtlasConfiguration.SEARCH_MAX_LIMIT.getInt());
+ searchResult = discoveryService.searchWithParameters(searchParameters);
+ queryType = BASIC;
+
+ } else if (parameters.get(SEARCH_TYPE_KEY) == DSL) {
+ String query = (String) parameters.get(QUERY_KEY);
+ String typeName = (String) parameters.get(TYPE_NAME_KEY);
+ String classification = (String) parameters.get(CLASSIFICATION_KEY);
+ int offset = (int) parameters.get(OFFSET_KEY);
+ String queryStr = discoveryService.getDslQueryUsingTypeNameClassification(query, typeName, classification);
+ searchResult = discoveryService.searchUsingDslQuery(queryStr, AtlasConfiguration.SEARCH_MAX_LIMIT.getInt(), offset);
+ queryType = DSL;
+ }
+
+ String attributeLabelMapJson = (String) parameters.get(ATTRIBUTE_LABEL_MAP_KEY);
+ attributeLabelMap = AtlasJson.fromJson(attributeLabelMapJson, Map.class);
+
+ generateCSVFileFromSearchResult(searchResult, attributeLabelMap, queryType);
+ }
+
+ private void generateCSVFileFromSearchResult(AtlasSearchResult searchResult, Map<String, String> attributeLabelMap, AtlasSearchResult.AtlasQueryType queryType) throws IOException {
+
+ List<AtlasEntityHeader> allEntityHeaders = searchResult.getEntities();
+ AtlasSearchResult.AttributeSearchResult attributeSearchResult = searchResult.getAttributes();
+ String fileName = (String) getTaskDef().getParameters().get(CSV_FILE_NAME_KEY);
+
+ if ((queryType == BASIC && CollectionUtils.isEmpty(allEntityHeaders))
+ || (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult == null))) {
+ LOG.info("No result found. Not generating csv file: {}", fileName);
+ return;
+ }
+
+ File dir = new File(DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
+ File csvFile;
+ if (!dir.exists()) {
+ dir.mkdirs();
+ }
+
+ csvFile = new File(dir, fileName);
+ try (FileWriter fileWriter = new FileWriter(csvFile);
+ CSVWriter csvWriter = new CSVWriter(fileWriter)) {
+
+ String[] defaultHeaders = new String[]{"Type name", "Name", "Classifications", "Terms"};
+ String[] attributeHeaders;
+ int attrSize = 0;
+
+ if (attributeLabelMap == null) {
+ attributeLabelMap = new HashMap<>();
+ }
+ attributeLabelMap.put("Owner", "owner");
+ attributeLabelMap.put("Description", "description");
+
+ Collection<String> attributeHeaderLabels = attributeLabelMap.keySet();
+
+ if (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult != null)) {
+ attributeHeaderLabels = attributeSearchResult.getName();
+ defaultHeaders = new String[0];
+ }
+
+ attrSize = (attributeHeaderLabels == null) ? 0 : attributeHeaderLabels.size();
+ attributeHeaders = new String[attrSize];
+ if (attributeHeaderLabels != null) {
+ attributeHeaders = attributeHeaderLabels.toArray(attributeHeaders);
+ }
+
+ int headerSize = attrSize + defaultHeaders.length;
+ String[] headers = new String[headerSize];
+ System.arraycopy(defaultHeaders, 0, headers, 0, defaultHeaders.length);
+ if (ArrayUtils.isNotEmpty(attributeHeaders)) {
+ System.arraycopy(attributeHeaders, 0, headers, defaultHeaders.length, attrSize);
+ }
+
+ csvWriter.writeNext(headers);
+
+ String[] entityRecords = new String[headerSize];
+ if (CollectionUtils.isNotEmpty(allEntityHeaders)) {
+ for (AtlasEntityHeader entityHeader : allEntityHeaders) {
+
+ entityRecords[0] = entityHeader.getTypeName();
+ entityRecords[1] = entityHeader.getDisplayText() != null ? entityHeader.getDisplayText() : entityHeader.getGuid();
+ entityRecords[2] = String.join(",", entityHeader.getClassificationNames());
+ entityRecords[3] = String.join(",", entityHeader.getMeaningNames());
+
+ if (MapUtils.isNotEmpty(entityHeader.getAttributes())) {
+
+ for (int i = defaultHeaders.length; i < headerSize; i++) {
+
+ Object attrValue = entityHeader.getAttribute(attributeLabelMap.get(headers[i]));
+ if (attrValue instanceof AtlasObjectId) {
+ entityRecords[i] = String.valueOf(((AtlasObjectId) attrValue).getUniqueAttributes().get("qualifiedName"));
+
+ } else if (attrValue instanceof List) {
+
+ if (CollectionUtils.isNotEmpty((List<?>) attrValue)) {
+ List<String> valueList = new ArrayList<>();
+ for (Object attrVal : (List) attrValue) {
+ if (attrVal instanceof AtlasObjectId) {
+ String value = String.valueOf(((AtlasObjectId) attrVal).getUniqueAttributes().get("qualifiedName"));
+ valueList.add(value);
+ } else {
+ valueList.add(String.valueOf(attrVal));
+ }
+ }
+ entityRecords[i] = String.join(",", valueList);
+ }
+ } else {
+ entityRecords[i] = attrValue == null ? EMPTY_STRING : String.valueOf(attrValue);
+ }
+ }
+ }
+ csvWriter.writeNext(entityRecords);
+ }
+ }
+
+ if (queryType == DSL && attributeSearchResult != null) {
+ for (List<Object> resultSet : attributeSearchResult.getValues()) {
+ for (int i = defaultHeaders.length; i < headerSize; i++) {
+ entityRecords[i] = resultSet.get(i) == null ? EMPTY_STRING : String.valueOf(resultSet.get(i));
+ }
+ csvWriter.writeNext(entityRecords);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTaskFactory.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTaskFactory.java
new file mode 100644
index 0000000..dda6948
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/searchdownload/SearchResultDownloadTaskFactory.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.tasks.TaskFactory;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Singleton
+@Service
+@EnableScheduling
+public class SearchResultDownloadTaskFactory implements TaskFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTaskFactory.class);
+
+ public static final String SEARCH_RESULT_DOWNLOAD = "SEARCH_RESULT_DOWNLOAD";
+ public static int MAX_PENDING_TASKS_ALLOWED;
+ private static final int MAX_PENDING_TASKS_ALLOWED_DEFAULT = 50;
+ private static final String MAX_PENDING_TASKS_ALLOWED_KEY = "atlas.download.search.max.pending.tasks";
+ private static final String FILES_CLEANUP_INTERVAL = "0 0/1 * * * *";
+ private static final long FILE_EXP_DURATION_IN_MILLIS_DEFAULT = 24 * 60 * 60 * 1000;
+ private static long FILE_EXP_DURATION_IN_MILLIS;
+ private static final String FILE_EXP_DURATION_IN_MILLIS_KEY = "atlas.download.search.file.expiry.millis";
+ private static Configuration configuration;
+
+ static {
+ try {
+ configuration = ApplicationProperties.get();
+ } catch (Exception e) {
+ LOG.info("Failed to load application properties", e);
+ }
+ if (configuration != null) {
+ MAX_PENDING_TASKS_ALLOWED = configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY, MAX_PENDING_TASKS_ALLOWED_DEFAULT);
+ FILE_EXP_DURATION_IN_MILLIS = configuration.getLong(FILE_EXP_DURATION_IN_MILLIS_KEY, FILE_EXP_DURATION_IN_MILLIS_DEFAULT);
+ } else {
+ MAX_PENDING_TASKS_ALLOWED = MAX_PENDING_TASKS_ALLOWED_DEFAULT;
+ FILE_EXP_DURATION_IN_MILLIS = FILE_EXP_DURATION_IN_MILLIS_DEFAULT;
+ }
+ }
+
+ private static final List<String> supportedTypes = new ArrayList<String>() {{
+ add(SEARCH_RESULT_DOWNLOAD);
+ }};
+
+ private final AtlasDiscoveryService discoveryService;
+ private final AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ public SearchResultDownloadTaskFactory(AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
+ this.discoveryService = discoveryService;
+ this.typeRegistry = typeRegistry;
+ }
+
+ @Override
+ public AbstractTask create(AtlasTask task) {
+ String taskType = task.getType();
+ String taskGuid = task.getGuid();
+
+ switch (taskType) {
+ case SEARCH_RESULT_DOWNLOAD:
+ return new SearchResultDownloadTask(task, discoveryService, typeRegistry);
+
+ default:
+ LOG.warn("Type: {} - {} not found!. The task will be ignored.", taskType, taskGuid);
+ return null;
+ }
+ }
+
+ @Override
+ public List<String> getSupportedTypes() {
+ return this.supportedTypes;
+ }
+
+ @Scheduled(cron = "#{getCronExpressionForCleanup}")
+ public void cleanupExpiredFiles() {
+ File csvDir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH);
+ deleteFiles(csvDir);
+ }
+
+ @Bean
+ private String getCronExpressionForCleanup() {
+ return FILES_CLEANUP_INTERVAL;
+ }
+
+ private void deleteFiles(File downloadDir) {
+
+ File[] subDirs = downloadDir.listFiles();
+
+ if (ArrayUtils.isNotEmpty(subDirs)) {
+ for (File subDir : subDirs) {
+ File[] csvFiles = subDir.listFiles();
+
+ if (ArrayUtils.isNotEmpty(csvFiles)) {
+ for (File csv : csvFiles) {
+ BasicFileAttributes attr;
+
+ try {
+ attr = Files.readAttributes(csv.toPath(), BasicFileAttributes.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ Date now = new Date();
+ long fileAgeInMillis = now.getTime() - attr.creationTime().toMillis();
+
+ if (FILE_EXP_DURATION_IN_MILLIS < fileAgeInMillis) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleting file: {}", csv.getName());
+ }
+ csv.delete();
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
index 5b4bf71..a8a1b9e 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskManagement.java
@@ -120,6 +120,10 @@
return this.registry.getPendingTasks();
}
+ public List<AtlasTask> getPendingTasksByType(String type) {
+ return this.registry.getPendingTasksByType(type);
+ }
+
public List<AtlasTask> getAll() {
return this.registry.getAll();
}
diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
index 6f770ed..5d1f50f 100644
--- a/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
+++ b/repository/src/main/java/org/apache/atlas/tasks/TaskRegistry.java
@@ -85,6 +85,31 @@
}
@GraphTransaction
+ public List<AtlasTask> getPendingTasksByType(String type) {
+ List<AtlasTask> ret = new ArrayList<>();
+
+ try {
+ AtlasGraphQuery query = graph.query()
+ .has(Constants.TASK_TYPE_PROPERTY_KEY, Constants.TASK_TYPE_NAME)
+ .has(Constants.TASK_STATUS, AtlasTask.Status.PENDING)
+ .has(Constants.TASK_TYPE, type)
+ .orderBy(Constants.TASK_CREATED_TIME, AtlasGraphQuery.SortOrder.ASC);
+
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ while (results.hasNext()) {
+ AtlasVertex vertex = results.next();
+
+ ret.add(toAtlasTask(vertex));
+ }
+ } catch (Exception exception) {
+ LOG.error("Error fetching pending tasks by type!", exception);
+ }
+
+ return ret;
+ }
+
+ @GraphTransaction
public void updateStatus(AtlasVertex taskVertex, AtlasTask task) {
if (taskVertex == null) {
return;
diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
index a6ca04f..d1d1907 100644
--- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
+++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java
@@ -19,6 +19,7 @@
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.Timed;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
@@ -29,9 +30,11 @@
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.util.Servlets;
import org.apache.commons.collections.CollectionUtils;
@@ -54,10 +57,21 @@
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC;
+import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL;
+import static org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask.*;
+
/**
* REST interface for data discovery using dsl or full text search
*/
@@ -107,18 +121,10 @@
@QueryParam("classification") String classification,
@QueryParam("limit") int limit,
@QueryParam("offset") int offset) throws AtlasBaseException {
- Servlets.validateQueryParamLength("typeName", typeName);
- Servlets.validateQueryParamLength("classification", classification);
- if (StringUtils.isNotEmpty(query)) {
- if (query.length() > maxDslQueryLength) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_DSL_QUERY_STR_LENGTH);
- }
- query = Servlets.decodeQueryString(query);
- }
+ validateDSLSearchParameters(query, typeName, classification);
AtlasPerfTracer perf = null;
-
try {
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchUsingDSL(" + query + "," + typeName
@@ -134,6 +140,23 @@
}
+ /**
+ *
+ * @param parameterMap
+ * @throws AtlasBaseException
+ */
+ @POST
+ @Timed
+ @Path("dsl/download/create_file")
+ public void dslSearchCreateFile(Map<String, Object> parameterMap) throws AtlasBaseException {
+ SearchParameters parameters = AtlasJson.fromLinkedHashMap(parameterMap.get("searchParameters"), SearchParameters.class);
+
+ validateDSLSearchParameters(parameters.getQuery(), parameters.getTypeName(), parameters.getClassification());
+
+ Map<String, Object> taskParams = populateTaskParams(parameters.getQuery(), parameters.getTypeName(), parameters.getClassification(), parameters.getLimit(), parameters.getOffset());
+
+ discoveryService.createAndQueueSearchResultDownloadTask(taskParams);
+ }
/**
* Retrieve data for the specified fulltext query
@@ -330,24 +353,7 @@
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchWithParameters(" + parameters + ")");
}
- if (parameters.getLimit() < 0 || parameters.getOffset() < 0) {
- throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative");
- }
-
- if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) {
- throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name");
- }
-
- if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) {
- throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
- }
-
- if (StringUtils.isEmpty(parameters.getTypeName()) && StringUtils.isEmpty(parameters.getClassification()) &&
- StringUtils.isEmpty(parameters.getQuery()) && StringUtils.isEmpty(parameters.getTermName())) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_SEARCH_PARAMS);
- }
-
- validateSearchParameters(parameters);
+ validateBasicSearchParameters(parameters);
return discoveryService.searchWithParameters(parameters);
} finally {
@@ -356,6 +362,67 @@
}
/**
+ *
+ * @param parameterMap
+ * @throws AtlasBaseException
+ */
+ @POST
+ @Timed
+ @Path("basic/download/create_file")
+ public void basicSearchCreateFile(Map<String, Object> parameterMap) throws AtlasBaseException {
+ AtlasPerfTracer perf = null;
+
+ try {
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.basicSearchCreateFile(" + parameterMap + ")");
+ }
+
+ Map<String, String> attributeLabelMap = (Map<String, String>) parameterMap.get("attributeLabelMap");
+ SearchParameters parameters = AtlasJson.fromLinkedHashMap(parameterMap.get("searchParameters"), SearchParameters.class);
+
+ validateBasicSearchParameters(parameters);
+
+ Map<String, Object> taskParams = populateTaskParams(parameters, attributeLabelMap);
+
+ discoveryService.createAndQueueSearchResultDownloadTask(taskParams);
+
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ @GET
+ @Timed
+ @Path("download/status")
+ public AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws IOException {
+ return discoveryService.getSearchResultDownloadStatus();
+ }
+
+ /**
+ *
+ * @param fileName
+ * @return
+ */
+ @GET
+ @Timed
+ @Path("download/{filename}")
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ public Response downloadSearchResultFile(@PathParam("filename") String fileName) {
+
+ File dir = new File(SearchResultDownloadTask.DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
+ File csvFile = new File(dir, fileName);
+
+ if (!csvFile.exists()) {
+ return Response.noContent().build();
+ }
+
+ Response.ResponseBuilder response = Response.ok(csvFile);
+ response.header("Content-Disposition", "attachment; filename=\"" + fileName + "\"");
+
+ return response.build();
+ }
+
+ /**
* Relationship search to search for relations(edges)
*
* @param parameters RelationshipSearchParameters
@@ -827,4 +894,78 @@
validateSearchParameters(EntityDiscoveryService.createSearchParameters(parameters));
}
}
-}
+
+ private void validateBasicSearchParameters(SearchParameters parameters) throws AtlasBaseException {
+
+ if (parameters.getLimit() < 0 || parameters.getOffset() < 0) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative");
+ }
+
+ if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name");
+ }
+
+ if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) {
+ throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name");
+ }
+
+ if (StringUtils.isEmpty(parameters.getTypeName()) && StringUtils.isEmpty(parameters.getClassification()) &&
+ StringUtils.isEmpty(parameters.getQuery()) && StringUtils.isEmpty(parameters.getTermName())) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_SEARCH_PARAMS);
+ }
+
+ validateSearchParameters(parameters);
+ }
+
+ private void validateDSLSearchParameters(String query, String typeName, String classification) throws AtlasBaseException {
+
+ Servlets.validateQueryParamLength("typeName", typeName);
+ Servlets.validateQueryParamLength("classification", classification);
+
+ if (StringUtils.isNotEmpty(query)) {
+ if (query.length() > maxDslQueryLength) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_QUERY_LENGTH, Constants.MAX_DSL_QUERY_STR_LENGTH);
+ }
+ Servlets.decodeQueryString(query);
+ }
+ }
+
+ private Map<String, Object> populateTaskParams(SearchParameters parameters, Map<String, String> attributeLabelMap) {
+
+ String searchParametersJson = AtlasJson.toJson(parameters);
+ String attrLabelMapJson = AtlasJson.toJson(attributeLabelMap);
+
+ Map<String, Object> taskParams = new HashMap<>();
+ taskParams.put(SEARCH_TYPE_KEY, BASIC);
+ taskParams.put(SEARCH_PARAMETERS_JSON_KEY, searchParametersJson);
+ taskParams.put(ATTRIBUTE_LABEL_MAP_KEY, attrLabelMapJson);
+
+ String csvFileName = RequestContext.getCurrentUser() + "_" + BASIC + "_" + getDateTimeString() + CSV_FILE_EXTENSION;
+ taskParams.put(CSV_FILE_NAME_KEY, csvFileName);
+
+ return taskParams;
+ }
+
+ private Map<String, Object> populateTaskParams(String query, String typeName, String classification, int limit, int offset) {
+
+ Map<String, Object> taskParams = new HashMap<>();
+ taskParams.put(SEARCH_TYPE_KEY, DSL);
+ taskParams.put(QUERY_KEY, query);
+ taskParams.put(TYPE_NAME_KEY, typeName);
+ taskParams.put(CLASSIFICATION_KEY, classification);
+ taskParams.put(LIMIT_KEY, limit);
+ taskParams.put(OFFSET_KEY, offset);
+
+ String csvFileName = RequestContext.getCurrentUser() + "_" + DSL + "_" + getDateTimeString() + CSV_FILE_EXTENSION;
+ taskParams.put(CSV_FILE_NAME_KEY, csvFileName);
+
+ return taskParams;
+ }
+
+ private String getDateTimeString() {
+ LocalDateTime localDateTime = LocalDateTime.now();
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss.SSS");
+
+ return formatter.format(localDateTime);
+ }
+}
\ No newline at end of file