blob: b56df4d0415c3f6dc6d3fa942c85e04a9da24ae0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.descriptors
import org.apache.flink.table.api.Types
import org.apache.flink.table.descriptors.RowtimeValidator.{normalizeTimestampExtractor, normalizeWatermarkStrategy}
import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp, TimestampExtractor}
import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
/**
* Rowtime descriptor for describing an event time attribute in the schema.
*/
class Rowtime extends Descriptor {
private var timestampExtractor: Option[TimestampExtractor] = None
private var watermarkStrategy: Option[WatermarkStrategy] = None
/**
* Sets a built-in timestamp extractor that converts an existing [[Long]] or
* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
*
* @param fieldName The field to convert into a rowtime attribute.
*/
def timestampsFromField(fieldName: String): Rowtime = {
timestampExtractor = Some(new ExistingField(fieldName))
this
}
/**
* Sets a built-in timestamp extractor that converts the assigned timestamps from
* a DataStream API record into the rowtime attribute and thus preserves the assigned
* timestamps from the source.
*
* Note: This extractor only works in streaming environments.
*/
def timestampsFromSource(): Rowtime = {
timestampExtractor = Some(new StreamRecordTimestamp)
this
}
/**
* Sets a custom timestamp extractor to be used for the rowtime attribute.
*
* @param extractor The [[TimestampExtractor]] to extract the rowtime attribute
* from the physical type.
*/
def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
timestampExtractor = Some(extractor)
this
}
/**
* Sets a built-in watermark strategy for ascending rowtime attributes.
*
* Emits a watermark of the maximum observed timestamp so far minus 1.
* Rows that have a timestamp equal to the max timestamp are not late.
*/
def watermarksPeriodicAscending(): Rowtime = {
watermarkStrategy = Some(new AscendingTimestamps)
this
}
/**
* Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded
* time interval.
*
* Emits watermarks which are the maximum observed timestamp minus the specified delay.
*
* @param delay delay in milliseconds
*/
def watermarksPeriodicBounded(delay: Long): Rowtime = {
watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
this
}
/**
* Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
* underlying DataStream API and thus preserves the assigned watermarks from the source.
*/
def watermarksFromSource(): Rowtime = {
watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
this
}
/**
* Sets a custom watermark strategy to be used for the rowtime attribute.
*/
def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
watermarkStrategy = Some(strategy)
this
}
/**
* Converts this descriptor into a set of properties.
*/
final override def toProperties: java.util.Map[String, String] = {
val properties = new DescriptorProperties()
timestampExtractor.foreach(normalizeTimestampExtractor(_)
.foreach(e => properties.putString(e._1, e._2)))
watermarkStrategy.foreach(normalizeWatermarkStrategy(_)
.foreach(e => properties.putString(e._1, e._2)))
properties.asMap()
}
}
/**
* Rowtime descriptor for describing an event time attribute in the schema.
*/
object Rowtime {
/**
* Rowtime descriptor for describing an event time attribute in the schema.
*/
def apply(): Rowtime = new Rowtime()
}