blob: b15895a93a1056449826b5a7c36b1bd192579b57 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.metron.indexing.dao;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.collections.MapUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
import static org.hamcrest.CoreMatchers.hasItem;
public abstract class UpdateIntegrationTest {
/**
* {
* "comment":"New Comment",
* "username":"test_user",
* "timestamp":1526401584951
* }
*/
@Multiline
protected String commentOne;
/**
* {
* "comment":"New Comment 2",
* "username":"test_user_2",
* "timestamp":1526401584952
* }
*/
@Multiline
protected String commentTwo;
protected static final int MAX_RETRIES = 10;
protected static final int SLEEP_MS = 500;
protected static final String SENSOR_NAME = "test";
private static final String CF = "p";
private MultiIndexDao dao;
@Test
public void testUpdate() throws Exception {
// create a document to update
final String guid = UUID.randomUUID().toString();
final Long timestamp = 1526306463050L;
Document toUpdate = createDocument(guid, timestamp);
{
// update the document and validate
Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME));
Assert.assertEquals(toUpdate, updated);
}
{
// ensure the document is updated in the index
Document indexed = findUpdatedDoc(toUpdate.getDocument(), guid, SENSOR_NAME);
Assert.assertEquals(toUpdate, indexed);
}
}
@Test
public void testBatchUpdate() throws Exception {
Map<Document, Optional<String>> toUpdate = new HashMap<>();
// create the first document to update
final String guid1 = UUID.randomUUID().toString();
final Long timestamp1 = 1526306463050L;
Document document1 = createDocument(guid1, timestamp1);
toUpdate.put(document1, Optional.of(SENSOR_NAME));
// create the second document to update
final String guid2 = UUID.randomUUID().toString();
final Long timestamp2 = 1526306463100L;
Document document2 = createDocument(guid2, timestamp2);
toUpdate.put(document2, Optional.of(SENSOR_NAME));
// create the third document to update
final String guid3 = UUID.randomUUID().toString();
final Long timestamp3 = 1526306463300L;
Document document3 = createDocument(guid3, timestamp3);
toUpdate.put(document3, Optional.of(SENSOR_NAME));
// update the documents as a batch and validate
Map<Document, Optional<String>> updated = getDao().batchUpdate(toUpdate);
Assert.assertThat(updated.keySet(), hasItem(document1));
Assert.assertThat(updated.keySet(), hasItem(document2));
Assert.assertThat(updated.keySet(), hasItem(document3));
// ensure the documents were written to the index
Assert.assertEquals(document1, findUpdatedDoc(document1.getDocument(), guid1, SENSOR_NAME));
Assert.assertEquals(document2, findUpdatedDoc(document2.getDocument(), guid2, SENSOR_NAME));
Assert.assertEquals(document3, findUpdatedDoc(document3.getDocument(), guid3, SENSOR_NAME));
}
@Test
public void testAddComment() throws Exception {
Document document = createAndIndexDocument("testAddCommentAndPatch");
// comment on the document
String commentText = "New Comment";
String commentUser = "test_user";
long commentTimestamp = 152630493050L;
Document withComment = addAlertComment(document.getGuid(), commentText, commentUser, commentTimestamp);
{
// validate that the comment was made on the returned document
List<AlertComment> comments = getComments(withComment);
Assert.assertEquals(1, comments.size());
Assert.assertEquals(commentText, comments.get(0).getComment());
Assert.assertEquals(commentUser, comments.get(0).getUsername());
Assert.assertEquals(commentTimestamp, comments.get(0).getTimestamp());
}
{
// validate that the comment was made on the indexed document
Document indexed = findUpdatedDoc(withComment.getDocument(), withComment.getGuid(), SENSOR_NAME);
List<AlertComment> comments = getComments(indexed);
Assert.assertEquals(1, comments.size());
Assert.assertEquals(commentText, comments.get(0).getComment());
Assert.assertEquals(commentUser, comments.get(0).getUsername());
Assert.assertEquals(commentTimestamp, comments.get(0).getTimestamp());
}
}
@Test
public void testPatchDocumentThatHasComment() throws Exception {
Document document = createAndIndexDocument("testPatchDocumentWithComment");
// comment on the document
String commentText = "New Comment";
String commentUser = "test_user";
long commentTimestamp = 152630493050L;
Document withComment = addAlertComment(document.getGuid(), commentText, commentUser, commentTimestamp);
// create a patch
List<Map<String, Object>> patches = new ArrayList<>();
Map<String, Object> patch = new HashMap<>();
patch.put("op", "add");
patch.put("path", "/project");
patch.put("value", "metron");
patches.add(patch);
PatchRequest pr = new PatchRequest();
pr.setGuid(withComment.getGuid());
pr.setIndex(SENSOR_NAME);
pr.setSensorType(SENSOR_NAME);
pr.setPatch(patches);
// patch the document that has been commented on
Document patched = getDao().patch(getDao(), pr, Optional.of(withComment.getTimestamp()));
Assert.assertEquals("metron", patched.getDocument().get("project"));
// ensure the patch was made on the indexed document
Document indexed = findUpdatedDoc(patched.getDocument(), patched.getGuid(), SENSOR_NAME);
Assert.assertEquals("metron", indexed.getDocument().get("project"));
}
@Test
public void testRemoveComments() throws Exception {
String guid = "testRemoveComments";
createAndIndexDocument(guid);
// add a comment on the document
Document withComments = addAlertComment(guid, "comment", "user1", 1526401584951L);
Assert.assertEquals(1, getComments(withComments).size());
// ensure the comment was added to the document in the index
Document indexedWithComments = findUpdatedDoc(withComments.getDocument(), withComments.getGuid(), withComments.getSensorType());
Assert.assertEquals(1, getComments(indexedWithComments).size());
// remove a comment from the document
AlertComment toRemove = getComments(withComments).get(0);
Document noComments = removeAlertComment(guid, toRemove.getComment(), toRemove.getUsername(), toRemove.getTimestamp());
Assert.assertEquals(0, getComments(noComments).size());
// ensure the comment was removed from the index
Document indexedNoComments = findUpdatedDoc(noComments.getDocument(), withComments.getGuid(), withComments.getSensorType());
Assert.assertEquals(0, getComments(indexedNoComments).size());
}
protected Document addAlertComment(String guid, String comment, String username, long timestamp)
throws IOException {
CommentAddRemoveRequest request = buildAlertRequest(guid, comment, username, timestamp);
return getDao().addCommentToAlert(request);
}
protected Document removeAlertComment(String guid, String comment, String username, long timestamp)
throws IOException {
CommentAddRemoveRequest request = buildAlertRequest(guid, comment, username, timestamp);
return getDao().removeCommentFromAlert(request);
}
private CommentAddRemoveRequest buildAlertRequest(String guid, String comment, String username,
long timestamp) {
CommentAddRemoveRequest request = new CommentAddRemoveRequest();
request.setGuid(guid);
request.setComment(comment);
request.setUsername(username);
request.setTimestamp(timestamp);
request.setSensorType(SENSOR_NAME);
return request;
}
private Document createAndIndexDocument(String guid) throws Exception {
// create the document
Long timestamp = 1526306463050L;
Document toCreate = createDocument(guid, timestamp);
// index the document
Document created = getDao().update(toCreate, Optional.of(SENSOR_NAME));
Assert.assertEquals(toCreate, created);
// ensure the document is indexed
return findUpdatedDoc(toCreate.getDocument(), guid, SENSOR_NAME);
}
protected Document createDocument(String guid, Long timestamp) {
Map<String, Object> message1 = new HashMap<>();
message1.put(Constants.GUID, guid);
message1.put(Constants.SENSOR_TYPE, SENSOR_NAME);
message1.put(Constants.Fields.TIMESTAMP.getName(), timestamp);
return new Document(message1, guid, SENSOR_NAME, timestamp);
}
private List<AlertComment> getComments(Document withComment) {
List<Map<String, Object>> commentsField = List.class.cast(withComment.getDocument().get(COMMENTS_FIELD));
List<AlertComment> comments = new ArrayList<>();
if(commentsField != null) {
comments = commentsField
.stream()
.map(map -> new AlertComment(map))
.collect(Collectors.toList());
}
return comments;
}
protected Document findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)
throws InterruptedException, IOException, OriginalNotFoundException {
for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
Document doc = getDao().getLatest(guid, sensorType);
if (doc != null && message0.equals(doc.getDocument())) {
return doc;
}
if (t == MAX_RETRIES -1) {
MapUtils.debugPrint(System.out, "Expected", message0);
MapUtils.debugPrint(System.out, "actual", doc.getDocument());
}
}
throw new OriginalNotFoundException("Count not find " + guid + " after " + MAX_RETRIES + " tries");
}
protected IndexDao getDao() {
return dao;
}
protected void setDao(MultiIndexDao dao) {
this.dao = dao;
}
protected abstract String getIndexName();
protected abstract MockHTable getMockHTable();
protected abstract void addTestData(String indexName, String sensorType, List<Map<String,Object>> docs) throws Exception;
protected abstract List<Map<String,Object>> getIndexedTestData(String indexName, String sensorType) throws Exception;
}