blob: eb2af1d268a4070117c40cd993736341207b74d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.repl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClient;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerPolicy;
import org.apache.hadoop.hive.ql.exec.repl.ranger.NoOpRangerRestClient;
import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* RangerDumpTask.
*
* Exports the Ranger security policies to staging directory.
**/
public class RangerDumpTask extends Task<RangerDumpWork> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RangerDumpTask.class);
private transient RangerRestClient rangerRestClient;
private transient ReplLogger replLogger;
public RangerDumpTask() {
super();
}
@VisibleForTesting
RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) {
this.conf = conf;
this.work = work;
this.rangerRestClient = rangerRestClient;
}
@Override
public String getName() {
return "RANGER_DUMP";
}
@Override
public int execute() {
try {
long exportCount = 0;
Path filePath = null;
LOG.info("Exporting Ranger Metadata");
SecurityUtils.reloginExpiringKeytabUser();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L);
work.getMetricCollector().reportStageStart(getName(), metricMap);
replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString());
replLogger.startLog();
if (rangerRestClient == null) {
rangerRestClient = getRangerRestClient();
}
URL url = work.getRangerConfigResource();
if (url == null) {
throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
.format("Ranger configuration is not valid "
+ ReplUtils.RANGER_CONFIGURATION_RESOURCE_NAME, ReplUtils.REPL_RANGER_SERVICE));
}
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
if (StringUtils.isEmpty(rangerEndpoint)) {
throw new SemanticException(ErrorMsg.REPL_INVALID_CONFIG_FOR_SERVICE
.format("Ranger endpoint is not valid "
+ rangerEndpoint, ReplUtils.REPL_RANGER_SERVICE));
}
if (!rangerRestClient.checkConnection(rangerEndpoint, conf)) {
throw new SemanticException(ErrorMsg.REPL_EXTERNAL_SERVICE_CONNECTION_ERROR.format(ReplUtils
.REPL_RANGER_SERVICE,
"Ranger endpoint is not reachable " + rangerEndpoint));
}
RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint,
work.getDbName(), rangerHiveServiceName, conf);
List<RangerPolicy> rangerPolicies = rangerExportPolicyList.getPolicies();
if (rangerPolicies.isEmpty()) {
LOG.info("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
rangerExportPolicyList = new RangerExportPolicyList();
} else {
rangerPolicies = rangerRestClient.removeMultiResourcePolicies(rangerPolicies);
}
if (!CollectionUtils.isEmpty(rangerPolicies)) {
rangerExportPolicyList.setPolicies(rangerPolicies);
filePath = rangerRestClient.saveRangerPoliciesToFile(rangerExportPolicyList,
work.getCurrentDumpPath(), ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME, conf);
if (filePath != null) {
LOG.info("Ranger policy export finished successfully");
exportCount = rangerExportPolicyList.getListSize();
work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), exportCount);
}
}
replLogger.endLog(exportCount);
work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS);
LOG.debug("Ranger policy export filePath:" + filePath);
LOG.info("Number of ranger policies exported {}", exportCount);
return 0;
} catch (Exception e) {
LOG.error("failed", e);
setException(e);
int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
try {
if (errorCode > 40000) {
//Create non recoverable marker at top level
Path nonRecoverableMarker = new Path(work.getCurrentDumpPath().getParent(),
ReplAck.NON_RECOVERABLE_MARKER.toString());
Utils.writeStackTrace(e, nonRecoverableMarker, conf);
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED_ADMIN, nonRecoverableMarker.toString());
} else {
work.getMetricCollector().reportStageEnd(getName(), Status.FAILED);
}
} catch (Exception ex) {
LOG.error("Failed to collect Metrics ", ex);
}
return errorCode;
}
}
private RangerRestClient getRangerRestClient() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
return new NoOpRangerRestClient();
}
return new RangerRestClientImpl();
}
@Override
public StageType getType() {
return StageType.RANGER_DUMP;
}
@Override
public boolean canExecuteInParallel() {
return false;
}
}