blob: 75c50190477001a785d289e7636e4793db245d2f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.batch;
//import org.apache.flink.api.common.typeinfo.Types;
//import org.apache.flink.api.java.DataSet;
//import org.apache.flink.api.java.ExecutionEnvironment;
//import org.apache.flink.formats.avro.generated.Address;
//import org.apache.flink.formats.avro.generated.Colors;
//import org.apache.flink.formats.avro.generated.Fixed16;
//import org.apache.flink.formats.avro.generated.Fixed2;
//import org.apache.flink.formats.avro.generated.User;
//import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableEnvironment;
//import org.apache.flink.table.api.java.BatchTableEnvironment;
//import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
//import org.apache.flink.test.util.TestBaseUtils;
//import org.apache.flink.types.Row;
//import org.apache.avro.util.Utf8;
//import org.joda.time.DateTime;
//import org.joda.time.LocalDate;
//import org.joda.time.LocalTime;
//import org.junit.Ignore;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.junit.runners.Parameterized;
//import java.math.BigDecimal;
//import java.nio.ByteBuffer;
//import java.util.ArrayList;
//import java.util.Arrays;
//import java.util.Collections;
//import java.util.HashMap;
//import java.util.List;
//import static org.junit.Assert.assertEquals;
/**
* Tests for interoperability with Avro types.
*/
//@Ignore // TODO: should support this if we support transition from table to DataSet.
//@RunWith(Parameterized.class)
//public class AvroTypesITCase extends TableProgramsClusterTestBase {
//
// private static final User USER_1 = User.newBuilder()
// .setName("Charlie")
// .setFavoriteColor("blue")
// .setFavoriteNumber(null)
// .setTypeBoolTest(false)
// .setTypeDoubleTest(1.337d)
// .setTypeNullTest(null)
// .setTypeLongTest(1337L)
// .setTypeArrayString(new ArrayList<>())
// .setTypeArrayBoolean(new ArrayList<>())
// .setTypeNullableArray(null)
// .setTypeEnum(Colors.RED)
// .setTypeMap(new HashMap<>())
// .setTypeFixed(null)
// .setTypeUnion(null)
// .setTypeNested(
// Address.newBuilder()
// .setNum(42)
// .setStreet("Bakerstreet")
// .setCity("Berlin")
// .setState("Berlin")
// .setZip("12049").build())
// .setTypeBytes(ByteBuffer.allocate(10))
// .setTypeDate(LocalDate.parse("2014-03-01"))
// .setTypeTimeMillis(LocalTime.parse("12:12:12"))
// .setTypeTimeMicros(123456)
// .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
// .setTypeTimestampMicros(123456L)
// .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// .build();
//
// private static final User USER_2 = User.newBuilder()
// .setName("Whatever")
// .setFavoriteNumber(null)
// .setFavoriteColor("black")
// .setTypeLongTest(42L)
// .setTypeDoubleTest(0.0)
// .setTypeNullTest(null)
// .setTypeBoolTest(true)
// .setTypeArrayString(Collections.singletonList("hello"))
// .setTypeArrayBoolean(Collections.singletonList(true))
// .setTypeEnum(Colors.GREEN)
// .setTypeMap(new HashMap<>())
// .setTypeFixed(new Fixed16())
// .setTypeUnion(null)
// .setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01"))
// .setTypeBytes(ByteBuffer.allocate(10))
// .setTypeTimeMillis(LocalTime.parse("12:12:12"))
// .setTypeTimeMicros(123456)
// .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
// .setTypeTimestampMicros(123456L)
// .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// .build();
//
// private static final User USER_3 = User.newBuilder()
// .setName("Terminator")
// .setFavoriteNumber(null)
// .setFavoriteColor("yellow")
// .setTypeLongTest(1L)
// .setTypeDoubleTest(0.0)
// .setTypeNullTest(null)
// .setTypeBoolTest(false)
// .setTypeArrayString(Collections.singletonList("world"))
// .setTypeArrayBoolean(Collections.singletonList(false))
// .setTypeEnum(Colors.GREEN)
// .setTypeMap(new HashMap<>())
// .setTypeFixed(new Fixed16())
// .setTypeUnion(null)
// .setTypeNested(null)
// .setTypeBytes(ByteBuffer.allocate(10))
// .setTypeDate(LocalDate.parse("2014-03-01"))
// .setTypeTimeMillis(LocalTime.parse("12:12:12"))
// .setTypeTimeMicros(123456)
// .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
// .setTypeTimestampMicros(123456L)
// .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// .build();
//
// public AvroTypesITCase(
// TestExecutionMode executionMode,
// TableConfigMode tableConfigMode) {
// super(executionMode, tableConfigMode);
// }
//
// @Test
// public void testAvroToRow() throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
// env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
// BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
//
// Table t = tEnv.fromDataSet(testData(env));
// Table result = t.select("*");
//
// List<Row> results = tEnv.toDataSet(result, Row.class).collect();
// String expected =
// "black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," +
// "2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
// "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,123456," +
// "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
// "blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
// "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," +
// "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": " +
// "\"Berlin\", \"zip\": \"12049\"},null,null,123456,12:12:12.000,123456," +
// "2014-03-01T12:12:12.321Z,null\n" +
// "yellow,null,Terminator,[false],[world],false," +
// "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," +
// "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," +
// "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,123456," +
// "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null";
// TestBaseUtils.compareResultAsText(results, expected);
// }
//
// @Test
// public void testAvroStringAccess() throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
//
// Table t = tEnv.fromDataSet(testData(env));
// Table result = t.select("name");
// List<Utf8> results = tEnv.toDataSet(result, Types.GENERIC(Utf8.class)).collect();
// String expected = "Charlie\n" +
// "Terminator\n" +
// "Whatever";
// TestBaseUtils.compareResultAsText(results, expected);
// }
//
// @Test
// public void testAvroObjectAccess() throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
//
// Table t = tEnv.fromDataSet(testData(env));
// Table result = t
// .filter("type_nested.isNotNull")
// .select("type_nested.flatten()").as("city, num, state, street, zip");
//
// List<Address> results = tEnv.toDataSet(result, Types.POJO(Address.class)).collect();
// String expected = USER_1.getTypeNested().toString();
// TestBaseUtils.compareResultAsText(results, expected);
// }
//
// @Test
// public void testAvroToAvro() throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config());
//
// Table t = tEnv.fromDataSet(testData(env));
// Table result = t.select("*");
//
// List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect();
// List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
// assertEquals(expected, results);
// }
//
// private DataSet<User> testData(ExecutionEnvironment env) {
// List<User> data = new ArrayList<>(3);
// data.add(USER_1);
// data.add(USER_2);
// data.add(USER_3);
// return env.fromCollection(data);
// }
//}