[EAGLE-1044] Support policy Import using a policy prototype
https://issues.apache.org/jira/browse/EAGLE-1044'
Provided APIs:
* load policies to new site "sandbox" from policy prototypes by `POST /rest/policyProto/loadToSite/sandbox`
* create a new policy prototype with an existing policy by `POST /rest/policyProto/saveAsProto`
* update or create a policy prototype by by `POST /rest/policyProto`
* get all policy prototypes by `GET /rest/policyProto`
* delete a policy prototype by `DELETE /rest/policyProto/{uuid}`
Author: Zhao, Qingwen <qingwzhao@apache.org>
Closes #952 from qingwen220/EAGLE-1044.
diff --git a/eagle-assembly/src/main/doc/metadata-ddl.sql b/eagle-assembly/src/main/doc/metadata-ddl.sql
index 3312576..fa2ba15 100644
--- a/eagle-assembly/src/main/doc/metadata-ddl.sql
+++ b/eagle-assembly/src/main/doc/metadata-ddl.sql
@@ -57,6 +57,17 @@
UNIQUE INDEX `name_UNIQUE` (`name` ASC))
COMMENT = 'eagle dashboard metadata';
+CREATE TABLE IF NOT EXISTS `policy_prototype` (
+ `uuid` VARCHAR(50) NOT NULL,
+ `name` VARCHAR(200) NOT NULL,
+ `definition` longtext NOT NULL,
+ `alertPublisherIds` VARCHAR(500) NULL,
+ `modifiedtime` BIGINT(20) NOT NULL,
+ `createdtime` BIGINT(20) NOT NULL,
+ PRIMARY KEY (`uuid`),
+ UNIQUE INDEX `policy_proto_UNIQUE` (`name` ASC))
+COMMENT = 'eagle policy prototype metadata';
+
-- eagle security module metadata
CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity (
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml b/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
index 88274c2..246efa2 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
@@ -42,7 +42,7 @@
</dependency>
<dependency>
<groupId>org.apache.eagle</groupId>
- <artifactId>alert-metadata</artifactId>
+ <artifactId>alert-metadata-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/PolicyEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/PolicyEntity.java
new file mode 100644
index 0000000..cf2eab7
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/PolicyEntity.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.model;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.hibernate.validator.constraints.Length;
+
+import java.util.*;
+
+public class PolicyEntity extends PersistenceEntity {
+ @Length(min = 1, max = 50, message = "length should between 1 and 50")
+ private String name;
+ private PolicyDefinition definition;
+ private List<String> alertPublishmentIds = new ArrayList<>();
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public PolicyDefinition getDefinition() {
+ return definition;
+ }
+
+ public void setDefinition(PolicyDefinition definition) {
+ this.definition = definition;
+ }
+
+ public List<String> getAlertPublishmentIds() {
+ return alertPublishmentIds;
+ }
+
+ public void setAlertPublishmentIds(List<String> alertPublishmentIds) {
+ this.alertPublishmentIds = alertPublishmentIds;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(name)
+ .append(definition)
+ .append(alertPublishmentIds)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+
+ if (!(that instanceof PolicyEntity)) {
+ return false;
+ }
+
+ PolicyEntity another = (PolicyEntity) that;
+
+ return Objects.equals(another.name, this.name)
+ && Objects.equals(another.definition, this.definition)
+ && CollectionUtils.isEqualCollection(another.getAlertPublishmentIds(), alertPublishmentIds);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{name=\"%s\",definition=%s}", this.name, this.getDefinition() == null ? "null" : this.getDefinition().getDefinition().toString());
+ }
+
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
new file mode 100644
index 0000000..ce900fd
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/resource/PolicyResource.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.resource;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.apache.eagle.common.rest.RESTResponse;
+import org.apache.eagle.metadata.model.PolicyEntity;
+import org.apache.eagle.metadata.service.PolicyEntityService;
+import org.apache.eagle.metadata.utils.PolicyIdConversions;
+import org.apache.eagle.metadata.utils.StreamIdConversions;
+import org.apache.eagle.service.metadata.resource.MetadataResource;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@Path("/policyProto")
+public class PolicyResource {
+ private final PolicyEntityService policyEntityService;
+ private final MetadataResource metadataResource;
+
+ @Inject
+ public PolicyResource(PolicyEntityService policyEntityService, MetadataResource metadataResource) {
+ this.policyEntityService = policyEntityService;
+ this.metadataResource = metadataResource;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Collection<PolicyEntity>> getAllPolicyProto() {
+ return RESTResponse.async(policyEntityService::getAllPolicyProto).get();
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<PolicyEntity> createOrUpdatePolicyProto(PolicyEntity policyProto) {
+ return RESTResponse.async(() -> policyEntityService.createOrUpdatePolicyProto(policyProto)).get();
+ }
+
+ @POST
+ @Path("/import")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<PolicyEntity> saveAsPolicyProto(PolicyEntity policyEntity) {
+ Preconditions.checkNotNull(policyEntity, "policyProto should not be null");
+ Preconditions.checkNotNull(policyEntity.getDefinition(), "policyDefinition should not be null");
+ Preconditions.checkNotNull(policyEntity.getAlertPublishmentIds(), "alert publisher list should not be null");
+
+ PolicyDefinition policyDefinition = policyEntity.getDefinition();
+ List<String> inputStreamType = new ArrayList<>();
+ String newDefinition = policyDefinition.getDefinition().getValue();
+ for (String inputStream : policyDefinition.getInputStreams()) {
+ String streamDef = StreamIdConversions.parseStreamTypeId(policyDefinition.getSiteId(), inputStream);
+ inputStreamType.add(streamDef);
+ newDefinition = newDefinition.replaceAll(inputStream, streamDef);
+ }
+ policyDefinition.setInputStreams(inputStreamType);
+ policyDefinition.getDefinition().setValue(newDefinition);
+ policyDefinition.setName(PolicyIdConversions.parsePolicyId(policyDefinition.getSiteId(), policyDefinition.getName()));
+ policyDefinition.setSiteId(null);
+ policyEntity.setDefinition(policyDefinition);
+
+ return createOrUpdatePolicyProto(policyEntity);
+ }
+
+ @POST
+ @Path("/export/{site}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<List<PolicyDefinition>> loadPolicyDefinition(List<PolicyEntity> policyProtoList, @PathParam("site") String site) {
+ return RESTResponse.async(() -> exportPolicyDefinition(policyProtoList, site)).get();
+ }
+
+ @DELETE
+ @Path("/{uuid}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Boolean> deletePolicyProto(@PathParam("uuid") String uuid) {
+ return RESTResponse.async(() -> policyEntityService.deletePolicyProtoByUUID(uuid)).get();
+ }
+
+ private List<PolicyDefinition> exportPolicyDefinition(List<PolicyEntity> policyProtoList, String site) {
+ Preconditions.checkNotNull(site, "site should not be null");
+ if (policyProtoList == null || policyProtoList.isEmpty()) {
+ throw new IllegalArgumentException("policy prototype list is empty or null");
+ }
+ List<PolicyDefinition> policies = new ArrayList<>();
+ for (PolicyEntity policyProto : policyProtoList) {
+ PolicyDefinition policyDefinition = policyProto.getDefinition();
+ List<String> inputStreams = new ArrayList<>();
+ String newDefinition = policyDefinition.getDefinition().getValue();
+ for (String inputStreamType : policyDefinition.getInputStreams()) {
+ String streamId = StreamIdConversions.formatSiteStreamId(site, inputStreamType);
+ inputStreams.add(streamId);
+ newDefinition = newDefinition.replaceAll(inputStreamType, streamId);
+ }
+ policyDefinition.setInputStreams(inputStreams);
+ policyDefinition.getDefinition().setValue(newDefinition);
+ policyDefinition.setSiteId(site);
+ policyDefinition.setName(PolicyIdConversions.generateUniquePolicyId(site, policyProto.getDefinition().getName()));
+ PolicyValidationResult validationResult = metadataResource.validatePolicy(policyDefinition);
+ if (!validationResult.isSuccess() || validationResult.getException() != null) {
+ throw new IllegalArgumentException(validationResult.getException());
+ }
+ OpResult result = metadataResource.addPolicy(policyDefinition);
+ if (result.code != 200) {
+ throw new IllegalArgumentException("fail to create policy: " + result.message);
+ }
+ if (policyProto.getAlertPublishmentIds() != null && !policyProto.getAlertPublishmentIds().isEmpty()) {
+ result = metadataResource.addPublishmentsToPolicy(policyDefinition.getName(), policyProto.getAlertPublishmentIds());
+ if (result.code != 200) {
+ throw new IllegalArgumentException("fail to create policy publisherments: " + result.message);
+ }
+ }
+ policies.add(policyDefinition);
+ }
+ return policies;
+ }
+
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/PolicyEntityService.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/PolicyEntityService.java
new file mode 100644
index 0000000..3e03251
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/PolicyEntityService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.service;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.metadata.model.PolicyEntity;
+
+import java.util.Collection;
+
+
+public interface PolicyEntityService {
+
+ Collection<PolicyEntity> getAllPolicyProto();
+
+ PolicyEntity getPolicyProtoByUUID(String uuid);
+
+ boolean deletePolicyProtoByUUID(String uuid);
+
+ default PolicyEntity createOrUpdatePolicyProto(PolicyEntity policyProto) {
+ Preconditions.checkNotNull(policyProto, "PolicyProto should not be null");
+ Preconditions.checkNotNull(policyProto.getDefinition(), "PolicyProto definition should not be null");
+ if (policyProto.getName() == null) {
+ policyProto.setName(String.format("[%s]%s", policyProto.getDefinition().getAlertCategory(),
+ policyProto.getDefinition().getName()));
+ }
+ if (policyProto.getUuid() == null) {
+ return create(policyProto);
+ } else {
+ return update(policyProto);
+ }
+ }
+
+ PolicyEntity create(PolicyEntity policyEntity);
+
+ PolicyEntity update(PolicyEntity policyEntity);
+
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
index 7cc076a..e42b92c 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
@@ -22,6 +22,7 @@
import org.apache.eagle.metadata.persistence.MetadataStore;
import org.apache.eagle.metadata.service.ApplicationEntityService;
import org.apache.eagle.metadata.service.DashboardEntityService;
+import org.apache.eagle.metadata.service.PolicyEntityService;
import org.apache.eagle.metadata.service.SiteEntityService;
public class MemoryMetadataStore extends MetadataStore {
@@ -31,5 +32,6 @@
bind(ApplicationEntityService.class).to(ApplicationEntityServiceMemoryImpl.class).in(Singleton.class);
bind(IMetadataDao.class).to(InMemMetadataDaoImpl.class).in(Singleton.class);
bind(DashboardEntityService.class).to(DashboardEntityServiceMemoryImpl.class).in(Singleton.class);
+ bind(PolicyEntityService.class).to(PolicyEntityServiceMemoryImpl.class).in(Singleton.class);
}
}
\ No newline at end of file
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/PolicyEntityServiceMemoryImpl.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/PolicyEntityServiceMemoryImpl.java
new file mode 100644
index 0000000..511b648
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/PolicyEntityServiceMemoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.service.memory;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.metadata.model.PolicyEntity;
+import org.apache.eagle.metadata.service.PolicyEntityService;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+public class PolicyEntityServiceMemoryImpl implements PolicyEntityService {
+ private HashMap<String, PolicyEntity> policyProtoMap = new HashMap<>();
+
+ @Override
+ public Collection<PolicyEntity> getAllPolicyProto() {
+ return policyProtoMap.values();
+ }
+
+ @Override
+ public PolicyEntity getPolicyProtoByUUID(String uuid) {
+ return policyProtoMap.get(uuid);
+ }
+
+ @Override
+ public boolean deletePolicyProtoByUUID(String uuid) {
+ policyProtoMap.remove(uuid);
+ return true;
+ }
+
+
+ @Override
+ public PolicyEntity create(PolicyEntity entity) {
+ Preconditions.checkNotNull(entity, "entity is null: " + entity);
+ entity.ensureDefault();
+ policyProtoMap.put(entity.getUuid(), entity);
+ return entity;
+ }
+
+ @Override
+ public PolicyEntity update(PolicyEntity policyEntity) {
+ Preconditions.checkNotNull(policyEntity, "entity is null: " + policyEntity);
+ Preconditions.checkNotNull(policyEntity.getUuid(), "uuid is null: " + policyEntity.getUuid());
+ policyProtoMap.put(policyEntity.getUuid(), policyEntity);
+ return policyEntity;
+ }
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
new file mode 100644
index 0000000..c9ccadc
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/utils/PolicyIdConversions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.utils;
+
+import com.google.common.base.Preconditions;
+
+public class PolicyIdConversions {
+
+ public static String generateUniquePolicyId(String siteId, String policyName) {
+ return String.format("%s_%s", policyName, siteId);
+ }
+
+ public static String parsePolicyId(String siteId, String generatedUniquePolicyId) {
+ String subffix = String.format("_%s", siteId);
+ if (generatedUniquePolicyId.endsWith(subffix)) {
+ int streamTypeIdLength = generatedUniquePolicyId.length() - subffix.length();
+ Preconditions.checkArgument(streamTypeIdLength > 0, "Invalid policyId: " + generatedUniquePolicyId + ", policyId is empty");
+ return generatedUniquePolicyId.substring(0, streamTypeIdLength);
+ } else {
+ return generatedUniquePolicyId;
+ }
+ }
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/test/java/org/apache/eagle/metadata/service/TestPolicyEntityServiceMemoryImpl.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/java/org/apache/eagle/metadata/service/TestPolicyEntityServiceMemoryImpl.java
new file mode 100644
index 0000000..bfcc1cd
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/java/org/apache/eagle/metadata/service/TestPolicyEntityServiceMemoryImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.service;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.metadata.model.PolicyEntity;
+import org.apache.eagle.metadata.service.memory.PolicyEntityServiceMemoryImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+public class TestPolicyEntityServiceMemoryImpl {
+
+ private PolicyEntityService policyEntityService = new PolicyEntityServiceMemoryImpl();
+
+ @Test
+ public void test() {
+ // define a prototype policy without site info
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("policy1");
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition("siddhi",
+ "from STREAM select * insert into out");
+ policyDefinition.setDefinition(definition);
+ policyDefinition.setInputStreams(Arrays.asList("STREAM"));
+ policyDefinition.setOutputStreams(Arrays.asList("out"));
+ // define publisher list
+ List<String> alertPublisherIds = Arrays.asList("slack");
+
+ PolicyEntity policyEntity = new PolicyEntity();
+ policyEntity.setDefinition(policyDefinition);
+ policyEntity.setAlertPublishmentIds(alertPublisherIds);
+ PolicyEntity res = policyEntityService.createOrUpdatePolicyProto(policyEntity);
+ Assert.assertTrue(res.getDefinition().equals(policyDefinition));
+ Assert.assertTrue(CollectionUtils.isEqualCollection(res.getAlertPublishmentIds(), alertPublisherIds));
+
+ Collection<PolicyEntity> policies = policyEntityService.getAllPolicyProto();
+ Assert.assertTrue(policies.size() == 1);
+
+ PolicyEntity entity = policyEntityService.getPolicyProtoByUUID(policies.iterator().next().getUuid());
+ Assert.assertTrue(entity.equals(policies.iterator().next()));
+
+ // test update
+ entity.getDefinition().setName("policy2");
+ PolicyEntity updatedEntity = policyEntityService.update(entity);
+ Assert.assertTrue(updatedEntity.getDefinition().getName().equals("policy2"));
+
+
+ // test delete
+ //Assert.assertTrue(policyEntityService.deletePolicyProtoByUUID(entity.getUuid()));
+ policyEntityService.deletePolicyProtoByUUID(entity.getUuid());
+ Assert.assertTrue(policyEntityService.getAllPolicyProto().size() == 0);
+ }
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/test/java/org/apache/eagle/metadata/utils/PolicyIdConversionsTest.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/java/org/apache/eagle/metadata/utils/PolicyIdConversionsTest.java
new file mode 100644
index 0000000..bc0aa8b
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/test/java/org/apache/eagle/metadata/utils/PolicyIdConversionsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PolicyIdConversionsTest {
+
+ @Test
+ public void testGenerateUniquePolicyId() {
+ Assert.assertEquals("mock_policy_test", PolicyIdConversions.generateUniquePolicyId("test", "mock_policy"));
+ }
+
+ @Test
+ public void testParsePolicyId() {
+ Assert.assertEquals("mock_policy", PolicyIdConversions.parsePolicyId("test", "mock_policy_test"));
+ }
+
+ @Test
+ public void testParsePolicyId2() {
+ Assert.assertEquals("mock_policy", PolicyIdConversions.parsePolicyId("test", "mock_policy"));
+ }
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataMetadataStoreServiceImpl.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataMetadataStoreServiceImpl.java
index e7a50cd..73e678b 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataMetadataStoreServiceImpl.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataMetadataStoreServiceImpl.java
@@ -41,10 +41,11 @@
public boolean execute(String sql) throws SQLException {
Connection connection = null;
Statement statement = null;
+ boolean success = false;
try {
connection = dataSource.getConnection();
statement = connection.createStatement();
- return statement.execute(sql);
+ success = statement.execute(sql);
} catch (SQLException e) {
throw e;
} finally {
@@ -63,6 +64,7 @@
}
}
}
+ return success;
}
@Override
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataStore.java
index 1af8e78..2edb679 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataStore.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/JDBCMetadataStore.java
@@ -21,14 +21,17 @@
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.MetadataUtils;
import org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl;
+import org.apache.eagle.metadata.model.PolicyEntity;
import org.apache.eagle.metadata.persistence.MetadataStore;
import org.apache.eagle.metadata.service.ApplicationEntityService;
import org.apache.eagle.metadata.service.DashboardEntityService;
+import org.apache.eagle.metadata.service.PolicyEntityService;
import org.apache.eagle.metadata.service.SiteEntityService;
import org.apache.eagle.metadata.store.jdbc.provider.JDBCDataSourceProvider;
import org.apache.eagle.metadata.store.jdbc.provider.JDBCMetadataStoreConfigProvider;
import org.apache.eagle.metadata.store.jdbc.service.ApplicationEntityServiceJDBCImpl;
import org.apache.eagle.metadata.store.jdbc.service.DashboardEntityServiceJDBCImpl;
+import org.apache.eagle.metadata.store.jdbc.service.PolicyEntityServiceJDBCImpl;
import org.apache.eagle.metadata.store.jdbc.service.SiteEntityServiceJDBCImpl;
import javax.sql.DataSource;
@@ -43,5 +46,6 @@
bind(ApplicationEntityService.class).to(ApplicationEntityServiceJDBCImpl.class).in(Singleton.class);
bind(SiteEntityService.class).to(SiteEntityServiceJDBCImpl.class).in(Singleton.class);
bind(DashboardEntityService.class).to(DashboardEntityServiceJDBCImpl.class).in(Singleton.class);
+ bind(PolicyEntityService.class).to(PolicyEntityServiceJDBCImpl.class).in(Singleton.class);
}
}
\ No newline at end of file
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/PolicyEntityServiceJDBCImpl.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/PolicyEntityServiceJDBCImpl.java
new file mode 100644
index 0000000..65e3757
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/main/java/org/apache/eagle/metadata/store/jdbc/service/PolicyEntityServiceJDBCImpl.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.store.jdbc.service;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import com.google.common.base.Preconditions;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.common.function.ThrowableConsumer2;
+import org.apache.eagle.common.function.ThrowableFunction;
+import org.apache.eagle.metadata.exceptions.EntityNotFoundException;
+import org.apache.eagle.metadata.model.PolicyEntity;
+import org.apache.eagle.metadata.service.PolicyEntityService;
+import org.apache.eagle.metadata.store.jdbc.JDBCMetadataQueryService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class PolicyEntityServiceJDBCImpl implements PolicyEntityService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PolicyEntityServiceJDBCImpl.class);
+
+ private static final String selectSql = "SELECT * FROM policy_prototype";
+ private static final String queryByUUID = "SELECT * FROM policy_prototype where uuid = '%s'";
+ private static final String deleteSqlByUUID = "DELETE FROM policy_prototype where uuid = '%s'";
+ private static final String updateSqlByUUID = "UPDATE policy_prototype SET name = ?, definition = ? , alertPublisherIds = ? , createdtime = ? , modifiedtime = ? where uuid = ?";
+ private static final String insertSql = "INSERT INTO policy_prototype (name, definition, alertPublisherIds, createdtime, modifiedtime, uuid) VALUES (?, ?, ?, ?, ?, ?)";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @Inject
+ JDBCMetadataQueryService queryService;
+
+ @Override
+ public Collection<PolicyEntity> getAllPolicyProto() {
+ try {
+ return queryService.query(selectSql, policyEntityMapper);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public PolicyEntity getPolicyProtoByUUID(String uuid) {
+ Preconditions.checkNotNull(uuid, "uuid should not be null");
+ try {
+ return queryService.query(String.format(queryByUUID, uuid), policyEntityMapper).stream()
+ .findAny().orElseThrow(() -> new EntityNotFoundException("policyProto is not found by uuid"));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean deletePolicyProtoByUUID(String uuid) {
+ String sql = String.format(deleteSqlByUUID, uuid);
+ try {
+ return queryService.execute(sql);
+ } catch (Exception e) {
+ LOGGER.error("Error to execute {}: {}", sql, e);
+ throw new IllegalArgumentException("SQL execution error: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public PolicyEntity update(PolicyEntity policyProto) {
+ Preconditions.checkNotNull(policyProto, "Entity should not be null");
+ Preconditions.checkNotNull(policyProto.getUuid(), "uuid should not be null");
+ PolicyEntity current = getPolicyProtoByUUID(policyProto.getUuid());
+
+ if (policyProto.getName() != null) {
+ current.setName(policyProto.getName());
+ }
+ if (policyProto.getAlertPublishmentIds() != null) {
+ current.setAlertPublishmentIds(policyProto.getAlertPublishmentIds());
+ }
+ if (policyProto.getDefinition() != null) {
+ current.setDefinition(policyProto.getDefinition());
+ }
+ current.ensureDefault();
+
+ try {
+ if (!queryService.execute(updateSqlByUUID, current, policyEntityWriter)) {
+ throw new IllegalArgumentException("Failed to update policyProto");
+ }
+ } catch (SQLException e) {
+ LOGGER.error("Error to execute {}: {}", updateSqlByUUID, policyProto, e);
+ throw new IllegalArgumentException("SQL execution error: " + e.getMessage(), e);
+ }
+ return current;
+ }
+
+
+
+ @Override
+ public PolicyEntity create(PolicyEntity entity) {
+ Preconditions.checkNotNull(entity, "PolicyEntity should not be null");
+ entity.ensureDefault();
+ try {
+ int retCode = queryService.insert(insertSql, Collections.singletonList(entity), policyEntityWriter);
+ if (retCode > 0) {
+ return entity;
+ } else {
+ throw new SQLException("Insertion result: " + retCode);
+ }
+ } catch (SQLException e) {
+ LOGGER.error("Error to insert entity {} (entity: {}): {}", insertSql, entity.toString(), e.getMessage(), e);
+ throw new IllegalArgumentException("SQL execution error:" + e.getMessage(), e);
+ }
+ }
+
+ private ThrowableFunction<ResultSet, PolicyEntity, SQLException> policyEntityMapper = resultSet -> {
+ PolicyEntity entity = new PolicyEntity();
+ entity.setName(resultSet.getString("name"));
+ entity.setUuid(resultSet.getString("uuid"));
+ String policyStr = resultSet.getString("definition");
+ if (policyStr != null) {
+ try {
+ PolicyDefinition policyDefinition = OBJECT_MAPPER.readValue(policyStr, PolicyDefinition.class);
+ entity.setDefinition(policyDefinition);
+ } catch (Exception e) {
+ throw new SQLException("Error to deserialize JSON as {}", PolicyDefinition.class.getCanonicalName(), e);
+ }
+ }
+ String list = resultSet.getString("alertPublisherIds");
+ if (list != null) {
+ try {
+ List<String> alertPublisherIds = OBJECT_MAPPER.readValue(list, List.class);
+ entity.setAlertPublishmentIds(alertPublisherIds);
+ } catch (Exception e) {
+ throw new SQLException("Error to deserialize JSON as AlertPublisherIds list", e);
+ }
+ }
+ entity.setCreatedTime(resultSet.getLong("createdtime"));
+ entity.setModifiedTime(resultSet.getLong("modifiedtime"));
+ return entity;
+ };
+
+ private ThrowableConsumer2<PreparedStatement, PolicyEntity, SQLException> policyEntityWriter = (statement, policyEntity) -> {
+ policyEntity.ensureDefault();
+
+ statement.setString(1, policyEntity.getName());
+ if (policyEntity.getDefinition() != null) {
+ try {
+ statement.setString(2, OBJECT_MAPPER.writeValueAsString(policyEntity.getDefinition()));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+ if (policyEntity.getAlertPublishmentIds() != null) {
+ try {
+ statement.setString(3, OBJECT_MAPPER.writeValueAsString(policyEntity.getAlertPublishmentIds()));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+ statement.setLong(4, policyEntity.getCreatedTime());
+ statement.setLong(5, policyEntity.getModifiedTime());
+ statement.setString(6, policyEntity.getUuid());
+ };
+
+
+}
+
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/java/org/apache/eagle/metadata/store/jdbc/PolicyEntityServiceJDBCImplTest.java b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/java/org/apache/eagle/metadata/store/jdbc/PolicyEntityServiceJDBCImplTest.java
new file mode 100644
index 0000000..d68635e
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/java/org/apache/eagle/metadata/store/jdbc/PolicyEntityServiceJDBCImplTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.store.jdbc;
+
+import com.google.inject.Inject;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.metadata.model.PolicyEntity;
+import org.apache.eagle.metadata.service.PolicyEntityService;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+public class PolicyEntityServiceJDBCImplTest extends JDBCMetadataTestBase {
+
+ @Inject
+ private PolicyEntityService policyEntityService;
+
+ @Test
+ public void test() {
+ // define a prototype policy without site info
+ PolicyDefinition policyDefinition = new PolicyDefinition();
+ policyDefinition.setName("policy1");
+ PolicyDefinition.Definition definition = new PolicyDefinition.Definition("siddhi",
+ "from STREAM select * insert into out");
+ policyDefinition.setDefinition(definition);
+ policyDefinition.setInputStreams(Arrays.asList("STREAM"));
+ policyDefinition.setOutputStreams(Arrays.asList("out"));
+ // define publisher list
+ List<String> alertPublisherIds = Arrays.asList("slack");
+
+ PolicyEntity policyEntity = new PolicyEntity();
+ policyEntity.setDefinition(policyDefinition);
+ policyEntity.setAlertPublishmentIds(alertPublisherIds);
+ PolicyEntity res = policyEntityService.createOrUpdatePolicyProto(policyEntity);
+ Assert.assertTrue(res != null);
+ Assert.assertTrue(res.getDefinition().equals(policyDefinition));
+ Assert.assertTrue(CollectionUtils.isEqualCollection(res.getAlertPublishmentIds(), alertPublisherIds));
+
+ Collection<PolicyEntity> policies = policyEntityService.getAllPolicyProto();
+ Assert.assertTrue(policies.size() == 1);
+
+ PolicyEntity entity = policyEntityService.getPolicyProtoByUUID(policies.iterator().next().getUuid());
+ Assert.assertTrue(entity.equals(policies.iterator().next()));
+
+ // test update
+ entity.getDefinition().setName("policy2");
+ PolicyEntity updatedEntity = policyEntityService.update(entity);
+ Assert.assertTrue(updatedEntity.getDefinition().getName().equals("policy2"));
+
+
+ // test delete
+ //Assert.assertTrue(policyEntityService.deletePolicyProtoByUUID(entity.getUuid()));
+ policyEntityService.deletePolicyProtoByUUID(entity.getUuid());
+ Assert.assertTrue(policyEntityService.getAllPolicyProto().size() == 0);
+ }
+
+}
diff --git a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/resources/init.sql b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/resources/init.sql
index 1168473..bcf6f61 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/resources/init.sql
+++ b/eagle-core/eagle-metadata/eagle-metadata-jdbc/src/test/resources/init.sql
@@ -53,4 +53,15 @@
PRIMARY KEY (`uuid`),
UNIQUE INDEX `uuid_UNIQUE` (`uuid` ASC),
UNIQUE INDEX `name_UNIQUE` (`name` ASC))
-COMMENT = 'eagle dashboard metadata';
\ No newline at end of file
+COMMENT = 'eagle dashboard metadata';
+
+CREATE TABLE IF NOT EXISTS `policy_prototype` (
+ `uuid` VARCHAR(50) NOT NULL,
+ `name` VARCHAR(200) NOT NULL,
+ `definition` longtext NOT NULL,
+ `alertPublisherIds` VARCHAR(500) NULL,
+ `modifiedtime` BIGINT(20) NOT NULL,
+ `createdtime` BIGINT(20) NOT NULL,
+ PRIMARY KEY (`uuid`),
+ UNIQUE INDEX `policy_proto_UNIQUE` (`name` ASC))
+COMMENT = 'eagle policy prototype metadata';
\ No newline at end of file