blob: 5a0370f92f87029d6e88e92fa9adccae8b52bdb3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.spark.function.reader;
import org.apache.metron.profiler.spark.reader.TelemetryReaders;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.Rule;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport;
import org.junit.rules.TemporaryFolder;
import java.util.Properties;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Tests the {@link org.apache.metron.profiler.spark.reader.TextEncodedTelemetryReader} class.
*/
@EnableRuleMigrationSupport
public class TextEncodedTelemetryReaderTest {
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
private static SparkSession spark;
private Properties profilerProperties;
private Properties readerProperties;
@BeforeAll
public static void setupSpark() {
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("BatchProfilerIntegrationTest")
.set("spark.sql.shuffle.partitions", "8");
spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
}
@AfterAll
public static void tearDownSpark() {
if(spark != null) {
spark.close();
}
}
@BeforeEach
public void setup() {
readerProperties = new Properties();
profilerProperties = new Properties();
}
@Test
public void testCSV() {
// re-write the test data as a CSV with a header record
String pathToCSV = tempFolder.getRoot().getAbsolutePath();
spark.read()
.format("text")
.load("src/test/resources/telemetry.json")
.as(Encoders.STRING())
.write()
.mode("overwrite")
.option("header", "true")
.format("csv")
.save(pathToCSV);
// tell the profiler to use the CSV input data
profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), pathToCSV);
profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "csv");
// set a reader property; tell the reader to expect a header
readerProperties.put("header", "true");
// there should be 100 valid JSON records
Dataset<String> telemetry = TelemetryReaders.TEXT.read(spark, profilerProperties, readerProperties);
assertEquals(100, telemetry.filter(new IsValidJSON()).count());
}
@Test
public void testJSON() {
// use the test telemetry that is stored as raw json
profilerProperties.put(TELEMETRY_INPUT_PATH.getKey(), "src/test/resources/telemetry.json");
profilerProperties.put(TELEMETRY_INPUT_FORMAT.getKey(), "text");
// set a reader property; tell the reader to expect a header
readerProperties.put("header", "true");
// there should be 100 valid JSON records
Dataset<String> telemetry = TelemetryReaders.TEXT.read(spark, profilerProperties, readerProperties);
assertEquals(100, telemetry.filter(new IsValidJSON()).count());
}
}