blob: d8d3af40f06544b500d4c1332b3566f60ce56594 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop;
import org.apache.sqoop.testutil.ArgumentArrayBuilder;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.sqoop.util.ParquetReader;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import parquet.hadoop.metadata.CompressionCodecName;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static parquet.hadoop.metadata.CompressionCodecName.GZIP;
public class TestParquetIncrementalImportMerge extends ImportJobTestCase {
private static final String[] TEST_COLUMN_TYPES = {"INTEGER", "VARCHAR(32)", "CHAR(64)", "TIMESTAMP"};
private static final String[] ALTERNATIVE_TEST_COLUMN_TYPES = {"INTEGER", "INTEGER", "INTEGER", "TIMESTAMP"};
private static final String INITIAL_RECORDS_TIMESTAMP = "2018-06-14 15:00:00.000";
private static final String NEW_RECORDS_TIMESTAMP = "2018-06-14 16:00:00.000";
private static final List<List<Object>> INITIAL_RECORDS = Arrays.<List<Object>>asList(
Arrays.<Object>asList(2006, "Germany", "Italy", INITIAL_RECORDS_TIMESTAMP),
Arrays.<Object>asList(2014, "Brazil", "Hungary", INITIAL_RECORDS_TIMESTAMP)
);
private static final List<Object> ALTERNATIVE_INITIAL_RECORD = Arrays.<Object>asList(1, 2, 3, INITIAL_RECORDS_TIMESTAMP);
private static final List<List<Object>> NEW_RECORDS = Arrays.<List<Object>>asList(
Arrays.<Object>asList(2010, "South Africa", "Spain", NEW_RECORDS_TIMESTAMP),
Arrays.<Object>asList(2014, "Brazil", "Germany", NEW_RECORDS_TIMESTAMP)
);
private static final List<String> EXPECTED_MERGED_RECORDS = asList(
"2006,Germany,Italy," + timeFromString(INITIAL_RECORDS_TIMESTAMP),
"2010,South Africa,Spain," + timeFromString(NEW_RECORDS_TIMESTAMP),
"2014,Brazil,Germany," + timeFromString(NEW_RECORDS_TIMESTAMP)
);
private static final List<String> EXPECTED_INITIAL_RECORDS = asList(
"2006,Germany,Italy," + timeFromString(INITIAL_RECORDS_TIMESTAMP),
"2014,Brazil,Hungary," + timeFromString(INITIAL_RECORDS_TIMESTAMP)
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Override
public void setUp() {
super.setUp();
createTableWithRecords(TEST_COLUMN_TYPES, INITIAL_RECORDS);
}
@Test
public void testSimpleMerge() throws Exception {
String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString()).build();
runImport(args);
clearTable(getTableName());
insertRecordsIntoTable(TEST_COLUMN_TYPES, NEW_RECORDS);
args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
runImport(args);
List<String> result = new ParquetReader(getTablePath()).readAllInCsvSorted();
assertEquals(EXPECTED_MERGED_RECORDS, result);
}
@Test
public void testMergeWhenTheIncrementalImportDoesNotImportAnyRows() throws Exception {
String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString()).build();
runImport(args);
clearTable(getTableName());
args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
runImport(args);
List<String> result = new ParquetReader(getTablePath()).readAllInCsvSorted();
assertEquals(EXPECTED_INITIAL_RECORDS, result);
}
@Test
public void testMergeWithIncompatibleSchemas() throws Exception {
String targetDir = getWarehouseDir() + "/testMergeWithIncompatibleSchemas";
String[] args = initialImportArgs(getConnectString(), getTableName(), targetDir).build();
runImport(args);
incrementTableNum();
createTableWithColTypes(ALTERNATIVE_TEST_COLUMN_TYPES, ALTERNATIVE_INITIAL_RECORD);
args = incrementalImportArgs(getConnectString(), getTableName(), targetDir, getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP).build();
expectedException.expectMessage("Cannot merge files, the Avro schemas are not compatible.");
runImportThrowingException(args);
}
@Test
public void testMergedFilesHaveCorrectCodec() throws Exception {
String[] args = initialImportArgs(getConnectString(), getTableName(), getTablePath().toString())
.withOption("compression-codec", "snappy")
.build();
runImport(args);
args = incrementalImportArgs(getConnectString(), getTableName(), getTablePath().toString(), getColName(3), getColName(0), INITIAL_RECORDS_TIMESTAMP)
.withOption("compression-codec", "gzip")
.build();
runImport(args);
CompressionCodecName compressionCodec = new ParquetReader(getTablePath()).getCodec();
assertEquals(GZIP, compressionCodec);
}
private ArgumentArrayBuilder initialImportArgs(String connectString, String tableName, String targetDir) {
return new ArgumentArrayBuilder()
.withProperty("parquetjob.configurator.implementation", "hadoop")
.withOption("connect", connectString)
.withOption("table", tableName)
.withOption("num-mappers", "1")
.withOption("target-dir", targetDir)
.withOption("as-parquetfile");
}
private ArgumentArrayBuilder incrementalImportArgs(String connectString, String tableName, String targetDir, String checkColumn, String mergeKey, String lastValue) {
return initialImportArgs(connectString, tableName, targetDir)
.withOption("incremental", "lastmodified")
.withOption("check-column", checkColumn)
.withOption("merge-key", mergeKey)
.withOption("last-value", lastValue);
}
private static long timeFromString(String timeStampString) {
try {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
return format.parse(timeStampString).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}