blob: 617b4f0151bb017b597791862ccbb377ed656ffc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.service.metadata.resource;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.ScheduleState;
import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.engine.coordinator.*;
import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
import org.apache.eagle.alert.engine.interpreter.PolicyParseResult;
import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
import org.apache.eagle.alert.engine.model.AlertPublishEvent;
import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
import org.apache.eagle.alert.metadata.resource.Models;
import org.apache.eagle.alert.metadata.resource.OpResult;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import javax.validation.Valid;
import javax.ws.rs.*;
/**
* @since Apr 11, 2016.
*/
@Path("/metadata")
@Produces("application/json")
@Consumes("application/json")
public class MetadataResource {
private static final Logger LOG = LoggerFactory.getLogger(MetadataResource.class);
// private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
private final IMetadataDao dao;
public MetadataResource() {
this.dao = MetadataDaoFactory.getInstance().getMetadataDao();
}
@Inject
public MetadataResource(IMetadataDao dao) {
this.dao = dao;
}
@Path("/clusters")
@GET
public List<StreamingCluster> listClusters() {
return dao.listClusters();
}
@Path("/clear")
@POST
public OpResult clear() {
return dao.clear();
}
@Path("/clear/schedulestates")
@POST
public OpResult clearScheduleStates(int capacity) {
return dao.clearScheduleState(capacity);
}
@Path("/export")
@POST
public Models export() {
return dao.export();
}
@Path("/import")
@POST
public OpResult importModels(Models model) {
return dao.importModels(model);
}
@Path("/clusters")
@POST
public OpResult addCluster(StreamingCluster cluster) {
return dao.addCluster(cluster);
}
@Path("/clusters/batch")
@POST
public List<OpResult> addClusters(List<StreamingCluster> clusters) {
List<OpResult> results = new LinkedList<>();
for (StreamingCluster cluster : clusters) {
results.add(dao.addCluster(cluster));
}
return results;
}
@Path("/clusters/{clusterId}")
@DELETE
public OpResult removeCluster(@PathParam("clusterId") String clusterId) {
return dao.removeCluster(clusterId);
}
@Path("/clusters")
@DELETE
public List<OpResult> removeClusters(List<String> clusterIds) {
List<OpResult> results = new LinkedList<>();
for (String cluster : clusterIds) {
results.add(dao.removeCluster(cluster));
}
return results;
}
@Path("/streams")
@GET
public List<StreamDefinition> listStreams() {
return dao.listStreams();
}
@Path("/streams")
@POST
public OpResult createStream(StreamDefinition stream) {
return dao.createStream(stream);
}
@Path("/streams/batch")
@POST
public List<OpResult> addStreams(List<StreamDefinition> streams) {
List<OpResult> results = new LinkedList<>();
for (StreamDefinition stream : streams) {
results.add(dao.createStream(stream));
}
return results;
}
@Path("/streams/{streamId}")
@DELETE
public OpResult removeStream(@PathParam("streamId") String streamId) {
return dao.removeStream(streamId);
}
@Path("/streams")
@DELETE
public List<OpResult> removeStreams(List<String> streamIds) {
List<OpResult> results = new LinkedList<>();
for (String streamId : streamIds) {
results.add(dao.removeStream(streamId));
}
return results;
}
@Path("/datasources")
@GET
public List<Kafka2TupleMetadata> listDataSources() {
return dao.listDataSources();
}
@Path("/datasources")
@POST
public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
return dao.addDataSource(dataSource);
}
@Path("/datasources/batch")
@POST
public List<OpResult> addDataSources(List<Kafka2TupleMetadata> datasources) {
List<OpResult> results = new LinkedList<>();
for (Kafka2TupleMetadata ds : datasources) {
results.add(dao.addDataSource(ds));
}
return results;
}
@Path("/datasources/{datasourceId}")
@DELETE
public OpResult removeDataSource(@PathParam("datasourceId") String datasourceId) {
return dao.removeDataSource(datasourceId);
}
@Path("/datasources")
@DELETE
public List<OpResult> removeDataSources(List<String> datasourceIds) {
List<OpResult> results = new LinkedList<>();
for (String ds : datasourceIds) {
results.add(dao.removeDataSource(ds));
}
return results;
}
@Path("/policies")
@GET
public List<PolicyDefinition> listPolicies() {
return dao.listPolicies();
}
@Path("/policies")
@POST
public OpResult addPolicy(@Valid PolicyDefinition policy) {
PolicyValidationResult validationResult = this.validatePolicy(policy);
if (validationResult.isSuccess()) {
return dao.addPolicy(policy);
} else {
return OpResult.fail(validationResult.getMessage());
}
}
@Path("/policies/validate")
@POST
public PolicyValidationResult validatePolicy(PolicyDefinition policy) {
Map<String, StreamDefinition> allDefinitions = new HashMap<>();
for (StreamDefinition definition : dao.listStreams()) {
allDefinitions.put(definition.getStreamId(), definition);
}
return PolicyInterpreter.validate(policy, allDefinitions);
}
@Path("/policies/parse")
@POST
public PolicyParseResult parsePolicy(String policyDefinition) {
return PolicyInterpreter.parse(policyDefinition);
}
@Path("/policies/batch")
@POST
public List<OpResult> addPolicies(List<PolicyDefinition> policies) {
List<OpResult> results = new LinkedList<>();
for (PolicyDefinition policy : policies) {
results.add(dao.addPolicy(policy));
}
return results;
}
@Path("/policies/{policyId}")
@DELETE
public OpResult removePolicy(@PathParam("policyId") String policyId) {
return dao.removePolicy(policyId);
}
@Path("/policies/{policyId}/alerts")
@GET
public List<AlertPublishEvent> getAlertPublishEventByPolicyId(@PathParam("policyId") String policyId,
@QueryParam("size") int size) {
return dao.getAlertPublishEventsByPolicyId(policyId, size);
}
@Path("/policies/{policyId}/publishments")
@GET
public List<Publishment> getPolicyPublishments(@PathParam("policyId") String policyId) {
return dao.getPublishmentsByPolicyId(policyId);
}
@Path("/policies/{policyId}/publishments")
@POST
public OpResult addPublishmentsToPolicy(@PathParam("policyId") String policyId, List<String> publishmentIds) {
return dao.addPublishmentsToPolicy(policyId, publishmentIds);
}
@Path("/policies/{policyId}")
@GET
public PolicyDefinition getPolicyById(@PathParam("policyId") String policyId) {
Preconditions.checkNotNull(policyId, "policyId");
return dao.getPolicyById(policyId);
}
@Path("/policies/{policyId}/status/{status}")
@POST
public OpResult updatePolicyStatusByID(@PathParam("policyId") String policyId, @PathParam("status") PolicyDefinition.PolicyStatus status) {
OpResult result = new OpResult();
try {
PolicyDefinition policyDefinition = getPolicyById(policyId);
policyDefinition.setPolicyStatus(status);
OpResult updateResult = addPolicy(policyDefinition);
result.code = updateResult.code;
if (result.code == OpResult.SUCCESS) {
result.message = "Successfully updated status of " + policyId + " as " + status;
LOG.info(result.message);
} else {
result.message = updateResult.message;
LOG.error(result.message);
}
} catch (Exception e) {
LOG.error("Error: " + e.getMessage(),e);
result.code = OpResult.FAILURE;
result.message = e.getMessage();
}
return result;
}
@Path("/policies")
@DELETE
public List<OpResult> removePolicies(List<String> policies) {
List<OpResult> results = new LinkedList<>();
for (String policy : policies) {
results.add(dao.removePolicy(policy));
}
return results;
}
@Path("/publishments")
@GET
public List<Publishment> listPublishment() {
return dao.listPublishment();
}
@Path("/publishments")
@POST
public OpResult addPublishment(Publishment publishment) {
return dao.addPublishment(publishment);
}
@Path("/publishments/batch")
@POST
public List<OpResult> addPublishments(List<Publishment> publishments) {
List<OpResult> results = new LinkedList<>();
for (Publishment publishment : publishments) {
results.add(dao.addPublishment(publishment));
}
return results;
}
@Path("/publishments/{name}")
@DELETE
public OpResult removePublishment(@PathParam("name") String pubId) {
return dao.removePublishment(pubId);
}
@Path("/publishments")
@DELETE
public List<OpResult> removePublishments(List<String> pubIds) {
List<OpResult> results = new LinkedList<>();
for (String pub : pubIds) {
results.add(dao.removePublishment(pub));
}
return results;
}
@Path("/publishmentTypes")
@GET
public List<PublishmentType> listPublishmentType() {
return dao.listPublishmentType();
}
@Path("/publishmentTypes")
@POST
public OpResult addPublishmentType(PublishmentType publishmentType) {
return dao.addPublishmentType(publishmentType);
}
@Path("/publishmentTypes/batch")
@POST
public List<OpResult> addPublishmentTypes(List<PublishmentType> publishmentTypes) {
List<OpResult> results = new LinkedList<>();
for (PublishmentType pubType : publishmentTypes) {
results.add(dao.addPublishmentType(pubType));
}
return results;
}
@Path("/publishmentTypes/{name}")
@DELETE
public OpResult removePublishmentType(@PathParam("name") String name) {
return dao.removePublishmentType(name);
}
@Path("/publishmentTypes")
@DELETE
public List<OpResult> removePublishmentTypes(List<String> pubTypes) {
List<OpResult> results = new LinkedList<>();
for (String pubType : pubTypes) {
results.add(dao.removePublishmentType(pubType));
}
return results;
}
@Path("/schedulestates/{versionId}")
@GET
public ScheduleState listScheduleState(@PathParam("versionId") String versionId) {
return dao.getScheduleState(versionId);
}
@Path("/schedulestates")
@GET
public ScheduleState latestScheduleState() {
return dao.getScheduleState();
}
@Path("/schedulestates")
@POST
public OpResult addScheduleState(ScheduleState state) {
return dao.addScheduleState(state);
}
@Path("/assignments")
@GET
public List<PolicyAssignment> listAssignmenets() {
return dao.listAssignments();
}
@Path("/assignments")
@POST
public OpResult addAssignmenet(PolicyAssignment pa) {
return dao.addAssignment(pa);
}
@Path("/topologies")
@GET
public List<Topology> listTopologies() {
return dao.listTopologies();
}
@Path("/topologies")
@POST
public OpResult addTopology(Topology t) {
return dao.addTopology(t);
}
@Path("/topologies/batch")
@POST
public List<OpResult> addTopologies(List<Topology> topologies) {
List<OpResult> results = new LinkedList<>();
for (Topology t : topologies) {
results.add(dao.addTopology(t));
}
return results;
}
@Path("/alerts")
@POST
public OpResult addAlertPublishEvent(AlertPublishEvent event) {
return dao.addAlertPublishEvent(event);
}
@Path("/alerts/batch")
@POST
public List<OpResult> addAlertPublishEvents(List<AlertPublishEvent> events) {
List<OpResult> results = new LinkedList<>();
for (AlertPublishEvent e : events) {
results.add(dao.addAlertPublishEvent(e));
}
return results;
}
@Path("/alerts")
@GET
public List<AlertPublishEvent> listAlertPublishEvents(@QueryParam("size") int size) {
return dao.listAlertPublishEvent(size);
}
@Path("/alerts/{alertId}")
@GET
public AlertPublishEvent getAlertPublishEvent(@PathParam("alertId") String alertId) {
return dao.getAlertPublishEvent(alertId);
}
@Path("/topologies/{topologyName}")
@DELETE
public OpResult removeTopology(@PathParam("topologyName") String topologyName) {
return dao.removeTopology(topologyName);
}
@Path("/topologies")
@DELETE
public List<OpResult> removeTopologies(List<String> topologies) {
List<OpResult> results = new LinkedList<>();
for (String t : topologies) {
results.add(dao.removeTopology(t));
}
return results;
}
}