blob: 67ddf49e425034e138f88fccb1dee5dd9eeaf487 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.basic.operators;
import org.apache.commons.lang3.Validate;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.UnarySource;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.LimitedInputStream;
import org.apache.wayang.core.util.fs.FileSystem;
import org.apache.wayang.core.util.fs.FileSystems;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
/**
* This source reads a text file and outputs the lines as data units.
*/
public class TextFileSource extends UnarySource<String> {
private final Logger logger = LogManager.getLogger(this.getClass());
private final String inputUrl;
private final String encoding;
public TextFileSource(String inputUrl) {
this(inputUrl, "UTF-8");
}
public TextFileSource(String inputUrl, String encoding) {
super(DataSetType.createDefault(String.class));
this.inputUrl = inputUrl;
this.encoding = encoding;
}
/**
* Copies an instance (exclusive of broadcasts).
*
* @param that that should be copied
*/
public TextFileSource(TextFileSource that) {
super(that);
this.inputUrl = that.getInputUrl();
this.encoding = that.getEncoding();
}
public String getInputUrl() {
return this.inputUrl;
}
@Override
public Optional<org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator> createCardinalityEstimator(
final int outputIndex,
final Configuration configuration) {
Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
return Optional.of(new TextFileSource.CardinalityEstimator());
}
public String getEncoding() {
return this.encoding;
}
/**
* Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s.
*/
protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
public static final double CORRECTNESS_PROBABILITY = 0.95d;
/**
* We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}.
*/
public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05;
@Override
public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) {
Validate.isTrue(TextFileSource.this.getNumInputs() == inputEstimates.length);
// see Job for StopWatch measurements
final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
"Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"
);
// Query the job cache first to see if there is already an estimate.
String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), TextFileSource.this.inputUrl);
CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class);
if (cardinalityEstimate != null) return cardinalityEstimate;
// Otherwise calculate the cardinality.
// First, inspect the size of the file and its line sizes.
OptionalLong fileSize = FileSystems.getFileSize(TextFileSource.this.inputUrl);
if (!fileSize.isPresent()) {
TextFileSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
TextFileSource.this.inputUrl);
timeMeasurement.stop();
return this.FALLBACK_ESTIMATE;
} else if (fileSize.getAsLong() == 0L) {
timeMeasurement.stop();
return new CardinalityEstimate(0L, 0L, 1d);
}
OptionalDouble bytesPerLine = this.estimateBytesPerLine();
if (!bytesPerLine.isPresent()) {
TextFileSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.",
TextFileSource.this.inputUrl);
timeMeasurement.stop();
return this.FALLBACK_ESTIMATE;
}
// Extrapolate a cardinality estimate for the complete file.
double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble();
double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION;
cardinalityEstimate = new CardinalityEstimate(
(long) (numEstimatedLines - expectedDeviation),
(long) (numEstimatedLines + expectedDeviation),
CORRECTNESS_PROBABILITY
);
// Cache the result, so that it will not be recalculated again.
optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);
timeMeasurement.stop();
return cardinalityEstimate;
}
/**
* Estimate the number of bytes that are in each line of a given file.
*
* @return the average number of bytes per line if it could be determined
*/
private OptionalDouble estimateBytesPerLine() {
final Optional<FileSystem> fileSystem = FileSystems.getFileSystem(TextFileSource.this.inputUrl);
if (fileSystem.isPresent()) {
// Construct a limited reader for the first x KiB of the file.
final int KiB = 1024;
final int MiB = 1024 * KiB;
try (LimitedInputStream lis = new LimitedInputStream(fileSystem.get().open(TextFileSource.this.inputUrl), 1 * MiB)) {
final BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(lis, TextFileSource.this.encoding)
);
// Read as much as possible.
char[] cbuf = new char[1024];
int numReadChars, numLineFeeds = 0;
while ((numReadChars = bufferedReader.read(cbuf)) != -1) {
for (int i = 0; i < numReadChars; i++) {
if (cbuf[i] == '\n') {
numLineFeeds++;
}
}
}
if (numLineFeeds == 0) {
TextFileSource.this.logger.warn("Could not find any newline character in {}.", TextFileSource.this.inputUrl);
return OptionalDouble.empty();
}
return OptionalDouble.of((double) lis.getNumReadBytes() / numLineFeeds);
} catch (IOException e) {
TextFileSource.this.logger.error("Could not estimate bytes per line of an input file.", e);
}
}
return OptionalDouble.empty();
}
}
}