blob: 3a658473a86e6a23396610a40926321215fe5822 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.repl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* Atlas Metadata Replication Load Task.
**/
public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
private static final long serialVersionUID = 1L;
private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);
public AtlasLoadTask() {
super();
}
@VisibleForTesting
AtlasLoadTask(final HiveConf conf, final AtlasLoadWork work) {
this.conf = conf;
this.work = work;
}
@Override
public int execute() {
try {
SecurityUtils.reloginExpiringKeytabUser();
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.ENTITIES.name(), 0L);
work.getMetricCollector().reportStageStart(getName(), metricMap);
LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
atlasReplInfo.getStagingDir().toString());
replLogger.startLog();
int importCount = importAtlasMetadata(atlasReplInfo);
replLogger.endLog(importCount);
work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.ENTITIES.name(), importCount);
LOG.info("Atlas entities import count {}", importCount);
work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS);
return 0;
} catch (Exception e) {
LOG.error("Exception while loading atlas metadata", e);
setException(e);
int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
try {
if (errorCode > 40000) {
//Create non recoverable marker at top level
Path nonRecoverableMarker = new Path(work.getStagingDir().getParent(),
ReplAck.NON_RECOVERABLE_MARKER.toString());
Utils.writeStackTrace(e, nonRecoverableMarker, conf);
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString());
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
}
}
AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat))
.toString();
String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat);
String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat);
AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(),
srcCluster, tgtCluster, work.getStagingDir(), conf);
atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
return atlasReplInfo;
}
private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
Retryable retryable = Retryable.builder()
.withHiveConf(conf)
.withRetryOnException(IOException.class).build();
try {
return retryable.executeCallable(() -> {
BufferedReader br = null;
try {
FileSystem fs = metadataPath.getFileSystem(conf);
br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset()));
String line = br.readLine();
if (line == null) {
throw new SemanticException(ErrorMsg.REPL_INVALID_INTERNAL_CONFIG_FOR_SERVICE.format("Could not read stored " +
"src FS Uri from atlas metadata file", ReplUtils.REPL_ATLAS_SERVICE));
}
String[] lineContents = line.split("\t", 5);
return lineContents[0];
} finally {
if (br != null) {
br.close();
}
}
});
} catch (Exception e) {
throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
}
}
private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception {
AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder();
AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest(
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
atlasReplInfo.getSrcCluster(), atlasReplInfo.getTgtCluster(),
atlasReplInfo.getSrcFsUri(), atlasReplInfo.getTgtFsUri());
AtlasImportResult result = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint())
.getClient(atlasReplInfo.getConf()).importData(importRequest, atlasReplInfo);
if (result == null || result.getProcessedEntities() == null) {
LOG.info("No Atlas entity found");
return 0;
}
return result.getProcessedEntities().size();
}
@Override
public StageType getType() {
return StageType.ATLAS_LOAD;
}
@Override
public String getName() {
return "ATLAS_LOAD";
}
@Override
public boolean canExecuteInParallel() {
return false;
}
}