| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| |
| # The following S3 methods are registered on load if dplyr is present |
| |
| filter.arrow_dplyr_query <- function(.data, ..., .preserve = FALSE) { |
| # TODO something with the .preserve argument |
| filts <- quos(...) |
| if (length(filts) == 0) { |
| # Nothing to do |
| return(.data) |
| } |
| |
| .data <- arrow_dplyr_query(.data) |
| # tidy-eval the filter expressions inside an Arrow data_mask |
| filters <- lapply(filts, arrow_eval, arrow_mask(.data)) |
| bad_filters <- map_lgl(filters, ~inherits(., "try-error")) |
| if (any(bad_filters)) { |
| # This is similar to abandon_ship() except that the filter eval is |
| # vectorized, and we apply filters that _did_ work before abandoning ship |
| # with the rest |
| expr_labs <- map_chr(filts[bad_filters], as_label) |
| if (query_on_dataset(.data)) { |
| # Abort. We don't want to auto-collect if this is a Dataset because that |
| # could blow up, too big. |
| stop( |
| "Filter expression not supported for Arrow Datasets: ", |
| oxford_paste(expr_labs, quote = FALSE), |
| "\nCall collect() first to pull data into R.", |
| call. = FALSE |
| ) |
| } else { |
| arrow_errors <- map2_chr( |
| filters[bad_filters], expr_labs, |
| handle_arrow_not_supported |
| ) |
| if (length(arrow_errors) == 1) { |
| msg <- paste0(arrow_errors, "; ") |
| } else { |
| msg <- paste0("* ", arrow_errors, "\n", collapse = "") |
| } |
| warning( |
| msg, "pulling data into R", |
| immediate. = TRUE, |
| call. = FALSE |
| ) |
| # Set any valid filters first, then collect and then apply the invalid ones in R |
| .data <- set_filters(.data, filters[!bad_filters]) |
| return(dplyr::filter(dplyr::collect(.data), !!!filts[bad_filters])) |
| } |
| } |
| |
| set_filters(.data, filters) |
| } |
| filter.Dataset <- filter.ArrowTabular <- filter.arrow_dplyr_query |
| |
| set_filters <- function(.data, expressions) { |
| if (length(expressions)) { |
| # expressions is a list of Expressions. AND them together and set them on .data |
| new_filter <- Reduce("&", expressions) |
| if (isTRUE(.data$filtered_rows)) { |
| # TRUE is default (i.e. no filter yet), so we don't need to & with it |
| .data$filtered_rows <- new_filter |
| } else { |
| .data$filtered_rows <- .data$filtered_rows & new_filter |
| } |
| } |
| .data |
| } |