blob: 74cfab321935d2018988a15e0693b9b4ee4d6c14 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.HashMap;
import java.util.ArrayList;
import scala.Option;
import scala.Some;
import scala.None;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.dstream.*;
/** A Spark InputDStream for a stream socket */
public class SparkSocketStream extends JavaInputDStream<MRData> {
private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
public static final class MRDataSocketStream extends InputDStream<MRData> {
private final String host;
private final int port;
private Parser parser;
private final JavaStreamingContext stream_context;
MRDataSocketStream ( JavaStreamingContext stream_context, String host, int port, String parser_name, Trees args ) {
super(stream_context.ssc(),classtag);
this.host = host;
this.port = port;
this.stream_context = stream_context;
Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser_name);
if (parser_class == null)
throw new Error("Unknown parser: "+parser_name);
try {
parser = parser_class.newInstance();
parser.initialize(args);
parser.open(host,port);
} catch (Exception ex) {
throw new Error("Cannot create the parser");
}
}
@Override
public void start () {}
@Override
public void stop () {}
@Override
public Duration slideDuration () {
return new Duration(Config.stream_window);
}
@Override
public List dependencies () {
return Nil$.MODULE$;
}
@Override
public Option<RDD<MRData>> compute ( Time validTime ) {
long duration = validTime.milliseconds();
duration = Config.stream_window;
long ct = System.currentTimeMillis();
ArrayList<MRData> result = new ArrayList<MRData>();
while ( System.currentTimeMillis() - ct < duration ) {
String data = parser.slice();
for ( MRData x: parser.parse(data) )
result.add(x);
};
return new Some<RDD<MRData>>(SparkEvaluator.spark_context.parallelize(result).rdd());
}
}
SparkSocketStream ( JavaStreamingContext stream_context, String host, int port, String parser, Trees args ) {
super(new MRDataSocketStream(stream_context,host,port,parser,args),classtag);
}
}