blob: 2a3a04c77328a559a44f1a53e2b2ef0eceb9a663 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.siddhi.schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.util.Preconditions;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import java.util.ArrayList;
import java.util.List;
/**
* Siddhi specific Stream Schema.
*
* @param <T> Siddhi stream element type
*/
public class SiddhiStreamSchema<T> extends StreamSchema<T> {
private static final String DEFINE_STREAM_TEMPLATE = "define stream %s (%s);";
public SiddhiStreamSchema(TypeInformation<T> typeInfo, String... fieldNames) {
super(typeInfo, fieldNames);
}
public SiddhiStreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) {
super(typeInfo, fieldIndexes, fieldNames);
}
public StreamDefinition getStreamDefinition(String streamId) {
StreamDefinition streamDefinition = StreamDefinition.id(streamId);
for (int i = 0; i < getFieldNames().length; i++) {
streamDefinition.attribute(getFieldNames()[i], SiddhiTypeFactory.getAttributeType(getFieldTypes()[i]));
}
return streamDefinition;
}
public String getStreamDefinitionExpression(StreamDefinition streamDefinition) {
List<String> columns = new ArrayList<>();
Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
for (Attribute attribute : streamDefinition.getAttributeList()) {
columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
}
return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
}
public String getStreamDefinitionExpression(String streamId) {
StreamDefinition streamDefinition = getStreamDefinition(streamId);
List<String> columns = new ArrayList<>();
Preconditions.checkNotNull(streamDefinition, "StreamDefinition is null");
for (Attribute attribute : streamDefinition.getAttributeList()) {
columns.add(String.format("%s %s", attribute.getName(), attribute.getType().toString().toLowerCase()));
}
return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getId(), StringUtils.join(columns, ","));
}
}