blob: d0ec379e3bce210a02fae3c90e740149cfd9df25 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.io.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.dstream.*;
import scala.collection.Seq;
import scala.collection.JavaConversions;
import scala.runtime.AbstractFunction2;
/** Evaluates physical plans in Apache Spark stream mode */
public class SparkStreaming extends SparkEvaluator {
private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
private static ArrayList<JavaInputDStream<MRData>> streams;
private static ArrayList<String> stream_names;
private static int tries = 0;
private static JavaInputDStream<MRData> stream_source ( Tree source, Environment env ) {
match source {
case BinaryStream(`file,_):
String path = ((MR_string)evalE(file,env)).get();
new BinaryDataSource(path,Plan.conf);
return new SparkFileInputStream(stream_context,path,true);
case ParsedStream(`parser,`file,...args):
String path = ((MR_string)evalE(file,env)).get();
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
new ParsedDataSource(path,p,args,Plan.conf);
try {
dump_source_dir();
} catch (IOException ex) {
throw new Error("Cannot dump source directory");
};
return new SparkFileInputStream(stream_context,path,false);
case SocketStream(`parser,`hostname,`port,...args):
String h = ((MR_string)evalE(hostname,env)).get();
int p = ((MR_int)evalE(port,env)).get();
return new SparkSocketStream(stream_context,h,p,parser.toString(),args);
};
throw new Error("Unknown stream source: "+print_query(source));
}
private static Tree get_streams ( Tree plan, Environment env ) {
match plan {
case Stream(lambda(`v,`b),`source):
streams.add(stream_source(source,env));
stream_names.add(v.toString());
return get_streams(b,env);
};
return plan;
}
private final static Evaluator ef = Evaluator.evaluator;
/** bind the pattern variables to values */
private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
match pattern {
case tuple(...ps):
int i = 0;
match src {
case tuple(...es):
for ( Tree p: ps )
env = bind_list(p,es.nth(i++),env);
return env;
}
};
MRData value = ef.evalE(src,env);
env.replace(pattern.toString(),value);
if (repeat_variables.member(pattern))
Interpreter.new_distributed_binding(pattern.toString(),value);
return env;
}
private static void stream_processing ( final Tree plan, final Environment env, final Function f ) {
if (streams.size() == 0)
throw new Error("No input streams in query");
ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
for ( JavaDStream<MRData> jd: streams )
rdds.add(jd.dstream());
final ArrayList<String> vars = stream_names;
final AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>> fnc =
new AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>>() {
public RDD<MRData> apply ( Seq<RDD<?>> rdds, Time tm ) {
long t = System.currentTimeMillis();
Environment new_env = env;
int i = 0;
for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
try {
if (rdd.count() == 0)
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
} catch (Exception ex) {
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
};
new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
};
match plan {
case lambda(`pat,`e):
new_env = bind_list(pat,e,new_env);
f.eval(null);
case _: // non-incremental streaming
f.eval(ef.evalE(plan,new_env));
};
if (!Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
if (++tries >= Config.stream_tries) { // for performance experiments only
tries = 0;
SparkEvaluator.stream_context.stop(true,true);
System.exit(0);
};
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
}
};
new TransformedDStream<MRData>(JavaConversions.asScalaBuffer(rdds).toSeq(),fnc,classtag).register();
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
final public static void evaluate ( Tree plan, Environment env, Function f ) {
streams = new ArrayList<JavaInputDStream<MRData>>();
stream_names = new ArrayList<String>();
match plan {
case lambda(`p,`b):
b = get_streams(b,env);
stream_processing(#<lambda(`p,`b)>,env,f);
case _: // non-incremental streaming
plan = get_streams(plan,env);
stream_processing(plan,env,f);
};
stream_context.start();
stream_context.awaitTermination();
}
}