blob: fd90fd440c9d3f66b3773aa8788eb1ec27201e04 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.store.graph.v2.tasks.searchdownload;
import com.opencsv.CSVWriter;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.tasks.AbstractTask;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.BASIC;
import static org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType.DSL;
import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
public class SearchResultDownloadTask extends AbstractTask {
private static final Logger LOG = LoggerFactory.getLogger(SearchResultDownloadTask.class);
public static final String SEARCH_PARAMETERS_JSON_KEY = "search_parameters_json";
public static final String CSV_FILE_NAME_KEY = "csv_file_Name";
public static final String SEARCH_TYPE_KEY = "search_type";
public static final String ATTRIBUTE_LABEL_MAP_KEY = "attribute_label_map";
public static final String QUERY_KEY = "query";
public static final String TYPE_NAME_KEY = "type_name";
public static final String CLASSIFICATION_KEY = "classification";
public static final String LIMIT_KEY = "limit";
public static final String OFFSET_KEY = "offset";
public static final String CSV_FILE_EXTENSION = ".csv";
public static String DOWNLOAD_DIR_PATH;
private static final String EMPTY_STRING = "";
private static final String DOWNLOAD_DIR_PATH_KEY = "atlas.download.search.dir.path";
private static final String DOWNLOAD_DIR_PATH_DEFAULT = StringUtils.isEmpty(System.getProperty("atlas.home")) ? System.getProperty("user.dir") : System.getProperty("atlas.home");
private static final String CSV_DOWNLOAD_DIR = "search_result_downloads";
private static Configuration configuration;
static {
try {
configuration = ApplicationProperties.get();
} catch (AtlasException e) {
LOG.error("Failed to load application properties", e);
}
if (configuration != null) {
DOWNLOAD_DIR_PATH = configuration.getString(DOWNLOAD_DIR_PATH_KEY, DOWNLOAD_DIR_PATH_DEFAULT) + File.separator + CSV_DOWNLOAD_DIR;
} else {
DOWNLOAD_DIR_PATH = DOWNLOAD_DIR_PATH_DEFAULT + File.separator + CSV_DOWNLOAD_DIR;
}
}
private final AtlasDiscoveryService discoveryService;
private final AtlasTypeRegistry typeRegistry;
public SearchResultDownloadTask(AtlasTask task, AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
super(task);
this.discoveryService = discoveryService;
this.typeRegistry = typeRegistry;
}
@Override
public AtlasTask.Status perform() throws Exception {
RequestContext.clear();
Map<String, Object> params = getTaskDef().getParameters();
if (MapUtils.isEmpty(params)) {
LOG.warn("Task: {}: Unable to process task: Parameters is not readable!", getTaskGuid());
return FAILED;
}
String userName = getTaskDef().getCreatedBy();
if (StringUtils.isEmpty(userName)) {
LOG.warn("Task: {}: Unable to process task as user name is empty!", getTaskGuid());
return FAILED;
}
RequestContext.get().setUser(userName, null);
try {
run(params);
setStatus(COMPLETE);
} catch (Exception e) {
LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
setStatus(FAILED);
throw e;
} finally {
RequestContext.clear();
}
return getStatus();
}
protected void run(Map<String, Object> parameters) throws AtlasBaseException, IOException {
Map<String, String> attributeLabelMap;
AtlasSearchResult searchResult = null;
AtlasSearchResult.AtlasQueryType queryType = null;
if (parameters.get(SEARCH_TYPE_KEY) == BASIC) {
String searchParametersJson = (String) parameters.get(SEARCH_PARAMETERS_JSON_KEY);
SearchParameters searchParameters = AtlasJson.fromJson(searchParametersJson, SearchParameters.class);
searchParameters.setLimit(AtlasConfiguration.SEARCH_MAX_LIMIT.getInt());
searchResult = discoveryService.searchWithParameters(searchParameters);
queryType = BASIC;
} else if (parameters.get(SEARCH_TYPE_KEY) == DSL) {
String query = (String) parameters.get(QUERY_KEY);
String typeName = (String) parameters.get(TYPE_NAME_KEY);
String classification = (String) parameters.get(CLASSIFICATION_KEY);
int offset = (int) parameters.get(OFFSET_KEY);
String queryStr = discoveryService.getDslQueryUsingTypeNameClassification(query, typeName, classification);
searchResult = discoveryService.searchUsingDslQuery(queryStr, AtlasConfiguration.SEARCH_MAX_LIMIT.getInt(), offset);
queryType = DSL;
}
String attributeLabelMapJson = (String) parameters.get(ATTRIBUTE_LABEL_MAP_KEY);
attributeLabelMap = AtlasJson.fromJson(attributeLabelMapJson, Map.class);
generateCSVFileFromSearchResult(searchResult, attributeLabelMap, queryType);
}
private void generateCSVFileFromSearchResult(AtlasSearchResult searchResult, Map<String, String> attributeLabelMap, AtlasSearchResult.AtlasQueryType queryType) throws IOException {
List<AtlasEntityHeader> allEntityHeaders = searchResult.getEntities();
AtlasSearchResult.AttributeSearchResult attributeSearchResult = searchResult.getAttributes();
String fileName = (String) getTaskDef().getParameters().get(CSV_FILE_NAME_KEY);
if ((queryType == BASIC && CollectionUtils.isEmpty(allEntityHeaders))
|| (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult == null))) {
LOG.info("No result found. Not generating csv file: {}", fileName);
return;
}
File dir = new File(DOWNLOAD_DIR_PATH, RequestContext.getCurrentUser());
File csvFile;
if (!dir.exists()) {
dir.mkdirs();
}
csvFile = new File(dir, fileName);
try (FileWriter fileWriter = new FileWriter(csvFile);
CSVWriter csvWriter = new CSVWriter(fileWriter)) {
String[] defaultHeaders = new String[]{"Type name", "Name", "Classifications", "Terms"};
String[] attributeHeaders;
int attrSize = 0;
if (attributeLabelMap == null) {
attributeLabelMap = new HashMap<>();
}
attributeLabelMap.put("Owner", "owner");
attributeLabelMap.put("Description", "description");
Collection<String> attributeHeaderLabels = attributeLabelMap.keySet();
if (queryType == DSL && (CollectionUtils.isEmpty(allEntityHeaders) && attributeSearchResult != null)) {
attributeHeaderLabels = attributeSearchResult.getName();
defaultHeaders = new String[0];
}
attrSize = (attributeHeaderLabels == null) ? 0 : attributeHeaderLabels.size();
attributeHeaders = new String[attrSize];
if (attributeHeaderLabels != null) {
attributeHeaders = attributeHeaderLabels.toArray(attributeHeaders);
}
int headerSize = attrSize + defaultHeaders.length;
String[] headers = new String[headerSize];
System.arraycopy(defaultHeaders, 0, headers, 0, defaultHeaders.length);
if (ArrayUtils.isNotEmpty(attributeHeaders)) {
System.arraycopy(attributeHeaders, 0, headers, defaultHeaders.length, attrSize);
}
csvWriter.writeNext(headers);
String[] entityRecords = new String[headerSize];
if (CollectionUtils.isNotEmpty(allEntityHeaders)) {
for (AtlasEntityHeader entityHeader : allEntityHeaders) {
entityRecords[0] = entityHeader.getTypeName();
entityRecords[1] = entityHeader.getDisplayText() != null ? entityHeader.getDisplayText() : entityHeader.getGuid();
entityRecords[2] = String.join(",", entityHeader.getClassificationNames());
entityRecords[3] = String.join(",", entityHeader.getMeaningNames());
if (MapUtils.isNotEmpty(entityHeader.getAttributes())) {
for (int i = defaultHeaders.length; i < headerSize; i++) {
Object attrValue = entityHeader.getAttribute(attributeLabelMap.get(headers[i]));
if (attrValue instanceof AtlasObjectId) {
entityRecords[i] = String.valueOf(((AtlasObjectId) attrValue).getUniqueAttributes().get("qualifiedName"));
} else if (attrValue instanceof List) {
if (CollectionUtils.isNotEmpty((List<?>) attrValue)) {
List<String> valueList = new ArrayList<>();
for (Object attrVal : (List) attrValue) {
if (attrVal instanceof AtlasObjectId) {
String value = String.valueOf(((AtlasObjectId) attrVal).getUniqueAttributes().get("qualifiedName"));
valueList.add(value);
} else {
valueList.add(String.valueOf(attrVal));
}
}
entityRecords[i] = String.join(",", valueList);
}
} else {
entityRecords[i] = attrValue == null ? EMPTY_STRING : String.valueOf(attrValue);
}
}
}
csvWriter.writeNext(entityRecords);
}
}
if (queryType == DSL && attributeSearchResult != null) {
for (List<Object> resultSet : attributeSearchResult.getValues()) {
for (int i = defaultHeaders.length; i < headerSize; i++) {
entityRecords[i] = resultSet.get(i) == null ? EMPTY_STRING : String.valueOf(resultSet.get(i));
}
csvWriter.writeNext(entityRecords);
}
}
}
}
}