| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.siddhi.extension; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import org.wso2.siddhi.core.config.SiddhiAppContext; |
| import org.wso2.siddhi.core.exception.SiddhiAppCreationException; |
| import org.wso2.siddhi.core.executor.ExpressionExecutor; |
| import org.wso2.siddhi.core.executor.function.FunctionExecutor; |
| import org.wso2.siddhi.core.util.config.ConfigReader; |
| import org.wso2.siddhi.query.api.definition.Attribute; |
| |
| public class CustomPlusFunctionExtension extends FunctionExecutor { |
| private Attribute.Type returnType; |
| |
| /** |
| * The initialization method for FunctionExecutor, this method will be called before the other methods |
| */ |
| @Override |
| protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) { |
| for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) { |
| Attribute.Type attributeType = expressionExecutor.getReturnType(); |
| if (attributeType == Attribute.Type.DOUBLE) { |
| returnType = attributeType; |
| |
| } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) { |
| throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool"); |
| } else { |
| returnType = Attribute.Type.LONG; |
| } |
| } |
| } |
| |
| /** |
| * The main execution method which will be called upon event arrival |
| * when there are more then one function parameter |
| * |
| * @param data the runtime values of function parameters |
| * @return the function result |
| */ |
| @Override |
| protected Object execute(Object[] data) { |
| if (returnType == Attribute.Type.DOUBLE) { |
| double total = 0; |
| for (Object aObj : data) { |
| total += Double.parseDouble(String.valueOf(aObj)); |
| } |
| |
| return total; |
| } else { |
| long total = 0; |
| for (Object aObj : data) { |
| total += Long.parseLong(String.valueOf(aObj)); |
| } |
| return total; |
| } |
| } |
| |
| /** |
| * The main execution method which will be called upon event arrival |
| * when there are zero or one function parameter |
| * |
| * @param data null if the function parameter count is zero or |
| * runtime data value of the function parameter |
| * @return the function result |
| */ |
| @Override |
| protected Object execute(Object data) { |
| if (returnType == Attribute.Type.DOUBLE) { |
| return Double.parseDouble(String.valueOf(data)); |
| } else { |
| return Long.parseLong(String.valueOf(data)); |
| } |
| } |
| |
| @Override |
| public Attribute.Type getReturnType() { |
| return returnType; |
| } |
| |
| @Override |
| public Map<String, Object> currentState() { |
| return new HashMap<>(); |
| } |
| |
| @Override |
| public void restoreState(Map<String, Object> map) { |
| |
| } |
| } |