blob: 51da971fb70637ddbff6f19d471980f961e8b837 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.select;
import java.io.IOException;
import java.util.List;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;
import static org.apache.hadoop.fs.s3a.select.SelectConstants.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
/**
* Test the S3 Select feature with the Landsat dataset.
*
* This helps explore larger datasets, compression and the like.
*
* This suite is only executed if the destination store declares its support for
* the feature and the test CSV file configuration option points to the
* standard landsat GZip file. That's because these tests require the specific
* format of the landsat file.
*
* Normally working with the landsat file is a scale test.
* Here, because of the select operations, there's a lot less data
* to download.
* For this to work: write aggressive select calls: filtering, using LIMIT
* and projecting down to a few columns.
*
* For the structure, see
* <a href="https://docs.opendata.aws/landsat-pds/readme.html">Landsat on AWS</a>
*
* <code>
* entityId: String LC80101172015002LGN00
* acquisitionDate: String 2015-01-02 15:49:05.571384
* cloudCover: Float (possibly -ve) 80.81
* processingLevel: String L1GT
* path: Int 10
* row: Int 117
* min_lat: Float -79.09923
* min_lon: Float -139.66082
* max_lat: Float -77.7544
* max_lon: Float 125.09297
* download_url: HTTPS URL https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/117/LC80101172015002LGN00/index.html
* </code>
* Ranges
* <ol>
* <li>Latitude should range in -180 <= lat <= 180</li>
* <li>Longitude in 0 <= lon <= 360</li>
* <li>Standard Greenwich Meridian (not the french one which still surfaces)</li>
* <li>Cloud cover <i>Should</i> be 0-100, but there are some negative ones.</li>
* </ol>
*
* Head of the file:
* <code>
entityId,acquisitionDate,cloudCover,processingLevel,path,row,min_lat,min_lon,max_lat,max_lon,download_url
* LC80101172015002LGN00,2015-01-02 15:49:05.571384,80.81,L1GT,10,117,-79.09923,-139.66082,-77.7544,-125.09297,https://s3-us-west-2.amazonaws.com/landsat-pds/L8/010/117/LC80101172015002LGN00/index.html
* LC80260392015002LGN00,2015-01-02 16:56:51.399666,90.84,L1GT,26,39,29.23106,-97.48576,31.36421,-95.16029,https://s3-us-west-2.amazonaws.com/landsat-pds/L8/026/039/LC80260392015002LGN00/index.html
* LC82270742015002LGN00,2015-01-02 13:53:02.047000,83.44,L1GT,227,74,-21.28598,-59.27736,-19.17398,-57.07423,https://s3-us-west-2.amazonaws.com/landsat-pds/L8/227/074/LC82270742015002LGN00/index.html
* LC82270732015002LGN00,2015-01-02 13:52:38.110317,52.29,L1T,227,73,-19.84365,-58.93258,-17.73324,-56.74692,https://s3-us-west-2.amazonaws.com/landsat-pds/L8/227/073/LC82270732015002LGN00/index.html
* </code>
*
* For the Curious this is the Scala/Spark declaration of the schema.
* <code>
* def addLandsatColumns(csv: DataFrame): DataFrame = {
* csv
* .withColumnRenamed("entityId", "id")
* .withColumn("acquisitionDate",
* csv.col("acquisitionDate").cast(TimestampType))
* .withColumn("cloudCover", csv.col("cloudCover").cast(DoubleType))
* .withColumn("path", csv.col("path").cast(IntegerType))
* .withColumn("row", csv.col("row").cast(IntegerType))
* .withColumn("min_lat", csv.col("min_lat").cast(DoubleType))
* .withColumn("min_lon", csv.col("min_lon").cast(DoubleType))
* .withColumn("max_lat", csv.col("max_lat").cast(DoubleType))
* .withColumn("max_lon", csv.col("max_lon").cast(DoubleType))
* .withColumn("year",
* year(col("acquisitionDate")))
* .withColumn("month",
* month(col("acquisitionDate")))
* .withColumn("day",
* month(col("acquisitionDate")))
* }
* </code>
*/
public class ITestS3SelectLandsat extends AbstractS3SelectTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3SelectLandsat.class);
private JobConf selectConf;
/**
* Normal limit for select operations.
* Value: {@value}.
*/
public static final int SELECT_LIMIT = 250;
/**
* And that select limit as a limit string.
*/
public static final String LIMITED = " LIMIT " + SELECT_LIMIT;
/**
* Select days with 100% cloud cover, limited to {@link #SELECT_LIMIT}.
* Value: {@value}.
*/
public static final String SELECT_ENTITY_ID_ALL_CLOUDS =
"SELECT\n"
+ "s.entityId from\n"
+ "S3OBJECT s WHERE\n"
+ "s.\"cloudCover\" = '100.0'\n"
+ LIMITED;
/**
* Select sunny days. There's no limit on the returned values, so
* set one except for a scale test.
* Value: {@value}.
*/
public static final String SELECT_SUNNY_ROWS_NO_LIMIT
= "SELECT * FROM S3OBJECT s WHERE s.cloudCover = '0.0'";
/**
* A Select call which returns nothing, always.
* Value: {@value}.
*/
public static final String SELECT_NOTHING
= "SELECT * FROM S3OBJECT s WHERE s.cloudCover = 'sunny'";
/**
* Select the processing level; no limit.
* Value: {@value}.
*/
public static final String SELECT_PROCESSING_LEVEL_NO_LIMIT =
"SELECT\n"
+ "s.processingLevel from\n"
+ "S3OBJECT s";
@Override
public void setup() throws Exception {
super.setup();
selectConf = new JobConf(false);
// file is compressed.
selectConf.set(SELECT_INPUT_COMPRESSION, COMPRESSION_OPT_GZIP);
// and has a header
selectConf.set(CSV_INPUT_HEADER, CSV_HEADER_OPT_USE);
selectConf.setBoolean(SELECT_ERRORS_INCLUDE_SQL, true);
inputMust(selectConf, CSV_INPUT_HEADER, CSV_HEADER_OPT_USE);
inputMust(selectConf, SELECT_INPUT_FORMAT, SELECT_FORMAT_CSV);
inputMust(selectConf, SELECT_OUTPUT_FORMAT, SELECT_FORMAT_CSV);
inputMust(selectConf, SELECT_INPUT_COMPRESSION, COMPRESSION_OPT_GZIP);
// disable the gzip codec, so that the record readers do not
// get confused
enablePassthroughCodec(selectConf, ".gz");
ChangeDetectionPolicy changeDetectionPolicy =
getLandsatFS().getChangeDetectionPolicy();
Assume.assumeFalse("the standard landsat bucket doesn't have versioning",
changeDetectionPolicy.getSource() == Source.VersionId
&& changeDetectionPolicy.isRequireVersion());
}
protected int getMaxLines() {
return SELECT_LIMIT * 2;
}
@Test
public void testSelectCloudcoverIgnoreHeader() throws Throwable {
describe("select ignoring the header");
selectConf.set(CSV_INPUT_HEADER, CSV_HEADER_OPT_IGNORE);
String sql = "SELECT\n"
+ "* from\n"
+ "S3OBJECT s WHERE\n"
+ "s._3 = '0.0'\n"
+ LIMITED;
List<String> list = selectLandsatFile(selectConf, sql);
LOG.info("Line count: {}", list.size());
verifySelectionCount(1, SELECT_LIMIT, sql, list);
}
@Test
public void testSelectCloudcoverUseHeader() throws Throwable {
describe("select 100% cover using the header, "
+ "+ verify projection and incrementing select statistics");
S3ATestUtils.MetricDiff selectCount = new S3ATestUtils.MetricDiff(
getLandsatFS(),
Statistic.OBJECT_SELECT_REQUESTS);
List<String> list = selectLandsatFile(selectConf,
SELECT_ENTITY_ID_ALL_CLOUDS);
LOG.info("Line count: {}", list.size());
verifySelectionCount(1, SELECT_LIMIT, SELECT_ENTITY_ID_ALL_CLOUDS, list);
String line1 = list.get(0);
assertThat("no column filtering from " + SELECT_ENTITY_ID_ALL_CLOUDS,
line1, not(containsString("100.0")));
selectCount.assertDiffEquals("select count", 1);
}
@Test
public void testFileContextIntegration() throws Throwable {
describe("Test that select works through FileContext");
FileContext fc = S3ATestUtils.createTestFileContext(getConfiguration());
// there's a limit on the number of rows to read; this is larger
// than the SELECT_LIMIT call to catch any failure where more than
// that is returned, newline parsing fails, etc etc.
List<String> list = parseToLines(
select(fc, getLandsatGZ(), selectConf, SELECT_ENTITY_ID_ALL_CLOUDS),
SELECT_LIMIT * 2);
LOG.info("Line count: {}", list.size());
verifySelectionCount(1, SELECT_LIMIT, SELECT_ENTITY_ID_ALL_CLOUDS, list);
}
@Test
public void testReadLandsatRecords() throws Throwable {
describe("Use a record reader to read the records");
inputMust(selectConf, CSV_OUTPUT_FIELD_DELIMITER, "\\t");
inputMust(selectConf, CSV_OUTPUT_QUOTE_CHARACTER, "'");
inputMust(selectConf, CSV_OUTPUT_QUOTE_FIELDS,
CSV_OUTPUT_QUOTE_FIELDS_AS_NEEEDED);
inputMust(selectConf, CSV_OUTPUT_RECORD_DELIMITER, "\n");
List<String> records = readRecords(
selectConf,
getLandsatGZ(),
SELECT_ENTITY_ID_ALL_CLOUDS,
createLineRecordReader(),
SELECT_LIMIT);
verifySelectionCount(1, SELECT_LIMIT, SELECT_ENTITY_ID_ALL_CLOUDS, records);
}
@Test
public void testReadLandsatRecordsNoMatch() throws Throwable {
describe("Verify the v2 record reader does not fail"
+ " when there are no results");
verifySelectionCount(0, 0, SELECT_NOTHING,
readRecords(
selectConf,
getLandsatGZ(),
SELECT_NOTHING,
createLineRecordReader(),
SELECT_LIMIT));
}
@Test
public void testReadLandsatRecordsGZipEnabled() throws Throwable {
describe("Verify that by default, the gzip codec is connected to .gz"
+ " files, and so fails");
// implicitly re-enable the gzip codec.
selectConf.unset(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY);
intercept(IOException.class, "gzip",
() -> readRecords(
selectConf,
getLandsatGZ(),
SELECT_ENTITY_ID_ALL_CLOUDS,
createLineRecordReader(),
SELECT_LIMIT));
}
@Test
public void testReadLandsatRecordsV1() throws Throwable {
describe("Use a record reader to read the records");
verifySelectionCount(1, SELECT_LIMIT, SELECT_ENTITY_ID_ALL_CLOUDS,
readRecords(
selectConf,
getLandsatGZ(),
SELECT_ENTITY_ID_ALL_CLOUDS,
createLineRecordReader(),
SELECT_LIMIT));
}
@Test
public void testReadLandsatRecordsV1NoResults() throws Throwable {
describe("verify that a select with no results is not an error");
verifySelectionCount(0, 0, SELECT_NOTHING,
readRecords(
selectConf,
getLandsatGZ(),
SELECT_NOTHING,
createLineRecordReader(),
SELECT_LIMIT));
}
/**
* Select from the landsat file.
* @param conf config for the select call.
* @param sql template for a formatted SQL request.
* @param args arguments for the formatted request.
* @return the lines selected
* @throws IOException failure
*/
private List<String> selectLandsatFile(
final Configuration conf,
final String sql,
final Object... args)
throws Exception {
// there's a limit on the number of rows to read; this is larger
// than the SELECT_LIMIT call to catch any failure where more than
// that is returned, newline parsing fails, etc etc.
return parseToLines(
select(getLandsatFS(), getLandsatGZ(), conf, sql, args));
}
/**
* This is a larger-scale version of {@link ITestS3Select#testSelectSeek()}.
*/
@Test
public void testSelectSeekFullLandsat() throws Throwable {
describe("Verify forward seeks work, not others");
boolean enabled = getTestPropertyBool(
getConfiguration(),
KEY_SCALE_TESTS_ENABLED,
DEFAULT_SCALE_TESTS_ENABLED);
assume("Scale test disabled", enabled);
// start: read in the full data through the initial select
// this makes asserting that contents match possible
final Path path = getLandsatGZ();
S3AFileSystem fs = getLandsatFS();
int len = (int) fs.getFileStatus(path).getLen();
byte[] dataset = new byte[4 * _1MB];
int actualLen;
try (DurationInfo ignored =
new DurationInfo(LOG, "Initial read of %s", path);
FSDataInputStream sourceStream =
select(fs, path,
selectConf,
SELECT_EVERYTHING)) {
// read it in
actualLen = IOUtils.read(sourceStream, dataset);
}
int seekRange = 16 * _1KB;
try (FSDataInputStream seekStream =
select(fs, path,
selectConf,
SELECT_EVERYTHING)) {
SelectInputStream sis
= (SelectInputStream) seekStream.getWrappedStream();
S3AInputStreamStatistics streamStats
= sis.getS3AStreamStatistics();
// lazy seek doesn't raise a problem here
seekStream.seek(0);
assertEquals("first byte read", dataset[0], seekStream.read());
// and now the pos has moved, again, seek will be OK
seekStream.seek(1);
seekStream.seek(1);
// but trying to seek elsewhere now fails
intercept(PathIOException.class,
SelectInputStream.SEEK_UNSUPPORTED,
() -> seekStream.seek(0));
// positioned reads from the current location work.
byte[] buffer = new byte[1];
seekStream.readFully(seekStream.getPos(), buffer);
// but positioned backwards fail.
intercept(PathIOException.class,
SelectInputStream.SEEK_UNSUPPORTED,
() -> seekStream.readFully(0, buffer));
// forward seeks are implemented as 1+ skip
long target = seekStream.getPos() + seekRange;
seek(seekStream, target);
assertEquals("Seek position in " + seekStream,
target, seekStream.getPos());
// now do a read and compare values
assertEquals("byte at seek position",
dataset[(int) seekStream.getPos()], seekStream.read());
assertEquals("Seek bytes skipped in " + streamStats,
seekRange, streamStats.getBytesSkippedOnSeek());
long offset;
long increment = 64 * _1KB;
// seek forward, comparing bytes
for(offset = 32 * _1KB; offset < _1MB; offset += increment) {
seek(seekStream, offset);
assertEquals("Seek position in " + seekStream,
offset, seekStream.getPos());
// now do a read and compare values
assertEquals("byte at seek position",
dataset[(int) seekStream.getPos()], seekStream.read());
}
// there's no knowledge of how much data is left, but with Gzip
// involved there can be a lot. To keep the test duration down,
// this test, unlike the simpler one, doesn't try to read past the
// EOF. Know this: it will be slow.
LOG.info("Seek statistics {}", streamStats);
}
}
}