| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.io.IOException; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.common.functions.FlatMapFunction; |
| import org.apache.flink.api.common.functions.GroupReduceFunction; |
| import org.apache.flink.api.common.io.FileInputFormat; |
| import org.apache.flink.util.Collector; |
| import org.apache.flink.streaming.api.datastream.*; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction; |
| import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; |
| import org.apache.flink.core.fs.FileInputSplit; |
| import org.apache.hadoop.fs.*; |
| import org.apache.hadoop.io.*; |
| |
| |
| /** Evaluates physical plans in Apache Flink stream mode */ |
| public class FlinkStreaming extends FlinkEvaluator { |
| private static ArrayList<DataStream<FData>> streams; |
| private static ArrayList<String> stream_names; |
| |
| public static final class MRDataFileSourceFunction implements SourceFunction<FData> { |
| private static final long serialVersionUID = 1L; |
| final private FData container = new FData(); |
| private HashMap<String,Long> file_modification_times; |
| |
| final private String directory; |
| final private long time_window; |
| final private FileInputFormat<FData> input_format; |
| private boolean is_running; |
| |
| public MRDataFileSourceFunction ( String directory, long time_window, |
| FileInputFormat<FData> input_format ) { |
| this.directory = directory; |
| this.time_window = time_window; |
| this.input_format = input_format; |
| this.is_running = false; |
| } |
| |
| /** return the files within the directory that have been created within the last time window */ |
| private ArrayList<String> new_files () { |
| try { |
| long ct = System.currentTimeMillis(); |
| Path dpath = new Path(directory); |
| final FileSystem fs = dpath.getFileSystem(Plan.conf); |
| final FileStatus[] ds |
| = fs.listStatus(dpath, |
| new PathFilter () { |
| public boolean accept ( Path path ) { |
| return !path.getName().startsWith("_") |
| && !path.getName().endsWith(".type"); |
| } |
| }); |
| ArrayList<String> s = new ArrayList<String>(); |
| for ( FileStatus d: ds ) { |
| String name = d.getPath().toString(); |
| if (file_modification_times.get(name) == null |
| || d.getModificationTime() > file_modification_times.get(name)) { |
| file_modification_times.put(name,new Long(ct)); |
| s.add(name); |
| } |
| }; |
| return s; |
| } catch (Exception ex) { |
| throw new Error("Cannot open a new file from the directory "+directory+": "+ex); |
| } |
| } |
| |
| @Override |
| public void run ( SourceContext<FData> context ) throws Exception { |
| long tick = System.currentTimeMillis(); |
| file_modification_times = new HashMap<String,Long>(); |
| while (is_running) { |
| for ( String path: new_files() ) { |
| input_format.setFilePath(path); |
| FileInputSplit[] splits = input_format.createInputSplits(1); |
| for ( FileInputSplit split: splits ) { |
| input_format.open(split); |
| while (!input_format.reachedEnd()) { |
| FData fd = input_format.nextRecord(container); |
| if (fd != null) |
| context.collect(fd); |
| } |
| }; |
| input_format.close(); |
| }; |
| try { |
| long ct = System.currentTimeMillis(); |
| tick += time_window; |
| if (tick > ct) |
| Thread.sleep(tick-ct); |
| else tick = ct; |
| } catch (Exception ex) { |
| return; |
| } |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| is_running = false; |
| } |
| } |
| |
| private static DataStream<FData> stream_source ( Tree source, Environment env ) { |
| match source { |
| case BinaryStream(`file,_): |
| String path = absolute_path(((MR_string)evalE(file,null)).get()); |
| new BinaryDataSource(path,Plan.conf); |
| return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window, |
| new FlinkBinaryInputFormat.FDataInputFormat())); |
| case ParsedStream(`parser,`file,...args): |
| String path = absolute_path(((MR_string)evalE(file,null)).get()); |
| Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString()); |
| if (parser_class == null) |
| throw new Error("Unknown parser: "+parser); |
| new FlinkParsedDataSource(path,parser_class,args); |
| return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window, |
| new FlinkParsedInputFormat.ParsedInputFormat(path))); |
| }; |
| throw new Error("Unknown stream source: "+print_query(source)); |
| } |
| |
| private static void stream_processing ( final Tree plan, final Environment env, |
| final Function f, final DataStream<FData> stream ) { |
| final Evaluator ef = Evaluator.evaluator; |
| match plan { |
| case Stream(lambda(`v,`b),`source): |
| DataStream<FData> streamSource = stream_source(source,env); |
| streams.add(streamSource); |
| stream_names.add(v.toString()); |
| stream_processing(b,env,f,streamSource); |
| case _: |
| /** Incomplete: need a function from List<DataSet> to DataSet (or void) on each Flink window |
| stream.window().reduceGroup(new GroupReduceFunction<FData,FData>() { |
| public void reduce ( Iterable<FData> values, Collector<FData> out ) throws Exception { |
| f.eval(ef.eval(plan,env,"-")); |
| } |
| }); |
| */ |
| } |
| } |
| |
| /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */ |
| final public static void evaluate ( Tree plan, final Environment env, final Function f ) { |
| try { |
| if (true) // needs more work |
| throw new Error("MRQL Streaming is not supported in this evaluation mode yet"); |
| streams = new ArrayList<DataStream<FData>>(); |
| stream_names = new ArrayList<String>(); |
| stream_processing(plan,env,f,null); |
| stream_env.execute(); |
| } catch (Exception ex) { |
| throw new Error("Error during Flink stream processing: "+ex); |
| } |
| } |
| } |