blob: c973d9cf56b8c620b33f2eea1bcb378d8c27b1e4 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import collections
import sys
from pyflink.table.descriptors import (Rowtime, Schema)
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import PyFlinkTestCase
class RowTimeDescriptorTests(PyFlinkTestCase):
def test_timestamps_from_field(self):
rowtime = Rowtime().timestamps_from_field("rtime")
properties = rowtime.to_properties()
expected = {'rowtime.timestamps.type': 'from-field', 'rowtime.timestamps.from': 'rtime'}
self.assertEqual(expected, properties)
def test_timestamps_from_source(self):
rowtime = Rowtime().timestamps_from_source()
properties = rowtime.to_properties()
expected = {'rowtime.timestamps.type': 'from-source'}
self.assertEqual(expected, properties)
def test_timestamps_from_extractor(self):
rowtime = Rowtime().timestamps_from_extractor(
"org.apache.flink.table.legacyutils.CustomExtractor")
properties = rowtime.to_properties()
expected = {
'rowtime.timestamps.type': 'custom',
'rowtime.timestamps.class':
'org.apache.flink.table.legacyutils.CustomExtractor',
'rowtime.timestamps.serialized':
'rO0ABXNyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUV4dHJhY3Rvctj'
'ZLTGK9XvxAgABTAAFZmllbGR0ABJMamF2YS9sYW5nL1N0cmluZzt4cgA-b3JnLmFwYWNoZS5mbGluay'
'50YWJsZS5zb3VyY2VzLnRzZXh0cmFjdG9ycy5UaW1lc3RhbXBFeHRyYWN0b3Jf1Y6piFNsGAIAAHhwd'
'AACdHM'}
self.assertEqual(expected, properties)
def test_watermarks_periodic_ascending(self):
rowtime = Rowtime().watermarks_periodic_ascending()
properties = rowtime.to_properties()
expected = {'rowtime.watermarks.type': 'periodic-ascending'}
self.assertEqual(expected, properties)
def test_watermarks_periodic_bounded(self):
rowtime = Rowtime().watermarks_periodic_bounded(1000)
properties = rowtime.to_properties()
expected = {'rowtime.watermarks.type': 'periodic-bounded',
'rowtime.watermarks.delay': '1000'}
self.assertEqual(expected, properties)
def test_watermarks_from_source(self):
rowtime = Rowtime().watermarks_from_source()
properties = rowtime.to_properties()
expected = {'rowtime.watermarks.type': 'from-source'}
self.assertEqual(expected, properties)
def test_watermarks_from_strategy(self):
rowtime = Rowtime().watermarks_from_strategy(
"org.apache.flink.table.legacyutils.CustomAssigner")
properties = rowtime.to_properties()
expected = {
'rowtime.watermarks.type': 'custom',
'rowtime.watermarks.class':
'org.apache.flink.table.legacyutils.CustomAssigner',
'rowtime.watermarks.serialized':
'rO0ABXNyADFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmxlZ2FjeXV0aWxzLkN1c3RvbUFzc2lnbmVyu_8'
'TLNBQBsACAAB4cgBHb3JnLmFwYWNoZS5mbGluay50YWJsZS5zb3VyY2VzLndtc3RyYXRlZ2llcy5QdW'
'5jdHVhdGVkV2F0ZXJtYXJrQXNzaWduZXKBUc57oaWu9AIAAHhyAD1vcmcuYXBhY2hlLmZsaW5rLnRhY'
'mxlLnNvdXJjZXMud21zdHJhdGVnaWVzLldhdGVybWFya1N0cmF0ZWd53nt-g2OWaT4CAAB4cA'}
self.assertEqual(expected, properties)
class SchemaDescriptorTests(PyFlinkTestCase):
def test_field(self):
schema = Schema()\
.field("int_field", DataTypes.INT())\
.field("long_field", DataTypes.BIGINT())\
.field("string_field", DataTypes.STRING())\
.field("timestamp_field", DataTypes.TIMESTAMP(3))\
.field("time_field", DataTypes.TIME())\
.field("date_field", DataTypes.DATE())\
.field("double_field", DataTypes.DOUBLE())\
.field("float_field", DataTypes.FLOAT())\
.field("byte_field", DataTypes.TINYINT())\
.field("short_field", DataTypes.SMALLINT())\
.field("boolean_field", DataTypes.BOOLEAN())
properties = schema.to_properties()
expected = {'schema.0.name': 'int_field',
'schema.0.data-type': 'INT',
'schema.1.name': 'long_field',
'schema.1.data-type': 'BIGINT',
'schema.2.name': 'string_field',
'schema.2.data-type': 'VARCHAR(2147483647)',
'schema.3.name': 'timestamp_field',
'schema.3.data-type': 'TIMESTAMP(3)',
'schema.4.name': 'time_field',
'schema.4.data-type': 'TIME(0)',
'schema.5.name': 'date_field',
'schema.5.data-type': 'DATE',
'schema.6.name': 'double_field',
'schema.6.data-type': 'DOUBLE',
'schema.7.name': 'float_field',
'schema.7.data-type': 'FLOAT',
'schema.8.name': 'byte_field',
'schema.8.data-type': 'TINYINT',
'schema.9.name': 'short_field',
'schema.9.data-type': 'SMALLINT',
'schema.10.name': 'boolean_field',
'schema.10.data-type': 'BOOLEAN'}
self.assertEqual(expected, properties)
def test_fields(self):
fields = collections.OrderedDict([
("int_field", DataTypes.INT()),
("long_field", DataTypes.BIGINT()),
("string_field", DataTypes.STRING()),
("timestamp_field", DataTypes.TIMESTAMP(3)),
("time_field", DataTypes.TIME()),
("date_field", DataTypes.DATE()),
("double_field", DataTypes.DOUBLE()),
("float_field", DataTypes.FLOAT()),
("byte_field", DataTypes.TINYINT()),
("short_field", DataTypes.SMALLINT()),
("boolean_field", DataTypes.BOOLEAN())
])
schema = Schema().fields(fields)
properties = schema.to_properties()
expected = {'schema.0.name': 'int_field',
'schema.0.data-type': 'INT',
'schema.1.name': 'long_field',
'schema.1.data-type': 'BIGINT',
'schema.2.name': 'string_field',
'schema.2.data-type': 'VARCHAR(2147483647)',
'schema.3.name': 'timestamp_field',
'schema.3.data-type': 'TIMESTAMP(3)',
'schema.4.name': 'time_field',
'schema.4.data-type': 'TIME(0)',
'schema.5.name': 'date_field',
'schema.5.data-type': 'DATE',
'schema.6.name': 'double_field',
'schema.6.data-type': 'DOUBLE',
'schema.7.name': 'float_field',
'schema.7.data-type': 'FLOAT',
'schema.8.name': 'byte_field',
'schema.8.data-type': 'TINYINT',
'schema.9.name': 'short_field',
'schema.9.data-type': 'SMALLINT',
'schema.10.name': 'boolean_field',
'schema.10.data-type': 'BOOLEAN'}
self.assertEqual(expected, properties)
if sys.version_info[:2] <= (3, 5):
fields = {
"int_field": DataTypes.INT(),
"long_field": DataTypes.BIGINT(),
"string_field": DataTypes.STRING(),
"timestamp_field": DataTypes.TIMESTAMP(3),
"time_field": DataTypes.TIME(),
"date_field": DataTypes.DATE(),
"double_field": DataTypes.DOUBLE(),
"float_field": DataTypes.FLOAT(),
"byte_field": DataTypes.TINYINT(),
"short_field": DataTypes.SMALLINT(),
"boolean_field": DataTypes.BOOLEAN()
}
self.assertRaises(TypeError, Schema().fields, fields)
def test_field_in_string(self):
schema = Schema()\
.field("int_field", 'INT')\
.field("long_field", 'BIGINT')\
.field("string_field", 'VARCHAR')\
.field("timestamp_field", 'SQL_TIMESTAMP')\
.field("time_field", 'SQL_TIME')\
.field("date_field", 'SQL_DATE')\
.field("double_field", 'DOUBLE')\
.field("float_field", 'FLOAT')\
.field("byte_field", 'TINYINT')\
.field("short_field", 'SMALLINT')\
.field("boolean_field", 'BOOLEAN')
properties = schema.to_properties()
expected = {'schema.0.name': 'int_field',
'schema.0.data-type': 'INT',
'schema.1.name': 'long_field',
'schema.1.data-type': 'BIGINT',
'schema.2.name': 'string_field',
'schema.2.data-type': 'VARCHAR',
'schema.3.name': 'timestamp_field',
'schema.3.data-type': 'TIMESTAMP(3)',
'schema.4.name': 'time_field',
'schema.4.data-type': 'TIME(0)',
'schema.5.name': 'date_field',
'schema.5.data-type': 'DATE',
'schema.6.name': 'double_field',
'schema.6.data-type': 'DOUBLE',
'schema.7.name': 'float_field',
'schema.7.data-type': 'FLOAT',
'schema.8.name': 'byte_field',
'schema.8.data-type': 'TINYINT',
'schema.9.name': 'short_field',
'schema.9.data-type': 'SMALLINT',
'schema.10.name': 'boolean_field',
'schema.10.data-type': 'BOOLEAN'}
self.assertEqual(expected, properties)
def test_from_origin_field(self):
schema = Schema()\
.field("int_field", DataTypes.INT())\
.field("long_field", DataTypes.BIGINT()).from_origin_field("origin_field_a")\
.field("string_field", DataTypes.STRING())
properties = schema.to_properties()
expected = {'schema.0.name': 'int_field',
'schema.0.data-type': 'INT',
'schema.1.name': 'long_field',
'schema.1.data-type': 'BIGINT',
'schema.1.from': 'origin_field_a',
'schema.2.name': 'string_field',
'schema.2.data-type': 'VARCHAR(2147483647)'}
self.assertEqual(expected, properties)
def test_proctime(self):
schema = Schema()\
.field("int_field", DataTypes.INT())\
.field("ptime", DataTypes.BIGINT()).proctime()\
.field("string_field", DataTypes.STRING())
properties = schema.to_properties()
expected = {'schema.0.name': 'int_field',
'schema.0.data-type': 'INT',
'schema.1.name': 'ptime',
'schema.1.data-type': 'BIGINT',
'schema.1.proctime': 'true',
'schema.2.name': 'string_field',
'schema.2.data-type': 'VARCHAR(2147483647)'}
self.assertEqual(expected, properties)
def test_rowtime(self):
schema = Schema()\
.field("int_field", DataTypes.INT())\
.field("long_field", DataTypes.BIGINT())\
.field("rtime", DataTypes.BIGINT())\
.rowtime(
Rowtime().timestamps_from_field("long_field").watermarks_periodic_bounded(5000))\
.field("string_field", DataTypes.STRING())
properties = schema.to_properties()
print(properties)
expected = {'schema.0.name': 'int_field',
'schema.0.data-type': 'INT',
'schema.1.name': 'long_field',
'schema.1.data-type': 'BIGINT',
'schema.2.name': 'rtime',
'schema.2.data-type': 'BIGINT',
'schema.2.rowtime.timestamps.type': 'from-field',
'schema.2.rowtime.timestamps.from': 'long_field',
'schema.2.rowtime.watermarks.type': 'periodic-bounded',
'schema.2.rowtime.watermarks.delay': '5000',
'schema.3.name': 'string_field',
'schema.3.data-type': 'VARCHAR(2147483647)'}
self.assertEqual(expected, properties)
def test_schema(self):
table_schema = TableSchema(["a", "b"], [DataTypes.INT(), DataTypes.STRING()])
schema = Schema().schema(table_schema)
properties = schema.to_properties()
expected = {'schema.0.name': 'a',
'schema.0.data-type': 'INT',
'schema.1.name': 'b',
'schema.1.data-type': 'VARCHAR(2147483647)'}
self.assertEqual(expected, properties)
if __name__ == '__main__':
import unittest
try:
import xmlrunner
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)