blob: 71d05444c92d2cffb1da5711473f676472349c36 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.indexing.dao;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.common.utils.KeyUtil;
import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
/**
* The HBaseDao is an index dao which only supports the following actions:
* * Update
* * Get document
*
* The mechanism here is that updates to documents will be added to a HBase Table as a write-ahead log.
* The Key for a row supporting a given document will be the GUID plus the sensor type, which should be sufficiently distributed.
* Every new update will have a column added (column qualifier will be the timestamp of the update).
* Upon retrieval, the most recent column will be returned.
*
*/
public class HBaseDao implements IndexDao {
public static String HBASE_TABLE = "update.hbase.table";
public static String HBASE_CF = "update.hbase.cf";
private HTableInterface tableInterface;
private byte[] cf;
private AccessConfig config;
/**
* Implements the HBaseDao row key and exposes convenience methods for serializing/deserializing the row key.
* The row key is made of a GUID and sensor type along with a prefix to ensure data is distributed evenly.
*/
public static class Key {
private String guid;
private String sensorType;
public Key(String guid, String sensorType) {
this.guid = guid;
this.sensorType = sensorType;
}
public String getGuid() {
return guid;
}
public String getSensorType() {
return sensorType;
}
public static Key fromBytes(byte[] buffer) throws IOException {
ByteArrayInputStream baos = new ByteArrayInputStream(buffer);
DataInputStream w = new DataInputStream(baos);
baos.skip(KeyUtil.HASH_PREFIX_SIZE);
return new Key(w.readUTF(), w.readUTF());
}
public byte[] toBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
if(getGuid() == null || getSensorType() == null) {
throw new IllegalStateException("Guid and sensor type must not be null: guid = " + getGuid() + ", sensorType = " + getSensorType());
}
DataOutputStream w = new DataOutputStream(baos);
w.writeUTF(getGuid());
w.writeUTF(getSensorType());
w.flush();
byte[] key = baos.toByteArray();
byte[] prefix = KeyUtil.INSTANCE.getPrefix(key);
return KeyUtil.INSTANCE.merge(prefix, key);
}
public static byte[] toBytes(Key k) throws IOException {
return k.toBytes();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Key key = (Key) o;
if (getGuid() != null ? !getGuid().equals(key.getGuid()) : key.getGuid() != null) return false;
return getSensorType() != null ? getSensorType().equals(key.getSensorType()) : key.getSensorType() == null;
}
@Override
public int hashCode() {
int result = getGuid() != null ? getGuid().hashCode() : 0;
result = 31 * result + (getSensorType() != null ? getSensorType().hashCode() : 0);
return result;
}
}
public HBaseDao() {
}
@Override
public synchronized SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
return null;
}
@Override
public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
return null;
}
@Override
public synchronized void init(AccessConfig config) {
if(this.tableInterface == null) {
this.config = config;
Map<String, Object> globalConfig = config.getGlobalConfigSupplier().get();
if(globalConfig == null) {
throw new IllegalStateException("Cannot find the global config.");
}
String table = (String)globalConfig.get(HBASE_TABLE);
String cf = (String) config.getGlobalConfigSupplier().get().get(HBASE_CF);
if(table == null || cf == null) {
throw new IllegalStateException("You must configure " + HBASE_TABLE + " and " + HBASE_CF + " in the global config.");
}
try {
tableInterface = config.getTableProvider().getTable(HBaseConfiguration.create(), table);
this.cf = cf.getBytes();
} catch (IOException e) {
throw new IllegalStateException("Unable to initialize HBaseDao: " + e.getMessage(), e);
}
}
}
public HTableInterface getTableInterface() {
if(tableInterface == null) {
init(config);
}
return tableInterface;
}
@Override
public synchronized Document getLatest(String guid, String sensorType) throws IOException {
Key k = new Key(guid, sensorType);
Get get = new Get(Key.toBytes(k));
get.addFamily(cf);
Result result = getTableInterface().get(get);
return getDocumentFromResult(result);
}
@Override
public Iterable<Document> getAllLatest(
List<GetRequest> getRequests) throws IOException {
List<Get> gets = new ArrayList<>();
for (GetRequest getRequest: getRequests) {
gets.add(buildGet(getRequest));
}
Result[] results = getTableInterface().get(gets);
List<Document> allLatest = new ArrayList<>();
for (Result result: results) {
Document d = getDocumentFromResult(result);
if (d != null) {
allLatest.add(d);
}
}
return allLatest;
}
private Document getDocumentFromResult(Result result) throws IOException {
NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf);
if(columns == null || columns.size() == 0) {
return null;
}
Map.Entry<byte[], byte[]> entry= columns.lastEntry();
Long ts = Bytes.toLong(entry.getKey());
if(entry.getValue()!= null) {
Map<String, Object> json = JSONUtils.INSTANCE.load(new String(entry.getValue()),
JSONUtils.MAP_SUPPLIER);
// Make sure comments are in the proper format
@SuppressWarnings("unchecked")
List<Map<String, Object>> commentsMap = (List<Map<String, Object>>) json.get(COMMENTS_FIELD);
try {
if (commentsMap != null) {
List<AlertComment> comments = new ArrayList<>();
for (Map<String, Object> commentMap : commentsMap) {
comments.add(new AlertComment(commentMap));
}
if (comments.size() > 0) {
json.put(COMMENTS_FIELD,
comments.stream().map(AlertComment::asMap).collect(Collectors.toList()));
}
}
Key k = Key.fromBytes(result.getRow());
return new Document(json, k.getGuid(), k.getSensorType(), ts);
} catch (IOException e) {
throw new RuntimeException("Unable to convert row key to a document", e);
}
}
else {
return null;
}
}
@Override
public synchronized Document update(Document update, Optional<String> index) throws IOException {
Put put = buildPut(update);
getTableInterface().put(put);
return update;
}
@Override
public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
List<Put> puts = new ArrayList<>();
for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
Document update = updateEntry.getKey();
Put put = buildPut(update);
puts.add(put);
}
getTableInterface().put(puts);
return updates;
}
protected Get buildGet(GetRequest getRequest) throws IOException {
Key k = new Key(getRequest.getGuid(), getRequest.getSensorType());
Get get = new Get(Key.toBytes(k));
get.addFamily(cf);
return get;
}
protected Put buildPut(Document update) throws IOException {
Key k = new Key(update.getGuid(), update.getSensorType());
Put put = new Put(Key.toBytes(k));
long ts = update.getTimestamp() == null || update.getTimestamp() == 0 ? System.currentTimeMillis() : update.getTimestamp();
byte[] columnQualifier = Bytes.toBytes(ts);
byte[] doc = JSONUtils.INSTANCE.toJSONPretty(update.getDocument());
put.addColumn(cf, columnQualifier, doc);
return put;
}
@Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
return null;
}
@Override
@SuppressWarnings("unchecked")
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
Document latest = getLatest(request.getGuid(), request.getSensorType());
return addCommentToAlert(request, latest);
}
@Override
@SuppressWarnings("unchecked")
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
if (latest == null || latest.getDocument() == null) {
throw new IOException(String.format("Unable to add comment. Document with guid %s cannot be found.",
request.getGuid()));
}
List<Map<String, Object>> comments = (List<Map<String, Object>>) latest.getDocument()
.getOrDefault(COMMENTS_FIELD, new ArrayList<>());
List<Map<String, Object>> originalComments = new ArrayList<>(comments);
// Convert all comments back to raw JSON before updating.
List<Map<String, Object>> commentsMap = new ArrayList<>();
for (Map<String, Object> comment : originalComments) {
commentsMap.add(new AlertComment(comment).asMap());
}
commentsMap.add(new AlertComment(
request.getComment(),
request.getUsername(),
request.getTimestamp())
.asMap());
Document newVersion = new Document(latest);
newVersion.getDocument().put(COMMENTS_FIELD, commentsMap);
return update(newVersion, Optional.empty());
}
@Override
@SuppressWarnings("unchecked")
public Document removeCommentFromAlert(CommentAddRemoveRequest request)
throws IOException {
Document latest = getLatest(request.getGuid(), request.getSensorType());
return removeCommentFromAlert(request, latest);
}
@Override
@SuppressWarnings("unchecked")
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest)
throws IOException {
if (latest == null || latest.getDocument() == null) {
throw new IOException(String.format("Unable to remove comment. Document with guid %s cannot be found.",
request.getGuid()));
}
List<Map<String, Object>> commentMap = (List<Map<String, Object>>) latest.getDocument().get(COMMENTS_FIELD);
// Can't remove anything if there's nothing there
if (commentMap == null) {
throw new IOException(String.format("Unable to remove comment. Document with guid %s has no comments.",
request.getGuid()));
}
List<Map<String, Object>> originalComments = new ArrayList<>(commentMap);
List<AlertComment> comments = new ArrayList<>();
for (Map<String, Object> commentStr : originalComments) {
comments.add(new AlertComment(commentStr));
}
comments.remove(new AlertComment(request.getComment(), request.getUsername(), request.getTimestamp()));
Document newVersion = new Document(latest);
if (comments.size() > 0) {
List<Map<String, Object>> commentsAsMap = comments.stream().map(AlertComment::asMap)
.collect(Collectors.toList());
newVersion.getDocument().put(COMMENTS_FIELD, commentsAsMap);
update(newVersion, Optional.empty());
} else {
newVersion.getDocument().remove(COMMENTS_FIELD);
}
return update(newVersion, Optional.empty());
}
}