[WAYANG-multi-plugin] modification and test for multiplugin
Signed-off-by: bertty <bertty@apache.org>
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
index 966ecc5..1c695a4 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/api/Configuration.java
@@ -682,6 +682,11 @@
}
public CollectionProvider<Class<PlanEnumerationPruningStrategy>> getPruningStrategyClassProvider() {
+ this.pruningStrategyClassProvider
+ .provideAll()
+ .stream()
+ .map(obj -> obj.getSimpleName())
+ .forEach(System.out::println);
return this.pruningStrategyClassProvider;
}
diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
index 75e0cc0..1387286 100644
--- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
+++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/plan/executionplan/ExecutionTask.java
@@ -182,7 +182,7 @@
@Override
public String toString() {
- return "T[" + this.operator + ']';
+ return "T[" + this.operator+ " Platform: {"+this.stage.getPlatformExecution()+ "}]";
// return this.getClass().getSimpleName() + "[" + this.operator + ']';
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
index 3342290..cc0daf3 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/channels/ChannelConversions.java
@@ -126,7 +126,7 @@
}
public static Collection<Supplier<ChannelConversion>> SUPPLIERS = Arrays.asList(
- UNCACHED_RDD_TO_CACHED_RDD(),
+ //UNCACHED_RDD_TO_CACHED_RDD(),
COLLECTION_TO_BROADCAST(),
COLLECTION_TO_UNCACHED_RDD(),
UNCACHED_RDD_TO_COLLECTION(),
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
index 203de34..363998d 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/CartesianMapping.java
@@ -36,6 +36,8 @@
*/
public class CartesianMapping implements Mapping {
+
+
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
index eeac994..3ae8cfc 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/FilterMapping.java
@@ -25,6 +25,7 @@
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.spark.operators.SparkFilterOperator;
import org.apache.wayang.spark.platform.SparkPlatform;
@@ -38,12 +39,30 @@
@SuppressWarnings("unchecked")
public class FilterMapping implements Mapping {
+ private String name;
+ private String conf;
+
+ public FilterMapping(){
+ this.conf = null;
+ this.name = null;
+ }
+
+ public FilterMapping(String name, String conf){
+ this.conf = conf;
+ this.name = name;
+ }
+
+ public SparkPlatform getPlatformInstance(){
+ return (this.name == null)?
+ SparkPlatform.getInstance():
+ SparkPlatform.getInstance(this.name, this.conf);
+ }
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
- SparkPlatform.getInstance()
+ this.getPlatformInstance()
));
}
@@ -55,7 +74,7 @@
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<FilterOperator>(
- (matchedOperator, epoch) -> new SparkFilterOperator<>(matchedOperator).at(epoch)
+ (matchedOperator, epoch) -> new SparkFilterOperator<>(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
);
}
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
index 301246b..5d78903 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/LocalCallbackSinkMapping.java
@@ -36,12 +36,31 @@
*/
public class LocalCallbackSinkMapping implements Mapping {
+ private String name;
+ private String conf;
+
+ public LocalCallbackSinkMapping(){
+ this.conf = null;
+ this.name = null;
+ }
+
+ public LocalCallbackSinkMapping(String name, String conf){
+ this.conf = conf;
+ this.name = name;
+ }
+
+ public SparkPlatform getPlatformInstance(){
+ return (this.name == null)?
+ SparkPlatform.getInstance():
+ SparkPlatform.getInstance(this.name, this.conf);
+ }
+
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
- SparkPlatform.getInstance()
+ this.getPlatformInstance()
));
}
@@ -54,7 +73,7 @@
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<LocalCallbackSink>(
- (matchedOperator, epoch) -> new SparkLocalCallbackSink<>(matchedOperator).at(epoch)
+ (matchedOperator, epoch) -> new SparkLocalCallbackSink<>(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
);
}
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
index 4e26640..4cd51e2 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/Mappings.java
@@ -65,11 +65,22 @@
new PageRankMapping()
);
- public static Collection<Mapping> getBasicMappings(){
+ public static Collection<Mapping> getBasicMappings(String name, String conf){
return BASIC_MAPPINGS.stream()
.map(mapping -> {
try {
- return mapping.getClass().getConstructor().newInstance();
+ Class<Mapping> clazz = (Class<Mapping>) mapping.getClass();
+ if(
+ clazz.getSimpleName().contains("Filter") ||
+ clazz.getSimpleName().contains("LocalCallbackSink") ||
+ clazz.getSimpleName().contains("TextFileSource")
+ ){
+ return clazz.getConstructor(String.class, String.class).newInstance(
+ name, conf
+ );
+ }
+
+ return clazz.getConstructor().newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
index 09063e1..e78ed52 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/mapping/TextFileSourceMapping.java
@@ -37,12 +37,30 @@
*/
public class TextFileSourceMapping implements Mapping {
+ private String name;
+ private String conf;
+
+ public TextFileSourceMapping(){
+ this.conf = null;
+ this.name = null;
+ }
+
+ public TextFileSourceMapping(String name, String conf){
+ this.conf = conf;
+ this.name = name;
+ }
+
+ public SparkPlatform getPlatformInstance(){
+ return (this.name == null)?
+ SparkPlatform.getInstance():
+ SparkPlatform.getInstance(this.name, this.conf);
+ }
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
- SparkPlatform.getInstance()
+ this.getPlatformInstance()
));
}
@@ -55,7 +73,7 @@
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<TextFileSource>(
- (matchedOperator, epoch) -> new SparkTextFileSource(matchedOperator).at(epoch)
+ (matchedOperator, epoch) -> new SparkTextFileSource(matchedOperator).setPlatform(this.getPlatformInstance()).at(epoch)
);
}
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
index 80fb5ab..9910f7b 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkExecutionOperator.java
@@ -22,7 +22,9 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.spark.execution.SparkExecutor;
@@ -39,6 +41,7 @@
default SparkPlatform getPlatform() {
return SparkPlatform.getInstance();
}
+ default Operator setPlatform(SparkPlatform sparkPlatform){return null;}
/**
* Evaluates this operator. Takes a set of {@link ChannelInstance}s according to the operator inputs and manipulates
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
index 737061e..852aea8 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkFilterOperator.java
@@ -41,6 +41,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import org.apache.wayang.spark.platform.SparkPlatform;
/**
* Spark implementation of the {@link FilterOperator}.
@@ -49,6 +50,8 @@
extends FilterOperator<Type>
implements SparkExecutionOperator {
+ SparkPlatform platform;
+
/**
* Creates a new instance.
*
@@ -120,4 +123,17 @@
return false;
}
+ @Override
+ public SparkPlatform getPlatform() {
+ if(this.platform == null) {
+ return SparkExecutionOperator.super.getPlatform();
+ }
+ return this.platform;
+ }
+
+ @Override
+ public SparkFilterOperator<Type> setPlatform(SparkPlatform sparkPlatform) {
+ this.platform = sparkPlatform;
+ return this;
+ }
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
index c909d29..837a5b2 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkLocalCallbackSink.java
@@ -35,11 +35,14 @@
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
+import org.apache.wayang.spark.platform.SparkPlatform;
/**
* Implementation of the {@link LocalCallbackSink} operator for the Spark platform.
*/
public class SparkLocalCallbackSink<T extends Serializable> extends LocalCallbackSink<T> implements SparkExecutionOperator {
+
+ SparkPlatform platform;
/**
* Creates a new instance.
*
@@ -101,4 +104,17 @@
return true;
}
+ @Override
+ public SparkPlatform getPlatform() {
+ if(this.platform == null) {
+ return SparkExecutionOperator.super.getPlatform();
+ }
+ return this.platform;
+ }
+
+ @Override
+ public SparkLocalCallbackSink<T> setPlatform(SparkPlatform sparkPlatform) {
+ this.platform = sparkPlatform;
+ return this;
+ }
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
index acf9f1d..f5bff17 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/operators/SparkTextFileSource.java
@@ -34,12 +34,13 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import org.apache.wayang.spark.platform.SparkPlatform;
/**
* Provides a {@link Collection} to a Spark job.
*/
public class SparkTextFileSource extends TextFileSource implements SparkExecutionOperator {
-
+ SparkPlatform platform;
public SparkTextFileSource(String inputUrl, String encoding) {
super(inputUrl, encoding);
}
@@ -109,4 +110,18 @@
return false;
}
+
+ @Override
+ public SparkPlatform getPlatform() {
+ if(this.platform == null) {
+ return SparkExecutionOperator.super.getPlatform();
+ }
+ return this.platform;
+ }
+
+ @Override
+ public SparkTextFileSource setPlatform(SparkPlatform sparkPlatform) {
+ this.platform = sparkPlatform;
+ return this;
+ }
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
index d3c7b79..f0b7442 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java
@@ -107,8 +107,6 @@
return instances.get("default");
} else {
String first = instances.keySet().stream().findFirst().get();
- System.out.println("first");
- System.out.println(first);
return instances.get(first);
}
@@ -200,6 +198,7 @@
@Override
public void configureCustom(Configuration configuration, String config) {
+ System.out.println(config);
configuration.load(ReflectionUtils.loadResource(config));
}
diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
index 180acdb..7272530 100644
--- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
+++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/plugin/SparkMultiPlugin.java
@@ -25,14 +25,14 @@
@Override
public Collection<Mapping> getMappings() {
- return Mappings.getBasicMappings();
+ return Mappings.getBasicMappings(this.name, this.config);
// return Mappings.BASIC_MAPPINGS;
}
@Override
public Collection<ChannelConversion> getChannelConversions() {
-
- return ChannelConversions.ALL;
+ System.out.println("here in getChannelConversiona");
+ return ChannelConversions.getALL();
}
@Override
diff --git a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
index cf39e24..2781b55 100644
--- a/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
+++ b/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/test/DoubleSpark.java
@@ -7,6 +7,7 @@
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.ReflectionUtils;
+import org.apache.wayang.java.Java;
import org.apache.wayang.java.platform.JavaPlatform;
import org.apache.wayang.spark.Spark;
@@ -16,12 +17,14 @@
public class DoubleSpark {
public static void main(String[] args) {
List<String> collector = new LinkedList<>();
- TextFileSource textFileSource = new TextFileSource("file:///Users/rodrigopardomeza/files/demo.txt");
+ TextFileSource textFileSource = new TextFileSource("file:///Users/bertty/databloom/incubator-wayang/pom.xml");
textFileSource.setName("Load file");
+ textFileSource.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
FilterOperator<String> filterOperator = new FilterOperator<>(str -> !str.isEmpty(), String.class);
filterOperator.setName("Filter empty words");
- filterOperator.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
+ // filterOperator.addTargetPlatform(Spark.platform("sparky", "wayang-sparky-default.properties"));
+ filterOperator.addTargetPlatform(Java.platform());
// write results to a sink
LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(
@@ -29,15 +32,16 @@
DataSetType.createDefaultUnchecked(String.class)
);
sink.setName("Collect result");
+ sink.addTargetPlatform(Spark.platform("other", "wayang-spark-defaults.properties"));
// Build Rheem plan by connecting operators
textFileSource.connectTo(0, filterOperator, 0);
filterOperator.connectTo(0, sink, 0);
WayangContext wayangContext = new WayangContext();
- //wayangContext.register(Java.basicPlugin());
+ wayangContext.register(Java.basicPlugin());
wayangContext.register(Spark.multiPlugin("sparky", "wayang-sparky-default.properties"));
-// wayangContext.register(Spark.multiPlugin("other", "wayang-spark-defaults.properties"));
+ wayangContext.register(Spark.multiPlugin("other", "wayang-spark-defaults.properties"));
// wayangContext.register(Spark.basicPlugin());
System.out.println("here");
@@ -45,6 +49,6 @@
// collector.sort((t1 , t2) -> Integer.compare(t2.field1, t1.field1));
System.out.printf("Found %d words:\n", collector.size());
- collector.forEach(wc -> System.out.printf("%s\n", wc));
+ // collector.forEach(wc -> System.out.printf("%s\n", wc));
}
}
diff --git a/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties b/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties
new file mode 100644
index 0000000..cc70338
--- /dev/null
+++ b/wayang-platforms/wayang-spark/code/test/resources/log4j2.properties
@@ -0,0 +1,36 @@
+status = warn
+
+appender.console.type = Console
+appender.console.name = LogToConsole
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+
+#appender.file.type = File
+#appender.file.name = LogToFile
+#appender.file.fileName=logs/app.log
+#appender.file.layout.type=PatternLayout
+#appender.file.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
+
+# Rotate log file
+appender.rolling.type = RollingFile
+appender.rolling.name = LogToRollingFile
+appender.rolling.fileName = logs/app.log
+appender.rolling.filePattern = logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz
+appender.rolling.layout.type = PatternLayout
+appender.rolling.layout.pattern = %d %p %C{1.} [%t] %m%n
+appender.rolling.policies.type = Policies
+appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
+appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=10MB
+appender.rolling.strategy.type = DefaultRolloverStrategy
+appender.rolling.strategy.max = 10
+
+# Log to console and rolling file
+logger.app.name = com.mkyong
+logger.app.level = debug
+logger.app.additivity = false
+logger.app.appenderRef.rolling.ref = LogToRollingFile
+logger.app.appenderRef.console.ref = LogToConsole
+
+rootLogger.level = info
+rootLogger.appenderRef.stdout.ref = LogToConsole
\ No newline at end of file
diff --git a/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties b/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties
new file mode 100644
index 0000000..922aae4
--- /dev/null
+++ b/wayang-platforms/wayang-spark/code/test/resources/simplelogger.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.slf4j.simpleLogger.logFile = System.out
+org.slf4j.simpleLogger.defaultLogLevel = info
+org.slf4j.simpleLogger.showThreadName = false
+org.slf4j.simpleLogger.showShortName = true
+org.slf4j.simpleLogger.levelInBrackets = true
+org.slf4j.simpleLogger.log.org.apache = warn
+org.slf4j.simpleLogger.log.org.spark-project = warn