blob: ad26fb1123a99f39a7ae91e56b93811c7ed97aaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
/** This returns a row count estimation for files associated with a file pattern. */
@AutoValue
public abstract class TextRowCountEstimator {
private static final long DEFAULT_NUM_BYTES_PER_FILE = 64 * 1024L;
private static final Compression DEFAULT_COMPRESSION = Compression.AUTO;
private static final FileIO.ReadMatches.DirectoryTreatment DEFAULT_DIRECTORY_TREATMENT =
FileIO.ReadMatches.DirectoryTreatment.SKIP;
private static final EmptyMatchTreatment DEFAULT_EMPTY_MATCH_TREATMENT =
EmptyMatchTreatment.DISALLOW;
private static final SamplingStrategy DEFAULT_SAMPLING_STRATEGY = new SampleAllFiles();
public abstract long getNumSampledBytesPerFile();
@Nullable
@SuppressWarnings("mutable")
public abstract byte[] getDelimiters();
public abstract String getFilePattern();
public abstract Compression getCompression();
public abstract SamplingStrategy getSamplingStrategy();
public abstract EmptyMatchTreatment getEmptyMatchTreatment();
public abstract FileIO.ReadMatches.DirectoryTreatment getDirectoryTreatment();
public static TextRowCountEstimator.Builder builder() {
return (new AutoValue_TextRowCountEstimator.Builder())
.setSamplingStrategy(DEFAULT_SAMPLING_STRATEGY)
.setNumSampledBytesPerFile(DEFAULT_NUM_BYTES_PER_FILE)
.setCompression(DEFAULT_COMPRESSION)
.setDirectoryTreatment(DEFAULT_DIRECTORY_TREATMENT)
.setEmptyMatchTreatment(DEFAULT_EMPTY_MATCH_TREATMENT);
}
/**
* Estimates the number of non empty rows. It samples NumSampledBytesPerFile bytes from every file
* until the condition in sampling strategy is met. Then it takes the average line size of the
* rows and divides the total file sizes by that number. If all the sampled rows are empty, and it
* has not sampled all the lines (due to sampling strategy) it throws Exception.
*
* @return Number of estimated rows.
* @throws org.apache.beam.sdk.io.TextRowCountEstimator.NoEstimationException if all the sampled
* lines are empty and we have not read all the lines in the matched files.
*/
public Double estimateRowCount(PipelineOptions pipelineOptions)
throws IOException, NoEstimationException {
long linesSize = 0;
int numberOfReadLines = 0;
long totalFileSizes = 0;
long totalSampledBytes = 0;
int numberOfReadFiles = 0;
boolean sampledEverything = true;
MatchResult match = FileSystems.match(getFilePattern(), getEmptyMatchTreatment());
for (MatchResult.Metadata metadata : match.metadata()) {
if (getSamplingStrategy().stopSampling(numberOfReadFiles, totalSampledBytes)) {
sampledEverything = false;
break;
}
if (FileIO.ReadMatches.shouldSkipDirectory(metadata, getDirectoryTreatment())) {
continue;
}
FileIO.ReadableFile file = FileIO.ReadMatches.matchToReadableFile(metadata, getCompression());
// We use this as an estimate of the size of the sampled lines. Since the last sampled line
// may exceed this range, we are over estimating the number of lines in our estimation. (If
// each line is larger than readingWindowSize we will read one line any way and that line is
// the last line)
long readingWindowSize = Math.min(getNumSampledBytesPerFile(), metadata.sizeBytes());
sampledEverything = metadata.sizeBytes() == readingWindowSize && sampledEverything;
OffsetRange range = new OffsetRange(0, readingWindowSize);
TextSource textSource =
new TextSource(
ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().toString()),
getEmptyMatchTreatment(),
getDelimiters());
FileBasedSource<String> source =
CompressedSource.from(textSource).withCompression(file.getCompression());
try (BoundedSource.BoundedReader<String> reader =
source
.createForSubrangeOfFile(file.getMetadata(), range.getFrom(), range.getTo())
.createReader(pipelineOptions)) {
int numberOfNonEmptyLines = 0;
for (boolean more = reader.start(); more; more = reader.advance()) {
numberOfNonEmptyLines += reader.getCurrent().trim().equals("") ? 0 : 1;
}
numberOfReadLines += numberOfNonEmptyLines;
linesSize += (numberOfNonEmptyLines == 0) ? 0 : readingWindowSize;
}
long fileSize = metadata.sizeBytes();
numberOfReadFiles += fileSize == 0 ? 0 : 1;
totalFileSizes += fileSize;
}
if (numberOfReadLines == 0 && sampledEverything) {
return 0d;
}
if (numberOfReadLines == 0) {
throw new NoEstimationException(
"Cannot estimate the row count. All the sampled lines are empty");
}
// This is total file sizes divided by average line size.
return (double) totalFileSizes * numberOfReadLines / linesSize;
}
/** Builder for {@link org.apache.beam.sdk.io.TextRowCountEstimator}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setNumSampledBytesPerFile(long numSampledBytes);
public abstract Builder setDirectoryTreatment(
FileIO.ReadMatches.DirectoryTreatment directoryTreatment);
public abstract Builder setCompression(Compression compression);
public abstract Builder setDelimiters(byte[] delimiters);
public abstract Builder setFilePattern(String filePattern);
public abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
public abstract Builder setSamplingStrategy(SamplingStrategy samplingStrategy);
public abstract TextRowCountEstimator build();
}
/**
* An exception that will be thrown if the estimator cannot get an estimation of the number of
* lines.
*/
public static class NoEstimationException extends Exception {
NoEstimationException(String message) {
super(message);
}
}
/** Sampling Strategy shows us when should we stop reading further files. * */
public interface SamplingStrategy {
boolean stopSampling(int numberOfFiles, long totalReadBytes);
}
/** This strategy samples all the files. */
public static class SampleAllFiles implements SamplingStrategy {
@Override
public boolean stopSampling(int numberOfSampledFiles, long totalReadBytes) {
return false;
}
}
/** This strategy stops sampling if we sample enough number of bytes. */
public static class LimitNumberOfFiles implements SamplingStrategy {
int limit;
public LimitNumberOfFiles(int limit) {
this.limit = limit;
}
@Override
public boolean stopSampling(int numberOfFiles, long totalReadBytes) {
return numberOfFiles > limit;
}
}
/**
* This strategy stops sampling when total number of sampled bytes are more than some threshold.
*/
public static class LimitNumberOfTotalBytes implements SamplingStrategy {
long limit;
public LimitNumberOfTotalBytes(long limit) {
this.limit = limit;
}
@Override
public boolean stopSampling(int numberOfFiles, long totalReadBytes) {
return totalReadBytes > limit;
}
}
}