blob: 12ec3174e0c6ba82402f96a89c4ab37f20d6b2cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.io.IOException;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
/** Evaluates physical plans in Apache Flink stream mode */
public class FlinkStreaming extends FlinkEvaluator {
private static ArrayList<DataStream<FData>> streams;
private static ArrayList<String> stream_names;
public static final class MRDataFileSourceFunction implements SourceFunction<FData> {
private static final long serialVersionUID = 1L;
final private FData container = new FData();
private HashMap<String,Long> file_modification_times;
final private String directory;
final private long time_window;
final private FileInputFormat<FData> input_format;
private boolean is_running;
public MRDataFileSourceFunction ( String directory, long time_window,
FileInputFormat<FData> input_format ) {
this.directory = directory;
this.time_window = time_window;
this.input_format = input_format;
this.is_running = false;
}
/** return the files within the directory that have been created within the last time window */
private ArrayList<String> new_files () {
try {
long ct = System.currentTimeMillis();
Path dpath = new Path(directory);
final FileSystem fs = dpath.getFileSystem(Plan.conf);
final FileStatus[] ds
= fs.listStatus(dpath,
new PathFilter () {
public boolean accept ( Path path ) {
return !path.getName().startsWith("_")
&& !path.getName().endsWith(".type");
}
});
ArrayList<String> s = new ArrayList<String>();
for ( FileStatus d: ds ) {
String name = d.getPath().toString();
if (file_modification_times.get(name) == null
|| d.getModificationTime() > file_modification_times.get(name)) {
file_modification_times.put(name,new Long(ct));
s.add(name);
}
};
return s;
} catch (Exception ex) {
throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
}
}
@Override
public void run ( SourceContext<FData> context ) throws Exception {
long tick = System.currentTimeMillis();
file_modification_times = new HashMap<String,Long>();
while (is_running) {
for ( String path: new_files() ) {
input_format.setFilePath(path);
FileInputSplit[] splits = input_format.createInputSplits(1);
for ( FileInputSplit split: splits ) {
input_format.open(split);
while (!input_format.reachedEnd()) {
FData fd = input_format.nextRecord(container);
if (fd != null)
context.collect(fd);
}
};
input_format.close();
};
try {
long ct = System.currentTimeMillis();
tick += time_window;
if (tick > ct)
Thread.sleep(tick-ct);
else tick = ct;
} catch (Exception ex) {
return;
}
}
}
@Override
public void cancel() {
is_running = false;
}
}
private static DataStream<FData> stream_source ( Tree source, Environment env ) {
match source {
case BinaryStream(`file,_):
String path = absolute_path(((MR_string)evalE(file,null)).get());
new BinaryDataSource(path,Plan.conf);
return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window,
new FlinkBinaryInputFormat.FDataInputFormat()));
case ParsedStream(`parser,`file,...args):
String path = absolute_path(((MR_string)evalE(file,null)).get());
Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString());
if (parser_class == null)
throw new Error("Unknown parser: "+parser);
new FlinkParsedDataSource(path,parser_class,args);
return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window,
new FlinkParsedInputFormat.ParsedInputFormat(path)));
};
throw new Error("Unknown stream source: "+print_query(source));
}
private static void stream_processing ( final Tree plan, final Environment env,
final Function f, final DataStream<FData> stream ) {
final Evaluator ef = Evaluator.evaluator;
match plan {
case Stream(lambda(`v,`b),`source):
DataStream<FData> streamSource = stream_source(source,env);
streams.add(streamSource);
stream_names.add(v.toString());
stream_processing(b,env,f,streamSource);
case _:
/** Incomplete: need a function from List<DataSet> to DataSet (or void) on each Flink window
stream.window().reduceGroup(new GroupReduceFunction<FData,FData>() {
public void reduce ( Iterable<FData> values, Collector<FData> out ) throws Exception {
f.eval(ef.eval(plan,env,"-"));
}
});
*/
}
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
final public static void evaluate ( Tree plan, final Environment env, final Function f ) {
try {
if (true) // needs more work
throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
streams = new ArrayList<DataStream<FData>>();
stream_names = new ArrayList<String>();
stream_processing(plan,env,f,null);
stream_env.execute();
} catch (Exception ex) {
throw new Error("Error during Flink stream processing: "+ex);
}
}
}