| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| # WindowSpec.R - WindowSpec class and methods implemented in S4 OO classes |
| |
| #' @include generics.R jobj.R column.R |
| NULL |
| |
| #' S4 class that represents a WindowSpec |
| #' |
| #' WindowSpec can be created by using windowPartitionBy() or windowOrderBy() |
| #' |
| #' @rdname WindowSpec |
| #' @seealso \link{windowPartitionBy}, \link{windowOrderBy} |
| #' |
| #' @param sws A Java object reference to the backing Scala WindowSpec |
| #' @note WindowSpec since 2.0.0 |
| setClass("WindowSpec", |
| slots = list(sws = "jobj")) |
| |
| setMethod("initialize", "WindowSpec", function(.Object, sws) { |
| .Object@sws <- sws |
| .Object |
| }) |
| |
| windowSpec <- function(sws) { |
| stopifnot(class(sws) == "jobj") |
| new("WindowSpec", sws) |
| } |
| |
| #' @rdname show |
| #' @note show(WindowSpec) since 2.0.0 |
| setMethod("show", "WindowSpec", |
| function(object) { |
| cat("WindowSpec", callJMethod(object@sws, "toString"), "\n") |
| }) |
| |
| #' partitionBy |
| #' |
| #' Defines the partitioning columns in a WindowSpec. |
| #' |
| #' @param x a WindowSpec. |
| #' @param col a column to partition on (described by the name or Column). |
| #' @param ... additional column(s) to partition on. |
| #' @return A WindowSpec. |
| #' @rdname partitionBy |
| #' @name partitionBy |
| #' @aliases partitionBy,WindowSpec-method |
| #' @family windowspec_method |
| #' @examples |
| #' \dontrun{ |
| #' partitionBy(ws, "col1", "col2") |
| #' partitionBy(ws, df$col1, df$col2) |
| #' } |
| #' @note partitionBy(WindowSpec) since 2.0.0 |
| setMethod("partitionBy", |
| signature(x = "WindowSpec"), |
| function(x, col, ...) { |
| stopifnot(class(col) %in% c("character", "Column")) |
| |
| if (class(col) == "character") { |
| windowSpec(callJMethod(x@sws, "partitionBy", col, list(...))) |
| } else { |
| jcols <- lapply(list(col, ...), function(c) { |
| c@jc |
| }) |
| windowSpec(callJMethod(x@sws, "partitionBy", jcols)) |
| } |
| }) |
| |
| #' Ordering Columns in a WindowSpec |
| #' |
| #' Defines the ordering columns in a WindowSpec. |
| #' @param x a WindowSpec |
| #' @param col a character or Column indicating an ordering column |
| #' @param ... additional sorting fields |
| #' @return A WindowSpec. |
| #' @name orderBy |
| #' @rdname orderBy |
| #' @aliases orderBy,WindowSpec,character-method |
| #' @family windowspec_method |
| #' @seealso See \link{arrange} for use in sorting a SparkDataFrame |
| #' @examples |
| #' \dontrun{ |
| #' orderBy(ws, "col1", "col2") |
| #' orderBy(ws, df$col1, df$col2) |
| #' } |
| #' @note orderBy(WindowSpec, character) since 2.0.0 |
| setMethod("orderBy", |
| signature(x = "WindowSpec", col = "character"), |
| function(x, col, ...) { |
| windowSpec(callJMethod(x@sws, "orderBy", col, list(...))) |
| }) |
| |
| #' @rdname orderBy |
| #' @name orderBy |
| #' @aliases orderBy,WindowSpec,Column-method |
| #' @note orderBy(WindowSpec, Column) since 2.0.0 |
| setMethod("orderBy", |
| signature(x = "WindowSpec", col = "Column"), |
| function(x, col, ...) { |
| jcols <- lapply(list(col, ...), function(c) { |
| c@jc |
| }) |
| windowSpec(callJMethod(x@sws, "orderBy", jcols)) |
| }) |
| |
| #' rowsBetween |
| #' |
| #' Defines the frame boundaries, from \code{start} (inclusive) to \code{end} (inclusive). |
| #' |
| #' Both \code{start} and \code{end} are relative positions from the current row. For example, |
| #' "0" means "current row", while "-1" means the row before the current row, and "5" means the |
| #' fifth row after the current row. |
| #' |
| #' We recommend users use \code{Window.unboundedPreceding}, \code{Window.unboundedFollowing}, |
| #' and \code{Window.currentRow} to specify special boundary values, rather than using long values |
| #' directly. |
| #' |
| #' A row based boundary is based on the position of the row within the partition. |
| #' An offset indicates the number of rows above or below the current row, the frame for the |
| #' current row starts or ends. For instance, given a row based sliding frame with a lower bound |
| #' offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from |
| #' index 4 to index 6. |
| #' |
| #' @param x a WindowSpec |
| #' @param start boundary start, inclusive. |
| #' The frame is unbounded if this is the minimum long value. |
| #' @param end boundary end, inclusive. |
| #' The frame is unbounded if this is the maximum long value. |
| #' @return a WindowSpec |
| #' @rdname rowsBetween |
| #' @aliases rowsBetween,WindowSpec,numeric,numeric-method |
| #' @name rowsBetween |
| #' @family windowspec_method |
| #' @examples |
| #' \dontrun{ |
| #' id <- c(rep(1, 3), rep(2, 3), 3) |
| #' desc <- c('New', 'New', 'Good', 'New', 'Good', 'Good', 'New') |
| #' df <- data.frame(id, desc) |
| #' df <- createDataFrame(df) |
| #' w1 <- orderBy(windowPartitionBy('desc'), df$id) |
| #' w2 <- rowsBetween(w1, 0, 3) |
| #' df1 <- withColumn(df, "sum", over(sum(df$id), w2)) |
| #' head(df1) |
| #' } |
| #' @note rowsBetween since 2.0.0 |
| setMethod("rowsBetween", |
| signature(x = "WindowSpec", start = "numeric", end = "numeric"), |
| function(x, start, end) { |
| # "start" and "end" should be long, due to serde limitation, |
| # limit "start" and "end" as integer now |
| windowSpec(callJMethod(x@sws, "rowsBetween", as.integer(start), as.integer(end))) |
| }) |
| |
| #' rangeBetween |
| #' |
| #' Defines the frame boundaries, from \code{start} (inclusive) to \code{end} (inclusive). |
| #' |
| #' Both \code{start} and \code{end} are relative from the current row. For example, "0" means |
| #' "current row", while "-1" means one off before the current row, and "5" means the five off |
| #' after the current row. |
| #' |
| #' We recommend users use \code{Window.unboundedPreceding}, \code{Window.unboundedFollowing}, |
| #' and \code{Window.currentRow} to specify special boundary values, rather than using long values |
| #' directly. |
| #' |
| #' A range-based boundary is based on the actual value of the ORDER BY |
| #' expression(s). An offset is used to alter the value of the ORDER BY expression, |
| #' for instance if the current ORDER BY expression has a value of 10 and the lower bound offset |
| #' is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a |
| #' number of constraints on the ORDER BY expressions: there can be only one expression and this |
| #' expression must have a numerical data type. An exception can be made when the offset is |
| #' unbounded, because no value modification is needed, in this case multiple and non-numeric |
| #' ORDER BY expression are allowed. |
| #' |
| #' @param x a WindowSpec |
| #' @param start boundary start, inclusive. |
| #' The frame is unbounded if this is the minimum long value. |
| #' @param end boundary end, inclusive. |
| #' The frame is unbounded if this is the maximum long value. |
| #' @return a WindowSpec |
| #' @rdname rangeBetween |
| #' @aliases rangeBetween,WindowSpec,numeric,numeric-method |
| #' @name rangeBetween |
| #' @family windowspec_method |
| #' @examples |
| #' \dontrun{ |
| #' id <- c(rep(1, 3), rep(2, 3), 3) |
| #' desc <- c('New', 'New', 'Good', 'New', 'Good', 'Good', 'New') |
| #' df <- data.frame(id, desc) |
| #' df <- createDataFrame(df) |
| #' w1 <- orderBy(windowPartitionBy('desc'), df$id) |
| #' w2 <- rangeBetween(w1, 0, 3) |
| #' df1 <- withColumn(df, "sum", over(sum(df$id), w2)) |
| #' head(df1) |
| #' } |
| #' @note rangeBetween since 2.0.0 |
| setMethod("rangeBetween", |
| signature(x = "WindowSpec", start = "numeric", end = "numeric"), |
| function(x, start, end) { |
| # "start" and "end" should be long, due to serde limitation, |
| # limit "start" and "end" as integer now |
| windowSpec(callJMethod(x@sws, "rangeBetween", as.integer(start), as.integer(end))) |
| }) |
| |
| # Note that over is a method of Column class, but it is placed here to |
| # avoid Roxygen circular-dependency between class Column and WindowSpec. |
| |
| #' over |
| #' |
| #' Define a windowing column. |
| #' |
| #' @param x a Column, usually one returned by window function(s). |
| #' @param window a WindowSpec object. Can be created by \code{windowPartitionBy} or |
| #' \code{windowOrderBy} and configured by other WindowSpec methods. |
| #' @rdname over |
| #' @name over |
| #' @aliases over,Column,WindowSpec-method |
| #' @family column_func |
| #' @examples |
| #' \dontrun{ |
| #' df <- createDataFrame(mtcars) |
| #' |
| #' # Partition by am (transmission) and order by hp (horsepower) |
| #' ws <- orderBy(windowPartitionBy("am"), "hp") |
| #' |
| #' # Rank on hp within each partition |
| #' out <- select(df, over(rank(), ws), df$hp, df$am) |
| #' |
| #' # Lag mpg values by 1 row on the partition-and-ordered table |
| #' out <- select(df, over(lead(df$mpg), ws), df$mpg, df$hp, df$am) |
| #' } |
| #' @note over since 2.0.0 |
| setMethod("over", |
| signature(x = "Column", window = "WindowSpec"), |
| function(x, window) { |
| column(callJMethod(x@jc, "over", window@sws)) |
| }) |