blob: f514a485e83e7a60a87874bbb5323f9b1961deb6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.io.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.dstream.*;
import scala.collection.Seq;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction2;
/** Evaluates physical plans in Apache Spark stream mode */
public class SparkStreaming extends SparkEvaluator {
private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
private static ArrayList<JavaInputDStream<MRData>> streams;
private static ArrayList<String> stream_names;
private static JavaInputDStream<MRData> stream_source ( Tree source, Environment env ) {
match source {
case BinaryStream(`file,_):
String path = ((MR_string)evalE(file,env)).get();
new BinaryDataSource(path,Plan.conf);
return new SparkFileInputStream(stream_context,path,true);
case ParsedStream(`parser,`file,...args):
String path = ((MR_string)evalE(file,env)).get();
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
new ParsedDataSource(path,p,args,Plan.conf);
try {
dump_source_dir();
} catch (IOException ex) {
throw new Error("Cannot dump source directory");
};
return new SparkFileInputStream(stream_context,path,false);
};
throw new Error("Unknown stream source: "+print_query(source));
}
private static void stream_processing ( final Tree plan, final Environment env, final DataSetFunction f ) {
final Evaluator ef = Evaluator.evaluator;
match plan {
case Stream(lambda(`v,`b),`source):
streams.add(stream_source(source,env));
stream_names.add(v.toString());
stream_processing(b,env,f);
case _:
ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
for ( JavaDStream<MRData> jd: streams )
rdds.add(jd.dstream());
final ArrayList<String> vars = stream_names;
final AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>> fnc =
new AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>>() {
public RDD<MRData> apply ( Seq<RDD<?>> rdds, Time tm ) {
Environment new_env = env;
int i = 0;
for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
try {
if (rdd.count() == 0)
rdd = SparkEvaluator.spark_context.emptyRDD().rdd();
} catch (Exception ex) {
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
};
new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
};
f.eval(ef.eval(plan,new_env,"-"));
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
}
};
new TransformedDStream<MRData>(JavaConversions.asScalaBuffer(rdds).toSeq(),fnc,classtag).register();
}
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
final public static void evaluate ( Tree plan, Environment env, DataSetFunction f ) {
streams = new ArrayList<JavaInputDStream<MRData>>();
stream_names = new ArrayList<String>();
stream_processing(plan,env,f);
stream_context.start();
stream_context.awaitTermination();
}
}