blob: b061761d93ea49a57d4a2d00f298cff408629238 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.audit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.core.annotation.Order;
import javax.inject.Singleton;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* HBase based repository for entity audit events
* <p>
* Table -> 1, ATLAS_ENTITY_EVENTS <br>
* Key -> entity id + timestamp <br>
* Column Family -> 1,dt <br>
* Columns -> action, user, detail <br>
* versions -> 1 <br>
* <p>
* Note: The timestamp in the key is assumed to be timestamp in milli seconds. Since the key is
* entity id + timestamp, and only 1 version is kept, there can be just 1 audit event per entity
* id + timestamp. This is ok for one atlas server. But if there are more than one atlas servers,
* we should use server id in the key
*/
@Singleton
@Component
@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true)
@Order(0)
public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditRepository {
private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class);
public static final String CONFIG_PREFIX = "atlas.audit";
public static final String CONFIG_TABLE_NAME = CONFIG_PREFIX + ".hbase.tablename";
public static final String DEFAULT_TABLE_NAME = "ATLAS_ENTITY_AUDIT_EVENTS";
public static final String CONFIG_PERSIST_ENTITY_DEFINITION = CONFIG_PREFIX + ".persistEntityDefinition";
public static final byte[] COLUMN_FAMILY = Bytes.toBytes("dt");
public static final byte[] COLUMN_ACTION = Bytes.toBytes("a");
public static final byte[] COLUMN_DETAIL = Bytes.toBytes("d");
public static final byte[] COLUMN_USER = Bytes.toBytes("u");
public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("f");
private static final String AUDIT_REPOSITORY_MAX_SIZE_PROPERTY = "atlas.hbase.client.keyvalue.maxsize";
private static final String AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY = "atlas.audit.hbase.entity";
private static final String FIELD_SEPARATOR = ":";
private static final long ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE = 1024 * 1024;
private static Configuration APPLICATION_PROPERTIES = null;
private static final int DEFAULT_CACHING = 200;
private static boolean persistEntityDefinition;
private Map<String, List<String>> auditExcludedAttributesCache = new HashMap<>();
static {
try {
persistEntityDefinition = ApplicationProperties.get().getBoolean(CONFIG_PERSIST_ENTITY_DEFINITION, false);
} catch (AtlasException e) {
throw new RuntimeException(e);
}
}
private TableName tableName;
private Connection connection;
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
@Override
public void putEventsV1(EntityAuditEvent... events) throws AtlasException {
putEventsV1(Arrays.asList(events));
}
/**
* Add events to the event repository
* @param events events to be added
* @throws AtlasException
*/
@Override
public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException {
if (LOG.isDebugEnabled()) {
LOG.debug("Putting {} events", events.size());
}
Table table = null;
try {
table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.size());
for (int index = 0; index < events.size(); index++) {
EntityAuditEvent event = events.get(index);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding entity audit event {}", event);
}
Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index));
addColumn(put, COLUMN_ACTION, event.getAction());
addColumn(put, COLUMN_USER, event.getUser());
addColumn(put, COLUMN_DETAIL, event.getDetails());
if (persistEntityDefinition) {
addColumn(put, COLUMN_DEFINITION, event.getEntityDefinitionString());
}
puts.add(put);
}
table.put(puts);
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(table);
}
}
@Override
public void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException {
putEventsV2(Arrays.asList(events));
}
@Override
public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Putting {} events", events.size());
}
Table table = null;
try {
table = connection.getTable(tableName);
List<Put> puts = new ArrayList<>(events.size());
for (int index = 0; index < events.size(); index++) {
EntityAuditEventV2 event = events.get(index);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding entity audit event {}", event);
}
Put put = new Put(getKey(event.getEntityId(), event.getTimestamp(), index));
addColumn(put, COLUMN_ACTION, event.getAction());
addColumn(put, COLUMN_USER, event.getUser());
addColumn(put, COLUMN_DETAIL, event.getDetails());
if (persistEntityDefinition) {
addColumn(put, COLUMN_DEFINITION, event.getEntityDefinitionString());
}
puts.add(put);
}
table.put(puts);
} catch (IOException e) {
throw new AtlasBaseException(e);
} finally {
try {
close(table);
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
}
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, operation {}, starting key{}, maximum result count {}", entityId, auditAction.toString(), startKey, maxResultCount);
}
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listSortedEventsV2");
Table table = null;
ResultScanner scanner = null;
try {
table = connection.getTable(tableName);
/**
* Scan Details:
* In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
* Page filter is set to limit the number of results returned if needed
* Stop row is set to the entity id to avoid going past the current entity while scanning
* SingleColumnValueFilter is been used to match the operation at COLUMN_FAMILY->COLUMN_ACTION
* Small is set to true to optimise RPC calls as the scanner is created per request
* setCaching(DEFAULT_CACHING) will increase the payload size to DEFAULT_CACHING rows per remote call and
* both types of next() take these settings into account.
*/
Scan scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true);
if(maxResultCount > -1) {
scan.setFilter(new PageFilter(maxResultCount));
}
if (auditAction != null) {
Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY,
COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
scan.setFilter(filterAction);
}
if(StringUtils.isNotBlank(entityId)) {
scan.setStopRow(Bytes.toBytes(entityId));
}
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
}
scanner = table.getScanner(scan);
List<EntityAuditEventV2> events = new ArrayList<>();
Result result;
//PageFilter doesn't ensure maxResultCount results are returned. The filter is per region server.
//So, adding extra check on maxResultCount
while ((result = scanner.next()) != null && (maxResultCount == -1 || events.size() < maxResultCount)) {
EntityAuditEventV2 event = fromKeyV2(result.getRow());
//In case the user sets random start key, guarding against random events if entityId is provided
if (StringUtils.isNotBlank(entityId) && !event.getEntityId().equals(entityId)) {
continue;
}
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
if (persistEntityDefinition) {
String colDef = getResultString(result, COLUMN_DEFINITION);
if (colDef != null) {
event.setEntityDefinition(colDef);
}
}
events.add(event);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got events for entity id {}, operation {}, starting key{}, maximum result count {}, #records returned {}",
entityId, auditAction.toString(), startKey, maxResultCount, events.size());
}
return events;
} catch (IOException e) {
throw new AtlasBaseException(e);
} finally {
try {
close(scanner);
close(table);
RequestContext.get().endMetricRecord(metric);
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
}
}
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})", entityId, auditAction, sortByColumn, offset, limit);
}
AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("listEventsV2");
if (sortByColumn == null) {
sortByColumn = EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
}
if (offset < 0) {
offset = 0;
}
if (limit < 0) {
limit = 100;
}
try (Table table = connection.getTable(tableName)) {
/*
* HBase Does not support query with sorted results. To support this API inmemory sort has to be performed.
* Audit entry can potentially have entire entity dumped into it. Loading entire audit entries for an entity can be
* memory intensive. Therefore we load audit entries with limited columns first, perform sort on this light weight list,
* then get the relevant section by removing offsets and reducing to limits. With this reduced list we create
* MultiRowRangeFilter and then again scan the table to get all the columns from the table this time.
*/
Scan scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.addColumn(COLUMN_FAMILY, COLUMN_ACTION)
.addColumn(COLUMN_FAMILY, COLUMN_USER);
if (auditAction != null) {
Filter filterAction = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(auditAction.toString())));
scan.setFilter(filterAction);
}
List<EntityAuditEventV2> events = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result = scanner.next(); result != null; result = scanner.next()) {
EntityAuditEventV2 event = fromKeyV2(result.getRow());
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
events.add(event);
}
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
events = events.subList(Math.min(events.size(), offset), Math.min(events.size(), offset + limit));
if (events.size() > 0) {
List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
events.forEach(e -> {
ranges.add(new MultiRowRangeFilter.RowRange(e.getEventKey(), true, e.getEventKey(), true));
});
scan = new Scan().setReversed(true)
.setCaching(DEFAULT_CACHING)
.setSmall(true)
.setStopRow(Bytes.toBytes(entityId))
.setStartRow(getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE))
.setFilter(new MultiRowRangeFilter(ranges));
try (ResultScanner scanner = table.getScanner(scan)) {
events = new ArrayList<>();
for (Result result = scanner.next(); result != null; result = scanner.next()) {
EntityAuditEventV2 event = fromKeyV2(result.getRow());
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditActionV2.fromString(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
if (persistEntityDefinition) {
String colDef = getResultString(result, COLUMN_DEFINITION);
if (colDef != null) {
event.setEntityDefinition(colDef);
}
}
events.add(event);
}
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}", entityId, auditAction, sortByColumn, offset, limit, events.size());
}
return events;
} catch (IOException e) {
throw new AtlasBaseException(e);
} finally {
RequestContext.get().endMetricRecord(metric);
}
}
@Override
public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException {
List ret = listEventsV2(entityId, null, startKey, maxResults);
try {
if (CollectionUtils.isEmpty(ret)) {
ret = listEventsV1(entityId, startKey, maxResults);
}
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
return ret;
}
private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
if (columnValue != null && !columnValue.toString().isEmpty()) {
put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString()));
}
}
private byte[] getKey(String id, Long ts) {
assert id != null : "entity id can't be null";
assert ts != null : "timestamp can't be null";
String keyStr = id + FIELD_SEPARATOR + ts;
return Bytes.toBytes(keyStr);
}
/**
* List events for the given entity id in decreasing order of timestamp, from the given startKey. Returns n results
* @param entityId entity id
* @param startKey key for the first event to be returned, used for pagination
* @param n number of events to be returned
* @return list of events
* @throws AtlasException
*/
public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n)
throws AtlasException {
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n);
}
Table table = null;
ResultScanner scanner = null;
try {
table = connection.getTable(tableName);
/**
* Scan Details:
* In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first
* Page filter is set to limit the number of results returned.
* Stop row is set to the entity id to avoid going past the current entity while scanning
* small is set to true to optimise RPC calls as the scanner is created per request
*/
Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n))
.setStopRow(Bytes.toBytes(entityId))
.setCaching(n)
.setSmall(true);
if (StringUtils.isEmpty(startKey)) {
//Set start row to entity id + max long value
byte[] entityBytes = getKey(entityId, Long.MAX_VALUE, Integer.MAX_VALUE);
scan = scan.setStartRow(entityBytes);
} else {
scan = scan.setStartRow(Bytes.toBytes(startKey));
}
scanner = table.getScanner(scan);
Result result;
List<EntityAuditEvent> events = new ArrayList<>();
//PageFilter doesn't ensure n results are returned. The filter is per region server.
//So, adding extra check on n here
while ((result = scanner.next()) != null && events.size() < n) {
EntityAuditEvent event = fromKey(result.getRow());
//In case the user sets random start key, guarding against random events
if (!event.getEntityId().equals(entityId)) {
continue;
}
event.setUser(getResultString(result, COLUMN_USER));
event.setAction(EntityAuditEvent.EntityAuditAction.fromString(getResultString(result, COLUMN_ACTION)));
event.setDetails(getResultString(result, COLUMN_DETAIL));
if (persistEntityDefinition) {
String colDef = getResultString(result, COLUMN_DEFINITION);
if (colDef != null) {
event.setEntityDefinition(colDef);
}
}
events.add(event);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size());
}
return events;
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(scanner);
close(table);
}
}
@Override
public long repositoryMaxSize() {
long ret;
initApplicationProperties();
if (APPLICATION_PROPERTIES == null) {
ret = ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE;
} else {
ret = APPLICATION_PROPERTIES.getLong(AUDIT_REPOSITORY_MAX_SIZE_PROPERTY, ATLAS_HBASE_KEYVALUE_DEFAULT_SIZE);
}
return ret;
}
@Override
public List<String> getAuditExcludeAttributes(String entityType) {
List<String> ret = null;
initApplicationProperties();
if (auditExcludedAttributesCache.containsKey(entityType)) {
ret = auditExcludedAttributesCache.get(entityType);
} else if (APPLICATION_PROPERTIES != null) {
String[] excludeAttributes = APPLICATION_PROPERTIES.getStringArray(AUDIT_EXCLUDE_ATTRIBUTE_PROPERTY + "." +
entityType + "." + "attributes.exclude");
if (excludeAttributes != null) {
ret = Arrays.asList(excludeAttributes);
}
auditExcludedAttributesCache.put(entityType, ret);
}
return ret;
}
private String getResultString(Result result, byte[] columnName) {
byte[] rawValue = result.getValue(COLUMN_FAMILY, columnName);
if ( rawValue != null) {
return Bytes.toString(rawValue);
}
return null;
}
private EntityAuditEvent fromKey(byte[] keyBytes) {
String key = Bytes.toString(keyBytes);
EntityAuditEvent event = new EntityAuditEvent();
if (StringUtils.isNotEmpty(key)) {
String[] parts = key.split(FIELD_SEPARATOR);
event.setEntityId(parts[0]);
event.setTimestamp(Long.valueOf(parts[1]));
event.setEventKey(key);
}
return event;
}
private EntityAuditEventV2 fromKeyV2(byte[] keyBytes) {
String key = Bytes.toString(keyBytes);
EntityAuditEventV2 event = new EntityAuditEventV2();
if (StringUtils.isNotEmpty(key)) {
String[] parts = key.split(FIELD_SEPARATOR);
event.setEntityId(parts[0]);
event.setTimestamp(Long.valueOf(parts[1]));
event.setEventKey(key);
}
return event;
}
private void close(Closeable closeable) throws AtlasException {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
throw new AtlasException(e);
}
}
}
/**
* Converts atlas' application properties to hadoop conf
* @return
* @throws AtlasException
* @param atlasConf
*/
public static org.apache.hadoop.conf.Configuration getHBaseConfiguration(Configuration atlasConf) throws AtlasException {
Properties properties = ApplicationProperties.getSubsetAsProperties(atlasConf, CONFIG_PREFIX);
org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create();
for (String key : properties.stringPropertyNames()) {
String value = properties.getProperty(key);
LOG.info("adding HBase configuration: {}={}", key, value);
hbaseConf.set(key, value);
}
return hbaseConf;
}
private void createTableIfNotExists() throws AtlasException {
Admin admin = null;
try {
admin = connection.getAdmin();
LOG.info("Checking if table {} exists", tableName.getNameAsString());
if (!admin.tableExists(tableName)) {
LOG.info("Creating table {}", tableName.getNameAsString());
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily = new HColumnDescriptor(COLUMN_FAMILY);
columnFamily.setMaxVersions(1);
columnFamily.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
columnFamily.setCompressionType(Compression.Algorithm.GZ);
columnFamily.setBloomFilterType(BloomType.ROW);
tableDescriptor.addFamily(columnFamily);
admin.createTable(tableDescriptor);
} else {
LOG.info("Table {} exists", tableName.getNameAsString());
}
} catch (IOException e) {
throw new AtlasException(e);
} finally {
close(admin);
}
}
@Override
public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException {
final String classificationUpdatesAction = "CLASSIFICATION_";
if (LOG.isDebugEnabled()) {
LOG.debug("Listing events for fromTimestamp {}, toTimestamp {}, action {}", fromTimestamp, toTimestamp);
}
Table table = null;
ResultScanner scanner = null;
try {
Set<String> guids = new HashSet<>();
table = connection.getTable(tableName);
byte[] filterValue = Bytes.toBytes(classificationUpdatesAction);
BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator(filterValue);
SingleColumnValueFilter filter = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, binaryPrefixComparator);
Scan scan = new Scan().setFilter(filter).setTimeRange(fromTimestamp, toTimestamp);
Result result;
scanner = table.getScanner(scan);
while ((result = scanner.next()) != null) {
EntityAuditEvent event = fromKey(result.getRow());
if (event == null) {
continue;
}
guids.add(event.getEntityId());
}
return guids;
} catch (IOException e) {
throw new AtlasBaseException(e);
} finally {
try {
close(scanner);
close(table);
} catch (AtlasException e) {
throw new AtlasBaseException(e);
}
}
}
@Override
public void start() throws AtlasException {
Configuration configuration = ApplicationProperties.get();
startInternal(configuration, getHBaseConfiguration(configuration));
}
@VisibleForTesting
void startInternal(Configuration atlasConf,
org.apache.hadoop.conf.Configuration hbaseConf) throws AtlasException {
String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME);
tableName = TableName.valueOf(tableNameStr);
try {
connection = createConnection(hbaseConf);
} catch (IOException e) {
throw new AtlasException(e);
}
if (!HAConfiguration.isHAEnabled(atlasConf)) {
LOG.info("HA is disabled. Hence creating table on startup.");
createTableIfNotExists();
}
}
@VisibleForTesting
protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) throws IOException {
return ConnectionFactory.createConnection(hbaseConf);
}
@Override
public void stop() throws AtlasException {
close(connection);
}
@Override
public void instanceIsActive() throws AtlasException {
LOG.info("Reacting to active: Creating HBase table for Audit if required.");
createTableIfNotExists();
}
@Override
public void instanceIsPassive() {
LOG.info("Reacting to passive: No action for now.");
}
@Override
public int getHandlerOrder() {
return HandlerOrder.AUDIT_REPOSITORY.getOrder();
}
}