blob: f760938ac14953fac8a00d15799214bc4bdd036e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.siddhi.operator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.util.Preconditions;
import org.wso2.siddhi.core.SiddhiManager;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* SiddhiCEP Operator Context Metadata including input/output stream (streamId, TypeInformation) as well execution plan query,
* and execution environment context like TimeCharacteristic and ExecutionConfig.
*/
public class SiddhiOperatorContext implements Serializable {
private ExecutionConfig executionConfig;
private Map<String, SiddhiStreamSchema<?>> inputStreamSchemas;
private final Map<String, Class<?>> siddhiExtensions;
private String outputStreamId;
private TypeInformation outputStreamType;
private TimeCharacteristic timeCharacteristic;
private String name;
private String executionPlan;
public SiddhiOperatorContext() {
inputStreamSchemas = new HashMap<>();
siddhiExtensions = new HashMap<>();
}
/**
* @param extensions siddhi extensions to register
*/
public void setExtensions(Map<String, Class<?>> extensions) {
Preconditions.checkNotNull(extensions,"extensions");
siddhiExtensions.putAll(extensions);
}
/**
* @return registered siddhi extensions
*/
public Map<String, Class<?>> getExtensions() {
return siddhiExtensions;
}
/**
* @return Siddhi Stream Operator Name in format of "Siddhi: execution query ... (query length)"
*/
public String getName() {
if (this.name == null) {
if (executionPlan.length() > 100) {
return String.format("Siddhi: %s ... (%s)", executionPlan.substring(0, 100), executionPlan.length() - 100);
} else {
return String.format("Siddhi: %s", executionPlan);
}
} else {
return this.name;
}
}
/**
* @return Source siddhi stream IDs
*/
public List<String> getInputStreams() {
Object[] keys = this.inputStreamSchemas.keySet().toArray();
List<String> result = new ArrayList<>(keys.length);
for (Object key : keys) {
result.add((String) key);
}
return result;
}
/**
* @return Siddhi CEP cql-like execution plan
*/
public String getExecutionPlan() {
return executionPlan;
}
/**
* Stream definition + execution expression
*/
public String getFinalExecutionPlan() {
Preconditions.checkNotNull(executionPlan, "Execution plan is not set");
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, SiddhiStreamSchema<?>> entry : inputStreamSchemas.entrySet()) {
sb.append(entry.getValue().getStreamDefinitionExpression(entry.getKey()));
}
sb.append(this.getExecutionPlan());
return sb.toString();
}
/**
* @return Siddhi Stream Operator output type information
*/
public TypeInformation getOutputStreamType() {
return outputStreamType;
}
/**
* @return Siddhi output streamId for callback
*/
public String getOutputStreamId() {
return outputStreamId;
}
/**
* @param inputStreamId Siddhi streamId
* @return StreamSchema for given siddhi streamId
*
* @throws UndefinedStreamException throws if stream is not defined
*/
@SuppressWarnings("unchecked")
public <IN> StreamSchema<IN> getInputStreamSchema(String inputStreamId) {
Preconditions.checkNotNull(inputStreamId,"inputStreamId");
if (!inputStreamSchemas.containsKey(inputStreamId)) {
throw new UndefinedStreamException("Input stream: " + inputStreamId + " is not found");
}
return (StreamSchema<IN>) inputStreamSchemas.get(inputStreamId);
}
/**
* @param outputStreamId Siddhi output streamId, which must exist in siddhi execution plan
*/
public void setOutputStreamId(String outputStreamId) {
Preconditions.checkNotNull(outputStreamId,"outputStreamId");
this.outputStreamId = outputStreamId;
}
/**
* @param outputStreamType Output stream TypeInformation
*/
public void setOutputStreamType(TypeInformation outputStreamType) {
Preconditions.checkNotNull(outputStreamType,"outputStreamType");
this.outputStreamType = outputStreamType;
}
/**
* @return Returns execution environment TimeCharacteristic
*/
public TimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}
public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
Preconditions.checkNotNull(timeCharacteristic,"timeCharacteristic");
this.timeCharacteristic = timeCharacteristic;
}
/**
* @param executionPlan Siddhi SQL-Like exeuction plan query
*/
public void setExecutionPlan(String executionPlan) {
Preconditions.checkNotNull(executionPlan,"executionPlan");
this.executionPlan = executionPlan;
}
/**
* @return Returns input stream ID and schema mapping
*/
public Map<String, SiddhiStreamSchema<?>> getInputStreamSchemas() {
return inputStreamSchemas;
}
/**
* @param inputStreamSchemas input stream ID and schema mapping
*/
public void setInputStreamSchemas(Map<String, SiddhiStreamSchema<?>> inputStreamSchemas) {
Preconditions.checkNotNull(inputStreamSchemas,"inputStreamSchemas");
this.inputStreamSchemas = inputStreamSchemas;
}
public void setName(String name) {
Preconditions.checkNotNull(name,"name");
this.name = name;
}
/**
* @return Created new SiddhiManager instance with registered siddhi extensions
*/
public SiddhiManager createSiddhiManager() {
SiddhiManager siddhiManager = new SiddhiManager();
for (Map.Entry<String, Class<?>> entry : getExtensions().entrySet()) {
siddhiManager.setExtension(entry.getKey(), entry.getValue());
}
return siddhiManager;
}
/**
* @return StreamExecutionEnvironment ExecutionConfig
*/
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}
/**
* @param executionConfig StreamExecutionEnvironment ExecutionConfig
*/
public void setExecutionConfig(ExecutionConfig executionConfig) {
Preconditions.checkNotNull(executionConfig,"executionConfig");
this.executionConfig = executionConfig;
}
}