| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.sqoop.mapreduce.parquet.kite; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.sqoop.avro.AvroSchemaMismatchException; |
| import org.apache.sqoop.hive.HiveConfig; |
| import org.kitesdk.data.CompressionType; |
| import org.kitesdk.data.Dataset; |
| import org.kitesdk.data.DatasetDescriptor; |
| import org.kitesdk.data.Datasets; |
| import org.kitesdk.data.Formats; |
| import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; |
| import org.kitesdk.data.spi.SchemaValidationUtil; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| |
| import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY; |
| import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY; |
| |
| /** |
| * Helper class using the Kite Dataset API for setting up a Parquet MapReduce job. |
| */ |
| public final class KiteParquetUtils { |
| |
| public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName()); |
| |
| public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; |
| |
| public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled"; |
| // Purposefully choosing the same token alias as the one Oozie chooses. |
| // Make sure we don't generate a new delegation token if oozie |
| // has already generated one. |
| public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token"; |
| |
| public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. "; |
| |
| public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " + |
| "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" + |
| " but it is possible that date/timestamp types were mapped to strings during table" + |
| " creation. Consider using Sqoop option --map-column-java resolve the mismatch" + |
| " (e.g. --map-column-java date_field1=String,timestamp_field1=String)."; |
| |
| private static final String HIVE_URI_PREFIX = "dataset:hive"; |
| |
| private KiteParquetUtils() { |
| } |
| |
| public enum WriteMode { |
| DEFAULT, APPEND, OVERWRITE |
| }; |
| |
| public static CompressionType getCompressionType(Configuration conf) { |
| CompressionType defaults = Formats.PARQUET.getDefaultCompressionType(); |
| String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName()); |
| try { |
| return CompressionType.forName(codec); |
| } catch (IllegalArgumentException ex) { |
| LOG.warn(String.format( |
| "Unsupported compression type '%s'. Fallback to '%s'.", |
| codec, defaults)); |
| } |
| return defaults; |
| } |
| |
| /** |
| * Configure the import job. The import process will use a Kite dataset to |
| * write data records into Parquet format internally. The input key class is |
| * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is |
| * {@link org.apache.avro.generic.GenericRecord}. |
| */ |
| public static void configureImportJob(JobConf conf, Schema schema, |
| String uri, WriteMode writeMode) throws IOException { |
| Dataset dataset; |
| |
| // Add hive delegation token only if we don't already have one. |
| if (isHiveImport(uri)) { |
| Configuration hiveConf = HiveConfig.getHiveConf(conf); |
| if (isSecureMetastore(hiveConf)) { |
| // Copy hive configs to job config |
| HiveConfig.addHiveConfigs(hiveConf, conf); |
| |
| if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) { |
| addHiveDelegationToken(conf); |
| } |
| } |
| } |
| |
| if (Datasets.exists(uri)) { |
| if (WriteMode.DEFAULT.equals(writeMode)) { |
| throw new IOException("Destination exists! " + uri); |
| } |
| |
| dataset = Datasets.load(uri); |
| Schema writtenWith = dataset.getDescriptor().getSchema(); |
| if (!SchemaValidationUtil.canRead(writtenWith, schema)) { |
| String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri)); |
| throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema); |
| } |
| } else { |
| dataset = createDataset(schema, getCompressionType(conf), uri); |
| } |
| conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString()); |
| |
| DatasetKeyOutputFormat.ConfigBuilder builder = |
| DatasetKeyOutputFormat.configure(conf); |
| if (WriteMode.OVERWRITE.equals(writeMode)) { |
| builder.overwrite(dataset); |
| } else if (WriteMode.APPEND.equals(writeMode)) { |
| builder.appendTo(dataset); |
| } else { |
| builder.writeTo(dataset); |
| } |
| } |
| |
| private static boolean isHiveImport(String importUri) { |
| return importUri.startsWith(HIVE_URI_PREFIX); |
| } |
| |
| public static Dataset createDataset(Schema schema, |
| CompressionType compressionType, String uri) { |
| DatasetDescriptor descriptor = new DatasetDescriptor.Builder() |
| .schema(schema) |
| .format(Formats.PARQUET) |
| .compressionType(compressionType) |
| .build(); |
| return Datasets.create(uri, descriptor, GenericRecord.class); |
| } |
| |
| private static boolean isSecureMetastore(Configuration conf) { |
| return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false); |
| } |
| |
| /** |
| * Add hive delegation token to credentials store. |
| * @param conf |
| */ |
| private static void addHiveDelegationToken(JobConf conf) { |
| // Need to use reflection since there's no compile time dependency on the client libs. |
| Class<?> HiveConfClass; |
| Class<?> HiveMetaStoreClientClass; |
| |
| try { |
| HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS); |
| } catch (ClassNotFoundException ex) { |
| LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS |
| + " when adding hive delegation token. " |
| + "Make sure HIVE_CONF_DIR is set correctly.", ex); |
| throw new RuntimeException("Couldn't fetch delegation token.", ex); |
| } |
| |
| try { |
| HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS); |
| } catch (ClassNotFoundException ex) { |
| LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS |
| + " when adding hive delegation token." |
| + " Make sure HIVE_CONF_DIR is set correctly.", ex); |
| throw new RuntimeException("Couldn't fetch delegation token.", ex); |
| } |
| |
| try { |
| Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance( |
| HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class) |
| ); |
| // getDelegationToken(String kerberosPrincial) |
| Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class); |
| Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName()); |
| |
| // Load token |
| Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>(); |
| metastoreToken.decodeFromUrlString(tokenStringForm.toString()); |
| conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken); |
| |
| LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken); |
| } catch (Exception ex) { |
| LOG.error("Couldn't fetch delegation token.", ex); |
| throw new RuntimeException("Couldn't fetch delegation token.", ex); |
| } |
| } |
| |
| private static String buildAvroSchemaMismatchMessage(boolean hiveImport) { |
| String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG; |
| |
| if (hiveImport) { |
| exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG; |
| } |
| |
| return exceptionMessage; |
| } |
| |
| } |