blob: ce88814ce93782f91f923d060b0b25624803af40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;
import java.math.BigDecimal;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform;
import org.junit.After;
import org.junit.Test;
public class TestIcebergExpressions extends SparkExtensionsTestBase {
public TestIcebergExpressions(String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP VIEW IF EXISTS emp");
sql("DROP VIEW IF EXISTS v");
}
@Test
public void testTruncateExpressions() {
sql("CREATE TABLE %s ( " +
" int_c INT, long_c LONG, dec_c DECIMAL(4, 2), str_c STRING, binary_c BINARY " +
") USING iceberg", tableName);
sql("CREATE TEMPORARY VIEW emp " +
"AS SELECT * FROM VALUES (101, 10001, 10.65, '101-Employee', CAST('1234' AS BINARY)) " +
"AS EMP(int_c, long_c, dec_c, str_c, binary_c)");
sql("INSERT INTO %s SELECT * FROM emp", tableName);
Dataset<Row> df = spark.sql("SELECT * FROM " + tableName);
df.select(
new Column(new IcebergTruncateTransform(df.col("int_c").expr(), 2)).as("int_c"),
new Column(new IcebergTruncateTransform(df.col("long_c").expr(), 2)).as("long_c"),
new Column(new IcebergTruncateTransform(df.col("dec_c").expr(), 50)).as("dec_c"),
new Column(new IcebergTruncateTransform(df.col("str_c").expr(), 2)).as("str_c"),
new Column(new IcebergTruncateTransform(df.col("binary_c").expr(), 2)).as("binary_c")
).createOrReplaceTempView("v");
assertEquals("Should have expected rows",
ImmutableList.of(row(100, 10000L, new BigDecimal("10.50"), "10", "12")),
sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v"));
}
}