blob: 5d7eeb17b046c580190cb894c0630bb2c6b97eed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.coordinator.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.internal.Topology;
import org.apache.eagle.alert.coordinator.IScheduleContext;
import org.apache.eagle.alert.coordinator.ValidateState;
import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.Publishment;
import org.apache.eagle.alert.engine.coordinator.StreamColumn;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.service.IMetadataServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.SiddhiManager;
import java.util.*;
import java.util.stream.Collectors;
/**
* Created on 10/1/16.
*/
public class MetadataValdiator {
private static final Logger LOG = LoggerFactory.getLogger(MetadataValdiator.class);
private static final Map<StreamColumn.Type, String> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
static {
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING, "STRING");
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT, "INT");
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG, "LONG");
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT, "FLOAT");
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE, "DOUBLE");
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL, "BOOL");
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT, "OBJECT");
}
private IScheduleContext context;
private final ValidateState state;
public MetadataValdiator(IMetadataServiceClient client) {
List<Topology> topologies = client.listTopologies();
List<Kafka2TupleMetadata> datasources = client.listDataSources();
List<StreamDefinition> streams = client.listStreams();
// filter out disabled policies
List<PolicyDefinition> enabledPolicies = client.listPolicies();
List<Publishment> publishments = client.listPublishment();
context = new InMemScheduleConext(ScheduleContextBuilder.listToMap(topologies), new HashMap<>(),
ScheduleContextBuilder.listToMap(datasources),
ScheduleContextBuilder.listToMap(enabledPolicies),
ScheduleContextBuilder.listToMap(publishments),
ScheduleContextBuilder.listToMap(streams), new HashMap<>(), new HashMap<>());
this.state = new ValidateState();
}
public MetadataValdiator(IScheduleContext context) {
this.context = context;
this.state = new ValidateState();
}
public ValidateState validate() {
validateTopology();
validateDataSources();
validateStreams();
validatePolicies();
validatePublishments();
return state;
}
private void validatePolicies() {
Collection<Publishment> pubs = context.getPublishments().values();
for (PolicyDefinition pd : context.getPolicies().values()) {
if (!pubs.stream().anyMatch(p -> p.getPolicyIds().contains(pd.getName()))) {
state.appendUnPublishedPolicies(pd.getName());
}
boolean isStreamMiss = false;
StringBuilder builder = new StringBuilder();
for (String inputStream : pd.getInputStreams()) {
if (context.getStreamSchemas().get(inputStream) == null) {
state.appendPublishemtnValidation(pd.getName(), String.format("policy %s contains unknown stream %s!", pd.getName(), inputStream));
isStreamMiss = true;
break;
}
builder.append(buildStreamDefinition(context.getStreamSchemas().get(inputStream)));
builder.append("\n");
}
if (isStreamMiss) {
continue;
}
builder.append(pd.getDefinition().getValue());
// now evaluate
try {
SiddhiManager sm = new SiddhiManager();
sm.createExecutionPlanRuntime(builder.toString());
} catch (Exception e) {
LOG.error(String.format("siddhi creation failed! %s ", builder.toString()), e);
state.appendPolicyValidation(pd.getName(), e.getMessage());
}
}
}
private String buildStreamDefinition(StreamDefinition streamDefinition) {
List<String> columns = new ArrayList<>();
if (streamDefinition.getColumns() != null) {
for (StreamColumn column : streamDefinition.getColumns()) {
columns.add(String.format("%s %s", column.getName(), _EAGLE_SIDDHI_TYPE_MAPPING.get(column.getType().toString().toLowerCase())));
}
} else {
LOG.warn("No columns found for stream {}" + streamDefinition.getStreamId());
}
return String.format("define stream %s( %s );", streamDefinition.getStreamId(), StringUtils.join(columns, ","));
}
private void validatePublishments() {
Collection<PolicyDefinition> definitions = context.getPolicies().values();
for (Publishment p : context.getPublishments().values()) {
//TODO: check type; check serializer types; check dedup fields existence; check extend deduplicator...
Set<String> unknown = p.getPolicyIds().stream().filter(pid -> definitions.stream().anyMatch(pd -> pd.getName().equals(pid))).collect(Collectors.toSet());
if (unknown.size() > 0) {
state.appendPublishemtnValidation(p.getName(), String.format("publishment %s reference unknown/uneabled policy %s!", p.getName(), unknown));
}
}
}
private void validateStreams() {
Collection<Kafka2TupleMetadata> datasources = context.getDataSourceMetadata().values();
Collection<PolicyDefinition> definitions = context.getPolicies().values();
for (StreamDefinition sd : context.getStreamSchemas().values()) {
if (!datasources.stream().anyMatch(d -> d.getName().equals(sd.getDataSource()))) {
state.appendStreamValidation(sd.getStreamId(), String.format("stream %s reference unknown data source %s !", sd.getStreamId(), sd.getDataSource()));
}
if (!definitions.stream().anyMatch(p -> p.getInputStreams().contains(sd.getStreamId()))) {
state.appendUnusedStreams(sd.getStreamId());
}
// more on columns
if (sd.getColumns() == null || sd.getColumns().size() == 0) {
state.appendStreamValidation(sd.getStreamId(), String.format("stream %s have empty columns!", sd.getStreamId()));
}
}
}
private void validateDataSources() {
Collection<StreamDefinition> sds = context.getStreamSchemas().values();
for (Kafka2TupleMetadata ds : context.getDataSourceMetadata().values()) {
// simply do a O(^2) loop
if (!sds.stream().anyMatch(t -> t.getDataSource().equals(ds.getName()))) {
state.appendUnusedDatasource(ds.getName());
}
if (!"KAFKA".equalsIgnoreCase(ds.getType())) {
state.appendDataSourceValidation(ds.getName(), String.format(" unsupported data source type %s !", ds.getType()));
}
//scheme
// String schemeCls = ds.getSchemeCls();
// try {
// Object scheme = Class.forName(schemeCls).getConstructor(String.class, Map.class).newInstance(ds.getTopic(), new HashMap<>());// coul only mock empty map
// if (!(scheme instanceof MultiScheme || scheme instanceof Scheme)) {
// throw new IllegalArgumentException(" scheme class not subclass of Scheme or MultiScheme !");
// }
// } catch (Exception e) {
// state.appendDataSourceValidation(ds.getName(), String.format("schemeCls %s expected to be qualified sub class name of %s or %s with given constructor signature!"
// +"Message: %s !",
// schemeCls, Scheme.class.getCanonicalName(), MultiScheme.class.getCanonicalName(), e.getMessage()));
// }
// codec
if (ds.getCodec() == null) {
state.appendDataSourceValidation(ds.getName(), String.format("codec of datasource must *not* be null!"));
continue;
}
// String selectCls = ds.getCodec().getStreamNameSelectorCls();
// try {
// StreamNameSelector cachedSelector = (StreamNameSelector) Class.forName(selectCls).getConstructor(Properties.class)
// .newInstance(ds.getCodec().getStreamNameSelectorProp());
// } catch (Exception e) {
// state.appendDataSourceValidation(ds.getName(), String.format("streamNameSelectorCls %s expected to be subclass of %s and with given constructor signature! Message: %s !",
// selectCls, StreamNameSelector.class.getCanonicalName(), e.getMessage()));
// }
}
}
private void validateTopology() {
for (Topology t : context.getTopologies().values()) {
}
}
}