SQOOP-3068: Enhance error (tool.ImportTool:
Encountered IOException running import job:
java.io.IOException: Expected schema)
to suggest workaround (--map-column-java)
(Szabolcs Vasas via Attila Szabo)
diff --git a/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java b/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java
new file mode 100644
index 0000000..4070627
--- /dev/null
+++ b/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.avro;
+
+import org.apache.avro.Schema;
+
+/**
+ * This exception will be thrown when Sqoop tries to write to a dataset
+ * and the Avro schema which was used when the dataset was created does not match
+ * the actual schema which is used by Sqoop during the write operation.
+ */
+public class AvroSchemaMismatchException extends RuntimeException {
+
+ static final String MESSAGE_TEMPLATE = "%s%nExpected schema: %s%nActual schema: %s";
+
+ private final Schema writtenWithSchema;
+
+ private final Schema actualSchema;
+
+ public AvroSchemaMismatchException(String message, Schema writtenWithSchema, Schema actualSchema) {
+ super(String.format(MESSAGE_TEMPLATE, message, writtenWithSchema.toString(), actualSchema.toString()));
+ this.writtenWithSchema = writtenWithSchema;
+ this.actualSchema = actualSchema;
+ }
+
+ public Schema getWrittenWithSchema() {
+ return writtenWithSchema;
+ }
+
+ public Schema getActualSchema() {
+ return actualSchema;
+ }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index b077d9b..4604773 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
import org.apache.sqoop.hive.HiveConfig;
import org.kitesdk.data.CompressionType;
import org.kitesdk.data.Dataset;
@@ -48,12 +49,23 @@
public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
+
public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
// Purposefully choosing the same token alias as the one Oozie chooses.
// Make sure we don't generate a new delegation token if oozie
// has already generated one.
public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
+ public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
+
+ public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
+ "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
+ " but it is possible that date/timestamp types were mapped to strings during table" +
+ " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
+ " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
+
+ private static final String HIVE_URI_PREFIX = "dataset:hive";
+
private ParquetJob() {
}
@@ -91,7 +103,7 @@
Dataset dataset;
// Add hive delegation token only if we don't already have one.
- if (uri.startsWith("dataset:hive")) {
+ if (isHiveImport(uri)) {
Configuration hiveConf = HiveConfig.getHiveConf(conf);
if (isSecureMetastore(hiveConf)) {
// Copy hive configs to job config
@@ -111,9 +123,8 @@
dataset = Datasets.load(uri);
Schema writtenWith = dataset.getDescriptor().getSchema();
if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
- throw new IOException(
- String.format("Expected schema: %s%nActual schema: %s",
- writtenWith, schema));
+ String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
+ throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
}
} else {
dataset = createDataset(schema, getCompressionType(conf), uri);
@@ -131,7 +142,11 @@
}
}
- private static Dataset createDataset(Schema schema,
+ private static boolean isHiveImport(String importUri) {
+ return importUri.startsWith(HIVE_URI_PREFIX);
+ }
+
+ public static Dataset createDataset(Schema schema,
CompressionType compressionType, String uri) {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(schema)
@@ -191,4 +206,15 @@
throw new RuntimeException("Couldn't fetch delegation token.", ex);
}
}
+
+ private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
+ String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
+
+ if (hiveImport) {
+ exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
+ }
+
+ return exceptionMessage;
+ }
+
}
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index ed951ea..258ef79 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -52,7 +52,7 @@
import com.cloudera.sqoop.metastore.JobStorageFactory;
import com.cloudera.sqoop.util.AppendUtils;
import com.cloudera.sqoop.util.ImportException;
-import org.apache.sqoop.manager.SupportedManagers;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
import static org.apache.sqoop.manager.SupportedManagers.MYSQL;
@@ -63,6 +63,8 @@
public static final Log LOG = LogFactory.getLog(ImportTool.class.getName());
+ private static final String IMPORT_FAILED_ERROR_MSG = "Import failed: ";
+
private CodeGenTool codeGenerator;
// true if this is an all-tables import. Set by a subclass which
@@ -81,8 +83,12 @@
}
public ImportTool(String toolName, boolean allTables) {
+ this(toolName, new CodeGenTool(), allTables);
+ }
+
+ public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) {
super(toolName);
- this.codeGenerator = new CodeGenTool();
+ this.codeGenerator = codeGenerator;
this.allTables = allTables;
}
@@ -616,18 +622,21 @@
// Import a single table (or query) the user specified.
importTable(options, options.getTableName(), hiveImport);
} catch (IllegalArgumentException iea) {
- LOG.error("Imported Failed: " + iea.getMessage());
+ LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage());
rethrowIfRequired(options, iea);
return 1;
} catch (IOException ioe) {
- LOG.error("Encountered IOException running import job: "
- + StringUtils.stringifyException(ioe));
+ LOG.error(IMPORT_FAILED_ERROR_MSG + StringUtils.stringifyException(ioe));
rethrowIfRequired(options, ioe);
return 1;
} catch (ImportException ie) {
- LOG.error("Error during import: " + ie.toString());
+ LOG.error(IMPORT_FAILED_ERROR_MSG + ie.toString());
rethrowIfRequired(options, ie);
return 1;
+ } catch (AvroSchemaMismatchException e) {
+ LOG.error(IMPORT_FAILED_ERROR_MSG, e);
+ rethrowIfRequired(options, e);
+ return 1;
} finally {
destroy(options);
}
diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
index 6f488ab..1253e8d 100644
--- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
+++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
@@ -26,13 +26,18 @@
import java.util.Arrays;
import java.util.List;
+import com.cloudera.sqoop.Sqoop;
import junit.framework.JUnit4TestAdapter;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
+import org.apache.sqoop.mapreduce.ParquetJob;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -54,6 +59,8 @@
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetReader;
import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.spi.DefaultConfiguration;
/**
* Test HiveImport capability after an import to HDFS.
@@ -388,6 +395,42 @@
verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}});
}
+ @Test
+ public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
+ final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE";
+ setCurTableName(TABLE_NAME);
+ setNumCols(3);
+
+ String [] types = { "VARCHAR(32)", "INTEGER", "DATE" };
+ String [] vals = { "'test'", "42", "'2009-12-31'" };
+ String [] extraArgs = {"--as-parquetfile"};
+
+ createHiveDataSet(TABLE_NAME);
+
+ createTableWithColTypes(types, vals);
+
+ thrown.expect(AvroSchemaMismatchException.class);
+ thrown.expectMessage(ParquetJob.INCOMPATIBLE_AVRO_SCHEMA_MSG + ParquetJob.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
+
+ SqoopOptions sqoopOptions = getSqoopOptions(getConf());
+ sqoopOptions.setThrowOnError(true);
+ Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions);
+ sqoop.run(getArgv(false, extraArgs));
+
+ }
+
+ private void createHiveDataSet(String tableName) {
+ Schema dataSetSchema = SchemaBuilder
+ .record(tableName)
+ .fields()
+ .name(getColName(0)).type().nullable().stringType().noDefault()
+ .name(getColName(1)).type().nullable().stringType().noDefault()
+ .name(getColName(2)).type().nullable().stringType().noDefault()
+ .endRecord();
+ String dataSetUri = "dataset:hive:/default/" + tableName;
+ ParquetJob.createDataset(dataSetSchema, Formats.PARQUET.getDefaultCompressionType(), dataSetUri);
+ }
+
/**
* Test that records are appended to an existing table.
*/
diff --git a/src/test/org/apache/sqoop/tool/TestImportTool.java b/src/test/org/apache/sqoop/tool/TestImportTool.java
index 4136e9f..7e11f54 100644
--- a/src/test/org/apache/sqoop/tool/TestImportTool.java
+++ b/src/test/org/apache/sqoop/tool/TestImportTool.java
@@ -20,11 +20,25 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.sql.Connection;
+import com.cloudera.sqoop.hive.HiveImport;
+import org.apache.avro.Schema;
import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
+import org.apache.sqoop.util.ExpectedLogMessage;
+import org.junit.Rule;
+import org.junit.Test;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
@@ -41,6 +55,9 @@
{"TRANSACTION_SERIALIZABLE",Connection.TRANSACTION_SERIALIZABLE}
};
+ @Rule
+ public ExpectedLogMessage logMessage = new ExpectedLogMessage();
+
@Theory
public void esnureTransactionIsolationLevelsAreMappedToTheRightValues(Object[] values)
throws Exception {
@@ -50,4 +67,30 @@
assertThat(options.getMetadataTransactionIsolationLevel(), is(equalTo(values[1])));
}
+ @Test
+ public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Exception {
+ final String writtenWithSchemaString = "writtenWithSchema";
+ final String actualSchemaString = "actualSchema";
+ final String errorMessage = "Import failed";
+
+ ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false));
+
+ doReturn(true).when(importTool).init(any(com.cloudera.sqoop.SqoopOptions.class));
+
+ Schema writtenWithSchema = mock(Schema.class);
+ when(writtenWithSchema.toString()).thenReturn(writtenWithSchemaString);
+ Schema actualSchema = mock(Schema.class);
+ when(actualSchema.toString()).thenReturn(actualSchemaString);
+
+ AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema);
+ doThrow(expectedException).when(importTool).importTable(any(com.cloudera.sqoop.SqoopOptions.class), anyString(), any(HiveImport.class));
+
+ com.cloudera.sqoop.SqoopOptions sqoopOptions = mock(com.cloudera.sqoop.SqoopOptions.class);
+ when(sqoopOptions.doHiveImport()).thenReturn(true);
+
+ logMessage.expectError(expectedException.getMessage());
+ int result = importTool.run(sqoopOptions);
+ assertEquals(1, result);
+ }
+
}
diff --git a/src/test/org/apache/sqoop/util/ExpectedLogMessage.java b/src/test/org/apache/sqoop/util/ExpectedLogMessage.java
new file mode 100644
index 0000000..0372fe2
--- /dev/null
+++ b/src/test/org/apache/sqoop/util/ExpectedLogMessage.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.mockito.ArgumentCaptor;
+
+import static org.apache.commons.lang.StringUtils.EMPTY;
+import static org.apache.commons.lang.StringUtils.contains;
+import static org.apache.commons.lang.StringUtils.defaultString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class ExpectedLogMessage implements TestRule {
+
+ private static class LoggingEventMatcher extends TypeSafeMatcher<LoggingEvent> {
+
+ private final String msg;
+
+ private final Level level;
+
+ private LoggingEventMatcher(String msg, Level level) {
+ this.msg = msg;
+ this.level = level;
+ }
+
+ @Override
+ public boolean matchesSafely(LoggingEvent o) {
+ return contains(extractEventMessage(o), msg) && level.equals(o.getLevel());
+ }
+
+ @Override
+ public void describeTo(org.hamcrest.Description description) {
+ description.appendText(eventToString(msg, level));
+ }
+
+ @Override
+ protected void describeMismatchSafely(LoggingEvent item, org.hamcrest.Description mismatchDescription) {
+ mismatchDescription.appendText(eventToString(extractEventMessage(item), item.getLevel()));
+ }
+
+ private String extractEventMessage(LoggingEvent item) {
+ final String eventMsg = item.getRenderedMessage();
+ final String exceptionMessage = extractExceptionMessage(item.getThrowableInformation());
+
+ return eventMsg + exceptionMessage;
+ }
+
+ private String extractExceptionMessage(ThrowableInformation throwableInfo) {
+ if (throwableInfo == null) {
+ return EMPTY;
+ }
+
+ Throwable throwable = throwableInfo.getThrowable();
+ if (throwable == null) {
+ return EMPTY;
+ }
+
+ return defaultString(throwable.getMessage());
+ }
+
+ private String eventToString(String msg, Level level) {
+ return "Log entry [ " + msg + ", " + level + " ]";
+ }
+
+ }
+
+ private Matcher<LoggingEvent> loggingEventMatcher;
+
+ @Override
+ public Statement apply(final Statement base, Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+
+ Logger rootLogger = Logger.getRootLogger();
+ Appender mockAppender = mock(Appender.class);
+ rootLogger.addAppender(mockAppender);
+
+ try {
+ base.evaluate();
+ if (loggingEventMatcher != null) {
+ ArgumentCaptor<LoggingEvent> argumentCaptor = ArgumentCaptor.forClass(LoggingEvent.class);
+ verify(mockAppender, atMost(Integer.MAX_VALUE)).doAppend(argumentCaptor.capture());
+ assertThat(argumentCaptor.getAllValues(), hasItem(loggingEventMatcher));
+ }
+ } finally {
+ rootLogger.removeAppender(mockAppender);
+ loggingEventMatcher = null;
+ }
+ }
+ };
+ }
+
+ public void expectFatal(String msg) {
+ loggingEventMatcher = new LoggingEventMatcher(msg, Level.FATAL);
+ }
+
+ public void expectError(String msg) {
+ loggingEventMatcher = new LoggingEventMatcher(msg, Level.ERROR);
+ }
+
+ public void expectWarn(String msg) {
+ loggingEventMatcher = new LoggingEventMatcher(msg, Level.WARN);
+ }
+
+ public void expectInfo(String msg) {
+ loggingEventMatcher = new LoggingEventMatcher(msg, Level.INFO);
+ }
+
+ public void expectDebug(String msg) {
+ loggingEventMatcher = new LoggingEventMatcher(msg, Level.DEBUG);
+ }
+
+ public void expectTrace(String msg) {
+ loggingEventMatcher = new LoggingEventMatcher(msg, Level.TRACE);
+ }
+
+}