blob: 2bcfb363f9d248f762574059e92d1e61e3c76cbb [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# streaming.R - Structured Streaming / StreamingQuery class and methods implemented in S4 OO classes
#' @include generics.R jobj.R
NULL
#' S4 class that represents a StreamingQuery
#'
#' StreamingQuery can be created by using read.stream() and write.stream()
#'
#' @rdname StreamingQuery
#' @seealso \link{read.stream}
#'
#' @param ssq A Java object reference to the backing Scala StreamingQuery
#' @note StreamingQuery since 2.2.0
#' @note experimental
setClass("StreamingQuery",
slots = list(ssq = "jobj"))
setMethod("initialize", "StreamingQuery", function(.Object, ssq) {
.Object@ssq <- ssq
.Object
})
streamingQuery <- function(ssq) {
stopifnot(class(ssq) == "jobj")
new("StreamingQuery", ssq)
}
#' @rdname show
#' @note show(StreamingQuery) since 2.2.0
setMethod("show", "StreamingQuery",
function(object) {
name <- callJMethod(object@ssq, "name")
if (!is.null(name)) {
cat(paste0("StreamingQuery '", name, "'\n"))
} else {
cat("StreamingQuery", "\n")
}
})
#' queryName
#'
#' Returns the user-specified name of the query. This is specified in
#' \code{write.stream(df, queryName = "query")}. This name, if set, must be unique across all active
#' queries.
#'
#' @param x a StreamingQuery.
#' @return The name of the query, or NULL if not specified.
#' @rdname queryName
#' @name queryName
#' @aliases queryName,StreamingQuery-method
#' @family StreamingQuery methods
#' @seealso \link{write.stream}
#' @examples
#' \dontrun{ queryName(sq) }
#' @note queryName(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("queryName",
signature(x = "StreamingQuery"),
function(x) {
callJMethod(x@ssq, "name")
})
#' @rdname explain
#' @name explain
#' @aliases explain,StreamingQuery-method
#' @family StreamingQuery methods
#' @examples
#' \dontrun{ explain(sq) }
#' @note explain(StreamingQuery) since 2.2.0
setMethod("explain",
signature(x = "StreamingQuery"),
function(x, extended = FALSE) {
cat(callJMethod(x@ssq, "explainInternal", extended), "\n")
})
#' lastProgress
#'
#' Prints the most recent progress update of this streaming query in JSON format.
#'
#' @param x a StreamingQuery.
#' @rdname lastProgress
#' @name lastProgress
#' @aliases lastProgress,StreamingQuery-method
#' @family StreamingQuery methods
#' @examples
#' \dontrun{ lastProgress(sq) }
#' @note lastProgress(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("lastProgress",
signature(x = "StreamingQuery"),
function(x) {
p <- callJMethod(x@ssq, "lastProgress")
if (is.null(p)) {
cat("Streaming query has no progress")
} else {
cat(callJMethod(p, "toString"), "\n")
}
})
#' status
#'
#' Prints the current status of the query in JSON format.
#'
#' @param x a StreamingQuery.
#' @rdname status
#' @name status
#' @aliases status,StreamingQuery-method
#' @family StreamingQuery methods
#' @examples
#' \dontrun{ status(sq) }
#' @note status(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("status",
signature(x = "StreamingQuery"),
function(x) {
cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n")
})
#' isActive
#'
#' Returns TRUE if this query is actively running.
#'
#' @param x a StreamingQuery.
#' @return TRUE if query is actively running, FALSE if stopped.
#' @rdname isActive
#' @name isActive
#' @aliases isActive,StreamingQuery-method
#' @family StreamingQuery methods
#' @examples
#' \dontrun{ isActive(sq) }
#' @note isActive(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("isActive",
signature(x = "StreamingQuery"),
function(x) {
callJMethod(x@ssq, "isActive")
})
#' awaitTermination
#'
#' Waits for the termination of the query, either by \code{stopQuery} or by an error.
#'
#' If the query has terminated, then all subsequent calls to this method will return TRUE
#' immediately.
#'
#' @param x a StreamingQuery.
#' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
#' is called or an error has occurred.
#' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
#' specified.
#' @rdname awaitTermination
#' @name awaitTermination
#' @aliases awaitTermination,StreamingQuery-method
#' @family StreamingQuery methods
#' @examples
#' \dontrun{ awaitTermination(sq, 10000) }
#' @note awaitTermination(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("awaitTermination",
signature(x = "StreamingQuery"),
function(x, timeout = NULL) {
if (is.null(timeout)) {
invisible(handledCallJMethod(x@ssq, "awaitTermination"))
} else {
handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
}
})
#' stopQuery
#'
#' Stops the execution of this query if it is running. This method blocks until the execution is
#' stopped.
#'
#' @param x a StreamingQuery.
#' @rdname stopQuery
#' @name stopQuery
#' @aliases stopQuery,StreamingQuery-method
#' @family StreamingQuery methods
#' @examples
#' \dontrun{ stopQuery(sq) }
#' @note stopQuery(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("stopQuery",
signature(x = "StreamingQuery"),
function(x) {
invisible(callJMethod(x@ssq, "stop"))
})