blob: 5c54ad8d838390181db55d1c552e2f5b5f7fbbcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.siddhi.operator;
import java.io.IOException;
import java.util.PriorityQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
*/
public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
super(siddhiPlan);
}
@Override
protected StreamElementSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
TypeInformation<Tuple2<String, IN>> tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation((TypeInformation<IN>) streamSchema.getTypeInfo());
return new StreamElementSerializer<>(tuple2TypeInformation.createSerializer(executionConfig));
}
@Override
protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException {
send(value.f0, getSiddhiPlan().getInputStreamSchema(value.f0).getStreamSerializer().getRow(value.f1), timestamp);
}
@Override
public String getStreamId(Tuple2<String, IN> record) {
return record.f0;
}
@Override
protected void snapshotQueueState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException {
dataOutputView.writeInt(queue.size());
for (StreamRecord<Tuple2<String, IN>> record : queue) {
String streamId = record.getValue().f0;
dataOutputView.writeUTF(streamId);
this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView);
}
}
@Override
protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
int sizeOfQueue = dataInputView.readInt();
PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
for (int i = 0; i < sizeOfQueue; i++) {
String streamId = dataInputView.readUTF();
StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
}
return priorityQueue;
}
}