blob: 030392a1727c3bc47a13edccf71e72fac8aa1d54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.spark.tsfile
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.iotdb.hadoop.fileSystem.HDFSInput
import org.apache.iotdb.spark.constant.TestConstant
import org.apache.iotdb.spark.tool.TsFileWriteTool
import org.apache.tsfile.common.constant.QueryConstant
import org.apache.tsfile.read.{TsFileSequenceReader, common}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.junit.Assert
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import java.io.{ByteArrayOutputStream, File}
import java.net.URI
import java.util
class TSFileSuit extends FunSuite with BeforeAndAfterAll {
private val baseFolder = TestConstant.BASE_OUTPUT_PATH.concat("suitTestBaseDir")
private val tsfileFolder1 = baseFolder + "/tsfileFolder1"
private val tsfile1 = tsfileFolder1 + "/test1.tsfile"
private val tsfile2 = tsfileFolder1 + "/test2.tsfile"
private val tsfileFolder2 = baseFolder + "/tsfileFolder2"
private val tsfile3 = tsfileFolder2 + "/test.tsfile"
private val tsfileFolder3 = baseFolder + "/tsfileFolder3"
private val tsfile4 = tsfileFolder3 + "/test.tsfile"
private val outputPath = baseFolder + "/output"
private val outputPathFile = outputPath + "/part-m-00000"
private val outputPath2 = baseFolder + "/output2"
private val outputPathFile2 = outputPath2 + "/part-m-00000"
private val outputPath3 = baseFolder + "/output3"
private val outputPathFile3 = outputPath3
private val outputHDFSPath = "hdfs://localhost:9000/usr/hadoop/output"
private val outputHDFSPathFile = outputHDFSPath + "/part-m-00000"
private var spark: SparkSession = _
override protected def beforeAll(): Unit = {
super.beforeAll()
val base = new File(baseFolder)
if (!base.exists())
base.mkdirs()
val tsfile_folder1 = new File(tsfileFolder1)
if (tsfile_folder1.exists()) {
deleteDir(tsfile_folder1)
}
tsfile_folder1.mkdirs()
val tsfile_folder2 = new File(tsfileFolder2)
if (tsfile_folder2.exists()) {
deleteDir(tsfile_folder2)
}
tsfile_folder2.mkdirs()
val tsfile_folder3 = new File(tsfileFolder3)
if (tsfile_folder3.exists()) {
deleteDir(tsfile_folder3)
}
tsfile_folder3.mkdirs()
new TsFileWriteTool().create1(tsfile1)
new TsFileWriteTool().create2(tsfile2)
new TsFileWriteTool().create3(tsfile3)
new TsFileWriteTool().create4(tsfile4)
val output = new File(outputPath)
if (output.exists())
deleteDir(output)
val output2 = new File(outputPath2)
if (output2.exists())
deleteDir(output2)
spark = SparkSession
.builder()
.config("spark.master", "local")
.appName("TSFile test")
.getOrCreate()
}
override protected def afterAll(): Unit = {
val folder1 = new File(tsfileFolder1)
deleteDir(folder1)
val folder2 = new File(tsfileFolder2)
deleteDir(folder2)
val folder3 = new File(tsfileFolder3)
deleteDir(folder3)
val out = new File(outputPath)
deleteDir(out)
val out2 = new File(outputPath2)
deleteDir(out2)
val base = new File(baseFolder)
deleteDir(base)
try {
spark.sparkContext.stop()
} finally {
super.afterAll()
}
}
def deleteDir(dir: File): Unit = {
if (dir.isDirectory) {
dir.list().foreach(f => {
deleteDir(new File(dir, f))
})
}
dir.delete()
}
test("test write 1") {
val df = spark.read.tsfile(tsfile1)
df.write.tsfile(outputPath)
val newDf = spark.read.tsfile(outputPathFile)
Assert.assertEquals(newDf.collectAsList(), df.collectAsList())
}
test("test write 2") {
val df = spark.read.tsfile(tsfile2)
df.write.tsfile(outputPath2)
val newDf = spark.read.tsfile(outputPathFile2)
Assert.assertEquals(newDf.collectAsList(), df.collectAsList())
}
test("test write 3") {
val df = spark.read.tsfile(tsfile3, isNarrowForm = true)
df.write.tsfile(outputPath3, isNarrowForm = true)
val newDf = spark.read.tsfile(outputPathFile3, isNarrowForm = true)
Assert.assertEquals(newDf.collectAsList(), df.collectAsList())
}
test("testSelect * from tsfile1") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table")
val count = newDf.count()
Assert.assertEquals(7, count)
}
test("testSelect * from tsfile2") {
val df = spark.read.tsfile(tsfile2)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table")
val count = newDf.count()
Assert.assertEquals(TsFileWriteTool.largeNum, count)
}
test("testSelect * from tsfile2 in part") {
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 256)
val df = spark.read.tsfile(tsfile2)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table")
val count = newDf.count()
Assert.assertEquals(TsFileWriteTool.largeNum, count)
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 1024 * 128)
}
test("testCount") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
Assert.assertEquals(7, newDf.head().apply(0).asInstanceOf[Long])
}
test("testSelect time") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time from tsfile_table")
Assert.assertEquals(7, newDf.count())
}
test("testSelectString") {
val df = spark.read.tsfile(tsfile3)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"`device_1.sensor_2` = \"Monday\"")
Assert.assertEquals(1, newDf.count())
}
test("testSelectBoolean") {
val df = spark.read.tsfile(tsfile3)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"`device_1.sensor_1` = true")
Assert.assertEquals(2, newDf.count())
}
test("testSelectWithFilterAnd") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"`device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
Assert.assertEquals(5, newDf.count())
}
test("testSelectWithFilterOr") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"`device_1.sensor_1`>0 or `device_1.sensor_2` < 22")
Assert.assertEquals(7, newDf.count())
}
test("testSelectComplex") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"(`device_1.sensor_1`>0 or `device_1.sensor_2` < 22) and time < 4")
Assert.assertEquals(3, newDf.count())
}
test("testMultiFiles") {
val df = spark.read.tsfile(tsfileFolder1)
df.createOrReplaceTempView("tsfile_table")
Assert.assertEquals(TsFileWriteTool.largeNum + 7, df.count())
}
test("testMultiFilesWithFilter1") {
val df = spark.read.tsfile(tsfileFolder1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1` >0 " +
"and `device_1.sensor_1` <10 or `device_1.sensor_2` >0")
Assert.assertEquals(16, newDf.count())
}
test("testQuerySchema") {
val df = spark.read.format("org.apache.iotdb.spark.tsfile").load(tsfile1)
val expected = StructType(Seq(
StructField(QueryConstant.RESERVED_TIME, LongType, nullable = true),
StructField("device_1.sensor_3", IntegerType, nullable = true),
StructField("device_1.sensor_1", FloatType, nullable = true),
StructField("device_1.sensor_2", IntegerType, nullable = true),
StructField("device_2.sensor_3", IntegerType, nullable = true),
StructField("device_2.sensor_1", FloatType, nullable = true),
StructField("device_2.sensor_2", IntegerType, nullable = true)
))
Assert.assertEquals(expected, df.schema)
}
test("testTransform1") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table " +
"where (`device_1.sensor_1`>0 or `device_1.sensor_2` < 22) and time < 4")
Assert.assertEquals(3, newDf.count())
}
test("testTransform2") {
val df = spark.read.tsfile(tsfile1, true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table " +
"where `device_name` = 'device_1' and (`sensor_1`>0 or `sensor_2` < 22) and time < 4")
Assert.assertEquals(3, newDf.count())
}
test("testTransform3") {
val df = spark.read.tsfile(tsfile1, true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table " +
"where (`sensor_1`>0 or `sensor_2` < 22) and time < 4")
Assert.assertEquals(5, newDf.count())
}
/*
device_1: 400000 rows, time range [0,399999], interval 1
device_2: 400000 rows, time range [0,799998], interval 2
*/
test("partition test: narrow table, no filter") {
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 128)
val df = spark.read.tsfile(tsfile4, true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table")
Assert.assertEquals(800000, newDf.count())
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
/*
device_1 & device_2: 400000 rows, time range [0,399999], interval 1
device_2: 200000 rows, time range [400000,799998], interval 2
*/
test("partition test: wide table, no filter") {
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 128)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table")
Assert.assertEquals(600000, newDf.count())
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
/*
sql: select * from tsfile_table where time > 131040 and time < 131050:
involves device_1's chunkGroup1&2 and device_2's chunkGroup1.
maxPartitionBytes size makes device_1's chunkGroup1 falls into the first partition, and
device_2's chunkGroup1, device_1's chunkGroup2 fall into the following several partitions.
In this test, note that the results from device_1's two chunkgroups are NOT presented together
because they are retrieved in different tasks.
*/
test("partition test: narrow table, global time filter, small partition size") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(1).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup - 100)
val df = spark.read.tsfile(tsfile4, true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"time > 131040 and time < 131050")
val outCapture = new ByteArrayOutputStream
Console.withOut(outCapture) {
newDf.show(newDf.count.toInt, false)
}
val actual = outCapture.toByteArray.map(_.toChar)
val expect =
"+------+-----------+--------+--------+--------+\n" +
"|time |device_name|sensor_3|sensor_1|sensor_2|\n" +
"+------+-----------+--------+--------+--------+\n" +
"|131042|device_2 |true |131042 |131042.0|\n" +
"|131044|device_2 |true |131044 |131044.0|\n" +
"|131046|device_2 |true |131046 |131046.0|\n" +
"|131048|device_2 |true |131048 |131048.0|\n" +
"|131041|device_1 |null |131041 |null |\n" +
"|131042|device_1 |null |131042 |null |\n" +
"|131043|device_1 |null |131043 |null |\n" +
"|131044|device_1 |null |131044 |null |\n" +
"|131045|device_1 |null |131045 |null |\n" +
"|131046|device_1 |null |131046 |null |\n" +
"|131047|device_1 |null |131047 |null |\n" +
"|131048|device_1 |null |131048 |null |\n" +
"|131049|device_1 |null |131049 |null |\n" +
"+------+-----------+--------+--------+--------+"
println("???" + util.Arrays.toString(actual))
Assert.assertArrayEquals(expect.toCharArray, actual.dropRight(2))
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
/*
sql: select * from tsfile_table where time > 131040 and time < 131050:
involves device_1's chunkGroup1&2 and device_2's chunkGroup1.
maxPartitionBytes size makes device_1's chunkGroup1, device_2's chunkGroup1,
device_1's chunkGroup2 fall into the first partition.
Thus, compared to the former test("global time filter test 1"), this test has two differences:
1) the results from device_1's two chunkgroups are presented together as they are retrieved in
the same task.
2) device_2's data is presented first as the dataset at the end of the list is retrieved first
by code.
*/
test("partition test: narrow table, global time filter, larger partition size") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"time > 131040 and time < 131050")
newDf.show()
val outCapture = new ByteArrayOutputStream
Console.withOut(outCapture) {
newDf.show(newDf.count.toInt, false)
}
val actual = outCapture.toByteArray.map(_.toChar)
val expect =
"+------+-----------+--------+--------+--------+\n" +
"|time |device_name|sensor_3|sensor_1|sensor_2|\n" +
"+------+-----------+--------+--------+--------+\n" +
"|131042|device_2 |true |131042 |131042.0|\n" +
"|131044|device_2 |true |131044 |131044.0|\n" +
"|131046|device_2 |true |131046 |131046.0|\n" +
"|131048|device_2 |true |131048 |131048.0|\n" +
"|131041|device_1 |null |131041 |null |\n" +
"|131042|device_1 |null |131042 |null |\n" +
"|131043|device_1 |null |131043 |null |\n" +
"|131044|device_1 |null |131044 |null |\n" +
"|131045|device_1 |null |131045 |null |\n" +
"|131046|device_1 |null |131046 |null |\n" +
"|131047|device_1 |null |131047 |null |\n" +
"|131048|device_1 |null |131048 |null |\n" +
"|131049|device_1 |null |131049 |null |\n" +
"+------+-----------+--------+--------+--------+"
Assert.assertArrayEquals(expect.toCharArray, actual.dropRight(2))
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, global time filter 1") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time,`device_1.sensor_1` from tsfile_table where " +
"time > 131040 and time < 131050")
Assert.assertEquals(9, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, global time filter 2") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time,`device_2.sensor_1` from tsfile_table where " +
"time > 131040 and time < 131050")
newDf.show()
Assert.assertEquals(4, newDf.rdd.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, global time filter 3") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where " +
"time > 131040 and time < 131050")
Assert.assertEquals(9, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: narrow table, value filter") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, true)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time from tsfile_table where " +
"`device_name` = 'device_2' and `sensor_1` > 262080 and `sensor_2` <= 600000")
Assert.assertEquals(168960, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, value filter 1") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time,`device_2.sensor_3` from tsfile_table where " +
"`device_2.sensor_1` > 262080 and `device_2.sensor_2` <= 600000")
Assert.assertEquals(168960, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, value filter 2") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time,`device_2.sensor_3` from tsfile_table where " +
"`device_2.sensor_1` > 262080 and `device_2.sensor_2` <= 600000 and " +
"`device_1.sensor_1` < 400000")
Assert.assertEquals(68959, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, value filter 3") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time,`device_1.sensor_1` from tsfile_table where " +
"`device_2.sensor_1` > 262080 and `device_2.sensor_2` <= 600000 and " +
"`device_1.sensor_1` < 400000")
Assert.assertEquals(68959, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
test("partition test: wide table, value filter 4") {
var conf: Configuration = spark.sparkContext.hadoopConfiguration
val in = new HDFSInput(new Path(new URI(tsfile4)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val chunkMetadataList = reader.getChunkMetadataList(new common.Path("device_1", "sensor_1", true))
val endOffsetOfChunkGroup = chunkMetadataList.get(2).getOffsetOfChunkHeader
val tmp = spark.conf.get("spark.sql.files.maxPartitionBytes")
spark.conf.set("spark.sql.files.maxPartitionBytes", endOffsetOfChunkGroup + 100)
val df = spark.read.tsfile(tsfile4, false)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select time,`device_2.sensor_1` from tsfile_table where " +
"`device_1.sensor_1` < 400000")
Assert.assertEquals(400000, newDf.count())
reader.close() // DO NOT FORGET THIS
spark.conf.set("spark.sql.files.maxPartitionBytes", tmp)
}
}