blob: c20bcfab80d25c52a43f955d6373269bf43b33c9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ranger.patch.cliutil;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.Arrays;
import org.apache.log4j.Logger;
import org.apache.ranger.db.RangerDaoManager;
import org.apache.ranger.entity.XXAccessAudit;
import org.apache.ranger.entity.XXAccessAuditBase;
import org.apache.ranger.entity.XXAccessAuditV4;
import org.apache.ranger.entity.XXAccessAuditV5;
import org.apache.ranger.patch.BaseLoader;
import org.apache.ranger.solr.SolrAccessAuditsService;
import org.apache.ranger.audit.utils.InMemoryJAASConfiguration;
import org.apache.ranger.authorization.utils.StringUtil;
import org.apache.ranger.biz.RangerBizUtil;
import org.apache.ranger.common.AppConstants;
import org.apache.ranger.common.DateUtil;
import org.apache.ranger.common.PropertiesUtil;
import org.apache.ranger.util.CLIUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.Krb5HttpClientBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@Component
public class DbToSolrMigrationUtil extends BaseLoader {
private static final Logger logger = Logger.getLogger(DbToSolrMigrationUtil.class);
private final static String CHECK_FILE_NAME = "migration_check_file.txt";
private final static Charset ENCODING = StandardCharsets.UTF_8;
public static SolrClient solrClient = null;
public final static String SOLR_URLS_PROP = "ranger.audit.solr.urls";
public final static String SOLR_ZK_HOSTS = "ranger.audit.solr.zookeepers";
public final static String SOLR_COLLECTION_NAME = "ranger.audit.solr.collection.name";
public final static String PROP_JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
public final static String DEFAULT_COLLECTION_NAME = "ranger_audits";
@Autowired
RangerDaoManager daoManager;
@Autowired
SolrAccessAuditsService solrAccessAuditsService;
public static void main(String[] args) throws Exception {
logger.info("main()");
logger.info("Note: If migrating to Secure Solr, make sure SolrClient JAAS Properites are configured in ranger-admin-site.xml");
try {
DbToSolrMigrationUtil loader = (DbToSolrMigrationUtil) CLIUtil
.getBean(DbToSolrMigrationUtil.class);
loader.init();
while (loader.isMoreToProcess()) {
loader.load();
}
logger.info("Load complete. Exiting!!!");
System.exit(0);
} catch (Exception e) {
logger.error("Error loading", e);
System.exit(1);
} finally {
if (solrClient != null) {
solrClient.close();
}
}
}
@Override
public void init() throws Exception {
logger.info("==> DbToSolrMigrationUtil.init() Start.");
solrClient = createSolrClient();
logger.info("<== DbToSolrMigrationUtil.init() End.");
}
@Override
public void execLoad() {
logger.info("==> DbToSolrMigrationUtil.execLoad() Start.");
migrateAuditDbLogsToSolr();
logger.info("<== DbToSolrMigrationUtil.execLoad() End.");
}
public void migrateAuditDbLogsToSolr() {
System.out.println("Migration process is started..");
long maxXXAccessAuditID = daoManager.getXXAccessAudit().getMaxIdOfXXAccessAudit();
if(maxXXAccessAuditID==0){
logger.info("Access Audit log does not exist.");
System.out.println("Access Audit log does not exist in db.");
return;
}
long maxMigratedID=0;
try {
maxMigratedID = readMigrationStatusFile(CHECK_FILE_NAME);
} catch (IOException ex) {
logger.error("Failed to read migration status from file " + CHECK_FILE_NAME, ex);
}
logger.info("ID of the last available audit log: "+ maxXXAccessAuditID);
if(maxMigratedID > 0) {
logger.info("ID of the last migrated audit log: "+ maxMigratedID);
}
if(maxMigratedID>=maxXXAccessAuditID){
logger.info("No more DB Audit logs to migrate. Last migrated audit log ID: " + maxMigratedID);
System.out.println("No more DB Audit logs to migrate. Last migrated audit log ID: " + maxMigratedID);
return;
}
String db_flavor=AppConstants.getLabelFor_DatabaseFlavor(RangerBizUtil.getDBFlavor());
logger.info("DB flavor: " + db_flavor);
List<String> columnList=daoManager.getXXAccessAudit().getColumnNames(db_flavor);
int auditTableVersion=4;
if(columnList!=null){
if(columnList.contains("tags")){
auditTableVersion=6;
}else if(columnList.contains("seq_num") && columnList.contains("event_count") && columnList.contains("event_dur_ms")){
auditTableVersion=5;
}
}
logger.info("Columns Name:"+columnList);
long maxRowsPerBatch=10000;
//To ceil the actual division result i.e noOfBatches=maxXXAccessAuditID/maxRowsPerBatch
long noOfBatches=((maxXXAccessAuditID-maxMigratedID)+maxRowsPerBatch-1)/maxRowsPerBatch;
long rangeStart=maxMigratedID;
long rangeEnd=maxXXAccessAuditID-maxMigratedID<=maxRowsPerBatch ? maxXXAccessAuditID : rangeStart+maxRowsPerBatch;
long startTimeInMS=0;
long timeTaken=0;
long lastMigratedID=0;
long totalMigratedLogs=0;
for(long index=1;index<=noOfBatches;index++){
logger.info("Batch "+ index+" of total "+noOfBatches);
System.out.println("Processing batch "+ index+" of total "+noOfBatches);
startTimeInMS=System.currentTimeMillis();
//rangeStart and rangeEnd both exclusive, if we add +1 in maxRange
if(auditTableVersion==4){
List<XXAccessAuditV4> xXAccessAuditV4List=daoManager.getXXAccessAudit().getByIdRangeV4(rangeStart,rangeEnd+1);
if(!CollectionUtils.isEmpty(xXAccessAuditV4List)){
for(XXAccessAuditV4 xXAccessAudit:xXAccessAuditV4List){
if(xXAccessAudit!=null){
try {
send2solr(xXAccessAudit);
lastMigratedID=xXAccessAudit.getId();
totalMigratedLogs++;
} catch (Throwable e) {
logger.error("Error while writing audit log id '"+xXAccessAudit.getId()+"' to Solr.", e);
writeMigrationStatusFile(lastMigratedID,CHECK_FILE_NAME);
logger.info("Stopping migration process!");
System.out.println("Error while writing audit log id '"+xXAccessAudit.getId()+"' to Solr.");
System.out.println("Migration process failed, Please refer ranger_db_patch.log file.");
return;
}
}
}
}
}else if(auditTableVersion==5){
List<XXAccessAuditV5> xXAccessAuditV5List=daoManager.getXXAccessAudit().getByIdRangeV5(rangeStart,rangeEnd+1);
if(!CollectionUtils.isEmpty(xXAccessAuditV5List)){
for(XXAccessAuditV5 xXAccessAudit:xXAccessAuditV5List){
if(xXAccessAudit!=null){
try {
send2solr(xXAccessAudit);
lastMigratedID=xXAccessAudit.getId();
totalMigratedLogs++;
} catch (Throwable e) {
logger.error("Error while writing audit log id '"+xXAccessAudit.getId()+"' to Solr.", e);
writeMigrationStatusFile(lastMigratedID,CHECK_FILE_NAME);
logger.info("Stopping migration process!");
System.out.println("Error while writing audit log id '"+xXAccessAudit.getId()+"' to Solr.");
System.out.println("Migration process failed, Please refer ranger_db_patch.log file.");
return;
}
}
}
}
}
else if(auditTableVersion==6){
List<XXAccessAudit> xXAccessAuditV6List=daoManager.getXXAccessAudit().getByIdRangeV6(rangeStart,rangeEnd+1);
if(!CollectionUtils.isEmpty(xXAccessAuditV6List)){
for(XXAccessAudit xXAccessAudit:xXAccessAuditV6List){
if(xXAccessAudit!=null){
try {
send2solr(xXAccessAudit);
lastMigratedID=xXAccessAudit.getId();
totalMigratedLogs++;
} catch (Throwable e) {
logger.error("Error while writing audit log id '"+xXAccessAudit.getId()+"' to Solr.", e);
writeMigrationStatusFile(lastMigratedID,CHECK_FILE_NAME);
logger.info("Stopping migration process!");
System.out.println("Error while writing audit log id '"+xXAccessAudit.getId()+"' to Solr.");
System.out.println("Migration process failed, Please refer ranger_db_patch.log file.");
return;
}
}
}
}
}
timeTaken=(System.currentTimeMillis()-startTimeInMS);
logger.info("Batch #" + index + ": time taken:"+timeTaken+" ms");
if(rangeEnd<maxXXAccessAuditID){
writeMigrationStatusFile(rangeEnd,CHECK_FILE_NAME);
}else{
writeMigrationStatusFile(maxXXAccessAuditID,CHECK_FILE_NAME);
}
rangeStart=rangeEnd;
rangeEnd=rangeEnd+maxRowsPerBatch;
}
if(totalMigratedLogs>0){
System.out.println("Total Number of Migrated Audit logs:"+totalMigratedLogs);
logger.info("Total Number of Migrated Audit logs:"+totalMigratedLogs);
}
if(solrClient!=null){
try {
solrClient.close();
} catch (IOException e) {
logger.error("Error while closing solr connection", e);
}finally{
solrClient=null;
}
}
System.out.println("Migration process finished!!");
}
public void send2solr(XXAccessAuditV4 xXAccessAudit) throws Throwable {
SolrInputDocument document = new SolrInputDocument();
toSolrDocument(xXAccessAudit,document);
UpdateResponse response = solrClient.add(document);
if (response.getStatus() != 0) {
logger.info("Response=" + response.toString() + ", status= "
+ response.getStatus() + ", event=" + xXAccessAudit.toString());
throw new Exception("Failed to send audit event ID=" + xXAccessAudit.getId());
}
}
public void send2solr(XXAccessAuditV5 xXAccessAudit) throws Throwable {
SolrInputDocument document = new SolrInputDocument();
toSolrDocument(xXAccessAudit,document);
UpdateResponse response = solrClient.add(document);
if (response.getStatus() != 0) {
logger.info("Response=" + response.toString() + ", status= "
+ response.getStatus() + ", event=" + xXAccessAudit.toString());
throw new Exception("Failed to send audit event ID=" + xXAccessAudit.getId());
}
}
public void send2solr(XXAccessAudit xXAccessAudit) throws Throwable {
SolrInputDocument document = new SolrInputDocument();
toSolrDocument(xXAccessAudit,document);
UpdateResponse response = solrClient.add(document);
if (response.getStatus() != 0) {
logger.info("Response=" + response.toString() + ", status= "
+ response.getStatus() + ", event=" + xXAccessAudit.toString());
throw new Exception("Failed to send audit event ID=" + xXAccessAudit.getId());
}
}
private void toSolrDocument(XXAccessAuditBase xXAccessAudit, SolrInputDocument document) {
// add v4 fields
document.addField("id", xXAccessAudit.getId());
document.addField("access", xXAccessAudit.getAccessType());
document.addField("enforcer", xXAccessAudit.getAclEnforcer());
document.addField("agent", xXAccessAudit.getAgentId());
document.addField("repo", xXAccessAudit.getRepoName());
document.addField("sess", xXAccessAudit.getSessionId());
document.addField("reqUser", xXAccessAudit.getRequestUser());
document.addField("reqData", xXAccessAudit.getRequestData());
document.addField("resource", xXAccessAudit.getResourcePath());
document.addField("cliIP", xXAccessAudit.getClientIP());
document.addField("logType", "RangerAudit");
document.addField("result", xXAccessAudit.getAccessResult());
document.addField("policy", xXAccessAudit.getPolicyId());
document.addField("repoType", xXAccessAudit.getRepoType());
document.addField("resType", xXAccessAudit.getResourceType());
document.addField("reason", xXAccessAudit.getResultReason());
document.addField("action", xXAccessAudit.getAction());
document.addField("evtTime", DateUtil.getLocalDateForUTCDate(xXAccessAudit.getEventTime()));
SolrInputField idField = document.getField("id");
boolean uidIsString = true;
if( idField == null) {
Object uid = null;
if(uidIsString) {
uid = UUID.randomUUID().toString();
}
document.setField("id", uid);
}
}
private void toSolrDocument(XXAccessAuditV5 xXAccessAudit, SolrInputDocument document) {
toSolrDocument((XXAccessAuditBase)xXAccessAudit, document);
// add v5 fields
document.addField("seq_num", xXAccessAudit.getSequenceNumber());
document.addField("event_count", xXAccessAudit.getEventCount());
document.addField("event_dur_ms", xXAccessAudit.getEventDuration());
}
private void toSolrDocument(XXAccessAudit xXAccessAudit,SolrInputDocument document) {
toSolrDocument((XXAccessAuditBase)xXAccessAudit, document);
// add v6 fields
document.addField("seq_num", xXAccessAudit.getSequenceNumber());
document.addField("event_count", xXAccessAudit.getEventCount());
document.addField("event_dur_ms", xXAccessAudit.getEventDuration());
document.addField("tags", xXAccessAudit.getTags());
}
private Long readMigrationStatusFile(String aFileName) throws IOException {
Long migratedDbID=0L;
Path path = Paths.get(aFileName);
if (Files.exists(path) && Files.isRegularFile(path)) {
List<String> fileContents=Files.readAllLines(path, ENCODING);
if(fileContents!=null && fileContents.size()>=1){
String line=fileContents.get(fileContents.size()-1).trim();
if(!StringUtil.isEmpty(line)){
try{
migratedDbID=Long.parseLong(line);
}catch(Exception ex){
}
}
}
}
return migratedDbID;
}
private void writeMigrationStatusFile(Long DbID, String aFileName) {
try{
Path path = Paths.get(aFileName);
List<String> fileContents=new ArrayList<String>();
fileContents.add(String.valueOf(DbID));
Files.write(path, fileContents, ENCODING);
}catch(IOException ex){
logger.error("Failed to update migration status to file " + CHECK_FILE_NAME, ex);
}catch(Exception ex){
logger.error("Error while updating migration status to file " + CHECK_FILE_NAME, ex);
}
}
@Override
public void printStats() {
}
private SolrClient createSolrClient() throws Exception {
SolrClient solrClient = null;
registerSolrClientJAAS();
String zkHosts = PropertiesUtil
.getProperty(SOLR_ZK_HOSTS);
if (zkHosts == null) {
zkHosts = PropertiesUtil
.getProperty("ranger.audit.solr.zookeeper");
}
if (zkHosts == null) {
zkHosts = PropertiesUtil
.getProperty("ranger.solr.zookeeper");
}
String solrURL = PropertiesUtil
.getProperty(SOLR_URLS_PROP);
if (solrURL == null) {
// Try with url
solrURL = PropertiesUtil
.getProperty("ranger.audit.solr.url");
}
if (solrURL == null) {
// Let's try older property name
solrURL = PropertiesUtil
.getProperty("ranger.solr.url");
}
if (zkHosts != null && !"".equals(zkHosts.trim())
&& !"none".equalsIgnoreCase(zkHosts.trim())) {
zkHosts = zkHosts.trim();
String collectionName = PropertiesUtil
.getProperty(SOLR_COLLECTION_NAME);
if (collectionName == null
|| "none".equalsIgnoreCase(collectionName)) {
collectionName = DEFAULT_COLLECTION_NAME;
}
logger.info("Solr zkHosts=" + zkHosts
+ ", collectionName=" + collectionName);
try {
// Instantiate
Krb5HttpClientBuilder krbBuild = new Krb5HttpClientBuilder();
SolrHttpClientBuilder kb = krbBuild.getBuilder();
HttpClientUtil.setHttpClientBuilder(kb);
final List<String> zkhosts = new ArrayList<String>(Arrays.asList(zkHosts.split(",")));
CloudSolrClient solrCloudClient = new CloudSolrClient.Builder(zkhosts, null).build();
solrCloudClient
.setDefaultCollection(collectionName);
return solrCloudClient;
} catch (Exception e) {
logger.fatal(
"Can't connect to Solr server. ZooKeepers="
+ zkHosts + ", collection="
+ collectionName, e);
throw e;
}
} else {
if (solrURL == null || solrURL.isEmpty()
|| "none".equalsIgnoreCase(solrURL)) {
logger.fatal("Solr ZKHosts and URL for Audit are empty. Please set property "
+ SOLR_ZK_HOSTS
+ " or "
+ SOLR_URLS_PROP);
} else {
try {
Krb5HttpClientBuilder krbBuild = new Krb5HttpClientBuilder();
SolrHttpClientBuilder kb = krbBuild.getBuilder();
HttpClientUtil.setHttpClientBuilder(kb);
HttpSolrClient.Builder builder = new HttpSolrClient.Builder();
builder.withBaseSolrUrl(solrURL);
builder.allowCompression(true);
builder.withConnectionTimeout(1000);
HttpSolrClient httpSolrClient = builder.build();
httpSolrClient.setRequestWriter(new BinaryRequestWriter());
solrClient = httpSolrClient;
} catch (Exception e) {
logger.fatal(
"Can't connect to Solr server. URL="
+ solrURL, e);
throw e;
}
}
}
return solrClient;
}
private void registerSolrClientJAAS() {
logger.info("==> createSolrClient.registerSolrClientJAAS()" );
Properties props = PropertiesUtil.getProps();
try {
// SolrJ requires "java.security.auth.login.config" property to be set to identify itself that it is kerberized. So using a dummy property for it
// Acutal solrclient JAAS configs are read from the ranger-admin-site.xml in ranger admin config folder and set by InMemoryJAASConfiguration
// Refer InMemoryJAASConfiguration doc for JAAS Configuration
if ( System.getProperty(PROP_JAVA_SECURITY_AUTH_LOGIN_CONFIG) == null ) {
System.setProperty(PROP_JAVA_SECURITY_AUTH_LOGIN_CONFIG, "/dev/null");
}
logger.info("Loading SolrClient JAAS config from Ranger audit config if present...");
InMemoryJAASConfiguration.init(props);
} catch (Exception e) {
logger.error("ERROR: Unable to load SolrClient JAAS config from ranger admin config file. Audit migration to Secure Solr will fail...",e);
}
logger.info("<==createSolrClient.registerSolrClientJAAS()" );
}
}