| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.flink.table.api.scala |
| |
| import java.math.{BigDecimal => JBigDecimal} |
| import java.sql.{Date, Time, Timestamp} |
| |
| import org.apache.calcite.avatica.util.DateTimeUtils._ |
| import org.apache.flink.api.common.typeinfo.TypeInformation |
| import org.apache.flink.table.api.{CurrentRange, CurrentRow, TableException, UnboundedRange, UnboundedRow} |
| import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, toMilliInterval, toMonthInterval, toRangeInterval, toRowInterval} |
| import org.apache.flink.table.api.Table |
| import org.apache.flink.table.api.functions.AggregateFunction |
| import org.apache.flink.table.api.types.{DataTypes, InternalType} |
| import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit |
| import org.apache.flink.table.expressions.{Atan2, Cosh, DistinctAgg, Hex, Literal, RegexpExtract, RegexpReplace, Repeat, Replace, TimestampDiff, _} |
| import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit |
| import org.apache.flink.table.codegen.expr.DistinctAggregateFunction |
| |
| import scala.language.implicitConversions |
| |
| /** |
| * These are all the operations that can be used to construct an [[Expression]] AST for expression |
| * operations. |
| * |
| * These operations must be kept in sync with the parser in |
| * [[org.apache.flink.table.expressions.ExpressionParser]]. |
| */ |
| trait ImplicitExpressionOperations { |
| private[flink] def expr: Expression |
| |
| /** |
| * Enables literals on left side of binary expressions. |
| * |
| * e.g. 12.toExpr % 'a |
| * |
| * @return expression |
| */ |
| def toExpr: Expression = expr |
| |
| /** |
| * Boolean AND in three-valued logic. |
| */ |
| def && (other: Expression) = And(expr, other) |
| |
| /** |
| * Boolean OR in three-valued logic. |
| */ |
| def || (other: Expression) = Or(expr, other) |
| |
| /** |
| * Greater than. |
| */ |
| def > (other: Expression) = GreaterThan(expr, other) |
| |
| /** |
| * Greater than or equal. |
| */ |
| def >= (other: Expression) = GreaterThanOrEqual(expr, other) |
| |
| /** |
| * Less than. |
| */ |
| def < (other: Expression) = LessThan(expr, other) |
| |
| /** |
| * Less than or equal. |
| */ |
| def <= (other: Expression) = LessThanOrEqual(expr, other) |
| |
| /** |
| * Equals. |
| */ |
| def === (other: Expression) = EqualTo(expr, other) |
| |
| /** |
| * Not equal. |
| */ |
| def !== (other: Expression) = NotEqualTo(expr, other) |
| |
| /** |
| * Whether boolean expression is not true; returns null if boolean is null. |
| */ |
| def unary_! = Not(expr) |
| |
| /** |
| * Returns negative numeric. |
| */ |
| def unary_- = UnaryMinus(expr) |
| |
| /** |
| * Returns numeric. |
| */ |
| def unary_+ = expr |
| |
| /** |
| * Returns true if the given expression is null. |
| */ |
| def isNull = IsNull(expr) |
| |
| /** |
| * Returns true if the given expression is not null. |
| */ |
| def isNotNull = IsNotNull(expr) |
| |
| /** |
| * Returns true if given boolean expression is true. False otherwise (for null and false). |
| */ |
| def isTrue = IsTrue(expr) |
| |
| /** |
| * Returns true if given boolean expression is false. False otherwise (for null and true). |
| */ |
| def isFalse = IsFalse(expr) |
| |
| /** |
| * Returns true if given boolean expression is not true (for null and false). False otherwise. |
| */ |
| def isNotTrue = IsNotTrue(expr) |
| |
| /** |
| * Returns true if given boolean expression is not false (for null and true). False otherwise. |
| */ |
| def isNotFalse = IsNotFalse(expr) |
| |
| /** |
| * Returns left plus right. |
| */ |
| def + (other: Expression) = Plus(expr, other) |
| |
| /** |
| * Returns left minus right. |
| */ |
| def - (other: Expression) = Minus(expr, other) |
| |
| /** |
| * Returns left divided by right. |
| */ |
| def / (other: Expression) = Div(expr, other) |
| |
| /** |
| * Returns left multiplied by right. |
| */ |
| def * (other: Expression) = Mul(expr, other) |
| |
| /** |
| * Returns the remainder (modulus) of left divided by right. |
| * The result is negative only if left is negative. |
| */ |
| def % (other: Expression) = mod(other) |
| |
| /** |
| * Returns the sum of the numeric field across all input values. |
| * If all values are null, null is returned. |
| */ |
| def sum = Sum(expr) |
| |
| /** |
| * Returns the sum of the numeric field across all input values. |
| * If all values are null, 0 is returned. |
| */ |
| def sum0 = Sum0(expr) |
| |
| /** |
| * Returns the sum of the numeric field across all input values. |
| * If all values are null, null is returned. |
| * And its modified monotonicity is increasing. |
| */ |
| def incr_sum = IncrSum(expr) |
| |
| /** |
| * Returns the a string that stitching all of the input values. |
| * If all values are null, null is returned. |
| */ |
| def concat_agg(separator: Expression) = ConcatAgg(expr, separator) |
| |
| /** |
| * Returns the minimum value of field across all input values. |
| */ |
| def min = Min(expr) |
| |
| /** |
| * Returns the maximum value of field across all input values. |
| */ |
| def max = Max(expr) |
| |
| /** |
| * Returns the number of input rows for which the field is not null. |
| */ |
| def count = Count(expr) |
| |
| /** |
| * Returns the average (arithmetic mean) of the numeric field across all input values. |
| */ |
| def avg = Avg(expr) |
| |
| /** |
| * The Rank function computes the rank of a value in a group of values. |
| * The result is one plus the number of rows preceding or equal to the current row |
| * in the ordering of the partition. The values will produce gaps in the sequence. |
| */ |
| def rank = Rank() |
| |
| /** |
| * Computes the rank of a value in a group of values. The result is one plus the previously |
| * assigned rank value. Unlike the function rank, |
| * dense_rank will not produce gaps in the ranking sequence. |
| */ |
| def denseRank = DenseRank() |
| |
| /** |
| * Assigns a unique, sequential number to each row, starting with one, |
| * according to the ordering of rows within the window partition. |
| */ |
| def rowNumber = RowNumber() |
| |
| /** |
| * Returns the first value in an ordered set of the inputs. |
| * If all values are null, null is returned. |
| */ |
| def first_value = FirstValue(expr) |
| |
| /** |
| * Returns the last value in an ordered set of the inputs. |
| * If all values are null, null is returned. |
| */ |
| def last_value = LastValue(expr) |
| |
| /** |
| * Returns the population standard deviation of an expression (the square root of varPop()). |
| */ |
| def stddevPop = StddevPop(expr) |
| |
| /** |
| * Returns the sample standard deviation of an expression (the square root of varSamp()). |
| */ |
| def stddevSamp = StddevSamp(expr) |
| |
| /** |
| * Returns the standard deviation of an expression (the square root of variance()). |
| */ |
| def stddev = Stddev(expr) |
| |
| /** |
| * Returns the population standard variance of an expression. |
| */ |
| def varPop = VarPop(expr) |
| |
| /** |
| * Returns the sample variance of a given expression. |
| */ |
| def varSamp = VarSamp(expr) |
| |
| /** |
| * Returns the variance of a given expression. |
| */ |
| def variance = Variance(expr) |
| |
| /** |
| * Returns multiset aggregate of a given expression. |
| */ |
| def collect = Collect(expr) |
| |
| /** |
| * Converts a value to a given type. |
| * |
| * e.g. "42".cast(DataTypes.INT) leads to 42. |
| * |
| * @return casted expression |
| */ |
| def cast(toType: InternalType) = Cast(expr, toType) |
| |
| /** |
| * Reinterpret a value to a given type. |
| * When the physical storage of two types is the same, this operator may be used to |
| * reinterpret values of one type as the other. This operator is similar to |
| * a cast, except that it does not alter the data value. It |
| * performs an overflow check if it has <i>any</i> second operand, whether |
| * true or not. |
| * |
| * @return Reinterpreted expression |
| */ |
| def reinterpret(toType: InternalType, checkOverflow: Boolean) = |
| Reinterpret(expr, toType, checkOverflow) |
| |
| /** |
| * Specifies a name for an expression i.e. a field. |
| * |
| * @param name name for one field |
| * @param extraNames additional names if the expression expands to multiple fields |
| * @return field with an alias |
| */ |
| def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name)) |
| |
| /** |
| * Specifies ascending order of an expression i.e. a field for orderBy call. |
| * |
| * @return ascend expression |
| */ |
| def asc = Asc(expr) |
| |
| /** |
| * Specifies descending order of an expression i.e. a field for orderBy call. |
| * |
| * @return descend expression |
| */ |
| def desc = Desc(expr) |
| |
| def nullsFirst = NullsFirst(expr) |
| def nullsLast = NullsLast(expr) |
| |
| /** |
| * Returns true if an expression exists in a given list of expressions. This is a shorthand |
| * for multiple OR conditions. |
| * |
| * If the testing set contains null, the result will be null if the element can not be found |
| * and true if it can be found. If the element is null, the result is always null. |
| * |
| * e.g. "42".in(1, 2, 3) leads to false. |
| */ |
| def in(elements: Expression*) = In(expr, elements) |
| |
| /** |
| * Returns true if an expression exists in a given table sub-query. The sub-query table |
| * must consist of one column. This column must have the same data type as the expression. |
| * |
| * Note: This operation is not supported in a streaming environment yet. |
| */ |
| def in(table: Table) = In(expr, Seq(TableReference(table.toString, table))) |
| |
| /** |
| * Returns the start time (inclusive) of a window when applied on a window reference. |
| */ |
| def start = WindowStart(expr) |
| |
| /** |
| * Returns the end time (exclusive) of a window when applied on a window reference. |
| * |
| * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000. |
| */ |
| def end = WindowEnd(expr) |
| |
| /** |
| * Ternary conditional operator that decides which of two other expressions should be evaluated |
| * based on a evaluated boolean condition. |
| * |
| * e.g. (42 > 5).?("A", "B") leads to "A" |
| * |
| * @param ifTrue expression to be evaluated if condition holds |
| * @param ifFalse expression to be evaluated if condition does not hold |
| */ |
| def ?(ifTrue: Expression, ifFalse: Expression) = { |
| If(expr, ifTrue, ifFalse) |
| } |
| |
| // scalar functions |
| |
| /** |
| * Calculates the remainder of division the given number by another one. |
| */ |
| def mod(other: Expression) = Mod(expr, other) |
| |
| /** |
| * Calculates the Euler's number raised to the given power. |
| */ |
| def exp() = Exp(expr) |
| |
| /** |
| * Calculates the base 10 logarithm of the given value. |
| */ |
| def log10() = Log10(expr) |
| |
| /** |
| * Calculates the base 2 logarithm of the given value. |
| */ |
| def log2() = Log2(expr) |
| |
| /** |
| * Calculates the natural logarithm of the given value. |
| */ |
| def ln() = Ln(expr) |
| |
| /** |
| * Calculates the natural logarithm of the given value. |
| */ |
| def log() = Log(null, expr) |
| |
| /** |
| * Calculates the logarithm of the given value to the given base. |
| */ |
| def log(base: Expression) = Log(base, expr) |
| |
| /** |
| * Calculates the given number raised to the power of the other value. |
| */ |
| def power(other: Expression) = Power(expr, other) |
| |
| /** |
| * Calculates the hyperbolic cosine of a given value. |
| */ |
| def cosh() = Cosh(expr) |
| |
| /** |
| * Calculates the square root of a given value. |
| */ |
| def sqrt() = Sqrt(expr) |
| |
| /** |
| * Calculates the absolute value of given value. |
| */ |
| def abs() = Abs(expr) |
| |
| /** |
| * Calculates the largest integer less than or equal to a given number. |
| */ |
| def floor() = Floor(expr) |
| |
| /** |
| * Calculates the hyperbolic sine of a given value. |
| */ |
| def sinh() = Sinh(expr) |
| |
| /** |
| * Calculates the smallest integer greater than or equal to a given number. |
| */ |
| def ceil() = Ceil(expr) |
| |
| /** |
| * Calculates the sine of a given number. |
| */ |
| def sin() = Sin(expr) |
| |
| /** |
| * Calculates the cosine of a given number. |
| */ |
| def cos() = Cos(expr) |
| |
| /** |
| * Calculates the tangent of a given number. |
| */ |
| def tan() = Tan(expr) |
| |
| /** |
| * Calculates the cotangent of a given number. |
| */ |
| def cot() = Cot(expr) |
| |
| /** |
| * Calculates the arc sine of a given number. |
| */ |
| def asin() = Asin(expr) |
| |
| /** |
| * Calculates the arc cosine of a given number. |
| */ |
| def acos() = Acos(expr) |
| |
| /** |
| * Calculates the arc tangent of a given number. |
| */ |
| def atan() = Atan(expr) |
| |
| /** |
| * Calculates the hyperbolic tangent of a given number. |
| */ |
| def tanh() = Tanh(expr) |
| |
| /** |
| * Converts numeric from radians to degrees. |
| */ |
| def degrees() = Degrees(expr) |
| |
| /** |
| * Converts numeric from degrees to radians. |
| */ |
| def radians() = Radians(expr) |
| |
| /** |
| * Calculates the signum of a given number. |
| */ |
| def sign() = Sign(expr) |
| |
| /** |
| * Rounds the given number to integer places right to the decimal point. |
| */ |
| def round(places: Expression) = Round(expr, places) |
| |
| /** |
| * Returns a string representation of an integer numeric value in binary format. Returns null if |
| * numeric is null. E.g. "4" leads to "100", "12" leads to "1100". |
| */ |
| def bin() = Bin(expr) |
| |
| /** |
| * Returns a string representation of an integer numeric value or a string in hex format. Returns |
| * null if numeric or string is null. |
| * |
| * E.g. a numeric 20 leads to "14", a numeric 100 leads to "64", and a string "hello,world" leads |
| * to "68656c6c6f2c776f726c64". |
| */ |
| def hex() = Hex(expr) |
| |
| // String operations |
| |
| /** |
| * Creates a substring of the given string at given index for a given length. |
| * |
| * @param beginIndex first character of the substring (starting at 1, inclusive) |
| * @param length number of characters of the substring |
| * @return substring |
| */ |
| def substring(beginIndex: Expression, length: Expression) = |
| Substring(expr, beginIndex, length) |
| |
| /** |
| * Creates a substring of the given string beginning at the given index to the end. |
| * |
| * @param beginIndex first character of the substring (starting at 1, inclusive) |
| * @return substring |
| */ |
| def substring(beginIndex: Expression) = |
| new Substring(expr, beginIndex) |
| |
| /** |
| * Creates a subString of the given string at the leftmost n characters. |
| * If length <= 0, return empty string |
| * |
| * @param length number of characters of the substring |
| * @return the leftmost n characters from the given string. |
| */ |
| def left(length: Expression) = Left(expr, length) |
| |
| /** |
| * Creates a subString of the given string at the rightmost n characters. |
| * If length <= 0, return empty string |
| * |
| * @param length number of characters of the substring |
| * @return the rightmost n characters from the given string. |
| */ |
| def right(length: Expression) = Right(expr, length) |
| |
| /** |
| * Removes leading and/or trailing characters from the given string. |
| * |
| * @param removeLeading if true, remove leading characters (default: true) |
| * @param removeTrailing if true, remove trailing characters (default: true) |
| * @param character string containing the character (default: " ") |
| * @return trimmed string |
| */ |
| def trim( |
| removeLeading: Boolean = true, |
| removeTrailing: Boolean = true, |
| character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = { |
| if (removeLeading && removeTrailing) { |
| Trim(TrimMode.BOTH, character, expr) |
| } else if (removeLeading) { |
| Trim(TrimMode.LEADING, character, expr) |
| } else if (removeTrailing) { |
| Trim(TrimMode.TRAILING, character, expr) |
| } else { |
| expr |
| } |
| } |
| |
| /** |
| * Removes leading characters from the given string. |
| * |
| * @param character string containing the character (default: " ") |
| * @return trimmed string |
| */ |
| def ltrim(character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = Ltrim(expr, character) |
| |
| /** |
| * Removes trailing characters from the given string. |
| * |
| * @param character string containing the character (default: " ") |
| * @return trimmed string |
| */ |
| def rtrim(character: Expression = TrimConstants.TRIM_DEFAULT_CHAR) = Rtrim(expr, character) |
| |
| /** |
| * Returns a string that repeats the base string n times. |
| */ |
| def repeat(n: Expression) = Repeat(expr, n) |
| |
| /** |
| * Returns a new string which replaces all the occurrences of the search target |
| * with the replacement string (non-overlapping). |
| */ |
| def replace(search: Expression, replacement: Expression) = |
| Replace(expr, search, replacement) |
| |
| /** |
| * Returns the length of a string. |
| */ |
| def charLength() = CharLength(expr) |
| |
| /** |
| * Returns the length of a string. |
| */ |
| def length() = CharLength(expr) |
| |
| /** |
| * Returns all of the characters in a string in upper case using the rules of |
| * the default locale. |
| */ |
| def upperCase() = Upper(expr) |
| |
| /** |
| * Returns all of the characters in a string in lower case using the rules of |
| * the default locale. |
| */ |
| def lowerCase() = Lower(expr) |
| |
| /** |
| * Converts the initial letter of each word in a string to uppercase. |
| * Assumes a string containing only [A-Za-z0-9], everything else is treated as whitespace. |
| */ |
| def initCap() = InitCap(expr) |
| |
| /** |
| * Returns true, if a string matches the specified LIKE pattern. |
| * |
| * e.g. "Jo_n%" matches all strings that start with "Jo(arbitrary letter)n" |
| */ |
| def like(pattern: Expression) = Like(expr, pattern) |
| |
| /** |
| * Returns true, if a string matches the specified SQL regex pattern. |
| * |
| * e.g. "A+" matches all strings that consist of at least one A |
| */ |
| def similar(pattern: Expression) = Similar(expr, pattern) |
| |
| /** |
| * Returns the position of string in an other string starting at 1. |
| * Returns 0 if string could not be found. |
| * |
| * e.g. "a".position("bbbbba") leads to 6 |
| */ |
| def position(haystack: Expression) = Position(expr, haystack) |
| |
| /** |
| * Returns a string left-padded with the given pad string to a length of len characters. If |
| * the string is longer than len, the return value is shortened to len characters. |
| * |
| * e.g. "hi".lpad(4, '??') returns "??hi", "hi".lpad(1, '??') returns "h" |
| */ |
| def lpad(len: Expression, pad: Expression) = Lpad(expr, len, pad) |
| |
| /** |
| * Returns a string right-padded with the given pad string to a length of len characters. If |
| * the string is longer than len, the return value is shortened to len characters. |
| * |
| * e.g. "hi".rpad(4, '??') returns "hi??", "hi".rpad(1, '??') returns "h" |
| */ |
| def rpad(len: Expression, pad: Expression) = Rpad(expr, len, pad) |
| |
| /** |
| * Returns the position of the nth appearance occurrence of the substring within the string |
| * starting at given start position. |
| * When start position is negative, then INSTR counts and searches backward from the end of |
| * string. |
| * Returns 0 if string could not be found. |
| * |
| * e.g. |
| * "Corporate Floor".instr("or", 3, 2) leads to 14 |
| * "Corporate Floor".instr("or", -3, 2) leads to 2 |
| */ |
| def instr(subString: Expression, |
| startPosition: Expression = Literal(1), |
| nthAppearance: Expression = Literal(1)) = |
| new Instr(expr, subString, startPosition, nthAppearance) |
| |
| /** |
| * For windowing function to config over window |
| * e.g.: |
| * table |
| * .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) |
| * .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) |
| */ |
| def over(alias: Expression): Expression = { |
| expr match { |
| case _: Aggregation => UnresolvedOverCall( |
| expr.asInstanceOf[Aggregation], |
| alias) |
| case _ => throw new TableException( |
| "The over method can only using with aggregation expression.") |
| } |
| } |
| |
| /** |
| * Returns a string extracted with a specified regular expression and a regex match group index. |
| */ |
| def regexpExtract(regex: Expression, extractIndex: Expression) = |
| RegexpExtract(expr, regex, extractIndex) |
| |
| /** |
| * Returns a string extracted with a specified regular expression. |
| */ |
| def regexpExtract(regex: Expression) = |
| RegexpExtract(expr, regex, null) |
| |
| /** |
| * Replaces a substring of string with a string starting at a position (starting at 1). |
| * |
| * e.g. "xxxxxtest".overlay("xxxx", 6) leads to "xxxxxxxxx" |
| */ |
| def overlay(newString: Expression, starting: Expression) = new Overlay(expr, newString, starting) |
| |
| /** |
| * Replaces a substring of string with a string starting at a position (starting at 1). |
| * The length specifies how many characters should be removed. |
| * |
| * e.g. "xxxxxtest".overlay("xxxx", 6, 2) leads to "xxxxxxxxxst" |
| */ |
| def overlay(newString: Expression, starting: Expression, length: Expression) = |
| Overlay(expr, newString, starting, length) |
| |
| /** |
| * Returns the base string decoded with base64. |
| */ |
| def fromBase64() = FromBase64(expr) |
| |
| /** |
| * Returns the base64-encoded result of the input string. |
| */ |
| def toBase64() = ToBase64(expr) |
| |
| /** |
| * Returns a string with all substrings that match the regular expression consecutively |
| * being replaced. |
| */ |
| def regexpReplace(regex: Expression, replacement: Expression) = |
| RegexpReplace(expr, regex, replacement) |
| |
| /** |
| * Returns the position of string in an other string starting at 1. |
| * Returns 0 if string could not be found. |
| * |
| * e.g. "st".locate("myteststring") leads to 5 |
| */ |
| def locate(haystack: Expression) = new Locate(expr, haystack) |
| |
| /** |
| * Returns the position of string in an other string starting at a position. |
| * Returns 0 if string could not be found. |
| * |
| * e.g. "st".locate("myteststring", 6) leads to 7 |
| */ |
| def locate(haystack: Expression, starting: Expression) = Locate(expr, haystack, starting) |
| |
| // Temporal operations |
| |
| /** |
| * Parses a date string in the form "yy-mm-dd" to a SQL Date. |
| */ |
| def toDate = Cast(expr, DataTypes.DATE) |
| |
| /** |
| * Parses a time string in the form "hh:mm:ss" to a SQL Time. |
| */ |
| def toTime = Cast(expr, DataTypes.TIME) |
| |
| /** |
| * Parses a timestamp string in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. |
| */ |
| def toTimestamp = Cast(expr, DataTypes.TIMESTAMP) |
| |
| /** |
| * Extracts parts of a time point or time interval. Returns the part as a long value. |
| * |
| * e.g. "2006-06-05".toDate.extract(DAY) leads to 5 |
| */ |
| def extract(timeIntervalUnit: TimeIntervalUnit) = Extract(timeIntervalUnit, expr) |
| |
| /** |
| * Rounds down a time point to the given unit. |
| * |
| * e.g. "12:44:31".toDate.floor(MINUTE) leads to 12:44:00 |
| */ |
| def floor(timeIntervalUnit: TimeIntervalUnit) = TemporalFloor(timeIntervalUnit, expr) |
| |
| /** |
| * Rounds up a time point to the given unit. |
| * |
| * e.g. "12:44:31".toDate.ceil(MINUTE) leads to 12:45:00 |
| */ |
| def ceil(timeIntervalUnit: TimeIntervalUnit) = TemporalCeil(timeIntervalUnit, expr) |
| |
| // Interval types |
| |
| /** |
| * Creates an interval of the given number of years. |
| * |
| * @return interval of months |
| */ |
| def year: Expression = toMonthInterval(expr, 12) |
| |
| /** |
| * Creates an interval of the given number of years. |
| * |
| * @return interval of months |
| */ |
| def years: Expression = year |
| |
| /** |
| * Creates an interval of the given number of quarters. |
| * |
| * @return interval of months |
| */ |
| def quarter: Expression = toMonthInterval(expr, 3) |
| |
| /** |
| * Creates an interval of the given number of quarters. |
| * |
| * @return interval of months |
| */ |
| def quarters: Expression = quarter |
| |
| /** |
| * Creates an interval of the given number of months. |
| * |
| * @return interval of months |
| */ |
| def month: Expression = toMonthInterval(expr, 1) |
| |
| /** |
| * Creates an interval of the given number of months. |
| * |
| * @return interval of months |
| */ |
| def months: Expression = month |
| |
| /** |
| * Creates an interval of the given number of weeks. |
| * |
| * @return interval of milliseconds |
| */ |
| def week: Expression = toMilliInterval(expr, 7 * MILLIS_PER_DAY) |
| |
| /** |
| * Creates an interval of the given number of weeks. |
| * |
| * @return interval of milliseconds |
| */ |
| def weeks: Expression = week |
| |
| /** |
| * Creates an interval of the given number of days. |
| * |
| * @return interval of milliseconds |
| */ |
| def day: Expression = toMilliInterval(expr, MILLIS_PER_DAY) |
| |
| /** |
| * Creates an interval of the given number of days. |
| * |
| * @return interval of milliseconds |
| */ |
| def days: Expression = day |
| |
| /** |
| * Creates an interval of the given number of hours. |
| * |
| * @return interval of milliseconds |
| */ |
| def hour: Expression = toMilliInterval(expr, MILLIS_PER_HOUR) |
| |
| /** |
| * Creates an interval of the given number of hours. |
| * |
| * @return interval of milliseconds |
| */ |
| def hours: Expression = hour |
| |
| /** |
| * Creates an interval of the given number of minutes. |
| * |
| * @return interval of milliseconds |
| */ |
| def minute: Expression = toMilliInterval(expr, MILLIS_PER_MINUTE) |
| |
| /** |
| * Creates an interval of the given number of minutes. |
| * |
| * @return interval of milliseconds |
| */ |
| def minutes: Expression = minute |
| |
| /** |
| * Creates an interval of the given number of seconds. |
| * |
| * @return interval of milliseconds |
| */ |
| def second: Expression = toMilliInterval(expr, MILLIS_PER_SECOND) |
| |
| /** |
| * Creates an interval of the given number of seconds. |
| * |
| * @return interval of milliseconds |
| */ |
| def seconds: Expression = second |
| |
| /** |
| * Creates an interval of the given number of milliseconds. |
| * |
| * @return interval of milliseconds |
| */ |
| def milli: Expression = toMilliInterval(expr, 1) |
| |
| /** |
| * Creates an interval of the given number of milliseconds. |
| * |
| * @return interval of milliseconds |
| */ |
| def millis: Expression = milli |
| |
| // Row interval type |
| |
| /** |
| * Creates an interval of rows. |
| * |
| * @return interval of rows |
| */ |
| def rows: Expression = toRowInterval(expr) |
| |
| // Range interval type |
| |
| /** |
| * Creates an interval of range. |
| * |
| * @return interval of range |
| */ |
| def range = toRangeInterval(expr) |
| |
| // Advanced type helper functions |
| |
| /** |
| * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and |
| * returns it's value. |
| * |
| * @param name name of the field (similar to Flink's field expressions) |
| * @return value of the field |
| */ |
| def get(name: String) = GetCompositeField(expr, name) |
| |
| /** |
| * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and |
| * returns it's value. |
| * |
| * @param index position of the field |
| * @return value of the field |
| */ |
| def get(index: Int) = GetCompositeField(expr, index) |
| |
| /** |
| * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes |
| * into a flat representation where every subtype is a separate field. |
| */ |
| def flatten() = Flattening(expr) |
| |
| /** |
| * Accesses the element of an array based on an index (starting at 1). |
| * Or accesses the element of a map based on key. |
| * |
| * @param index position of the element (starting at 1), or key of a map |
| * @return value of the element |
| */ |
| def at(index: Expression) = ItemAt(expr, index) |
| |
| /** |
| * Returns the number of elements of an array or a map. |
| * |
| * @return number of elements |
| */ |
| def cardinality() = Cardinality(expr) |
| |
| /** |
| * Returns the sole element of an array with a single element. Returns null if the array is |
| * empty. Throws an exception if the array has more than one element. |
| * |
| * @return the first and only element of an array with a single element |
| */ |
| def element() = ArrayElement(expr) |
| |
| // Time definition |
| |
| /** |
| * Declares a field as the rowtime attribute for indicating, accessing, and working in |
| * Flink's event time. |
| */ |
| def rowtime = RowtimeAttribute(expr) |
| |
| /** |
| * Declares a field as the proctime attribute for indicating, accessing, and working in |
| * Flink's processing time. |
| */ |
| def proctime = ProctimeAttribute(expr) |
| |
| /** |
| * Returns the ASCII code of the left-most character of a string. |
| * Returns 0 if the string is empty. Returns null if the string is Null. |
| * Note that if the leftmost character is out of the range (0,127), the behavior is depending on |
| * the RDMB's implemantation of the function. In Blink, returns the the numeric value of the |
| * first byte of the left-most character. |
| */ |
| def ascii() = Ascii(expr) |
| |
| /** |
| * Encodes the given string into Binary using the given charset. |
| * Returns null if the given string is null. |
| * Returns null if the given charset is null. |
| */ |
| def encode(charset: Expression) = Encode(expr, charset) |
| |
| /** |
| * Decodes the given binary into String using the given charset. |
| * Returns null if the given binary is null. |
| * Returns null if the given charset is null. |
| */ |
| def decode(charset: Expression) = Decode(expr, charset) |
| |
| // Hash functions |
| |
| /** |
| * Returns the MD5 hash of the string argument; null if string is null. |
| * |
| * @return string of 32 hexadecimal digits or null |
| */ |
| def md5() = Md5(expr) |
| |
| /** |
| * Returns the SHA-1 hash of the string argument; null if string is null. |
| * |
| * @return string of 40 hexadecimal digits or null |
| */ |
| def sha1() = Sha1(expr) |
| |
| /** |
| * Returns the SHA-224 hash of the string argument; null if string is null. |
| * |
| * @return string of 56 hexadecimal digits or null |
| */ |
| def sha224() = Sha224(expr) |
| |
| /** |
| * Returns the SHA-256 hash of the string argument; null if string is null. |
| * |
| * @return string of 64 hexadecimal digits or null |
| */ |
| def sha256() = Sha256(expr) |
| |
| /** |
| * Returns the SHA-384 hash of the string argument; null if string is null. |
| * |
| * @return string of 96 hexadecimal digits or null |
| */ |
| def sha384() = Sha384(expr) |
| |
| /** |
| * Returns the SHA-512 hash of the string argument; null if string is null. |
| * |
| * @return string of 128 hexadecimal digits or null |
| */ |
| def sha512() = Sha512(expr) |
| |
| /** |
| * Returns the hash for the given string expression using the SHA-2 family of hash |
| * functions (SHA-224, SHA-256, SHA-384, or SHA-512). |
| * |
| * @param hashLength bit length of the result (either 224, 256, 384, or 512) |
| * @return string or null if one of the arguments is null. |
| */ |
| def sha2(hashLength: Expression) = Sha2(expr, hashLength) |
| |
| /** |
| * Returns the Between with lower bound included and upper bound included. |
| * @param lowerBound |
| * @param upperBound |
| * @return Returns the Between with lower bound included and upper bound included |
| */ |
| def between(lowerBound: Expression, upperBound: Expression) = |
| Between(expr, lowerBound, upperBound) |
| |
| /** |
| * Returns the not Between with lower bound included and upper bound included. |
| * @param lowerBound |
| * @param upperBound |
| * @return Returns the NotBetween with lower bound included and upper bound included |
| */ |
| def notBetween(lowerBound: Expression, upperBound: Expression) = |
| NotBetween(expr, lowerBound, upperBound) |
| } |
| |
| /** |
| * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]] |
| * to [[ImplicitExpressionOperations]]. |
| */ |
| trait ImplicitExpressionConversions { |
| |
| implicit val UNBOUNDED_ROW = UnboundedRow() |
| implicit val UNBOUNDED_RANGE = UnboundedRange() |
| |
| implicit val CURRENT_ROW = CurrentRow() |
| implicit val CURRENT_RANGE = CurrentRange() |
| |
| implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { |
| def expr = e |
| } |
| |
| implicit class UnresolvedFieldExpression(s: Symbol) extends ImplicitExpressionOperations { |
| def expr = UnresolvedFieldReference(s.name) |
| } |
| |
| implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations { |
| def expr = Literal(l) |
| } |
| |
| implicit class LiteralByteExpression(b: Byte) extends ImplicitExpressionOperations { |
| def expr = Literal(b) |
| } |
| |
| implicit class LiteralShortExpression(s: Short) extends ImplicitExpressionOperations { |
| def expr = Literal(s) |
| } |
| |
| implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { |
| def expr = Literal(i) |
| } |
| |
| implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations { |
| def expr = Literal(f) |
| } |
| |
| implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations { |
| def expr = Literal(d) |
| } |
| |
| implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations { |
| def expr = Literal(str) |
| } |
| |
| implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations { |
| def expr = Literal(bool) |
| } |
| |
| implicit class LiteralJavaDecimalExpression(javaDecimal: java.math.BigDecimal) |
| extends ImplicitExpressionOperations { |
| def expr = Literal(javaDecimal) |
| } |
| |
| implicit class LiteralScalaDecimalExpression(scalaDecimal: scala.math.BigDecimal) |
| extends ImplicitExpressionOperations { |
| def expr = Literal(scalaDecimal.bigDecimal) |
| } |
| |
| implicit class LiteralSqlDateExpression(sqlDate: Date) extends ImplicitExpressionOperations { |
| def expr = Literal(sqlDate) |
| } |
| |
| implicit class LiteralSqlTimeExpression(sqlTime: Time) extends ImplicitExpressionOperations { |
| def expr = Literal(sqlTime) |
| } |
| |
| implicit class LiteralSqlTimestampExpression(sqlTimestamp: Timestamp) |
| extends ImplicitExpressionOperations { |
| def expr = Literal(sqlTimestamp) |
| } |
| |
| implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) |
| implicit def byte2Literal(b: Byte): Literal = Literal(b) |
| implicit def short2Literal(s: Short): Literal = Literal(s) |
| implicit def int2Literal(i: Int): Literal = Literal(i) |
| implicit def long2Literal(l: Long): Literal = Literal(l) |
| implicit def double2Literal(d: Double): Literal = Literal(d) |
| implicit def float2Literal(d: Float): Literal = Literal(d) |
| implicit def string2Literal(str: String): Literal = Literal(str) |
| implicit def boolean2Literal(bool: Boolean): Literal = Literal(bool) |
| implicit def javaDec2Literal(javaDec: JBigDecimal): Literal = Literal(javaDec) |
| implicit def scalaDec2Literal(scalaDec: BigDecimal): Literal = |
| Literal(scalaDec.bigDecimal) |
| implicit def sqlDate2Literal(sqlDate: Date): Literal = Literal(sqlDate) |
| implicit def sqlTime2Literal(sqlTime: Time): Literal = Literal(sqlTime) |
| implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Literal = |
| Literal(sqlTimestamp) |
| implicit def array2ArrayConstructor(array: Array[_]): Expression = convertArray(array) |
| implicit def userDefinedAggFunctionConstructor[T: TypeInformation, ACC: TypeInformation] |
| (udagg: AggregateFunction[T, ACC]): UDAGGExpression[T, ACC] = UDAGGExpression(udagg) |
| implicit def toDistinct(agg: Aggregation): DistinctAgg = DistinctAgg(agg) |
| implicit def toDistinct[T: TypeInformation, ACC: TypeInformation] |
| (agg: AggregateFunction[T, ACC]): DistinctAggregateFunction[T, ACC] = |
| DistinctAggregateFunction(agg) |
| } |
| |
| // ------------------------------------------------------------------------------------------------ |
| // Expressions with no parameters |
| // ------------------------------------------------------------------------------------------------ |
| |
| // we disable the object checker here as it checks for capital letters of objects |
| // but we want that objects look like functions in certain cases e.g. array(1, 2, 3) |
| // scalastyle:off object.name |
| |
| /** |
| * Returns the current SQL date in UTC time zone. |
| */ |
| object currentDate { |
| |
| /** |
| * Returns the current SQL date in UTC time zone. |
| */ |
| def apply(): Expression = { |
| CurrentDate() |
| } |
| } |
| |
| /** |
| * Returns the current SQL time in UTC time zone. |
| */ |
| object currentTime { |
| |
| /** |
| * Returns the current SQL time in UTC time zone. |
| */ |
| def apply(): Expression = { |
| CurrentTime() |
| } |
| } |
| |
| /** |
| * Returns the current SQL timestamp in UTC time zone. |
| */ |
| object currentTimestamp { |
| |
| /** |
| * Returns the current SQL timestamp in UTC time zone. |
| */ |
| def apply(): Expression = { |
| CurrentTimestamp() |
| } |
| } |
| |
| /** |
| * Returns the current SQL time in local time zone. |
| */ |
| object localTime { |
| |
| /** |
| * Returns the current SQL time in local time zone. |
| */ |
| def apply(): Expression = { |
| LocalTime() |
| } |
| } |
| |
| /** |
| * Returns the current SQL timestamp in local time zone. |
| */ |
| object localTimestamp { |
| |
| /** |
| * Returns the current SQL timestamp in local time zone. |
| */ |
| def apply(): Expression = { |
| LocalTimestamp() |
| } |
| } |
| |
| /** |
| * Determines whether two anchored time intervals overlap. Time point and temporal are |
| * transformed into a range defined by two time points (start, end). The function |
| * evaluates <code>leftEnd >= rightStart && rightEnd >= leftStart</code>. |
| * |
| * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart |
| * |
| * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true |
| */ |
| object temporalOverlaps { |
| |
| /** |
| * Determines whether two anchored time intervals overlap. Time point and temporal are |
| * transformed into a range defined by two time points (start, end). |
| * |
| * It evaluates: leftEnd >= rightStart && rightEnd >= leftStart |
| * |
| * e.g. temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hour) leads to true |
| */ |
| def apply( |
| leftTimePoint: Expression, |
| leftTemporal: Expression, |
| rightTimePoint: Expression, |
| rightTemporal: Expression): Expression = { |
| TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) |
| } |
| } |
| |
| /** |
| * Formats a timestamp as a string using a specified format. |
| * The format must be compatible with MySQL's date formatting syntax as used by the |
| * date_parse function. |
| * |
| * For example <code>dataFormat('time, "%Y, %d %M")</code> results in strings |
| * formatted as "2017, 05 May". |
| */ |
| object dateFormat { |
| |
| /** |
| * Formats a timestamp as a string using a specified format. |
| * The format must be compatible with MySQL's date formatting syntax as used by the |
| * date_parse function. |
| * |
| * For example dataFormat('time, "%Y, %d %M") results in strings formatted as "2017, 05 May". |
| * |
| * @param timestamp The timestamp to format as string. |
| * @param format The format of the string. |
| * @return The formatted timestamp as string. |
| */ |
| def apply( |
| timestamp: Expression, |
| format: Expression |
| ): Expression = { |
| DateFormat(timestamp, format) |
| } |
| } |
| |
| /** |
| * Creates a row of expressions. |
| */ |
| object row { |
| |
| /** |
| * Creates a row of expressions. |
| */ |
| def apply(head: Expression, tail: Expression*): Expression = { |
| RowConstructor(head +: tail.toSeq) |
| } |
| } |
| |
| /** |
| * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2. |
| * |
| * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads |
| * to 3. |
| */ |
| object timestampDiff { |
| |
| /** |
| * Returns the (signed) number of [[TimePointUnit]] between timePoint1 and timePoint2. |
| * |
| * For example, timestampDiff(TimePointUnit.DAY, '2016-06-15'.toDate, '2016-06-18'.toDate leads |
| * to 3. |
| * |
| * @param timePointUnit The unit to compute diff. |
| * @param timePoint1 The first point in time. |
| * @param timePoint2 The second point in time. |
| * @return The number of intervals as integer value. |
| */ |
| def apply( |
| timePointUnit: TimePointUnit, |
| timePoint1: Expression, |
| timePoint2: Expression) |
| : Expression = { |
| TimestampDiff(timePointUnit, timePoint1, timePoint2) |
| } |
| } |
| |
| /** |
| * Creates an array of expressions. The array will be an array of objects (not primitives). |
| */ |
| object array { |
| |
| /** |
| * Creates an array of expressions. The array will be an array of objects (not primitives). |
| */ |
| def apply(head: Expression, tail: Expression*): Expression = { |
| ArrayConstructor(head +: tail.toSeq) |
| } |
| } |
| |
| /** |
| * Creates a map of expressions. The map will be a map between two objects (not primitives). |
| */ |
| object map { |
| |
| /** |
| * Creates a map of expressions. The map will be a map between two objects (not primitives). |
| */ |
| def apply(key: Expression, value: Expression, tail: Expression*): Expression = { |
| MapConstructor(Seq(key, value) ++ tail.toSeq) |
| } |
| } |
| |
| /** |
| * Returns a value that is closer than any other value to pi. |
| */ |
| object pi { |
| |
| /** |
| * Returns a value that is closer than any other value to pi. |
| */ |
| def apply(): Expression = { |
| Pi() |
| } |
| } |
| |
| /** |
| * Returns a value that is closer than any other value to e. |
| */ |
| object e { |
| |
| /** |
| * Returns a value that is closer than any other value to e. |
| */ |
| def apply(): Expression = { |
| E() |
| } |
| } |
| |
| /** |
| * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). |
| */ |
| object rand { |
| |
| /** |
| * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). |
| */ |
| def apply(): Expression = { |
| new Rand() |
| } |
| |
| /** |
| * Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a |
| * initial seed. Two rand() functions will return identical sequences of numbers if they |
| * have same initial seed. |
| */ |
| def apply(seed: Expression): Expression = { |
| Rand(seed) |
| } |
| } |
| |
| /** |
| * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified |
| * value (exclusive). |
| */ |
| object randInteger { |
| |
| /** |
| * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified |
| * value (exclusive). |
| */ |
| def apply(bound: Expression): Expression = { |
| new RandInteger(bound) |
| } |
| |
| /** |
| * Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value |
| * (exclusive) with a initial seed. Two randInteger() functions will return identical sequences |
| * of numbers if they have same initial seed and same bound. |
| */ |
| def apply(seed: Expression, bound: Expression): Expression = { |
| RandInteger(seed, bound) |
| } |
| } |
| |
| /** |
| * Returns the string that results from concatenating the arguments. |
| * Returns NULL if any argument is NULL. |
| */ |
| object concat { |
| def apply(string: Expression, strings: Expression*): Expression = { |
| new Concat(Seq(string) ++ strings) |
| } |
| } |
| |
| /** |
| * Returns the string that results from concatenating the arguments and separator. |
| * Returns NULL If the separator is NULL. |
| * |
| * Note: this user-defined function does not skip empty strings. However, it does skip any NULL |
| * values after the separator argument. |
| **/ |
| object concat_ws { |
| def apply(separator: Expression, string: Expression, strings: Expression*): Expression = { |
| new ConcatWs(separator, Seq(string) ++ strings) |
| } |
| } |
| |
| /** |
| * Calculates the arc tangent of a given coordinate. |
| */ |
| object atan2 { |
| |
| /** |
| * Calculates the arc tangent of a given coordinate. |
| */ |
| def apply(y: Expression, x: Expression): Expression = { |
| Atan2(y, x) |
| } |
| } |
| |
| object proctime { |
| def apply(): Expression = Proctime() |
| } |
| |
| object log { |
| def apply(base: Expression, antilogarithm: Expression): Expression = { |
| Log(base, antilogarithm) |
| } |
| def apply(antilogarithm: Expression): Expression = { |
| new Log(antilogarithm) |
| } |
| } |
| |
| /** |
| * Returns an UUID (Universally Unique Identifier) string (e.g., |
| * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly |
| * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number |
| * generator. |
| */ |
| object uuid { |
| |
| /** |
| * Returns an UUID (Universally Unique Identifier) string (e.g., |
| * "3d3c68f7-f608-473f-b60c-b0c44ad4cc4e") according to RFC 4122 type 4 (pseudo randomly |
| * generated) UUID. The UUID is generated using a cryptographically strong pseudo random number |
| * generator. |
| */ |
| def apply(): Expression = { |
| UUID() |
| } |
| } |
| |
| // scalastyle:on object.name |