SQOOP-3068: Enhance error (tool.ImportTool:
Encountered IOException running import job:
java.io.IOException: Expected schema)
to suggest workaround (--map-column-java)

(Szabolcs Vasas via Attila Szabo)
diff --git a/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java b/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java
new file mode 100644
index 0000000..4070627
--- /dev/null
+++ b/src/java/org/apache/sqoop/avro/AvroSchemaMismatchException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.avro;
+
+import org.apache.avro.Schema;
+
+/**
+ * This exception will be thrown when Sqoop tries to write to a dataset
+ * and the Avro schema which was used when the dataset was created does not match
+ * the actual schema which is used by Sqoop during the write operation.
+ */
+public class AvroSchemaMismatchException extends RuntimeException {
+
+  static final String MESSAGE_TEMPLATE = "%s%nExpected schema: %s%nActual schema: %s";
+
+  private final Schema writtenWithSchema;
+
+  private final Schema actualSchema;
+
+  public AvroSchemaMismatchException(String message, Schema writtenWithSchema, Schema actualSchema) {
+    super(String.format(MESSAGE_TEMPLATE, message, writtenWithSchema.toString(), actualSchema.toString()));
+    this.writtenWithSchema = writtenWithSchema;
+    this.actualSchema = actualSchema;
+  }
+
+  public Schema getWrittenWithSchema() {
+    return writtenWithSchema;
+  }
+
+  public Schema getActualSchema() {
+    return actualSchema;
+  }
+
+}
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
index b077d9b..4604773 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
 import org.apache.sqoop.hive.HiveConfig;
 import org.kitesdk.data.CompressionType;
 import org.kitesdk.data.Dataset;
@@ -48,12 +49,23 @@
   public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
 
   public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
+
   public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
   // Purposefully choosing the same token alias as the one Oozie chooses.
   // Make sure we don't generate a new delegation token if oozie
   // has already generated one.
   public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
 
+  public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
+
+  public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
+      "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
+      " but it is possible that date/timestamp types were mapped to strings during table" +
+      " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
+      " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
+
+  private static final String HIVE_URI_PREFIX = "dataset:hive";
+
   private ParquetJob() {
   }
 
@@ -91,7 +103,7 @@
     Dataset dataset;
 
     // Add hive delegation token only if we don't already have one.
-    if (uri.startsWith("dataset:hive")) {
+    if (isHiveImport(uri)) {
       Configuration hiveConf = HiveConfig.getHiveConf(conf);
       if (isSecureMetastore(hiveConf)) {
         // Copy hive configs to job config
@@ -111,9 +123,8 @@
       dataset = Datasets.load(uri);
       Schema writtenWith = dataset.getDescriptor().getSchema();
       if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
-        throw new IOException(
-            String.format("Expected schema: %s%nActual schema: %s",
-                writtenWith, schema));
+        String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
+        throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
       }
     } else {
       dataset = createDataset(schema, getCompressionType(conf), uri);
@@ -131,7 +142,11 @@
     }
   }
 
-  private static Dataset createDataset(Schema schema,
+  private static boolean isHiveImport(String importUri) {
+    return importUri.startsWith(HIVE_URI_PREFIX);
+  }
+
+  public static Dataset createDataset(Schema schema,
       CompressionType compressionType, String uri) {
     DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
         .schema(schema)
@@ -191,4 +206,15 @@
       throw new RuntimeException("Couldn't fetch delegation token.", ex);
     }
   }
+
+  private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
+    String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
+
+    if (hiveImport) {
+      exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
+    }
+
+    return exceptionMessage;
+  }
+
 }
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index ed951ea..258ef79 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -52,7 +52,7 @@
 import com.cloudera.sqoop.metastore.JobStorageFactory;
 import com.cloudera.sqoop.util.AppendUtils;
 import com.cloudera.sqoop.util.ImportException;
-import org.apache.sqoop.manager.SupportedManagers;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
 
 import static org.apache.sqoop.manager.SupportedManagers.MYSQL;
 
@@ -63,6 +63,8 @@
 
   public static final Log LOG = LogFactory.getLog(ImportTool.class.getName());
 
+  private static final String IMPORT_FAILED_ERROR_MSG = "Import failed: ";
+
   private CodeGenTool codeGenerator;
 
   // true if this is an all-tables import. Set by a subclass which
@@ -81,8 +83,12 @@
   }
 
   public ImportTool(String toolName, boolean allTables) {
+    this(toolName, new CodeGenTool(), allTables);
+  }
+
+  public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) {
     super(toolName);
-    this.codeGenerator = new CodeGenTool();
+    this.codeGenerator = codeGenerator;
     this.allTables = allTables;
   }
 
@@ -616,18 +622,21 @@
       // Import a single table (or query) the user specified.
       importTable(options, options.getTableName(), hiveImport);
     } catch (IllegalArgumentException iea) {
-        LOG.error("Imported Failed: " + iea.getMessage());
+        LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage());
       rethrowIfRequired(options, iea);
       return 1;
     } catch (IOException ioe) {
-      LOG.error("Encountered IOException running import job: "
-          + StringUtils.stringifyException(ioe));
+      LOG.error(IMPORT_FAILED_ERROR_MSG + StringUtils.stringifyException(ioe));
       rethrowIfRequired(options, ioe);
       return 1;
     } catch (ImportException ie) {
-      LOG.error("Error during import: " + ie.toString());
+      LOG.error(IMPORT_FAILED_ERROR_MSG + ie.toString());
       rethrowIfRequired(options, ie);
       return 1;
+    } catch (AvroSchemaMismatchException e) {
+      LOG.error(IMPORT_FAILED_ERROR_MSG, e);
+      rethrowIfRequired(options, e);
+      return 1;
     } finally {
       destroy(options);
     }
diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
index 6f488ab..1253e8d 100644
--- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
+++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java
@@ -26,13 +26,18 @@
 import java.util.Arrays;
 import java.util.List;
 
+import com.cloudera.sqoop.Sqoop;
 import junit.framework.JUnit4TestAdapter;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
+import org.apache.sqoop.mapreduce.ParquetJob;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -54,6 +59,8 @@
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetReader;
 import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.spi.DefaultConfiguration;
 
 /**
  * Test HiveImport capability after an import to HDFS.
@@ -388,6 +395,42 @@
     verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}});
   }
 
+  @Test
+  public void testHiveImportAsParquetWhenTableExistsWithIncompatibleSchema() throws Exception {
+    final String TABLE_NAME = "HIVE_IMPORT_AS_PARQUET_EXISTING_TABLE";
+    setCurTableName(TABLE_NAME);
+    setNumCols(3);
+
+    String [] types = { "VARCHAR(32)", "INTEGER", "DATE" };
+    String [] vals = { "'test'", "42", "'2009-12-31'" };
+    String [] extraArgs = {"--as-parquetfile"};
+
+    createHiveDataSet(TABLE_NAME);
+
+    createTableWithColTypes(types, vals);
+
+    thrown.expect(AvroSchemaMismatchException.class);
+    thrown.expectMessage(ParquetJob.INCOMPATIBLE_AVRO_SCHEMA_MSG + ParquetJob.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
+
+    SqoopOptions sqoopOptions = getSqoopOptions(getConf());
+    sqoopOptions.setThrowOnError(true);
+    Sqoop sqoop = new Sqoop(new ImportTool(), getConf(), sqoopOptions);
+    sqoop.run(getArgv(false, extraArgs));
+
+  }
+
+  private void createHiveDataSet(String tableName) {
+    Schema dataSetSchema = SchemaBuilder
+        .record(tableName)
+            .fields()
+            .name(getColName(0)).type().nullable().stringType().noDefault()
+            .name(getColName(1)).type().nullable().stringType().noDefault()
+            .name(getColName(2)).type().nullable().stringType().noDefault()
+            .endRecord();
+    String dataSetUri = "dataset:hive:/default/" + tableName;
+    ParquetJob.createDataset(dataSetSchema, Formats.PARQUET.getDefaultCompressionType(), dataSetUri);
+  }
+
   /**
    * Test that records are appended to an existing table.
    */
diff --git a/src/test/org/apache/sqoop/tool/TestImportTool.java b/src/test/org/apache/sqoop/tool/TestImportTool.java
index 4136e9f..7e11f54 100644
--- a/src/test/org/apache/sqoop/tool/TestImportTool.java
+++ b/src/test/org/apache/sqoop/tool/TestImportTool.java
@@ -20,11 +20,25 @@
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.sql.Connection;
 
+import com.cloudera.sqoop.hive.HiveImport;
+import org.apache.avro.Schema;
 import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
+import org.apache.sqoop.util.ExpectedLogMessage;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.experimental.theories.DataPoints;
 import org.junit.experimental.theories.Theories;
 import org.junit.experimental.theories.Theory;
@@ -41,6 +55,9 @@
     {"TRANSACTION_SERIALIZABLE",Connection.TRANSACTION_SERIALIZABLE}
   };
 
+  @Rule
+  public ExpectedLogMessage logMessage = new ExpectedLogMessage();
+
   @Theory
   public void esnureTransactionIsolationLevelsAreMappedToTheRightValues(Object[] values)
       throws Exception {
@@ -50,4 +67,30 @@
     assertThat(options.getMetadataTransactionIsolationLevel(), is(equalTo(values[1])));
   }
 
+  @Test
+  public void testImportToolHandlesAvroSchemaMismatchExceptionProperly() throws Exception {
+    final String writtenWithSchemaString = "writtenWithSchema";
+    final String actualSchemaString = "actualSchema";
+    final String errorMessage = "Import failed";
+
+    ImportTool importTool = spy(new ImportTool("import", mock(CodeGenTool.class), false));
+
+    doReturn(true).when(importTool).init(any(com.cloudera.sqoop.SqoopOptions.class));
+
+    Schema writtenWithSchema = mock(Schema.class);
+    when(writtenWithSchema.toString()).thenReturn(writtenWithSchemaString);
+    Schema actualSchema = mock(Schema.class);
+    when(actualSchema.toString()).thenReturn(actualSchemaString);
+
+    AvroSchemaMismatchException expectedException = new AvroSchemaMismatchException(errorMessage, writtenWithSchema, actualSchema);
+    doThrow(expectedException).when(importTool).importTable(any(com.cloudera.sqoop.SqoopOptions.class), anyString(), any(HiveImport.class));
+
+    com.cloudera.sqoop.SqoopOptions sqoopOptions = mock(com.cloudera.sqoop.SqoopOptions.class);
+    when(sqoopOptions.doHiveImport()).thenReturn(true);
+
+    logMessage.expectError(expectedException.getMessage());
+    int result = importTool.run(sqoopOptions);
+    assertEquals(1, result);
+  }
+
 }
diff --git a/src/test/org/apache/sqoop/util/ExpectedLogMessage.java b/src/test/org/apache/sqoop/util/ExpectedLogMessage.java
new file mode 100644
index 0000000..0372fe2
--- /dev/null
+++ b/src/test/org/apache/sqoop/util/ExpectedLogMessage.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.mockito.ArgumentCaptor;
+
+import static org.apache.commons.lang.StringUtils.EMPTY;
+import static org.apache.commons.lang.StringUtils.contains;
+import static org.apache.commons.lang.StringUtils.defaultString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class ExpectedLogMessage implements TestRule {
+
+  private static class LoggingEventMatcher extends TypeSafeMatcher<LoggingEvent> {
+
+    private final String msg;
+
+    private final Level level;
+
+    private LoggingEventMatcher(String msg, Level level) {
+      this.msg = msg;
+      this.level = level;
+    }
+
+    @Override
+    public boolean matchesSafely(LoggingEvent o) {
+      return contains(extractEventMessage(o), msg) && level.equals(o.getLevel());
+    }
+
+    @Override
+    public void describeTo(org.hamcrest.Description description) {
+      description.appendText(eventToString(msg, level));
+    }
+
+    @Override
+    protected void describeMismatchSafely(LoggingEvent item, org.hamcrest.Description mismatchDescription) {
+      mismatchDescription.appendText(eventToString(extractEventMessage(item), item.getLevel()));
+    }
+
+    private String extractEventMessage(LoggingEvent item) {
+      final String eventMsg = item.getRenderedMessage();
+      final String exceptionMessage = extractExceptionMessage(item.getThrowableInformation());
+
+      return eventMsg + exceptionMessage;
+    }
+
+    private String extractExceptionMessage(ThrowableInformation throwableInfo) {
+      if (throwableInfo == null) {
+        return EMPTY;
+      }
+
+      Throwable throwable = throwableInfo.getThrowable();
+      if (throwable == null) {
+        return EMPTY;
+      }
+
+      return defaultString(throwable.getMessage());
+    }
+
+    private String eventToString(String msg, Level level) {
+      return "Log entry [ " + msg + ", " + level + " ]";
+    }
+
+  }
+
+  private Matcher<LoggingEvent> loggingEventMatcher;
+
+  @Override
+  public Statement apply(final Statement base, Description description) {
+    return new Statement() {
+      @Override
+      public void evaluate() throws Throwable {
+
+        Logger rootLogger = Logger.getRootLogger();
+        Appender mockAppender = mock(Appender.class);
+        rootLogger.addAppender(mockAppender);
+
+        try {
+          base.evaluate();
+          if (loggingEventMatcher != null) {
+            ArgumentCaptor<LoggingEvent> argumentCaptor = ArgumentCaptor.forClass(LoggingEvent.class);
+            verify(mockAppender, atMost(Integer.MAX_VALUE)).doAppend(argumentCaptor.capture());
+            assertThat(argumentCaptor.getAllValues(), hasItem(loggingEventMatcher));
+          }
+        } finally {
+          rootLogger.removeAppender(mockAppender);
+          loggingEventMatcher = null;
+        }
+      }
+    };
+  }
+
+  public void expectFatal(String msg) {
+    loggingEventMatcher = new LoggingEventMatcher(msg, Level.FATAL);
+  }
+
+  public void expectError(String msg) {
+    loggingEventMatcher = new LoggingEventMatcher(msg, Level.ERROR);
+  }
+
+  public void expectWarn(String msg) {
+    loggingEventMatcher = new LoggingEventMatcher(msg, Level.WARN);
+  }
+
+  public void expectInfo(String msg) {
+    loggingEventMatcher = new LoggingEventMatcher(msg, Level.INFO);
+  }
+
+  public void expectDebug(String msg) {
+    loggingEventMatcher = new LoggingEventMatcher(msg, Level.DEBUG);
+  }
+
+  public void expectTrace(String msg) {
+    loggingEventMatcher = new LoggingEventMatcher(msg, Level.TRACE);
+  }
+
+}