blob: 566525e49b3e7160909cdafc0865ff62c44803fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.eagle.alert.metadata.impl;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.PublishmentType;
import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.resource.OpResult;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
public class JdbcImplTest {
private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
static IMetadataDao dao;
@BeforeClass
public static void setup() {
ConfigFactory.invalidateCaches();
Config config = ConfigFactory.load("application-jdbc.conf");
dao = new JdbcMetadataDaoImpl(config);
}
@AfterClass
public static void teardown() {
if (dao != null) {
try {
dao.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private String TOPO_NAME = "topoName";
@Test
public void test_apis() {
// publishment
{
Publishment publishment = new Publishment();
publishment.setName("pub-");
OpResult result = dao.addPublishment(publishment);
Assert.assertEquals(200, result.code);
List<Publishment> assigns = dao.listPublishment();
Assert.assertEquals(1, assigns.size());
result = dao.removePublishment("pub-");
Assert.assertTrue(200 == result.code);
}
// topology
{
OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
System.out.println(result.message);
Assert.assertEquals(200, result.code);
List<Topology> topos = dao.listTopologies();
Assert.assertEquals(1, topos.size());
// add again: replace existing one
dao.addTopology(new Topology(TOPO_NAME, 4, 5));
topos = dao.listTopologies();
Assert.assertEquals(1, topos.size());
Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
}
// assignment
{
PolicyAssignment assignment = new PolicyAssignment();
assignment.setPolicyName("policy1");
OpResult result = dao.addAssignment(assignment);
Assert.assertEquals(200, result.code);
List<PolicyAssignment> assigns = dao.listAssignments();
Assert.assertEquals(1, assigns.size());
}
// cluster
{
StreamingCluster cluster = new StreamingCluster();
cluster.setName("dd");
OpResult result = dao.addCluster(cluster);
Assert.assertEquals(200, result.code);
List<StreamingCluster> assigns = dao.listClusters();
Assert.assertEquals(1, assigns.size());
dao.removeCluster("dd");
Assert.assertEquals(0, dao.listClusters().size());
}
// data source
{
Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
dataSource.setName("ds");
OpResult result = dao.addDataSource(dataSource);
Assert.assertEquals(200, result.code);
List<Kafka2TupleMetadata> assigns = dao.listDataSources();
Assert.assertEquals(1, assigns.size());
}
// policy
{
PolicyDefinition policy = new PolicyDefinition();
policy.setName("ds");
OpResult result = dao.addPolicy(policy);
Assert.assertEquals(200, result.code);
List<PolicyDefinition> assigns = dao.listPolicies();
Assert.assertEquals(1, assigns.size());
}
// publishmentType
{
PublishmentType publishmentType = new PublishmentType();
publishmentType.setName("KAFKA");
publishmentType.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
List<Map<String, String>> fields = new ArrayList<>();
Map<String, String> field1 = new HashMap<>();
field1.put("name", "kafka_broker");
field1.put("value", "sandbox.hortonworks.com:6667");
Map<String, String> field2 = new HashMap<>();
field2.put("name", "topic");
fields.add(field1);
fields.add(field2);
publishmentType.setFields(fields);
OpResult result = dao.addPublishmentType(publishmentType);
Assert.assertEquals(200, result.code);
List<PublishmentType> types = dao.listPublishmentType();
Assert.assertEquals(6, types.size());
dao.removePublishmentType("KAFKA");
types = dao.listPublishmentType();
Assert.assertTrue(types.size() == 5);
}
}
@Test
public void test_addstate() {
ScheduleState state = new ScheduleState();
String versionId = "state-" + System.currentTimeMillis();
state.setVersion(versionId);
state.setGenerateTime(String.valueOf(new Date().getTime()));
OpResult result = dao.addScheduleState(state);
Assert.assertEquals(200, result.code);
state = dao.getScheduleState();
Assert.assertEquals(state.getVersion(), versionId);
}
@Test
public void test_readCurrentState() {
test_addstate();
ScheduleState state = dao.getScheduleState();
Assert.assertNotNull(state);
LOG.debug(state.getVersion());
LOG.debug(state.getGenerateTime());
}
@Test
public void test_clearScheduleState() {
int maxCapacity = 4;
List<String> reservedOnes = new ArrayList<>();
for (int i = 0; i < 10; i++) {
ScheduleState state = new ScheduleState();
String versionId = "state-" + (System.currentTimeMillis() + i);
state.setVersion(versionId);
state.setGenerateTime(String.valueOf(new Date().getTime()));
dao.addScheduleState(state);
if (i >= 10 - maxCapacity) {
reservedOnes.add(versionId);
}
}
dao.clearScheduleState(maxCapacity);
List<ScheduleState> scheduleStates = dao.listScheduleStates();
Assert.assertTrue(scheduleStates.size() == maxCapacity);
List<String> targetOnes = new ArrayList<>();
scheduleStates.stream().forEach(state -> targetOnes.add(state.getVersion()));
LOG.info("reservedOne={}",reservedOnes);
LOG.info("targetOne={}", targetOnes);
Assert.assertTrue(CollectionUtils.isEqualCollection(reservedOnes, targetOnes));
}
@Test
public void testUpdate() throws SQLException {
OpResult updateResult;
// update
Publishment publishment = new Publishment();
publishment.setName("pub-");
publishment.setType("type1");
updateResult = dao.addPublishment(publishment);
Assert.assertTrue(updateResult.code == OpResult.SUCCESS);
publishment.setType("type2");
updateResult = dao.addPublishment(publishment);
Assert.assertTrue(updateResult.code == OpResult.SUCCESS);
Assert.assertTrue(dao.listPublishment().get(0).getType().equals("type2"));
// remove
updateResult = dao.removePublishment("pub-");
Assert.assertTrue(updateResult.code == OpResult.SUCCESS);
Assert.assertTrue(dao.listPublishment().size() == 0);
// update alert event
AlertPublishEvent alert = new AlertPublishEvent();
String alertId = UUID.randomUUID().toString();
alert.setAlertTimestamp(System.currentTimeMillis());
alert.setAlertId(alertId);
alert.setPolicyId("policyId");
alert.setPolicyValue("from HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX[str:contains(src,'/tmp/test') and ((cmd=='rename' and str:contains(dst, '.Trash')) or cmd=='delete')] select * insert into hdfs_audit_log_enriched_stream_out");
Map<String, Object> alertData = new HashMap<>();
alertData.put("siteId", "sandbox");
alertData.put("policyId", "sample");
alert.setAlertData(alertData);
List<String> appIds = new ArrayList<>();
appIds.add("app1");
appIds.add("app2");
alert.setAppIds(appIds);
updateResult = dao.addAlertPublishEvent(alert);
Assert.assertTrue(updateResult.code == OpResult.SUCCESS);
AlertPublishEvent event = dao.getAlertPublishEvent(alertId);
Assert.assertTrue(CollectionUtils.isEqualCollection(appIds, event.getAppIds()));
Assert.assertTrue(alertData.equals(event.getAlertData()));
}
@Test
public void testUpdatePublishmentsByPolicyId() {
OpResult updateResult;
// add publishment
String policyId = "policy";
Publishment pub1 = new Publishment();
pub1.setName("pub1");
pub1.setType("type1");
updateResult = dao.addPublishment(pub1);
Assert.assertTrue(updateResult.code == OpResult.SUCCESS);
Publishment pub2 = new Publishment();
pub2.setName("pub2");
pub2.setType("type2");
updateResult = dao.addPublishment(pub2);
Assert.assertTrue(updateResult.code == OpResult.SUCCESS);
// add policy
PolicyDefinition policy = new PolicyDefinition();
policy.setName(policyId);
OpResult result = dao.addPolicy(policy);
Assert.assertEquals(200, result.code);
// get publishments by policyId
List<String> publishmentIds = new ArrayList<>();
publishmentIds.add("pub1");
publishmentIds.add("pub2");
dao.addPublishmentsToPolicy(policyId, publishmentIds);
List<Publishment> publishments = dao.getPublishmentsByPolicyId(policyId);
Assert.assertTrue(publishments.size() == 2);
publishments = dao.listPublishment();
Assert.assertTrue(publishments.size() == 2);
}
}