| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hive.ql.exec; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import java.beans.DefaultPersistenceDelegate; |
| import java.beans.Encoder; |
| import java.beans.Expression; |
| import java.beans.Statement; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInput; |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.net.URLDecoder; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.PreparedStatement; |
| import java.sql.SQLException; |
| import java.sql.SQLFeatureNotSupportedException; |
| import java.sql.SQLTransientException; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Base64; |
| import java.util.Calendar; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import java.util.zip.Deflater; |
| import java.util.zip.DeflaterOutputStream; |
| import java.util.zip.InflaterInputStream; |
| |
| import org.apache.commons.collections.MapUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.StringEscapeUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.filecache.DistributedCache; |
| import org.apache.hadoop.fs.ContentSummary; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.hive.common.BlobStorageUtils; |
| import org.apache.hadoop.hive.common.FileUtils; |
| import org.apache.hadoop.hive.common.HiveInterruptCallback; |
| import org.apache.hadoop.hive.common.HiveInterruptUtils; |
| import org.apache.hadoop.hive.common.HiveStatsUtils; |
| import org.apache.hadoop.hive.common.JavaUtils; |
| import org.apache.hadoop.hive.common.StatsSetupConst; |
| import org.apache.hadoop.hive.common.StringInternUtils; |
| import org.apache.hadoop.hive.common.TableName; |
| import org.apache.hadoop.hive.common.ValidWriteIdList; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.conf.HiveConf.ConfVars; |
| import org.apache.hadoop.hive.metastore.TableType; |
| import org.apache.hadoop.hive.metastore.Warehouse; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.Order; |
| import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; |
| import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; |
| import org.apache.hadoop.hive.ql.Context; |
| import org.apache.hadoop.hive.ql.ErrorMsg; |
| import org.apache.hadoop.hive.ql.DriverState; |
| import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; |
| import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; |
| import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; |
| import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; |
| import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; |
| import org.apache.hadoop.hive.ql.exec.tez.DagUtils; |
| import org.apache.hadoop.hive.ql.exec.tez.TezTask; |
| import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; |
| import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; |
| import org.apache.hadoop.hive.ql.io.AcidUtils; |
| import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; |
| import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; |
| import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; |
| import org.apache.hadoop.hive.ql.io.HiveInputFormat; |
| import org.apache.hadoop.hive.ql.io.HiveOutputFormat; |
| import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; |
| import org.apache.hadoop.hive.ql.io.IOConstants; |
| import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat; |
| import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; |
| import org.apache.hadoop.hive.ql.io.RCFile; |
| import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; |
| import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; |
| import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; |
| import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; |
| import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; |
| import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; |
| import org.apache.hadoop.hive.ql.log.PerfLogger; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; |
| import org.apache.hadoop.hive.ql.metadata.HiveUtils; |
| import org.apache.hadoop.hive.ql.metadata.InputEstimator; |
| import org.apache.hadoop.hive.ql.metadata.Partition; |
| import org.apache.hadoop.hive.ql.metadata.Table; |
| import org.apache.hadoop.hive.ql.parse.SemanticException; |
| import org.apache.hadoop.hive.ql.plan.BaseWork; |
| import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; |
| import org.apache.hadoop.hive.ql.plan.FileSinkDesc; |
| import org.apache.hadoop.hive.ql.plan.IStatsGatherDesc; |
| import org.apache.hadoop.hive.ql.plan.MapWork; |
| import org.apache.hadoop.hive.ql.plan.MapredWork; |
| import org.apache.hadoop.hive.ql.plan.MergeJoinWork; |
| import org.apache.hadoop.hive.ql.plan.OperatorDesc; |
| import org.apache.hadoop.hive.ql.plan.PartitionDesc; |
| import org.apache.hadoop.hive.ql.plan.PlanUtils; |
| import org.apache.hadoop.hive.ql.plan.ReduceWork; |
| import org.apache.hadoop.hive.ql.plan.TableDesc; |
| import org.apache.hadoop.hive.ql.plan.TableScanDesc; |
| import org.apache.hadoop.hive.ql.secrets.URISecretSource; |
| import org.apache.hadoop.hive.ql.session.SessionState; |
| import org.apache.hadoop.hive.ql.stats.StatsFactory; |
| import org.apache.hadoop.hive.ql.stats.StatsPublisher; |
| import org.apache.hadoop.hive.serde.serdeConstants; |
| import org.apache.hadoop.hive.serde2.AbstractSerDe; |
| import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; |
| import org.apache.hadoop.hive.serde2.SerDeException; |
| import org.apache.hadoop.hive.serde2.SerDeUtils; |
| import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; |
| import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.StructField; |
| import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; |
| import org.apache.hadoop.hive.shims.ShimLoader; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.SequenceFile.CompressionType; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.DefaultCodec; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| import org.apache.hadoop.mapred.FileSplit; |
| import org.apache.hadoop.mapred.InputFormat; |
| import org.apache.hadoop.mapred.InputSplit; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.RecordReader; |
| import org.apache.hadoop.mapred.Reporter; |
| import org.apache.hadoop.mapred.SequenceFileInputFormat; |
| import org.apache.hadoop.mapred.SequenceFileOutputFormat; |
| import org.apache.hadoop.mapred.TextInputFormat; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.alias.CredentialProviderFactory; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hive.common.util.ACLConfigurationParser; |
| import org.apache.hive.common.util.ReflectionUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.esotericsoftware.kryo.Kryo; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Maps; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| |
| /** |
| * Utilities. |
| * |
| */ |
| @SuppressWarnings({ "nls", "deprecation" }) |
| public final class Utilities { |
| |
| /** |
| * Mapper to use to serialize/deserialize JSON objects (). |
| */ |
| public static final ObjectMapper JSON_MAPPER = new ObjectMapper(); |
| |
| /** |
| * A logger mostly used to trace-log the details of Hive table file operations. Filtering the |
| * logs for FileOperations (with trace logs present) allows one to debug what Hive has done with |
| * various files and directories while committing writes, as well as reading. |
| */ |
| public static final Logger FILE_OP_LOGGER = LoggerFactory.getLogger("FileOperations"); |
| |
| public static final Logger LOGGER = LoggerFactory.getLogger(Utilities.class); |
| |
| /** |
| * The object in the reducer are composed of these top level fields. |
| */ |
| |
| public static final String HADOOP_LOCAL_FS = "file:///"; |
| public static final String HADOOP_LOCAL_FS_SCHEME = "file"; |
| public static final String MAP_PLAN_NAME = "map.xml"; |
| public static final String REDUCE_PLAN_NAME = "reduce.xml"; |
| public static final String MERGE_PLAN_NAME = "merge.xml"; |
| public static final String INPUT_NAME = "iocontext.input.name"; |
| public static final String HAS_MAP_WORK = "has.map.work"; |
| public static final String HAS_REDUCE_WORK = "has.reduce.work"; |
| public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class"; |
| public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; |
| public static final String HIVE_ADDED_JARS = "hive.added.jars"; |
| public static final String VECTOR_MODE = "VECTOR_MODE"; |
| public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT"; |
| public static final String MAPNAME = "Map "; |
| public static final String REDUCENAME = "Reducer "; |
| public static final String ENSURE_OPERATORS_EXECUTED = "ENSURE_OPERATORS_EXECUTED"; |
| public static final String BRANCH_NAME = "branch_name"; |
| |
| @Deprecated |
| protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; |
| |
| // all common whitespaces as defined in Character.isWhitespace(char) |
| // Used primarily as a workaround until TEXT-175 is released |
| public static final char[] COMMON_WHITESPACE_CHARS = |
| { '\t', '\n', '\u000B', '\f', '\r', '\u001C', '\u001D', '\u001E', '\u001F', ' ' }; |
| |
| private static final Object INPUT_SUMMARY_LOCK = new Object(); |
| private static final Object ROOT_HDFS_DIR_LOCK = new Object(); |
| public static final String BLOB_MANIFEST_FILE = "_blob_manifest_file"; |
| |
| @FunctionalInterface |
| public interface SupplierWithCheckedException<T, X extends Exception> { |
| T get() throws X; |
| } |
| |
| /** |
| * ReduceField: |
| * KEY: record key |
| * VALUE: record value |
| */ |
| public static enum ReduceField { |
| KEY(0), VALUE(1); |
| |
| int position; |
| ReduceField(int position) { |
| this.position = position; |
| }; |
| }; |
| |
| public static List<String> reduceFieldNameList; |
| static { |
| reduceFieldNameList = new ArrayList<String>(); |
| for (ReduceField r : ReduceField.values()) { |
| reduceFieldNameList.add(r.toString()); |
| } |
| } |
| |
| public static String removeValueTag(String column) { |
| if (column.startsWith(ReduceField.VALUE + ".")) { |
| return column.substring(6); |
| } |
| return column; |
| } |
| |
| private Utilities() { |
| // prevent instantiation |
| } |
| |
| private static GlobalWorkMapFactory gWorkMap = new GlobalWorkMapFactory(); |
| |
| private static final String CLASS_NAME = Utilities.class.getName(); |
| private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); |
| |
| public static void clearWork(Configuration conf) { |
| Path mapPath = getPlanPath(conf, MAP_PLAN_NAME); |
| Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME); |
| |
| // if the plan path hasn't been initialized just return, nothing to clean. |
| if (mapPath == null && reducePath == null) { |
| return; |
| } |
| |
| |
| try { |
| if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { |
| FileSystem fs = mapPath.getFileSystem(conf); |
| try { |
| fs.delete(mapPath, true); |
| } catch (FileNotFoundException e) { |
| // delete if exists, don't panic if it doesn't |
| } |
| try { |
| fs.delete(reducePath, true); |
| } catch (FileNotFoundException e) { |
| // delete if exists, don't panic if it doesn't |
| } |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to clean-up tmp directories.", e); |
| } finally { |
| // where a single process works with multiple plans - we must clear |
| // the cache before working with the next plan. |
| clearWorkMapForConf(conf); |
| } |
| } |
| |
| public static MapredWork getMapRedWork(Configuration conf) { |
| MapredWork w = new MapredWork(); |
| w.setMapWork(getMapWork(conf)); |
| w.setReduceWork(getReduceWork(conf)); |
| return w; |
| } |
| |
| public static void cacheMapWork(Configuration conf, MapWork work, Path hiveScratchDir) { |
| cacheBaseWork(conf, MAP_PLAN_NAME, work, hiveScratchDir); |
| } |
| |
| public static void setMapWork(Configuration conf, MapWork work) { |
| setBaseWork(conf, MAP_PLAN_NAME, work); |
| } |
| |
| public static MapWork getMapWork(Configuration conf) { |
| if (!conf.getBoolean(HAS_MAP_WORK, false)) { |
| return null; |
| } |
| return (MapWork) getBaseWork(conf, MAP_PLAN_NAME); |
| } |
| |
| public static void setReduceWork(Configuration conf, ReduceWork work) { |
| setBaseWork(conf, REDUCE_PLAN_NAME, work); |
| } |
| |
| public static ReduceWork getReduceWork(Configuration conf) { |
| if (!conf.getBoolean(HAS_REDUCE_WORK, false)) { |
| return null; |
| } |
| return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME); |
| } |
| |
| public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir, |
| boolean useCache) { |
| for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) { |
| setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache); |
| String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); |
| if (prefixes == null) { |
| prefixes = baseWork.getName(); |
| } else { |
| prefixes = prefixes + "," + baseWork.getName(); |
| } |
| conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes); |
| } |
| |
| // nothing to return |
| return null; |
| } |
| |
| public static BaseWork getMergeWork(Configuration jconf) { |
| String currentMergePrefix = jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX); |
| if (StringUtils.isEmpty(currentMergePrefix)) { |
| return null; |
| } |
| return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX)); |
| } |
| |
| public static BaseWork getMergeWork(Configuration jconf, String prefix) { |
| if (StringUtils.isEmpty(prefix)) { |
| return null; |
| } |
| |
| return getBaseWork(jconf, prefix + MERGE_PLAN_NAME); |
| } |
| |
| public static void cacheBaseWork(Configuration conf, String name, BaseWork work, |
| Path hiveScratchDir) { |
| try { |
| setPlanPath(conf, hiveScratchDir); |
| setBaseWork(conf, name, work); |
| } catch (IOException e) { |
| LOG.error("Failed to cache plan", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Pushes work into the global work map |
| */ |
| private static void setBaseWork(Configuration conf, String name, BaseWork work) { |
| Path path = getPlanPath(conf, name); |
| setHasWork(conf, name); |
| gWorkMap.get(conf).put(path, work); |
| } |
| |
| /** |
| * Returns the Map or Reduce plan |
| * Side effect: the BaseWork returned is also placed in the gWorkMap |
| * @param conf |
| * @param name |
| * @return BaseWork based on the name supplied will return null if name is null |
| * @throws RuntimeException if the configuration files are not proper or if plan can not be loaded |
| */ |
| private static BaseWork getBaseWork(Configuration conf, String name) { |
| |
| Path path = getPlanPath(conf, name); |
| LOG.debug("PLAN PATH = {}", path); |
| if (path == null) { // Map/reduce plan may not be generated |
| return null; |
| } |
| |
| BaseWork gWork = gWorkMap.get(conf).get(path); |
| if (gWork != null) { |
| LOG.debug("Found plan in cache for name: {}", name); |
| return gWork; |
| } |
| |
| InputStream in = null; |
| Kryo kryo = SerializationUtilities.borrowKryo(); |
| try { |
| String engine = HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE); |
| Path localPath = path; |
| LOG.debug("local path = {}", localPath); |
| final long serializedSize; |
| final String planMode; |
| if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { |
| String planStringPath = path.toUri().getPath(); |
| LOG.debug("Loading plan from string: {}", planStringPath); |
| String planString = conf.getRaw(planStringPath); |
| if (planString == null) { |
| LOG.info("Could not find plan string in conf"); |
| return null; |
| } |
| serializedSize = planString.length(); |
| planMode = "RPC"; |
| byte[] planBytes = Base64.getDecoder().decode(planString); |
| in = new ByteArrayInputStream(planBytes); |
| in = new InflaterInputStream(in); |
| } else { |
| LOG.debug("Open file to read in plan: {}", localPath); |
| FileSystem fs = localPath.getFileSystem(conf); |
| in = fs.open(localPath); |
| serializedSize = fs.getFileStatus(localPath).getLen(); |
| planMode = "FILE"; |
| } |
| |
| if(MAP_PLAN_NAME.equals(name)){ |
| if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){ |
| gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class); |
| } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { |
| gWork = SerializationUtilities.deserializePlan(kryo, in, MergeFileWork.class); |
| } else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) { |
| gWork = SerializationUtilities.deserializePlan(kryo, in, ColumnTruncateWork.class); |
| } else { |
| throw new RuntimeException("unable to determine work from configuration ." |
| + MAPRED_MAPPER_CLASS + " was "+ conf.get(MAPRED_MAPPER_CLASS)); |
| } |
| } else if (REDUCE_PLAN_NAME.equals(name)) { |
| if(ExecReducer.class.getName().equals(conf.get(MAPRED_REDUCER_CLASS))) { |
| gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class); |
| } else { |
| throw new RuntimeException("unable to determine work from configuration ." |
| + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)); |
| } |
| } else if (name.contains(MERGE_PLAN_NAME)) { |
| if (name.startsWith(MAPNAME)) { |
| gWork = SerializationUtilities.deserializePlan(kryo, in, MapWork.class); |
| } else if (name.startsWith(REDUCENAME)) { |
| gWork = SerializationUtilities.deserializePlan(kryo, in, ReduceWork.class); |
| } else { |
| throw new RuntimeException("Unknown work type: " + name); |
| } |
| } |
| LOG.info("Deserialized plan (via {}) - name: {} size: {}", planMode, |
| gWork.getName(), humanReadableByteCount(serializedSize)); |
| gWorkMap.get(conf).put(path, gWork); |
| return gWork; |
| } catch (FileNotFoundException fnf) { |
| // happens. e.g.: no reduce work. |
| LOG.debug("No plan file found: {}", path, fnf); |
| return null; |
| } catch (Exception e) { |
| String msg = "Failed to load plan: " + path; |
| LOG.error(msg, e); |
| throw new RuntimeException(msg, e); |
| } finally { |
| SerializationUtilities.releaseKryo(kryo); |
| IOUtils.closeStream(in); |
| } |
| } |
| |
| private static void setHasWork(Configuration conf, String name) { |
| if (MAP_PLAN_NAME.equals(name)) { |
| conf.setBoolean(HAS_MAP_WORK, true); |
| } else if (REDUCE_PLAN_NAME.equals(name)) { |
| conf.setBoolean(HAS_REDUCE_WORK, true); |
| } |
| } |
| |
| public static List<String> getFieldSchemaString(List<FieldSchema> fl) { |
| if (fl == null) { |
| return null; |
| } |
| |
| ArrayList<String> ret = new ArrayList<String>(); |
| for (FieldSchema f : fl) { |
| ret.add(f.getName() + " " + f.getType() |
| + (f.getComment() != null ? (" " + f.getComment()) : "")); |
| } |
| return ret; |
| } |
| |
| public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { |
| String useName = conf.get(INPUT_NAME); |
| if (useName == null) { |
| useName = "mapreduce:" + hiveScratchDir; |
| } |
| conf.set(INPUT_NAME, useName); |
| setMapWork(conf, w.getMapWork(), hiveScratchDir, true); |
| if (w.getReduceWork() != null) { |
| conf.set(INPUT_NAME, useName); |
| setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true); |
| } |
| } |
| |
| public static Path setMapWork(Configuration conf, MapWork w, Path hiveScratchDir, boolean useCache) { |
| return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache); |
| } |
| |
| public static Path setReduceWork(Configuration conf, ReduceWork w, Path hiveScratchDir, boolean useCache) { |
| return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache); |
| } |
| |
| private static Path setBaseWork(Configuration conf, BaseWork w, Path hiveScratchDir, String name, boolean useCache) { |
| Kryo kryo = SerializationUtilities.borrowKryo(conf); |
| try { |
| setPlanPath(conf, hiveScratchDir); |
| |
| Path planPath = getPlanPath(conf, name); |
| setHasWork(conf, name); |
| |
| OutputStream out = null; |
| |
| final long serializedSize; |
| final String planMode; |
| if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { |
| // add it to the conf |
| ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); |
| try { |
| out = new DeflaterOutputStream(byteOut, new Deflater(Deflater.BEST_SPEED)); |
| SerializationUtilities.serializePlan(kryo, w, out); |
| out.close(); |
| out = null; |
| } finally { |
| IOUtils.closeStream(out); |
| } |
| final String serializedPlan = Base64.getEncoder().encodeToString(byteOut.toByteArray()); |
| serializedSize = serializedPlan.length(); |
| planMode = "RPC"; |
| conf.set(planPath.toUri().getPath(), serializedPlan); |
| } else { |
| // use the default file system of the conf |
| FileSystem fs = planPath.getFileSystem(conf); |
| try { |
| out = fs.create(planPath); |
| SerializationUtilities.serializePlan(kryo, w, out); |
| out.close(); |
| out = null; |
| long fileLen = fs.getFileStatus(planPath).getLen(); |
| serializedSize = fileLen; |
| planMode = "FILE"; |
| } finally { |
| IOUtils.closeStream(out); |
| } |
| |
| // Serialize the plan to the default hdfs instance |
| // Except for hadoop local mode execution where we should be |
| // able to get the plan directly from the cache |
| if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) { |
| // Set up distributed cache |
| if (!DistributedCache.getSymlink(conf)) { |
| DistributedCache.createSymlink(conf); |
| } |
| String uriWithLink = planPath.toUri().toString() + "#" + name; |
| DistributedCache.addCacheFile(new URI(uriWithLink), conf); |
| |
| // set replication of the plan file to a high number. we use the same |
| // replication factor as used by the hadoop jobclient for job.xml etc. |
| short replication = (short) conf.getInt("mapred.submit.replication", 10); |
| fs.setReplication(planPath, replication); |
| } |
| } |
| |
| LOG.info("Serialized plan (via {}) - name: {} size: {}", planMode, w.getName(), |
| humanReadableByteCount(serializedSize)); |
| // Cache the plan in this process |
| gWorkMap.get(conf).put(planPath, w); |
| return planPath; |
| } catch (Exception e) { |
| String msg = "Error caching " + name; |
| LOG.error(msg, e); |
| throw new RuntimeException(msg, e); |
| } finally { |
| SerializationUtilities.releaseKryo(kryo); |
| } |
| } |
| |
| private static Path getPlanPath(Configuration conf, String name) { |
| Path planPath = getPlanPath(conf); |
| if (planPath == null) { |
| return null; |
| } |
| return new Path(planPath, name); |
| } |
| |
| private static void setPlanPath(Configuration conf, Path hiveScratchDir) throws IOException { |
| if (getPlanPath(conf) == null) { |
| // this is the unique conf ID, which is kept in JobConf as part of the plan file name |
| String jobID = UUID.randomUUID().toString(); |
| Path planPath = new Path(hiveScratchDir, jobID); |
| if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { |
| FileSystem fs = planPath.getFileSystem(conf); |
| // since we are doing RPC creating a directory is un-necessary |
| fs.mkdirs(planPath); |
| } |
| HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, planPath.toUri().toString()); |
| } |
| } |
| |
| public static Path getPlanPath(Configuration conf) { |
| String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN); |
| if (plan != null && !plan.isEmpty()) { |
| return new Path(plan); |
| } |
| return null; |
| } |
| |
| public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate { |
| @Override |
| protected Expression instantiate(Object oldInstance, Encoder out) { |
| return new Expression(oldInstance, oldInstance.getClass(), "new", null); |
| } |
| |
| @Override |
| protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) { |
| Iterator<?> ite = ((Collection<?>) oldInstance).iterator(); |
| while (ite.hasNext()) { |
| out.writeStatement(new Statement(oldInstance, "add", new Object[] {ite.next()})); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| public static TableDesc defaultTd; |
| static { |
| // by default we expect ^A separated strings |
| // This tableDesc does not provide column names. We should always use |
| // PlanUtils.getDefaultTableDesc(String separatorCode, String columns) |
| // or getBinarySortableTableDesc(List<FieldSchema> fieldSchemas) when |
| // we know the column names. |
| /** |
| * Generate the table descriptor of MetadataTypedColumnsetSerDe with the |
| * separatorCode. MetaDataTypedColumnsetSerDe is used because LazySimpleSerDe |
| * does not support a table with a single column "col" with type |
| * "array<string>". |
| */ |
| defaultTd = new TableDesc(TextInputFormat.class, IgnoreKeyTextOutputFormat.class, |
| Utilities.makeProperties(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT, |
| "" + Utilities.ctrlaCode, serdeConstants.SERIALIZATION_LIB, |
| MetadataTypedColumnsetSerDe.class.getName())); |
| } |
| |
| public static final int carriageReturnCode = 13; |
| public static final int newLineCode = 10; |
| public static final int tabCode = 9; |
| public static final int ctrlaCode = 1; |
| |
| public static final String INDENT = " "; |
| |
| // Note: When DDL supports specifying what string to represent null, |
| // we should specify "NULL" to represent null in the temp table, and then |
| // we can make the following translation deprecated. |
| public static final String nullStringStorage = "\\N"; |
| public static final String nullStringOutput = "NULL"; |
| |
| /** |
| * Gets the task id if we are running as a Hadoop job. Gets a random number otherwise. |
| */ |
| public static String getTaskId(Configuration hconf) { |
| String taskid = (hconf == null) ? null : hconf.get("mapred.task.id"); |
| if (StringUtils.isEmpty(taskid)) { |
| return (Integer |
| .toString(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))); |
| } else { |
| /* |
| * extract the task and attempt id from the hadoop taskid. in version 17 the leading component |
| * was 'task_'. thereafter the leading component is 'attempt_'. in 17 - hadoop also seems to |
| * have used _map_ and _reduce_ to denote map/reduce task types |
| */ |
| String ret = taskid.replaceAll(".*_[mr]_", "").replaceAll(".*_(map|reduce)_", ""); |
| return (ret); |
| } |
| } |
| |
| public static Properties makeProperties(String... olist) { |
| Properties ret = new Properties(); |
| for (int i = 0; i < olist.length; i += 2) { |
| ret.setProperty(olist[i], olist[i + 1]); |
| } |
| return (ret); |
| } |
| |
| public static ArrayList makeList(Object... olist) { |
| ArrayList ret = new ArrayList(); |
| for (Object element : olist) { |
| ret.add(element); |
| } |
| return (ret); |
| } |
| |
| public static TableDesc getTableDesc(Table tbl) { |
| Properties props = tbl.getMetadata(); |
| props.put(serdeConstants.SERIALIZATION_LIB, tbl.getDeserializer().getClass().getName()); |
| if (tbl.getMetaTable() != null) { |
| props.put("metaTable", tbl.getMetaTable()); |
| } |
| if (tbl.getBranchName() != null) { |
| props.put(BRANCH_NAME, tbl.getBranchName()); |
| } |
| return (new TableDesc(tbl.getInputFormatClass(), tbl |
| .getOutputFormatClass(), props)); |
| } |
| |
| // column names and column types are all delimited by comma |
| public static TableDesc getTableDesc(String cols, String colTypes) { |
| Properties properties = new Properties(); |
| properties.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode); |
| properties.put(serdeConstants.LIST_COLUMNS, cols); |
| properties.put(serdeConstants.LIST_COLUMN_TYPES, colTypes); |
| properties.put(serdeConstants.SERIALIZATION_LIB, LazySimpleSerDe.class.getName()); |
| properties.put(hive_metastoreConstants.TABLE_BUCKETING_VERSION, "-1"); |
| return (new TableDesc(SequenceFileInputFormat.class, |
| HiveSequenceFileOutputFormat.class, properties)); |
| } |
| |
| public static PartitionDesc getPartitionDesc(Partition part, TableDesc tableDesc) throws |
| HiveException { |
| return new PartitionDesc(part, tableDesc); |
| } |
| |
| public static PartitionDesc getPartitionDescFromTableDesc(TableDesc tblDesc, Partition part, |
| boolean usePartSchemaProperties) throws HiveException { |
| return new PartitionDesc(part, tblDesc, usePartSchemaProperties); |
| } |
| |
| private static boolean isWhitespace(int c) { |
| if (c == -1) { |
| return false; |
| } |
| return Character.isWhitespace((char) c); |
| } |
| |
| public static boolean contentsEqual(InputStream is1, InputStream is2, boolean ignoreWhitespace) |
| throws IOException { |
| try { |
| if ((is1 == is2) || (is1 == null && is2 == null)) { |
| return true; |
| } |
| |
| if (is1 == null || is2 == null) { |
| return false; |
| } |
| |
| while (true) { |
| int c1 = is1.read(); |
| while (ignoreWhitespace && isWhitespace(c1)) { |
| c1 = is1.read(); |
| } |
| int c2 = is2.read(); |
| while (ignoreWhitespace && isWhitespace(c2)) { |
| c2 = is2.read(); |
| } |
| if (c1 == -1 && c2 == -1) { |
| return true; |
| } |
| if (c1 != c2) { |
| break; |
| } |
| } |
| } catch (FileNotFoundException e) { |
| LOG.warn("Could not compare files. One or both cannot be found", e); |
| } |
| return false; |
| } |
| |
| /** |
| * convert "From src insert blah blah" to "From src insert ... blah" |
| */ |
| public static String abbreviate(String str, int max) { |
| str = str.trim(); |
| |
| int len = str.length(); |
| int suffixlength = 20; |
| |
| if (len <= max) { |
| return str; |
| } |
| |
| suffixlength = Math.min(suffixlength, (max - 3) / 2); |
| String rev = StringUtils.reverse(str); |
| |
| // get the last few words |
| String suffix = StringUtils.abbreviate(rev, suffixlength); |
| suffix = StringUtils.reverse(suffix); |
| |
| // first few .. |
| String prefix = StringUtils.abbreviate(str, max - suffix.length()); |
| |
| return prefix + suffix; |
| } |
| |
| public static final String NSTR = ""; |
| |
| /** |
| * StreamStatus. |
| * |
| */ |
| public static enum StreamStatus { |
| EOF, TERMINATED |
| } |
| |
| public static StreamStatus readColumn(DataInput in, OutputStream out) throws IOException { |
| |
| while (true) { |
| int b; |
| try { |
| b = in.readByte(); |
| } catch (EOFException e) { |
| return StreamStatus.EOF; |
| } |
| |
| if (b == Utilities.newLineCode) { |
| return StreamStatus.TERMINATED; |
| } |
| |
| out.write(b); |
| } |
| // Unreachable |
| } |
| |
| /** |
| * Convert an output stream to a compressed output stream based on codecs codecs in the Job |
| * Configuration. Caller specifies directly whether file is compressed or not |
| * |
| * @param jc |
| * Job Configuration |
| * @param out |
| * Output Stream to be converted into compressed output stream |
| * @param isCompressed |
| * whether the output stream needs to be compressed or not |
| * @return compressed output stream |
| */ |
| public static OutputStream createCompressedStream(JobConf jc, OutputStream out, |
| boolean isCompressed) throws IOException { |
| if (isCompressed) { |
| Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(jc, |
| DefaultCodec.class); |
| CompressionCodec codec = ReflectionUtil.newInstance(codecClass, jc); |
| return codec.createOutputStream(out); |
| } else { |
| return (out); |
| } |
| } |
| |
| /** |
| * Based on compression option, output format, and configured output codec - |
| * get extension for output file. Text files require an extension, whereas |
| * others, like sequence files, do not. |
| * <p> |
| * The property <code>hive.output.file.extension</code> is used to determine |
| * the extension - if set, it will override other logic for choosing an |
| * extension. |
| * |
| * @param jc |
| * Job Configuration |
| * @param isCompressed |
| * Whether the output file is compressed or not |
| * @param hiveOutputFormat |
| * The output format, used to detect if the format is text |
| * @return the required file extension (example: .gz) |
| */ |
| public static String getFileExtension(JobConf jc, boolean isCompressed, |
| HiveOutputFormat<?, ?> hiveOutputFormat) { |
| String extension = HiveConf.getVar(jc, HiveConf.ConfVars.OUTPUT_FILE_EXTENSION); |
| if (!StringUtils.isEmpty(extension)) { |
| return extension; |
| } |
| if ((hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) && isCompressed) { |
| Class<? extends CompressionCodec> codecClass = FileOutputFormat.getOutputCompressorClass(jc, |
| DefaultCodec.class); |
| CompressionCodec codec = ReflectionUtil.newInstance(codecClass, jc); |
| return codec.getDefaultExtension(); |
| } |
| return StringUtils.EMPTY; |
| } |
| |
| /** |
| * Create a sequencefile output stream based on job configuration Uses user supplied compression |
| * flag (rather than obtaining it from the Job Configuration). |
| * |
| * @param jc |
| * Job configuration |
| * @param fs |
| * File System to create file in |
| * @param file |
| * Path to be created |
| * @param keyClass |
| * Java Class for key |
| * @param valClass |
| * Java Class for value |
| * @return output stream over the created sequencefile |
| */ |
| public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs, Path file, |
| Class<?> keyClass, Class<?> valClass, boolean isCompressed, Progressable progressable) |
| throws IOException { |
| CompressionCodec codec = null; |
| CompressionType compressionType = CompressionType.NONE; |
| Class<? extends CompressionCodec> codecClass = null; |
| if (isCompressed) { |
| compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc); |
| codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); |
| codec = ReflectionUtil.newInstance(codecClass, jc); |
| } |
| return SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec, |
| progressable); |
| |
| } |
| |
| /** |
| * Create a RCFile output stream based on job configuration Uses user supplied compression flag |
| * (rather than obtaining it from the Job Configuration). |
| * |
| * @param jc |
| * Job configuration |
| * @param fs |
| * File System to create file in |
| * @param file |
| * Path to be created |
| * @return output stream over the created rcfile |
| */ |
| public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs, Path file, |
| boolean isCompressed, Progressable progressable) throws IOException { |
| CompressionCodec codec = null; |
| if (isCompressed) { |
| Class<?> codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); |
| codec = (CompressionCodec) ReflectionUtil.newInstance(codecClass, jc); |
| } |
| return new RCFile.Writer(fs, jc, file, progressable, codec); |
| } |
| |
| /** |
| * Shamelessly cloned from GenericOptionsParser. |
| */ |
| public static String realFile(String newFile, Configuration conf) throws IOException { |
| Path path = new Path(newFile); |
| URI pathURI = path.toUri(); |
| FileSystem fs; |
| |
| if (pathURI.getScheme() == null) { |
| fs = FileSystem.getLocal(conf); |
| } else { |
| fs = path.getFileSystem(conf); |
| } |
| |
| if (!fs.exists(path)) { |
| return null; |
| } |
| |
| String file = path.makeQualified(fs).toString(); |
| return file; |
| } |
| |
| public static List<String> mergeUniqElems(List<String> src, List<String> dest) { |
| if (dest == null) { |
| return src; |
| } |
| if (src == null) { |
| return dest; |
| } |
| int pos = 0; |
| |
| while (pos < dest.size()) { |
| if (!src.contains(dest.get(pos))) { |
| src.add(dest.get(pos)); |
| } |
| pos++; |
| } |
| |
| return src; |
| } |
| |
| private static final String tmpPrefix = "_tmp."; |
| private static final String taskTmpPrefix = "_task_tmp."; |
| |
| public static Path toTaskTempPath(Path orig) { |
| if (orig.getName().indexOf(taskTmpPrefix) == 0) { |
| return orig; |
| } |
| return new Path(orig.getParent(), taskTmpPrefix + orig.getName()); |
| } |
| |
| public static Path toTempPath(Path orig) { |
| if (orig.getName().indexOf(tmpPrefix) == 0) { |
| return orig; |
| } |
| return new Path(orig.getParent(), tmpPrefix + orig.getName()); |
| } |
| |
| /** |
| * Given a path, convert to a temporary path. |
| */ |
| public static Path toTempPath(String orig) { |
| return toTempPath(new Path(orig)); |
| } |
| |
| /** |
| * Detect if the supplied file is a temporary path. |
| */ |
| private static boolean isTempPath(FileStatus file) { |
| String name = file.getPath().getName(); |
| // in addition to detecting hive temporary files, we also check hadoop |
| // temporary folders that used to show up in older releases |
| return (name.startsWith("_task") || name.startsWith(tmpPrefix)); |
| } |
| |
| /** |
| * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an |
| * existing file with the same name, the new file's name will be appended with "_1", "_2", etc. |
| * |
| * @param fs |
| * the FileSystem where src and dst are on. |
| * @param src |
| * the src directory |
| * @param dst |
| * the target directory |
| * @throws IOException |
| */ |
| public static void rename(FileSystem fs, Path src, Path dst) throws IOException, HiveException { |
| if (!fs.rename(src, dst)) { |
| throw new HiveException("Unable to move: " + src + " to: " + dst); |
| } |
| } |
| |
| private static void moveFileOrDir(FileSystem fs, FileStatus file, Path dst) throws IOException, |
| HiveException { |
| Path srcFilePath = file.getPath(); |
| String fileName = srcFilePath.getName(); |
| Path dstFilePath = new Path(dst, fileName); |
| if (file.isDir()) { |
| renameOrMoveFiles(fs, srcFilePath, dstFilePath); |
| } else { |
| moveFile(fs, srcFilePath, dst, fileName); |
| } |
| } |
| |
| /** |
| * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an |
| * existing file with the same name, the new file's name will be generated based on the file name. |
| * If the file name confirms to hive managed file NNNNNN_Y(_copy_YY) then it will create NNNNN_Y_copy_XX |
| * else it will append _1, _2, .... |
| * @param fs |
| * the FileSystem where src and dst are on. |
| * @param srcFile |
| * the src file |
| * @param destDir |
| * the target directory |
| * @param destFileName |
| * the target filename |
| * @return The final path the file was moved to. |
| * @throws IOException |
| * @throws HiveException |
| */ |
| public static Path moveFile(FileSystem fs, Path srcFile, Path destDir, String destFileName) |
| throws IOException, HiveException { |
| Path dstFilePath = new Path(destDir, destFileName); |
| if (fs.exists(dstFilePath)) { |
| ParsedOutputFileName parsedFileName = ParsedOutputFileName.parse(destFileName); |
| int suffix = 0; |
| do { |
| suffix++; |
| if (parsedFileName.matches()) { |
| dstFilePath = new Path(destDir, parsedFileName.makeFilenameWithCopyIndex(suffix)); |
| } else { |
| dstFilePath = new Path(destDir, destFileName + "_" + suffix); |
| } |
| } while (fs.exists(dstFilePath)); |
| } |
| if (!fs.rename(srcFile, dstFilePath)) { |
| throw new HiveException("Unable to move: " + srcFile + " to: " + dstFilePath); |
| } |
| return dstFilePath; |
| } |
| |
| /** |
| * Rename src to dst, or in the case dst already exists, move files in src to dst. If there is an |
| * existing file with the same name, the new file's name will be generated based on the file name. |
| * If the file name confirms to hive managed file NNNNNN_Y(_copy_YY) then it will create NNNNN_Y_copy_XX |
| * else it will append _1, _2, .... |
| * |
| * @param fs |
| * the FileSystem where src and dst are on. |
| * @param src |
| * the src directory |
| * @param dst |
| * the target directory |
| * @throws IOException |
| */ |
| public static void renameOrMoveFiles(FileSystem fs, Path src, Path dst) throws IOException, |
| HiveException { |
| if (!fs.exists(dst)) { |
| if (!fs.rename(src, dst)) { |
| throw new HiveException("Unable to move: " + src + " to: " + dst); |
| } |
| } else { |
| // move file by file |
| FileStatus[] files = fs.listStatus(src); |
| for (FileStatus file : files) { |
| Utilities.moveFileOrDir(fs, file, dst); |
| } |
| } |
| } |
| |
| /** |
| * Rename src to dst, or in the case dst already exists, move files in src |
| * to dst. If there is an existing file with the same name, the new file's |
| * name will be appended with "_1", "_2", etc. Happens in parallel mode. |
| * |
| * @param conf |
| * |
| * @param fs |
| * the FileSystem where src and dst are on. |
| * @param src |
| * the src directory |
| * @param dst |
| * the target directory |
| * @throws IOException |
| */ |
| public static void renameOrMoveFilesInParallel(Configuration conf, |
| FileSystem fs, Path src, Path dst) throws IOException, HiveException { |
| if (!fs.exists(dst)) { |
| if (!fs.rename(src, dst)) { |
| throw new HiveException("Unable to move: " + src + " to: " + dst); |
| } |
| } else { |
| // move files in parallel |
| LOG.info("Moving files from {} to {}", src, dst); |
| final ExecutorService pool = createMoveThreadPool(conf); |
| List<Future<Void>> futures = new LinkedList<>(); |
| |
| final FileStatus[] files = fs.listStatus(src); |
| for (FileStatus file : files) { |
| futures.add(pool.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws HiveException { |
| try { |
| Utilities.moveFileOrDir(fs, file, dst); |
| } catch (Exception e) { |
| throw new HiveException(e); |
| } |
| return null; |
| } |
| })); |
| } |
| |
| shutdownAndCleanup(pool, futures); |
| LOG.info("Rename files from {} to {} is complete", src, dst); |
| } |
| } |
| |
| public static final String COPY_KEYWORD = "_copy_"; // copy keyword |
| |
| /** |
| * This breaks a prefixed bucket number into the prefix and the taskID |
| */ |
| private static final Pattern PREFIXED_TASK_ID_REGEX = |
| Pattern.compile("^(.*?\\(.*\\))?([0-9]+)$"); |
| |
| /** |
| * This breaks a prefixed bucket number out into a single integer |
| */ |
| private static final Pattern PREFIXED_BUCKET_ID_REGEX = |
| Pattern.compile("^(0*([0-9]+))_([0-9]+).*"); |
| /** |
| * Get the task id from the filename. It is assumed that the filename is derived from the output |
| * of getTaskId |
| * |
| * @param filename |
| * filename to extract taskid from |
| */ |
| public static String getTaskIdFromFilename(String filename) { |
| return getIdFromFilename(filename, false, false); |
| } |
| |
| /** |
| * Get the part-spec + task id from the filename. It is assumed that the filename is derived |
| * from the output of getTaskId |
| * |
| * @param filename |
| * filename to extract taskid from |
| */ |
| private static String getPrefixedTaskIdFromFilename(String filename) { |
| return getIdFromFilename(filename, true, false); |
| } |
| |
| private static int getAttemptIdFromFilename(String filename) { |
| return Integer.parseInt(getIdFromFilename(filename, true, true)); |
| } |
| |
| private static String getIdFromFilename(String filepath, boolean isPrefixed, boolean isTaskAttempt) { |
| String filename = filepath; |
| int dirEnd = filepath.lastIndexOf(Path.SEPARATOR); |
| if (dirEnd != -1) { |
| filename = filepath.substring(dirEnd + 1); |
| } |
| |
| ParsedOutputFileName parsedOutputFileName = ParsedOutputFileName.parse(filename); |
| String taskId; |
| if (parsedOutputFileName.matches()) { |
| if (isTaskAttempt) { |
| taskId = parsedOutputFileName.getAttemptId(); |
| } else { |
| taskId = isPrefixed ? parsedOutputFileName.getPrefixedTaskId() : parsedOutputFileName.getTaskId(); |
| } |
| } else { |
| taskId = filename; |
| LOG.warn("Unable to get task id from file name: {}. Using last component {}" |
| + " as task id.", filepath, taskId); |
| } |
| if (isTaskAttempt) { |
| LOG.debug("TaskAttemptId for {} = {}", filepath, taskId); |
| } else { |
| LOG.debug("TaskId for {} = {}", filepath, taskId); |
| } |
| return taskId; |
| } |
| |
| /** |
| * Replace the task id from the filename. It is assumed that the filename is derived from the |
| * output of getTaskId |
| * |
| * @param filename |
| * filename to replace taskid "0_0" or "0_0.gz" by 33 to "33_0" or "33_0.gz" |
| */ |
| public static String replaceTaskIdFromFilename(String filename, int bucketNum) { |
| return replaceTaskIdFromFilename(filename, String.valueOf(bucketNum)); |
| } |
| |
| public static String replaceTaskIdFromFilename(String filename, String fileId) { |
| String taskId = getTaskIdFromFilename(filename); |
| String newTaskId = replaceTaskId(taskId, fileId); |
| String ret = replaceTaskIdFromFilename(filename, taskId, newTaskId); |
| return (ret); |
| } |
| |
| /** |
| * Replace taskId with input bucketNum. For example, if taskId is 000000 and bucketNum is 1, |
| * return should be 000001; if taskId is (ds%3D1)000000 and bucketNum is 1, return should be |
| * (ds%3D1)000001. This method is different from the replaceTaskId(String, String) method. |
| * In this method, the pattern is in taskId. |
| * @param taskId |
| * @param bucketNum |
| * @return |
| */ |
| public static String replaceTaskId(String taskId, int bucketNum) { |
| String bucketNumStr = String.valueOf(bucketNum); |
| Matcher m = PREFIXED_TASK_ID_REGEX.matcher(taskId); |
| if (!m.matches()) { |
| LOG.warn("Unable to determine bucket number from task id: {}. Using " + |
| "task ID as bucket number.", taskId); |
| return adjustBucketNumLen(bucketNumStr, taskId); |
| } else { |
| String adjustedBucketNum = adjustBucketNumLen(bucketNumStr, m.group(2)); |
| return (m.group(1) == null ? StringUtils.EMPTY : m.group(1)) + adjustedBucketNum; |
| } |
| } |
| |
| /** |
| * Returns strBucketNum with enough 0's prefixing the task ID portion of the String to make it |
| * equal in length to taskId |
| * |
| * @param taskId - the taskId used as a template for length |
| * @param strBucketNum - the bucket number of the output, may or may not be prefixed |
| * @return |
| */ |
| private static String replaceTaskId(String taskId, String strBucketNum) { |
| Matcher m = PREFIXED_TASK_ID_REGEX.matcher(strBucketNum); |
| if (!m.matches()) { |
| LOG.warn("Unable to determine bucket number from file ID: {}. Using " + |
| "file ID as bucket number.", strBucketNum); |
| return adjustBucketNumLen(strBucketNum, taskId); |
| } else { |
| String adjustedBucketNum = adjustBucketNumLen(m.group(2), taskId); |
| return (m.group(1) == null ? StringUtils.EMPTY : m.group(1)) + adjustedBucketNum; |
| } |
| } |
| |
| /** |
| * Adds 0's to the beginning of bucketNum until bucketNum and taskId are the same length. |
| * |
| * @param bucketNum - the bucket number, should not be prefixed |
| * @param taskId - the taskId used as a template for length |
| * @return |
| */ |
| private static String adjustBucketNumLen(String bucketNum, String taskId) { |
| int bucketNumLen = bucketNum.length(); |
| int taskIdLen = taskId.length(); |
| StringBuilder s = new StringBuilder(); |
| for (int i = 0; i < taskIdLen - bucketNumLen; i++) { |
| s.append('0'); |
| } |
| s.append(bucketNum); |
| return s.toString(); |
| } |
| |
| /** |
| * Replace the oldTaskId appearing in the filename by the newTaskId. The string oldTaskId could |
| * appear multiple times, we should only replace the last one. |
| * |
| * @param filename |
| * @param oldTaskId |
| * @param newTaskId |
| * @return |
| */ |
| private static String replaceTaskIdFromFilename(String filename, String oldTaskId, |
| String newTaskId) { |
| |
| String[] spl = filename.split(oldTaskId); |
| |
| if ((spl.length == 0) || (spl.length == 1)) { |
| return filename.replaceAll(oldTaskId, newTaskId); |
| } |
| |
| StringBuilder snew = new StringBuilder(); |
| for (int idx = 0; idx < spl.length - 1; idx++) { |
| if (idx > 0) { |
| snew.append(oldTaskId); |
| } |
| snew.append(spl[idx]); |
| } |
| snew.append(newTaskId); |
| snew.append(spl[spl.length - 1]); |
| return snew.toString(); |
| } |
| |
| |
| private static boolean shouldAvoidRename(FileSinkDesc conf, Configuration hConf) { |
| // we are avoiding rename/move only if following conditions are met |
| // * execution engine is tez |
| // * if it is select query |
| if (conf != null && conf.getIsQuery() && conf.getFilesToFetch() != null |
| && HiveConf.getVar(hConf, ConfVars.HIVE_EXECUTION_ENGINE).equalsIgnoreCase("tez")){ |
| return true; |
| } |
| return false; |
| } |
| /** |
| * returns null if path is not exist |
| */ |
| public static FileStatus[] listStatusIfExists(Path path, FileSystem fs) throws IOException { |
| try { |
| return fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); |
| } catch (FileNotFoundException e) { |
| // FS in hadoop 2.0 throws FNF instead of returning null |
| return null; |
| } |
| } |
| |
| public static void mvFileToFinalPath(Path specPath, String unionSuffix, Configuration hconf, |
| boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf, |
| Reporter reporter) throws IOException, |
| HiveException { |
| |
| // There are following two paths this could could take based on the value of shouldAvoidRename |
| // shouldAvoidRename indicate if tmpPath should be renamed/moved or now. |
| // if false: |
| // Skip renaming/moving the tmpPath |
| // Deduplicate and keep a list of files |
| // Pass on the list of files to conf (to be used later by fetch operator) |
| // if true: |
| // 1) Rename tmpPath to a new directory name to prevent additional files |
| // from being added by runaway processes. |
| // 2) Remove duplicates from the temp directory |
| // 3) Rename/move the temp directory to specPath |
| |
| FileSystem fs = specPath.getFileSystem(hconf); |
| Path tmpPath = Utilities.toTempPath(specPath); |
| Path taskTmpPath = Utilities.toTaskTempPath(specPath); |
| PerfLogger perfLogger = SessionState.getPerfLogger(); |
| boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); |
| boolean avoidRename = false; |
| boolean shouldAvoidRename = shouldAvoidRename(conf, hconf); |
| |
| if(isBlobStorage && (shouldAvoidRename|| ((conf != null) && conf.isCTASorCM())) |
| || (!isBlobStorage && shouldAvoidRename)) { |
| avoidRename = true; |
| } |
| if (success) { |
| if (!avoidRename && fs.exists(tmpPath)) { |
| // 1) Rename tmpPath to a new directory name to prevent additional files |
| // from being added by runaway processes. |
| // this is only done for all statements except SELECT, CTAS and Create MV |
| Path tmpPathOriginal = tmpPath; |
| tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved"); |
| LOG.debug("shouldAvoidRename is false therefore moving/renaming " + tmpPathOriginal + " to " + tmpPath); |
| perfLogger.perfLogBegin("FileSinkOperator", "rename"); |
| Utilities.rename(fs, tmpPathOriginal, tmpPath); |
| perfLogger.perfLogEnd("FileSinkOperator", "rename"); |
| } |
| |
| // Remove duplicates from tmpPath |
| List<FileStatus> statusList = HiveStatsUtils.getFileStatusRecurse( |
| tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); |
| FileStatus[] statuses = statusList.toArray(new FileStatus[statusList.size()]); |
| if(statuses != null && statuses.length > 0) { |
| Set<FileStatus> filesKept = new HashSet<>(); |
| perfLogger.perfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); |
| // remove any tmp file or double-committed output files |
| int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), |
| numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; |
| List<Path> emptyBuckets = removeTempOrDuplicateFiles( |
| fs, statuses, unionSuffix, dpLevels, numBuckets, hconf, null, 0, false, filesKept, false); |
| perfLogger.perfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles"); |
| // create empty buckets if necessary |
| if (!emptyBuckets.isEmpty()) { |
| perfLogger.perfLogBegin("FileSinkOperator", "CreateEmptyBuckets"); |
| createEmptyBuckets( |
| hconf, emptyBuckets, conf.getCompressed(), conf.getTableInfo(), reporter); |
| for(Path p:emptyBuckets) { |
| FileStatus[] items = fs.listStatus(p); |
| filesKept.addAll(Arrays.asList(items)); |
| } |
| perfLogger.perfLogEnd("FileSinkOperator", "CreateEmptyBuckets"); |
| } |
| |
| // move to the file destination |
| Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath); |
| if(shouldAvoidRename(conf, hconf)){ |
| // for SELECT statements |
| LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); |
| conf.getFilesToFetch().addAll(filesKept); |
| } else if (conf !=null && conf.isCTASorCM() && isBlobStorage) { |
| // for CTAS or Create MV statements |
| perfLogger.perfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); |
| LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString()); |
| if (conf.getTable() != null && conf.getTable().getTableType().equals(TableType.EXTERNAL_TABLE)) { |
| // Do this optimisation only for External tables. |
| createFileList(filesKept, tmpPath, specPath, fs); |
| } else { |
| Set<String> filesKeptPaths = filesKept.stream().map(x -> x.getPath().toString()).collect(Collectors.toSet()); |
| moveSpecifiedFilesInParallel(hconf, fs, tmpPath, specPath, filesKeptPaths); |
| } |
| perfLogger.perfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); |
| } else { |
| // for rest of the statement e.g. INSERT, LOAD etc |
| perfLogger.perfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); |
| LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath); |
| renameOrMoveFilesInParallel(hconf, fs, tmpPath, specPath); |
| perfLogger.perfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); |
| } |
| } |
| } else { |
| Utilities.FILE_OP_LOGGER.trace("deleting tmpPath {}", tmpPath); |
| fs.delete(tmpPath, true); |
| } |
| Utilities.FILE_OP_LOGGER.trace("deleting taskTmpPath {}", taskTmpPath); |
| fs.delete(taskTmpPath, true); |
| } |
| |
| private static void createFileList(Set<FileStatus> filesKept, Path srcPath, Path targetPath, FileSystem fs) |
| throws IOException { |
| try (FSDataOutputStream outStream = fs.create(new Path(targetPath, BLOB_MANIFEST_FILE))) { |
| // Adding the first entry in the manifest file as the source path, the entries post that are the files to be |
| // copied. |
| outStream.writeBytes(srcPath.toString() + System.lineSeparator()); |
| for (FileStatus file : filesKept) { |
| outStream.writeBytes(file.getPath().toString() + System.lineSeparator()); |
| } |
| } |
| LOG.debug("Created path list at path: {}", new Path(targetPath, BLOB_MANIFEST_FILE)); |
| } |
| |
| /** |
| * move specified files to destination in parallel mode. |
| * Spins up multiple threads, schedules transfer and shuts down the pool. |
| * |
| * @param conf |
| * @param fs |
| * @param srcPath |
| * @param destPath |
| * @param filesToMove |
| * @throws HiveException |
| * @throws IOException |
| */ |
| public static void moveSpecifiedFilesInParallel(Configuration conf, FileSystem fs, |
| Path srcPath, Path destPath, Set<String> filesToMove) |
| throws HiveException, IOException { |
| |
| LOG.info("rename {} files from {} to dest {}", |
| filesToMove.size(), srcPath, destPath); |
| PerfLogger perfLogger = SessionState.getPerfLogger(); |
| perfLogger.perfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); |
| |
| final ExecutorService pool = createMoveThreadPool(conf); |
| |
| List<Future<Void>> futures = new LinkedList<>(); |
| moveSpecifiedFilesInParallel(fs, srcPath, destPath, filesToMove, futures, pool); |
| |
| shutdownAndCleanup(pool, futures); |
| LOG.info("Completed rename from {} to {}", srcPath, destPath); |
| |
| perfLogger.perfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); |
| } |
| |
| /** |
| * Moves files from src to dst if it is within the specified set of paths |
| * @param fs |
| * @param src |
| * @param dst |
| * @param filesToMove |
| * @param futures List of futures |
| * @param pool thread pool |
| * @throws IOException |
| */ |
| private static void moveSpecifiedFilesInParallel(FileSystem fs, |
| Path src, Path dst, Set<String> filesToMove, List<Future<Void>> futures, |
| ExecutorService pool) throws IOException { |
| if (!fs.exists(dst)) { |
| LOG.info("Creating {}", dst); |
| fs.mkdirs(dst); |
| } |
| |
| FileStatus[] files = fs.listStatus(src); |
| for (FileStatus fileStatus : files) { |
| if (filesToMove.contains(fileStatus.getPath().toString())) { |
| futures.add(pool.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws HiveException { |
| try { |
| LOG.debug("Moving from {} to {} ", fileStatus.getPath(), dst); |
| Utilities.moveFileOrDir(fs, fileStatus, dst); |
| } catch (Exception e) { |
| throw new HiveException(e); |
| } |
| return null; |
| } |
| })); |
| } else if (fileStatus.isDir()) { |
| // Traverse directory contents. |
| // Directory nesting for dst needs to match src. |
| Path nestedDstPath = new Path(dst, fileStatus.getPath().getName()); |
| moveSpecifiedFilesInParallel(fs, fileStatus.getPath(), nestedDstPath, |
| filesToMove, futures, pool); |
| } |
| } |
| } |
| |
| private static ExecutorService createMoveThreadPool(Configuration conf) { |
| int threads = Math.max(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15), 1); |
| return Executors.newFixedThreadPool(threads, |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()); |
| } |
| |
| private static void shutdownAndCleanup(ExecutorService pool, |
| List<Future<Void>> futures) throws HiveException { |
| if (pool == null) { |
| return; |
| } |
| pool.shutdown(); |
| |
| futures = (futures != null) ? futures : Collections.emptyList(); |
| for (Future<Void> future : futures) { |
| try { |
| future.get(); |
| } catch (InterruptedException | ExecutionException e) { |
| LOG.error("Error in moving files to destination", e); |
| cancelTasks(futures); |
| throw new HiveException(e); |
| } |
| } |
| } |
| |
| /** |
| * cancel all futures. |
| * |
| * @param futureList |
| */ |
| private static void cancelTasks(List<Future<Void>> futureList) { |
| for (Future future : futureList) { |
| future.cancel(true); |
| } |
| } |
| |
| |
| |
| /** |
| * Check the existence of buckets according to bucket specification. Create empty buckets if |
| * needed. |
| * |
| * @param hconf The definition of the FileSink. |
| * @param paths A list of empty buckets to create |
| * @param reporter The mapreduce reporter object |
| * @throws HiveException |
| * @throws IOException |
| */ |
| static void createEmptyBuckets(Configuration hconf, List<Path> paths, |
| boolean isCompressed, TableDesc tableInfo, Reporter reporter) |
| throws HiveException, IOException { |
| |
| JobConf jc; |
| if (hconf instanceof JobConf) { |
| jc = new JobConf(hconf); |
| } else { |
| // test code path |
| jc = new JobConf(hconf); |
| } |
| HiveOutputFormat<?, ?> hiveOutputFormat = null; |
| Class<? extends Writable> outputClass = null; |
| try { |
| AbstractSerDe serde = tableInfo.getSerDeClass().newInstance(); |
| serde.initialize(hconf, tableInfo.getProperties(), null); |
| outputClass = serde.getSerializedClass(); |
| hiveOutputFormat = HiveFileFormatUtils.getHiveOutputFormat(hconf, tableInfo); |
| } catch (SerDeException e) { |
| throw new HiveException(e); |
| } catch (InstantiationException e) { |
| throw new HiveException(e); |
| } catch (IllegalAccessException e) { |
| throw new HiveException(e); |
| } |
| |
| for (Path path : paths) { |
| Utilities.FILE_OP_LOGGER.trace("creating empty bucket for {}", path); |
| RecordWriter writer = hiveOutputFormat.getHiveRecordWriter(jc, path, outputClass, isCompressed, |
| tableInfo.getProperties(), reporter); |
| writer.close(false); |
| LOG.info("created empty bucket for enforcing bucketing at {}", path); |
| } |
| } |
| |
| private static void addFilesToPathSet(Collection<FileStatus> files, Set<FileStatus> fileSet) { |
| for (FileStatus file : files) { |
| fileSet.add(file); |
| } |
| } |
| |
| /** |
| * Remove all temporary files and duplicate (double-committed) files from a given directory. |
| */ |
| public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, Configuration hconf, boolean isBaseDir) |
| throws IOException { |
| removeTempOrDuplicateFiles(fs, path, null, null, hconf, isBaseDir); |
| } |
| |
| public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, Path path, |
| DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { |
| if (path == null) { |
| return null; |
| } |
| List<FileStatus> statusList = HiveStatsUtils.getFileStatusRecurse(path, |
| ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); |
| FileStatus[] stats = statusList.toArray(new FileStatus[statusList.size()]); |
| return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf, isBaseDir); |
| } |
| |
| private static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, |
| DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException { |
| return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null, isBaseDir); |
| } |
| |
| /** |
| * Remove all temporary files and duplicate (double-committed) files from a given directory. |
| * |
| * @return a list of path names corresponding to should-be-created empty buckets. |
| */ |
| private static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, |
| DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set<FileStatus> filesKept, boolean isBaseDir) |
| throws IOException { |
| int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(), |
| numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0; |
| return removeTempOrDuplicateFiles( |
| fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir); |
| } |
| |
| private static FileStatus[] removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException { |
| // listStatus is not required to be called to check if we need to delete the directory or not, |
| // delete does that internally. We are getting the file list as it is used by the caller. |
| FileStatus[] items = fs.listStatus(path); |
| |
| // Remove empty directory since DP insert should not generate empty partitions. |
| // Empty directories could be generated by crashed Task/ScriptOperator. |
| if (items.length == 0) { |
| // delete() returns false in only two conditions |
| // 1. Tried to delete root |
| // 2. The file wasn't actually there (or deleted by some other thread) |
| // So return value is not checked for delete. |
| fs.delete(path, true); |
| } |
| return items; |
| } |
| |
| // Returns the list of non empty sub-directories, deletes the empty sub sub-directories. |
| private static Map<Path, FileStatus[]> getNonEmptySubDirs(FileSystem fs, Configuration hConf, FileStatus[] parts) |
| throws IOException { |
| int threadCount = hConf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15); |
| final ExecutorService pool = (threadCount <= 0 ? null : |
| Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder().setDaemon(true).setNameFormat( |
| "Remove-Temp-%d").build())); |
| Map<Path, FileStatus[]> partStatusMap = new ConcurrentHashMap<>(); |
| List<Future<Void>> futures = new LinkedList<>(); |
| |
| for (FileStatus part : parts) { |
| Path path = part.getPath(); |
| if (pool != null) { |
| futures.add(pool.submit(() -> { |
| FileStatus[] items = removeEmptyDpDirectory(fs, path); |
| partStatusMap.put(path, items); |
| return null; |
| })); |
| } else { |
| partStatusMap.put(path, removeEmptyDpDirectory(fs, path)); |
| } |
| } |
| |
| if (null != pool) { |
| pool.shutdown(); |
| try { |
| for (Future<Void> future : futures) { |
| future.get(); |
| } |
| } catch (InterruptedException | ExecutionException e) { |
| LOG.error("Exception in getting dir status", e); |
| for (Future<Void> future : futures) { |
| future.cancel(true); |
| } |
| throw new IOException(e); |
| } |
| } |
| |
| // Dump of its metrics |
| LOG.debug("FS {}", fs); |
| |
| return partStatusMap; |
| } |
| |
| public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, |
| String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long writeId, |
| int stmtId, boolean isMmTable, Set<FileStatus> filesKept, boolean isBaseDir) throws IOException { |
| if (fileStats == null) { |
| return null; |
| } |
| List<Path> result = new ArrayList<Path>(); |
| HashMap<String, FileStatus> taskIDToFile = null; |
| if (dpLevels > 0) { |
| Map<Path, FileStatus[]> partStatusMap = getNonEmptySubDirs(fs, hconf, fileStats); |
| for (int i = 0; i < fileStats.length; ++i) { |
| Path path = fileStats[i].getPath(); |
| assert fileStats[i].isDirectory() : "dynamic partition " + path + " is not a directory"; |
| FileStatus[] items = partStatusMap.get(path); |
| if (items.length == 0) { |
| fileStats[i] = null; |
| continue; |
| } |
| |
| if (isMmTable) { |
| if (!path.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { |
| throw new IOException("Unexpected non-MM directory name " + path); |
| } |
| } |
| |
| Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in directory {}", path); |
| if (!StringUtils.isEmpty(unionSuffix)) { |
| try { |
| items = fs.listStatus(new Path(path, unionSuffix)); |
| } catch (FileNotFoundException e) { |
| continue; |
| } |
| } |
| |
| taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs, hconf); |
| if (filesKept != null && taskIDToFile != null) { |
| addFilesToPathSet(taskIDToFile.values(), filesKept); |
| } |
| |
| addBucketFileToResults(taskIDToFile, numBuckets, hconf, result); |
| } |
| } else if (isMmTable && !StringUtils.isEmpty(unionSuffix)) { |
| if (fileStats.length == 0) { |
| return result; |
| } |
| Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); |
| taskIDToFile = removeTempOrDuplicateFilesNonMm( |
| fs.listStatus(new Path(mmDir, unionSuffix)), fs, hconf); |
| if (filesKept != null && taskIDToFile != null) { |
| addFilesToPathSet(taskIDToFile.values(), filesKept); |
| } |
| addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result); |
| } else { |
| if (fileStats.length == 0) { |
| return result; |
| } |
| if (!isMmTable) { |
| taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs, hconf); |
| if (filesKept != null && taskIDToFile != null) { |
| addFilesToPathSet(taskIDToFile.values(), filesKept); |
| } |
| } else { |
| Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); |
| taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs, hconf); |
| if (filesKept != null && taskIDToFile != null) { |
| addFilesToPathSet(taskIDToFile.values(), filesKept); |
| } |
| } |
| addBucketFileToResults2(taskIDToFile, numBuckets, hconf, result); |
| } |
| |
| return result; |
| } |
| |
| private static Path extractNonDpMmDir(Long writeId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { |
| if (items.length > 1) { |
| throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); |
| } |
| Path mmDir = items[0].getPath(); |
| if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { |
| throw new IOException("Unexpected non-MM directory " + mmDir); |
| } |
| Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); |
| return mmDir; |
| } |
| |
| |
| // TODO: not clear why two if conditions are different. Preserve the existing logic for now. |
| private static void addBucketFileToResults2(HashMap<String, FileStatus> taskIDToFile, |
| int numBuckets, Configuration hconf, List<Path> result) { |
| if (MapUtils.isNotEmpty(taskIDToFile) && (numBuckets > taskIDToFile.size()) |
| && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { |
| addBucketsToResultsCommon(taskIDToFile, numBuckets, result); |
| } |
| } |
| |
| // TODO: not clear why two if conditions are different. Preserve the existing logic for now. |
| private static void addBucketFileToResults(HashMap<String, FileStatus> taskIDToFile, |
| int numBuckets, Configuration hconf, List<Path> result) { |
| // if the table is bucketed and enforce bucketing, we should check and generate all buckets |
| if (numBuckets > 0 && taskIDToFile != null |
| && !"tez".equalsIgnoreCase(hconf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname))) { |
| addBucketsToResultsCommon(taskIDToFile, numBuckets, result); |
| } |
| } |
| |
| private static void addBucketsToResultsCommon( |
| HashMap<String, FileStatus> taskIDToFile, int numBuckets, List<Path> result) { |
| String taskID1 = taskIDToFile.keySet().iterator().next(); |
| Path bucketPath = taskIDToFile.values().iterator().next().getPath(); |
| for (int j = 0; j < numBuckets; ++j) { |
| addBucketFileIfMissing(result, taskIDToFile, taskID1, bucketPath, j); |
| } |
| } |
| |
| private static void addBucketFileIfMissing(List<Path> result, |
| HashMap<String, FileStatus> taskIDToFile, String taskID1, Path bucketPath, int j) { |
| String taskID2 = replaceTaskId(taskID1, j); |
| if (!taskIDToFile.containsKey(taskID2)) { |
| // create empty bucket, file name should be derived from taskID2 |
| URI bucketUri = bucketPath.toUri(); |
| String path2 = replaceTaskIdFromFilename(bucketUri.getPath().toString(), j); |
| Utilities.FILE_OP_LOGGER.trace("Creating an empty bucket file {}", path2); |
| result.add(new Path(bucketUri.getScheme(), bucketUri.getAuthority(), path2)); |
| } |
| } |
| |
| private static HashMap<String, FileStatus> removeTempOrDuplicateFilesNonMm( |
| FileStatus[] files, FileSystem fs, Configuration conf) throws IOException { |
| if (files == null || fs == null) { |
| return null; |
| } |
| HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>(); |
| |
| // This method currently does not support speculative execution due to |
| // compareTempOrDuplicateFiles not being able to de-duplicate speculative |
| // execution created files |
| if (isSpeculativeExecution(conf)) { |
| String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); |
| throw new IOException("Speculative execution is not supported for engine " + engine); |
| } |
| |
| for (FileStatus one : files) { |
| if (isTempPath(one)) { |
| Path onePath = one.getPath(); |
| Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles deleting {}", onePath); |
| if (!fs.delete(onePath, true)) { |
| // If file is already deleted by some other task, just ignore the failure with a warning. |
| LOG.warn("Unable to delete tmp file: " + onePath); |
| } |
| } else { |
| // This would be a single file. See if we need to remove it. |
| ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile, conf); |
| } |
| } |
| return taskIdToFile; |
| } |
| |
| private static void ponderRemovingTempOrDuplicateFile(FileSystem fs, |
| FileStatus file, HashMap<String, FileStatus> taskIdToFile, Configuration conf) |
| throws IOException { |
| Path filePath = file.getPath(); |
| String taskId = getPrefixedTaskIdFromFilename(filePath.getName()); |
| Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles looking at {}" |
| + ", taskId {}", filePath, taskId); |
| FileStatus otherFile = taskIdToFile.get(taskId); |
| taskIdToFile.put(taskId, (otherFile == null) ? file : |
| compareTempOrDuplicateFiles(fs, file, otherFile, conf)); |
| } |
| |
| private static boolean warnIfSet(Configuration conf, String value) { |
| if (conf.getBoolean(value, false)) { |
| LOG.warn(value + " support is currently deprecated"); |
| return true; |
| } |
| return false; |
| } |
| |
| private static boolean isSpeculativeExecution(Configuration conf) { |
| String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); |
| boolean isSpeculative = false; |
| if ("mr".equalsIgnoreCase(engine)) { |
| isSpeculative = warnIfSet(conf, "mapreduce.map.speculative") || |
| warnIfSet(conf, "mapreduce.reduce.speculative") || |
| warnIfSet(conf, "mapred.map.tasks.speculative.execution") || |
| warnIfSet(conf, "mapred.reduce.tasks.speculative.execution"); |
| } else if ("tez".equalsIgnoreCase(engine)) { |
| isSpeculative = warnIfSet(conf, "tez.am.speculation.enabled"); |
| } // all other engines do not support speculative execution |
| |
| return isSpeculative; |
| } |
| |
| private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, |
| FileStatus file, FileStatus existingFile, Configuration conf) throws IOException { |
| // Pick the one with newest attempt ID. Previously, this function threw an |
| // exception when the file size of the newer attempt was less than the |
| // older attempt. This was an incorrect assumption due to various |
| // techniques like file compression and no guarantee that the new task will |
| // write values in the same order. |
| FileStatus toDelete = null, toRetain = null; |
| |
| // This method currently does not support speculative execution |
| if (isSpeculativeExecution(conf)) { |
| String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); |
| throw new IOException("Speculative execution is not supported for engine " + engine); |
| } |
| |
| // "LOAD .. INTO" and "INSERT INTO" commands will generate files with |
| // "_copy_x" suffix. These files are usually read by map tasks and the |
| // task output gets written to some tmp path. The output file names will |
| // be of format taskId_attemptId. The usual path for all these tasks is |
| // srcPath -> taskTmpPath -> tmpPath -> finalPath. |
| // But, MergeFileTask can move files directly from src path to final path |
| // without copying it to tmp path. In such cases, different files with |
| // "_copy_x" suffix will be identified as duplicates (change in value |
| // of x is wrongly identified as attempt id) and will be deleted. |
| // To avoid that we will ignore files with "_copy_x" suffix from duplicate |
| // elimination. |
| Path filePath = file.getPath(); |
| if (isCopyFile(filePath.getName())) { |
| LOG.info("{} file identified as duplicate. This file is" |
| + " not deleted as it has copySuffix.", filePath); |
| return existingFile; |
| } |
| |
| int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName()); |
| int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName()); |
| // Files may come in any order irrespective of their attempt IDs |
| if (existingFileAttemptId > fileAttemptId) { |
| // keep existing |
| toRetain = existingFile; |
| toDelete = file; |
| } else if (existingFileAttemptId < fileAttemptId) { |
| // keep file |
| toRetain = file; |
| toDelete = existingFile; |
| } else { |
| throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as " |
| + existingFile.getPath()); |
| } |
| |
| if (!fs.delete(toDelete.getPath(), true)) { |
| throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() |
| + ". Existing file: " + toRetain.getPath()); |
| } |
| |
| LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " |
| + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " |
| + toRetain.getLen()); |
| return toRetain; |
| } |
| |
| public static boolean isCopyFile(String filepath) { |
| String filename = filepath; |
| int dirEnd = filepath.lastIndexOf(Path.SEPARATOR); |
| if (dirEnd != -1) { |
| filename = filepath.substring(dirEnd + 1); |
| } |
| ParsedOutputFileName parsedFileName = ParsedOutputFileName.parse(filename); |
| if (!parsedFileName.matches()) { |
| LOG.warn("Unable to verify if file name {} has _copy_ suffix.", filepath); |
| } |
| return parsedFileName.isCopyFile(); |
| } |
| |
| public static String getBucketFileNameFromPathSubString(String bucketName) { |
| try { |
| return bucketName.split(COPY_KEYWORD)[0]; |
| } catch (Exception e) { |
| LOG.warn("Invalid bucket file name", e); |
| return bucketName; |
| } |
| } |
| |
| /* compute bucket id from from Split */ |
| public static int parseSplitBucket(InputSplit split) { |
| if (split instanceof FileSplit) { |
| return getBucketIdFromFile(((FileSplit) split).getPath().getName()); |
| } |
| // cannot get this for combined splits |
| return -1; |
| } |
| |
| public static int getBucketIdFromFile(String bucketName) { |
| Matcher m = PREFIXED_BUCKET_ID_REGEX.matcher(bucketName); |
| if (m.matches()) { |
| if (m.group(2).isEmpty()) { |
| // all zeros |
| return m.group(1).isEmpty() ? -1 : 0; |
| } |
| return Integer.parseInt(m.group(2)); |
| } |
| // Check to see if the bucketName matches the pattern "bucket_([0-9]+).*" |
| // This can happen in ACID cases when we have splits on delta files, where the filenames |
| // are of the form delta_x_y/bucket_a. |
| if (bucketName.startsWith(AcidUtils.BUCKET_PREFIX)) { |
| m = AcidUtils.BUCKET_PATTERN.matcher(bucketName); |
| if (m.find()) { |
| return Integer.parseInt(m.group(1)); |
| } |
| // Note that legacy bucket digit pattern are being ignored here. |
| } |
| return -1; |
| } |
| |
| public static String getNameMessage(Throwable e) { |
| return e.getClass().getName() + "(" + e.getMessage() + ")"; |
| } |
| |
| public static String getResourceFiles(Configuration conf, SessionState.ResourceType t) { |
| // fill in local files (includes copy of HDFS files) to be added to the task environment |
| SessionState ss = SessionState.get(); |
| Set<String> files = (ss == null) ? null : ss.list_resource(t, null); |
| return validateFiles(conf, files); |
| } |
| |
| public static String getHdfsResourceFiles(Configuration conf, SessionState.ResourceType type) { |
| // fill in HDFS files to be added to the task environment |
| SessionState ss = SessionState.get(); |
| Set<String> files = (ss == null) ? null : ss.list_hdfs_resource(type); |
| return validateFiles(conf, files); |
| } |
| |
| public static String getLocalResourceFiles(Configuration conf, SessionState.ResourceType type) { |
| // fill in local only files (excludes copy of HDFS files) to be added to the task environment |
| SessionState ss = SessionState.get(); |
| Set<String> files = (ss == null) ? null : ss.list_local_resource(type); |
| return validateFiles(conf, files); |
| } |
| |
| private static String validateFiles(Configuration conf, Set<String> files){ |
| if (files != null) { |
| List<String> realFiles = new ArrayList<String>(files.size()); |
| for (String one : files) { |
| try { |
| String onefile = realFile(one, conf); |
| if (onefile != null) { |
| realFiles.add(realFile(one, conf)); |
| } else { |
| LOG.warn("The file {} does not exist.", one); |
| } |
| } catch (IOException e) { |
| throw new RuntimeException("Cannot validate file " + one + "due to exception: " |
| + e.getMessage(), e); |
| } |
| } |
| return StringUtils.join(realFiles, ","); |
| } else { |
| return StringUtils.EMPTY; |
| } |
| } |
| |
| /** |
| * get session specified class loader and get current class loader if fall |
| * |
| * @return |
| */ |
| public static ClassLoader getSessionSpecifiedClassLoader() { |
| SessionState state = SessionState.get(); |
| if (state == null || state.getConf() == null) { |
| LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead"); |
| return JavaUtils.getClassLoader(); |
| } |
| ClassLoader sessionCL = state.getConf().getClassLoader(); |
| if (sessionCL != null) { |
| LOG.trace("Use session specified class loader"); //it's normal case |
| return sessionCL; |
| } |
| LOG.debug("Session specified class loader not found, use thread based class loader"); |
| return JavaUtils.getClassLoader(); |
| } |
| |
| public static void restoreSessionSpecifiedClassLoader(ClassLoader prev) { |
| SessionState state = SessionState.get(); |
| if (state != null && state.getConf() != null) { |
| ClassLoader current = state.getConf().getClassLoader(); |
| if (current != prev && JavaUtils.closeClassLoadersTo(current, prev)) { |
| Thread.currentThread().setContextClassLoader(prev); |
| state.getConf().setClassLoader(prev); |
| } |
| } |
| } |
| |
| /** |
| * Create a URL from a string representing a path to a local file. |
| * The path string can be just a path, or can start with file:/, file:/// |
| * @param onestr path string |
| * @return |
| */ |
| static URL urlFromPathString(String onestr) { |
| URL oneurl = null; |
| try { |
| if (StringUtils.indexOf(onestr, "file:/") == 0) { |
| oneurl = new URL(onestr); |
| } else { |
| oneurl = new File(onestr).toURL(); |
| } |
| } catch (Exception err) { |
| LOG.error("Bad URL {}, ignoring path", onestr); |
| } |
| return oneurl; |
| } |
| |
| /** |
| * Remove elements from the classpath, if possible. This will only work if the current thread context class loader is |
| * an UDFClassLoader (i.e. if we have created it). |
| * |
| * @param pathsToRemove |
| * Array of classpath elements |
| */ |
| public static void removeFromClassPath(String[] pathsToRemove) throws IOException { |
| Thread curThread = Thread.currentThread(); |
| ClassLoader currentLoader = curThread.getContextClassLoader(); |
| // If current class loader is NOT UDFClassLoader, then it is a system class loader, we should not mess with it. |
| if (!(currentLoader instanceof UDFClassLoader)) { |
| LOG.warn("Ignoring attempt to manipulate {}; probably means we have closed more UDF loaders than opened.", |
| currentLoader == null ? "null" : currentLoader.getClass().getSimpleName()); |
| return; |
| } |
| // Otherwise -- for UDFClassLoaders -- we close the current one and create a new one, with more limited class path. |
| |
| UDFClassLoader loader = (UDFClassLoader) currentLoader; |
| |
| Set<URL> newPath = new HashSet<URL>(Arrays.asList(loader.getURLs())); |
| |
| for (String onestr : pathsToRemove) { |
| URL oneurl = urlFromPathString(onestr); |
| if (oneurl != null) { |
| newPath.remove(oneurl); |
| } |
| } |
| JavaUtils.closeClassLoader(loader); |
| // This loader is closed, remove it from cached registry loaders to avoid removing it again. |
| Registry reg = SessionState.getRegistry(); |
| if (reg != null) { |
| reg.removeFromUDFLoaders(loader); |
| } |
| |
| loader = new UDFClassLoader(newPath.toArray(new URL[0])); |
| curThread.setContextClassLoader(loader); |
| SessionState.get().getConf().setClassLoader(loader); |
| } |
| |
| public static String formatBinaryString(byte[] array, int start, int length) { |
| StringBuilder sb = new StringBuilder(); |
| for (int i = start; i < start + length; i++) { |
| sb.append('x'); |
| sb.append(array[i] < 0 ? array[i] + 256 : array[i] + 0); |
| } |
| return sb.toString(); |
| } |
| |
| public static List<String> getColumnNamesFromSortCols(List<Order> sortCols) { |
| if(sortCols == null) { |
| return Collections.emptyList(); |
| } |
| List<String> names = new ArrayList<String>(); |
| for (Order o : sortCols) { |
| names.add(o.getCol()); |
| } |
| return names; |
| } |
| |
| public static List<String> getColumnNamesFromFieldSchema(List<FieldSchema> partCols) { |
| List<String> names = new ArrayList<String>(); |
| for (FieldSchema o : partCols) { |
| names.add(o.getName()); |
| } |
| return names; |
| } |
| |
| public static List<String> getInternalColumnNamesFromSignature(List<ColumnInfo> colInfos) { |
| List<String> names = new ArrayList<String>(); |
| for (ColumnInfo ci : colInfos) { |
| names.add(ci.getInternalName()); |
| } |
| return names; |
| } |
| |
| /** |
| * Note: This will not return the correct number of columns in the case of |
| * Avro serde using an external schema URL, unless these properties have been |
| * used to initialize the Avro SerDe (which updates these properties). |
| * @param props TableDesc properties |
| * @return list of column names based on the table properties |
| */ |
| public static List<String> getColumnNames(Properties props) { |
| List<String> names = new ArrayList<String>(); |
| String colNames = props.getProperty(serdeConstants.LIST_COLUMNS); |
| return splitColNames(names, colNames); |
| } |
| |
| public static List<String> getColumnNames(Configuration conf) { |
| List<String> names = new ArrayList<String>(); |
| String colNames = conf.get(serdeConstants.LIST_COLUMNS); |
| return splitColNames(names, colNames); |
| } |
| |
| private static List<String> splitColNames(List<String> names, String colNames) { |
| String[] cols = colNames.trim().split(","); |
| for(String col : cols) { |
| if(StringUtils.isNotBlank(col)) { |
| names.add(col); |
| } |
| } |
| return names; |
| } |
| |
| public static List<String> getColumnTypes(Properties props) { |
| List<String> names = new ArrayList<String>(); |
| String colNames = props.getProperty(serdeConstants.LIST_COLUMN_TYPES); |
| ArrayList<TypeInfo> cols = TypeInfoUtils.getTypeInfosFromTypeString(colNames); |
| for (TypeInfo col : cols) { |
| names.add(col.getTypeName()); |
| } |
| return names; |
| } |
| |
| /** |
| * Extract db and table name from dbtable string, where db and table are separated by "." |
| * If there is no db name part, set the current sessions default db |
| * @param dbtable |
| * @return String array with two elements, first is db name, second is table name |
| * @throws SemanticException |
| * @deprecated use {@link TableName} or {@link org.apache.hadoop.hive.ql.parse.HiveTableName} instead |
| */ |
| @Deprecated |
| public static String[] getDbTableName(String dbtable) throws SemanticException { |
| return getDbTableName(SessionState.get().getCurrentDatabase(), dbtable); |
| } |
| |
| /** |
| * Extract db and table name from dbtable string. |
| * @param defaultDb |
| * @param dbtable |
| * @return String array with two elements, first is db name, second is table name |
| * @throws SemanticException |
| * @deprecated use {@link TableName} or {@link org.apache.hadoop.hive.ql.parse.HiveTableName} instead |
| */ |
| @Deprecated |
| public static String[] getDbTableName(String defaultDb, String dbtable) throws SemanticException { |
| if (dbtable == null) { |
| return new String[2]; |
| } |
| String[] names = dbtable.split("\\."); |
| switch (names.length) { |
| case 3: |
| case 2: |
| return names; |
| case 1: |
| return new String [] {defaultDb, dbtable}; |
| default: |
| throw new SemanticException(ErrorMsg.INVALID_TABLE_NAME, dbtable); |
| } |
| } |
| |
| public static void validateColumnNames(List<String> colNames, List<String> checkCols) |
| throws SemanticException { |
| Iterator<String> checkColsIter = checkCols.iterator(); |
| while (checkColsIter.hasNext()) { |
| String toCheck = checkColsIter.next(); |
| boolean found = false; |
| Iterator<String> colNamesIter = colNames.iterator(); |
| while (colNamesIter.hasNext()) { |
| String colName = colNamesIter.next(); |
| if (toCheck.equalsIgnoreCase(colName)) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg()); |
| } |
| } |
| } |
| |
| /** |
| * Accepts qualified name which is in the form of table, dbname.tablename or catalog.dbname.tablename and returns a |
| * {@link TableName}. All parts can be null. |
| * |
| * @param dbTableName |
| * @return a {@link TableName} |
| * @throws SemanticException |
| * @deprecated handle null values and use {@link TableName#fromString(String, String, String)} |
| */ |
| @Deprecated |
| public static TableName getNullableTableName(String dbTableName) throws SemanticException { |
| return getNullableTableName(dbTableName, SessionState.get().getCurrentDatabase()); |
| } |
| |
| /** |
| * Accepts qualified name which is in the form of table, dbname.tablename or catalog.dbname.tablename and returns a |
| * {@link TableName}. All parts can be null. |
| * |
| * @param dbTableName |
| * @param defaultDb |
| * @return a {@link TableName} |
| * @throws SemanticException |
| * @deprecated handle null values and use {@link TableName#fromString(String, String, String)} |
| */ |
| @Deprecated |
| public static TableName getNullableTableName(String dbTableName, String defaultDb) throws SemanticException { |
| if (dbTableName == null) { |
| return new TableName(null, null, null); |
| } else { |
| try { |
| return TableName |
| .fromString(dbTableName, SessionState.get().getCurrentCatalog(), defaultDb); |
| } catch (IllegalArgumentException e) { |
| throw new SemanticException(e.getCause()); |
| } |
| } |
| } |
| |
| /** |
| * Gets the default notification interval to send progress updates to the tracker. Useful for |
| * operators that may not output data for a while. |
| * |
| * @param hconf |
| * @return the interval in milliseconds |
| */ |
| public static int getDefaultNotificationInterval(Configuration hconf) { |
| int notificationInterval; |
| Integer expInterval = Integer.decode(hconf.get("mapred.tasktracker.expiry.interval")); |
| |
| if (expInterval != null) { |
| notificationInterval = expInterval.intValue() / 2; |
| } else { |
| // 5 minutes |
| notificationInterval = 5 * 60 * 1000; |
| } |
| return notificationInterval; |
| } |
| |
| /** |
| * Copies the storage handler properties configured for a table descriptor to a runtime job |
| * configuration. |
| * |
| * @param tbl |
| * table descriptor from which to read |
| * |
| * @param job |
| * configuration which receives configured properties |
| */ |
| public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) throws HiveException { |
| Properties tblProperties = tbl.getProperties(); |
| for(String name: tblProperties.stringPropertyNames()) { |
| if (job.get(name) == null) { |
| String val = (String) tblProperties.get(name); |
| if (val != null) { |
| job.set(name, StringEscapeUtils.escapeJava(val)); |
| } |
| } |
| } |
| Map<String, String> jobProperties = tbl.getJobProperties(); |
| if (jobProperties != null) { |
| for (Map.Entry<String, String> entry : jobProperties.entrySet()) { |
| job.set(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * Copies the storage handler properties configured for a table descriptor to a runtime job |
| * configuration. This differs from {@link #copyTablePropertiesToConf(org.apache.hadoop.hive.ql.plan.TableDesc, org.apache.hadoop.mapred.JobConf)} |
| * in that it does not allow parameters already set in the job to override the values from the |
| * table. This is important for setting the config up for reading, |
| * as the job may already have values in it from another table. |
| * @param tbl |
| * @param job |
| */ |
| public static void copyTablePropertiesToConf(TableDesc tbl, JobConf job) throws HiveException { |
| Properties tblProperties = tbl.getProperties(); |
| for(String name: tblProperties.stringPropertyNames()) { |
| String val = (String) tblProperties.get(name); |
| if (val != null) { |
| job.set(name, StringEscapeUtils.escapeJava(val)); |
| } |
| } |
| Map<String, String> jobProperties = tbl.getJobProperties(); |
| if (jobProperties != null) { |
| for (Map.Entry<String, String> entry : jobProperties.entrySet()) { |
| job.set(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * Copy job credentials to table properties |
| * @param tbl |
| */ |
| public static void copyJobSecretToTableProperties(TableDesc tbl) throws IOException { |
| Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); |
| for (Text key : credentials.getAllSecretKeys()) { |
| String keyString = key.toString(); |
| if (keyString.startsWith(TableDesc.SECRET_PREFIX + TableDesc.SECRET_DELIMIT)) { |
| String[] comps = keyString.split(TableDesc.SECRET_DELIMIT); |
| String tblName = comps[1]; |
| String keyName = comps[2]; |
| if (tbl.getTableName().equalsIgnoreCase(tblName)) { |
| tbl.getProperties().put(keyName, new String(credentials.getSecretKey(key))); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the maximum number of executors required to get file information from several input locations. |
| * It checks whether HIVE_EXEC_INPUT_LISTING_MAX_THREADS or DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX are > 1 |
| * |
| * @param conf Configuration object to get the maximum number of threads. |
| * @param inputLocationListSize Number of input locations required to process. |
| * @return The maximum number of executors to use. |
| */ |
| @VisibleForTesting |
| static int getMaxExecutorsForInputListing(final Configuration conf, int inputLocationListSize) { |
| if (inputLocationListSize < 1) { |
| return 0; |
| } |
| |
| int maxExecutors = 1; |
| |
| if (inputLocationListSize > 1) { |
| int listingMaxThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS); |
| |
| // DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX must be removed on next Hive version (probably on 3.0). |
| // If HIVE_EXEC_INPUT_LISTING_MAX_THREADS is not set, then we check of the deprecated configuration. |
| if (listingMaxThreads <= 0) { |
| listingMaxThreads = conf.getInt(DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 0); |
| if (listingMaxThreads > 0) { |
| LOG.warn("Deprecated configuration is used: {}. Please use {}", |
| DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, |
| ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname); |
| } |
| } |
| |
| if (listingMaxThreads > 1) { |
| maxExecutors = Math.min(inputLocationListSize, listingMaxThreads); |
| } |
| } |
| |
| return maxExecutors; |
| } |
| |
| /** |
| * Calculate the total size of input files. |
| * |
| * @param ctx |
| * the hadoop job context |
| * @param work |
| * map reduce job plan |
| * @param filter |
| * filter to apply to the input paths before calculating size |
| * @return the summary of all the input paths. |
| * @throws IOException |
| */ |
| public static ContentSummary getInputSummary(final Context ctx, MapWork work, PathFilter filter) |
| throws IOException { |
| PerfLogger perfLogger = SessionState.getPerfLogger(); |
| perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY); |
| |
| final long[] summary = {0L, 0L, 0L}; |
| final Set<Path> pathNeedProcess = new HashSet<>(); |
| |
| // For each input path, calculate the total size. |
| for (final Path path : work.getPathToAliases().keySet()) { |
| if (path == null) { |
| continue; |
| } |
| if (filter != null && !filter.accept(path)) { |
| continue; |
| } |
| |
| ContentSummary cs = ctx.getCS(path); |
| if (cs != null) { |
| summary[0] += cs.getLength(); |
| summary[1] += cs.getFileCount(); |
| summary[2] += cs.getDirectoryCount(); |
| } else { |
| pathNeedProcess.add(path); |
| } |
| } |
| |
| // Process the case when name node call is needed |
| final ExecutorService executor; |
| |
| int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size()); |
| if (numExecutors > 1) { |
| // Since multiple threads could call this method concurrently, locking |
| // this method will avoid number of threads out of control. |
| synchronized (INPUT_SUMMARY_LOCK) { |
| LOG.info("Using {} threads for getContentSummary", numExecutors); |
| executor = Executors.newFixedThreadPool(numExecutors, |
| new ThreadFactoryBuilder().setDaemon(true) |
| .setNameFormat("Get-Input-Summary-%d").build()); |
| getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), |
| work, summary, executor); |
| } |
| } else { |
| LOG.info("Not using thread pool for getContentSummary"); |
| executor = MoreExecutors.newDirectExecutorService(); |
| getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess), |
| work, summary, executor); |
| } |
| perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY); |
| |
| return new ContentSummary.Builder().length(summary[0]) |
| .fileCount(summary[1]).directoryCount(summary[2]).build(); |
| } |
| |
| /** |
| * Performs a ContentSummary lookup over a set of paths using 1 or more |
| * threads. The 'summary' argument is directly modified. |
| * |
| * @param ctx |
| * @param pathNeedProcess |
| * @param work |
| * @param summary |
| * @param executor |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| static void getInputSummaryWithPool(final Context ctx, |
| final Set<Path> pathNeedProcess, final MapWork work, final long[] summary, |
| final ExecutorService executor) throws IOException { |
| Preconditions.checkNotNull(ctx); |
| Preconditions.checkNotNull(pathNeedProcess); |
| Preconditions.checkNotNull(executor); |
| |
| List<Future<?>> futures = new ArrayList<Future<?>>(pathNeedProcess.size()); |
| final AtomicLong totalLength = new AtomicLong(0L); |
| final AtomicLong totalFileCount = new AtomicLong(0L); |
| final AtomicLong totalDirectoryCount = new AtomicLong(0L); |
| |
| HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() { |
| @Override |
| public void interrupt() { |
| for (Path path : pathNeedProcess) { |
| try { |
| path.getFileSystem(ctx.getConf()).close(); |
| } catch (IOException ignore) { |
| LOG.debug("Failed to close filesystem", ignore); |
| } |
| } |
| executor.shutdownNow(); |
| } |
| }); |
| try { |
| Configuration conf = ctx.getConf(); |
| JobConf jobConf = new JobConf(conf); |
| for (final Path path : pathNeedProcess) { |
| // All threads share the same Configuration and JobConf based on the |
| // assumption that they are thread safe if only read operations are |
| // executed. It is not stated in Hadoop's javadoc, the sourcce codes |
| // clearly showed that they made efforts for it and we believe it is |
| // thread safe. Will revisit this piece of codes if we find the assumption |
| // is not correct. |
| final Configuration myConf = conf; |
| final JobConf myJobConf = jobConf; |
| final Map<String, Operator<?>> aliasToWork = work.getAliasToWork(); |
| final Map<Path, List<String>> pathToAlias = work.getPathToAliases(); |
| final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); |
| Runnable r = new Runnable() { |
| @Override |
| public void run() { |
| try { |
| Class<? extends InputFormat> inputFormatCls = partDesc |
| .getInputFileFormatClass(); |
| InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache( |
| inputFormatCls, myJobConf); |
| if (inputFormatObj instanceof ContentSummaryInputFormat) { |
| ContentSummaryInputFormat csif = (ContentSummaryInputFormat) inputFormatObj; |
| final ContentSummary cs = csif.getContentSummary(path, myJobConf); |
| recordSummary(path, cs); |
| return; |
| } |
| String metaTableStorage = null; |
| if (partDesc.getTableDesc() != null && |
| partDesc.getTableDesc().getProperties() != null) { |
| metaTableStorage = partDesc.getTableDesc().getProperties() |
| .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null); |
| } |
| if (partDesc.getProperties() != null) { |
| metaTableStorage = partDesc.getProperties() |
| .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage); |
| } |
| |
| HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage); |
| if (handler instanceof InputEstimator) { |
| long total = 0; |
| TableDesc tableDesc = partDesc.getTableDesc(); |
| InputEstimator estimator = (InputEstimator) handler; |
| for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, path)) { |
| JobConf jobConf = new JobConf(myJobConf); |
| TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias); |
| Utilities.setColumnNameList(jobConf, scanOp, true); |
| Utilities.setColumnTypeList(jobConf, scanOp, true); |
| PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc); |
| Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf); |
| total += estimator.estimate(jobConf, scanOp, -1).getTotalLength(); |
| } |
| recordSummary(path, new ContentSummary(total, -1, -1)); |
| } else if (handler == null) { |
| // Nullify summary for non-native tables, |
| // in order not to be selected as a mapjoin target |
| FileSystem fs = path.getFileSystem(myConf); |
| recordSummary(path, fs.getContentSummary(path)); |
| } |
| } catch (Exception e) { |
| // We safely ignore this exception for summary data. |
| // We don't update the cache to protect it from polluting other |
| // usages. The worst case is that IOException will always be |
| // retried for another getInputSummary(), which is fine as |
| // IOException is not considered as a common case. |
| LOG.info("Cannot get size of {}. Safely ignored.", path); |
| LOG.debug("Cannot get size of {}. Safely ignored.", path, e); |
| } |
| } |
| |
| private void recordSummary(final Path p, final ContentSummary cs) { |
| final long csLength = cs.getLength(); |
| final long csFileCount = cs.getFileCount(); |
| final long csDirectoryCount = cs.getDirectoryCount(); |
| |
| totalLength.addAndGet(csLength); |
| totalFileCount.addAndGet(csFileCount); |
| totalDirectoryCount.addAndGet(csDirectoryCount); |
| |
| ctx.addCS(p.toString(), cs); |
| |
| LOG.debug( |
| "Cache Content Summary for {} length: {} file count: {} " |
| + "directory count: {}", |
| path, csLength, csFileCount, csDirectoryCount); |
| } |
| }; |
| |
| futures.add(executor.submit(r)); |
| } |
| |
| for (Future<?> future : futures) { |
| try { |
| future.get(); |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted when waiting threads", e); |
| Thread.currentThread().interrupt(); |
| break; |
| } catch (ExecutionException e) { |
| throw new IOException(e); |
| } |
| } |
| executor.shutdown(); |
| |
| HiveInterruptUtils.checkInterrupted(); |
| |
| summary[0] += totalLength.get(); |
| summary[1] += totalFileCount.get(); |
| summary[2] += totalDirectoryCount.get(); |
| } finally { |
| executor.shutdownNow(); |
| HiveInterruptUtils.remove(interrup); |
| } |
| } |
| |
| public static long sumOf(Map<String, Long> aliasToSize, Set<String> aliases) { |
| return sumOfExcept(aliasToSize, aliases, null); |
| } |
| |
| // return sum of lengths except some aliases. returns -1 if any of other alias is unknown |
| public static long sumOfExcept(Map<String, Long> aliasToSize, |
| Set<String> aliases, Set<String> excepts) { |
| long total = 0; |
| for (String alias : aliases) { |
| if (excepts != null && excepts.contains(alias)) { |
| continue; |
| } |
| Long size = aliasToSize.get(alias); |
| if (size == null) { |
| return -1; |
| } |
| total += size; |
| } |
| return total; |
| } |
| |
| public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx) |
| throws Exception { |
| if (ctx != null) { |
| ContentSummary cs = ctx.getCS(dirPath); |
| if (cs != null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Content Summary cached for {} length: {} num files: {} " + |
| "num directories: {}", dirPath, cs.getLength(), cs.getFileCount(), |
| cs.getDirectoryCount()); |
| } |
| return (cs.getLength() == 0 && cs.getFileCount() == 0 && cs.getDirectoryCount() <= 1); |
| } else { |
| LOG.debug("Content Summary not cached for {}", dirPath); |
| } |
| } |
| return isEmptyPath(job, dirPath); |
| } |
| |
| public static boolean isEmptyPath(Configuration job, Path dirPath) throws IOException { |
| FileStatus[] fStats = listNonHiddenFileStatus(job, dirPath); |
| if (fStats.length > 0) { |
| return false; |
| } |
| return true; |
| } |
| |
| public static FileStatus[] listNonHiddenFileStatus(Configuration job, Path dirPath) |
| throws IOException { |
| FileSystem inpFs = dirPath.getFileSystem(job); |
| try { |
| return inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER); |
| } catch (FileNotFoundException e) { |
| return new FileStatus[] {}; |
| } |
| } |
| |
| public static List<TezTask> getTezTasks(List<Task<?>> tasks) { |
| return getTasks(tasks, new TaskFilterFunction<>(TezTask.class)); |
| } |
| |
| public static List<ExecDriver> getMRTasks(List<Task<?>> tasks) { |
| return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class)); |
| } |
| |
| public static int getNumClusterJobs(List<Task<?>> tasks) { |
| return getMRTasks(tasks).size() + getTezTasks(tasks).size(); |
| } |
| |
| static class TaskFilterFunction<T> implements DAGTraversal.Function { |
| private Set<Task<?>> visited = new HashSet<>(); |
| private Class<T> requiredType; |
| private List<T> typeSpecificTasks = new ArrayList<>(); |
| |
| TaskFilterFunction(Class<T> requiredType) { |
| this.requiredType = requiredType; |
| } |
| |
| @Override |
| public void process(Task<?> task) { |
| if (requiredType.isInstance(task) && !typeSpecificTasks.contains(task)) { |
| typeSpecificTasks.add((T) task); |
| } |
| visited.add(task); |
| } |
| |
| List<T> getTasks() { |
| return typeSpecificTasks; |
| } |
| |
| @Override |
| public boolean skipProcessing(Task<?> task) { |
| return visited.contains(task); |
| } |
| } |
| |
| private static <T> List<T> getTasks(List<Task<?>> tasks, |
| TaskFilterFunction<T> function) { |
| DAGTraversal.traverse(tasks, function); |
| return function.getTasks(); |
| } |
| |
| |
| public static final class PartitionDetails { |
| public Map<String, String> fullSpec; |
| public Partition partition; |
| public List<FileStatus> newFiles; |
| public boolean hasOldPartition = false; |
| public AcidUtils.TableSnapshot tableSnapshot; |
| } |
| |
| /** |
| * Construct a list of full partition spec from Dynamic Partition Context and the directory names |
| * corresponding to these dynamic partitions. |
| */ |
| public static Map<Path, PartitionDetails> getFullDPSpecs(Configuration conf, DynamicPartitionCtx dpCtx, |
| Map<String, List<Path>> dynamicPartitionSpecs) throws HiveException { |
| |
| try { |
| Path loadPath = dpCtx.getRootPath(); |
| FileSystem fs = loadPath.getFileSystem(conf); |
| int numDPCols = dpCtx.getNumDPCols(); |
| Map<Path, Optional<List<Path>>> allPartition = new HashMap<>(); |
| if (dynamicPartitionSpecs != null) { |
| for (Map.Entry<String, List<Path>> partSpec : dynamicPartitionSpecs.entrySet()) { |
| allPartition.put(new Path(loadPath, partSpec.getKey()), Optional.of(partSpec.getValue())); |
| } |
| } else { |
| List<FileStatus> status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs); |
| for (FileStatus fileStatus : status) { |
| allPartition.put(fileStatus.getPath(), Optional.empty()); |
| } |
| } |
| |
| if (allPartition.isEmpty()) { |
| LOG.warn("No partition is generated by dynamic partitioning"); |
| return Collections.synchronizedMap(new LinkedHashMap<>()); |
| } |
| validateDynPartitionCount(conf, allPartition.keySet()); |
| |
| // partial partition specification |
| Map<String, String> partSpec = dpCtx.getPartSpec(); |
| |
| // list of full partition specification |
| Map<Path, PartitionDetails> partitionDetailsMap = |
| Collections.synchronizedMap(new LinkedHashMap<>()); |
| |
| // calculate full path spec for each valid partition path |
| for (Map.Entry<Path, Optional<List<Path>>> partEntry : allPartition.entrySet()) { |
| Path partPath = partEntry.getKey(); |
| Map<String, String> fullPartSpec = Maps.newLinkedHashMap(partSpec); |
| String staticParts = Warehouse.makeDynamicPartName(partSpec); |
| Path computedPath = partPath; |
| if (!staticParts.isEmpty() ) { |
| computedPath = new Path(new Path(partPath.getParent(), staticParts), partPath.getName()); |
| } |
| if (!Warehouse.makeSpecFromName(fullPartSpec, computedPath, new HashSet<>(partSpec.keySet()))) { |
| Utilities.FILE_OP_LOGGER.warn("Ignoring invalid DP directory " + partPath); |
| } else { |
| PartitionDetails details = new PartitionDetails(); |
| details.fullSpec = fullPartSpec; |
| if (partEntry.getValue().isPresent()) { |
| details.newFiles = new ArrayList<>(); |
| for (Path filePath : partEntry.getValue().get()) { |
| details.newFiles.add(fs.getFileStatus(filePath)); |
| } |
| } |
| partitionDetailsMap.put(partPath, details); |
| } |
| } |
| return partitionDetailsMap; |
| } catch (IOException e) { |
| throw new HiveException(e); |
| } |
| } |
| |
| private static void validateDynPartitionCount(Configuration conf, Collection<Path> partitions) throws HiveException { |
| int partsToLoad = partitions.size(); |
| int maxPartition = HiveConf.getIntVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); |
| if (partsToLoad > maxPartition) { |
| throw new HiveException("Number of dynamic partitions created is " + partsToLoad |
| + ", which is more than " |
| + maxPartition |
| +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname |
| + " to at least " + partsToLoad + '.'); |
| } |
| } |
| |
| public static StatsPublisher getStatsPublisher(JobConf jc) { |
| StatsFactory factory = StatsFactory.newFactory(jc); |
| return factory == null ? null : factory.getStatsPublisher(); |
| } |
| |
| public static String join(String... elements) { |
| StringBuilder builder = new StringBuilder(); |
| for (String element : elements) { |
| if (element == null || element.isEmpty()) { |
| continue; |
| } |
| builder.append(element); |
| if (!element.endsWith(Path.SEPARATOR)) { |
| builder.append(Path.SEPARATOR); |
| } |
| } |
| return builder.toString(); |
| } |
| |
| public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema) { |
| setColumnNameList(jobConf, rowSchema, false); |
| } |
| |
| public static void setColumnNameList(JobConf jobConf, RowSchema rowSchema, boolean excludeVCs) { |
| if (rowSchema == null) { |
| return; |
| } |
| StringBuilder columnNames = new StringBuilder(); |
| for (ColumnInfo colInfo : rowSchema.getSignature()) { |
| if (excludeVCs && colInfo.getIsVirtualCol()) { |
| continue; |
| } |
| if (columnNames.length() > 0) { |
| columnNames.append(','); |
| } |
| columnNames.append(colInfo.getInternalName()); |
| } |
| String columnNamesString = columnNames.toString(); |
| jobConf.set(serdeConstants.LIST_COLUMNS, columnNamesString); |
| } |
| |
| public static void setColumnNameList(JobConf jobConf, Operator op) { |
| setColumnNameList(jobConf, op, false); |
| } |
| |
| public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) { |
| RowSchema rowSchema = op.getSchema(); |
| setColumnNameList(jobConf, rowSchema, excludeVCs); |
| } |
| |
| public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema) { |
| setColumnTypeList(jobConf, rowSchema, false); |
| } |
| |
| public static void setColumnTypeList(JobConf jobConf, RowSchema rowSchema, boolean excludeVCs) { |
| if (rowSchema == null) { |
| return; |
| } |
| StringBuilder columnTypes = new StringBuilder(); |
| for (ColumnInfo colInfo : rowSchema.getSignature()) { |
| if (excludeVCs && colInfo.getIsVirtualCol()) { |
| continue; |
| } |
| if (columnTypes.length() > 0) { |
| columnTypes.append(','); |
| } |
| columnTypes.append(colInfo.getTypeName()); |
| } |
| String columnTypesString = columnTypes.toString(); |
| jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString); |
| } |
| |
| public static void setColumnTypeList(JobConf jobConf, Operator op) { |
| setColumnTypeList(jobConf, op, false); |
| } |
| |
| public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) { |
| RowSchema rowSchema = op.getSchema(); |
| setColumnTypeList(jobConf, rowSchema, excludeVCs); |
| } |
| |
| public static final String suffix = ".hashtable"; |
| |
| public static Path generatePath(Path basePath, String dumpFilePrefix, |
| Byte tag, String bigBucketFileName) { |
| return new Path(basePath, "MapJoin-" + dumpFilePrefix + tag + |
| "-" + bigBucketFileName + suffix); |
| } |
| |
| public static String generateFileName(Byte tag, String bigBucketFileName) { |
| return "MapJoin-" + tag + "-" + bigBucketFileName + suffix; |
| } |
| |
| public static Path generateTmpPath(Path basePath, String id) { |
| return new Path(basePath, "HashTable-" + id); |
| } |
| |
| public static Path generateTarPath(Path basePath, String filename) { |
| return new Path(basePath, filename + ".tar.gz"); |
| } |
| |
| public static String generateTarFileName(String name) { |
| return name + ".tar.gz"; |
| } |
| |
| public static String generatePath(Path baseURI, String filename) { |
| return baseURI + Path.SEPARATOR + filename; |
| } |
| |
| public static String now() { |
| Calendar cal = Calendar.getInstance(); |
| SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| return sdf.format(cal.getTime()); |
| } |
| |
| public static double showTime(long time) { |
| double result = (double) time / (double) 1000; |
| return result; |
| } |
| |
| /** |
| * The check here is kind of not clean. It first use a for loop to go through |
| * all input formats, and choose the ones that extend ReworkMapredInputFormat |
| * to a set. And finally go through the ReworkMapredInputFormat set, and call |
| * rework for each one. |
| * |
| * Technically all these can be avoided if all Hive's input formats can share |
| * a same interface. As in today's hive and Hadoop, it is not possible because |
| * a lot of Hive's input formats are in Hadoop's code. And most of Hadoop's |
| * input formats just extend InputFormat interface. |
| * |
| * @param task |
| * @param reworkMapredWork |
| * @param conf |
| * @throws SemanticException |
| */ |
| public static void reworkMapRedWork(Task<?> task, |
| boolean reworkMapredWork, HiveConf conf) throws SemanticException { |
| if (reworkMapredWork && (task instanceof MapRedTask)) { |
| try { |
| MapredWork mapredWork = ((MapRedTask) task).getWork(); |
| Set<Class<? extends InputFormat>> reworkInputFormats = new HashSet<Class<? extends InputFormat>>(); |
| for (PartitionDesc part : mapredWork.getMapWork().getPathToPartitionInfo().values()) { |
| Class<? extends InputFormat> inputFormatCls = part |
| .getInputFileFormatClass(); |
| if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) { |
| reworkInputFormats.add(inputFormatCls); |
| } |
| } |
| |
| if (reworkInputFormats.size() > 0) { |
| for (Class<? extends InputFormat> inputFormatCls : reworkInputFormats) { |
| ReworkMapredInputFormat inst = (ReworkMapredInputFormat) ReflectionUtil |
| .newInstance(inputFormatCls, null); |
| inst.rework(conf, mapredWork); |
| } |
| } |
| } catch (IOException e) { |
| throw new SemanticException(e); |
| } |
| } |
| } |
| |
| public static class SQLCommand<T> { |
| public T run(PreparedStatement stmt) throws SQLException { |
| return null; |
| } |
| } |
| |
| /** |
| * Retry SQL execution with random backoff (same as the one implemented in HDFS-767). |
| * This function only retries when the SQL query throws a SQLTransientException (which |
| * might be able to succeed with a simple retry). It doesn't retry when the exception |
| * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException |
| * the caller needs to reconnect to the database and restart the whole transaction. |
| * |
| * @param cmd the SQL command |
| * @param stmt the prepared statement of SQL. |
| * @param baseWindow The base time window (in milliseconds) before the next retry. |
| * see {@link #getRandomWaitTime} for details. |
| * @param maxRetries the maximum # of retries when getting a SQLTransientException. |
| * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the |
| * first time it is caught, or SQLTransientException when the maxRetries has reached. |
| */ |
| public static <T> T executeWithRetry(SQLCommand<T> cmd, PreparedStatement stmt, |
| long baseWindow, int maxRetries) throws SQLException { |
| |
| T result = null; |
| |
| // retry with # of maxRetries before throwing exception |
| for (int failures = 0; ; failures++) { |
| try { |
| result = cmd.run(stmt); |
| return result; |
| } catch (SQLTransientException e) { |
| LOG.warn("Failure and retry # {}", failures, e); |
| if (failures >= maxRetries) { |
| throw e; |
| } |
| long waitTime = getRandomWaitTime(baseWindow, failures, |
| ThreadLocalRandom.current()); |
| try { |
| Thread.sleep(waitTime); |
| } catch (InterruptedException iex) { |
| } |
| } catch (SQLException e) { |
| // throw other types of SQLExceptions (SQLNonTransientException / SQLRecoverableException) |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Retry connecting to a database with random backoff (same as the one implemented in HDFS-767). |
| * This function only retries when the SQL query throws a SQLTransientException (which |
| * might be able to succeed with a simple retry). It doesn't retry when the exception |
| * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException |
| * the caller needs to reconnect to the database and restart the whole transaction. |
| * |
| * @param connectionString the JDBC connection string. |
| * @param waitWindow The base time window (in milliseconds) before the next retry. |
| * see {@link #getRandomWaitTime} for details. |
| * @param maxRetries the maximum # of retries when getting a SQLTransientException. |
| * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the |
| * first time it is caught, or SQLTransientException when the maxRetries has reached. |
| */ |
| public static Connection connectWithRetry(String connectionString, |
| long waitWindow, int maxRetries) throws SQLException { |
| |
| // retry with # of maxRetries before throwing exception |
| for (int failures = 0; ; failures++) { |
| try { |
| Connection conn = DriverManager.getConnection(connectionString); |
| return conn; |
| } catch (SQLTransientException e) { |
| if (failures >= maxRetries) { |
| LOG.error("Error during JDBC connection.", e); |
| throw e; |
| } |
| long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, |
| ThreadLocalRandom.current()); |
| try { |
| Thread.sleep(waitTime); |
| } catch (InterruptedException e1) { |
| } |
| } catch (SQLException e) { |
| // just throw other types (SQLNonTransientException / SQLRecoverableException) |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * Retry preparing a SQL statement with random backoff (same as the one implemented in HDFS-767). |
| * This function only retries when the SQL query throws a SQLTransientException (which |
| * might be able to succeed with a simple retry). It doesn't retry when the exception |
| * is a SQLRecoverableException or SQLNonTransientException. For SQLRecoverableException |
| * the caller needs to reconnect to the database and restart the whole transaction. |
| * |
| * @param conn a JDBC connection. |
| * @param stmt the SQL statement to be prepared. |
| * @param waitWindow The base time window (in milliseconds) before the next retry. |
| * see {@link #getRandomWaitTime} for details. |
| * @param maxRetries the maximum # of retries when getting a SQLTransientException. |
| * @throws SQLException throws SQLRecoverableException or SQLNonTransientException the |
| * first time it is caught, or SQLTransientException when the maxRetries has reached. |
| */ |
| public static PreparedStatement prepareWithRetry(Connection conn, String stmt, |
| long waitWindow, int maxRetries) throws SQLException { |
| |
| // retry with # of maxRetries before throwing exception |
| for (int failures = 0; ; failures++) { |
| try { |
| return conn.prepareStatement(stmt); |
| } catch (SQLTransientException e) { |
| if (failures >= maxRetries) { |
| LOG.error("Error preparing JDBC Statement {}", stmt, e); |
| throw e; |
| } |
| long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, |
| ThreadLocalRandom.current()); |
| try { |
| Thread.sleep(waitTime); |
| } catch (InterruptedException e1) { |
| } |
| } catch (SQLException e) { |
| // just throw other types (SQLNonTransientException / SQLRecoverableException) |
| throw e; |
| } |
| } |
| } |
| |
| public static void setQueryTimeout(java.sql.Statement stmt, int timeout) throws SQLException { |
| if (timeout < 0) { |
| LOG.info("Invalid query timeout {}", timeout); |
| return; |
| } |
| try { |
| stmt.setQueryTimeout(timeout); |
| } catch (SQLException e) { |
| String message = e.getMessage() == null ? null : e.getMessage().toLowerCase(); |
| if (e instanceof SQLFeatureNotSupportedException || |
| (message != null && (message.contains("implemented") || message.contains("supported")))) { |
| LOG.info("setQueryTimeout is not supported"); |
| return; |
| } |
| throw e; |
| } |
| } |
| |
| /** |
| * Introducing a random factor to the wait time before another retry. |
| * The wait time is dependent on # of failures and a random factor. |
| * At the first time of getting an exception , the wait time |
| * is a random number between 0..baseWindow msec. If the first retry |
| * still fails, we will wait baseWindow msec grace period before the 2nd retry. |
| * Also at the second retry, the waiting window is expanded to 2*baseWindow msec |
| * alleviating the request rate from the server. Similarly the 3rd retry |
| * will wait 2*baseWindow msec. grace period before retry and the waiting window is |
| * expanded to 3*baseWindow msec and so on. |
| * @param baseWindow the base waiting window. |
| * @param failures number of failures so far. |
| * @param r a random generator. |
| * @return number of milliseconds for the next wait time. |
| */ |
| public static long getRandomWaitTime(long baseWindow, int failures, Random r) { |
| return (long) ( |
| baseWindow * failures + // grace period for the last round of attempt |
| baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure |
| } |
| |
| public static final char sqlEscapeChar = '\\'; |
| |
| /** |
| * Escape the '_', '%', as well as the escape characters inside the string key. |
| * @param key the string that will be used for the SQL LIKE operator. |
| * @return a string with escaped '_' and '%'. |
| */ |
| public static String escapeSqlLike(String key) { |
| StringBuilder sb = new StringBuilder(key.length()); |
| for (char c: key.toCharArray()) { |
| switch(c) { |
| case '_': |
| case '%': |
| case sqlEscapeChar: |
| sb.append(sqlEscapeChar); |
| // fall through |
| default: |
| sb.append(c); |
| break; |
| } |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * Format number of milliseconds to strings |
| * |
| * @param msec milliseconds |
| * @return a formatted string like "x days y hours z minutes a seconds b msec" |
| */ |
| public static String formatMsecToStr(long msec) { |
| long day = -1, hour = -1, minute = -1, second = -1; |
| long ms = msec % 1000; |
| long timeLeft = msec / 1000; |
| if (timeLeft > 0) { |
| second = timeLeft % 60; |
| timeLeft /= 60; |
| if (timeLeft > 0) { |
| minute = timeLeft % 60; |
| timeLeft /= 60; |
| if (timeLeft > 0) { |
| hour = timeLeft % 24; |
| day = timeLeft / 24; |
| } |
| } |
| } |
| StringBuilder sb = new StringBuilder(); |
| if (day != -1) { |
| sb.append(day + " days "); |
| } |
| if (hour != -1) { |
| sb.append(hour + " hours "); |
| } |
| if (minute != -1) { |
| sb.append(minute + " minutes "); |
| } |
| if (second != -1) { |
| sb.append(second + " seconds "); |
| } |
| sb.append(ms + " msec"); |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Estimate the number of reducers needed for this job, based on job input, |
| * and configuration parameters. |
| * |
| * The output of this method should only be used if the output of this |
| * MapRedTask is not being used to populate a bucketed table and the user |
| * has not specified the number of reducers to use. |
| * |
| * @return the number of reducers. |
| */ |
| public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary, |
| MapWork work, boolean finalMapRed) throws IOException { |
| long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); |
| int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); |
| |
| double samplePercentage = getHighestSamplePercentage(work); |
| long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage); |
| |
| // if all inputs are sampled, we should shrink the size of reducers accordingly. |
| if (totalInputFileSize != inputSummary.getLength()) { |
| LOG.info("BytesPerReducer={} maxReducers={} estimated totalInputFileSize={}", bytesPerReducer, |
| maxReducers, totalInputFileSize); |
| } else { |
| LOG.info("BytesPerReducer={} maxReducers={} totalInputFileSize={}", bytesPerReducer, |
| maxReducers, totalInputFileSize); |
| } |
| |
| // If this map reduce job writes final data to a table and bucketing is being inferred, |
| // and the user has configured Hive to do this, make sure the number of reducers is a |
| // power of two |
| boolean powersOfTwo = conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) && |
| finalMapRed && !work.getBucketedColsByDirectory().isEmpty(); |
| |
| return estimateReducers(totalInputFileSize, bytesPerReducer, maxReducers, powersOfTwo); |
| } |
| |
| public static int estimateReducers(long totalInputFileSize, long bytesPerReducer, |
| int maxReducers, boolean powersOfTwo) { |
| double bytes = Math.max(totalInputFileSize, bytesPerReducer); |
| int reducers = (int) Math.ceil(bytes / bytesPerReducer); |
| reducers = Math.max(1, reducers); |
| reducers = Math.min(maxReducers, reducers); |
| |
| int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1; |
| int reducersPowerTwo = (int)Math.pow(2, reducersLog); |
| |
| if (powersOfTwo) { |
| // If the original number of reducers was a power of two, use that |
| if (reducersPowerTwo / 2 == reducers) { |
| // nothing to do |
| } else if (reducersPowerTwo > maxReducers) { |
| // If the next power of two greater than the original number of reducers is greater |
| // than the max number of reducers, use the preceding power of two, which is strictly |
| // less than the original number of reducers and hence the max |
| reducers = reducersPowerTwo / 2; |
| } else { |
| // Otherwise use the smallest power of two greater than the original number of reducers |
| reducers = reducersPowerTwo; |
| } |
| } |
| return reducers; |
| } |
| |
| /** |
| * Computes the total input file size. If block sampling was used it will scale this |
| * value by the highest sample percentage (as an estimate for input). |
| * |
| * @param inputSummary |
| * @param work |
| * @param highestSamplePercentage |
| * @return estimated total input size for job |
| */ |
| public static long getTotalInputFileSize (ContentSummary inputSummary, MapWork work, |
| double highestSamplePercentage) { |
| long totalInputFileSize = inputSummary.getLength(); |
| if (MapUtils.isEmpty(work.getNameToSplitSample())) { |
| // If percentage block sampling wasn't used, we don't need to do any estimation |
| return totalInputFileSize; |
| } |
| |
| if (highestSamplePercentage >= 0) { |
| totalInputFileSize = Math.min((long) (totalInputFileSize * (highestSamplePercentage / 100D)) |
| , totalInputFileSize); |
| } |
| return totalInputFileSize; |
| } |
| |
| /** |
| * Computes the total number of input files. If block sampling was used it will scale this |
| * value by the highest sample percentage (as an estimate for # input files). |
| * |
| * @param inputSummary |
| * @param work |
| * @param highestSamplePercentage |
| * @return |
| */ |
| public static long getTotalInputNumFiles (ContentSummary inputSummary, MapWork work, |
| double highestSamplePercentage) { |
| long totalInputNumFiles = inputSummary.getFileCount(); |
| if (MapUtils.isEmpty(work.getNameToSplitSample())) { |
| // If percentage block sampling wasn't used, we don't need to do any estimation |
| return totalInputNumFiles; |
| } |
| |
| if (highestSamplePercentage >= 0) { |
| totalInputNumFiles = Math.min((long) (totalInputNumFiles * (highestSamplePercentage / 100D)) |
| , totalInputNumFiles); |
| } |
| return totalInputNumFiles; |
| } |
| |
| /** |
| * Returns the highest sample percentage of any alias in the given MapWork |
| */ |
| public static double getHighestSamplePercentage (MapWork work) { |
| double highestSamplePercentage = 0; |
| for (String alias : work.getAliasToWork().keySet()) { |
| if (work.getNameToSplitSample().containsKey(alias)) { |
| Double rate = work.getNameToSplitSample().get(alias).getPercent(); |
| if (rate != null && rate > highestSamplePercentage) { |
| highestSamplePercentage = rate; |
| } |
| } else { |
| highestSamplePercentage = -1; |
| break; |
| } |
| } |
| |
| return highestSamplePercentage; |
| } |
| |
| /** |
| * On Tez we're not creating dummy files when getting/setting input paths. |
| * We let Tez handle the situation. We're also setting the paths in the AM |
| * so we don't want to depend on scratch dir and context. |
| */ |
| public static List<Path> getInputPathsTez(JobConf job, MapWork work) throws Exception { |
| String scratchDir = job.get(DagUtils.TEZ_TMP_DIR_KEY); |
| |
| List<Path> paths = getInputPaths(job, work, new Path(scratchDir), null, true); |
| |
| return paths; |
| } |
| |
| /** |
| * Appends vertex name to specified counter name. |
| * |
| * @param counter counter to be appended with |
| * @param vertexName vertex name |
| * @return counter name with vertex name appended |
| */ |
| public static String getVertexCounterName(String counter, String vertexName) { |
| if (vertexName != null && !vertexName.isEmpty()) { |
| vertexName = "_" + vertexName.replace(" ", "_"); |
| } |
| return counter + vertexName; |
| } |
| |
| /** |
| * Computes a list of all input paths needed to compute the given MapWork. All aliases |
| * are considered and a merged list of input paths is returned. If any input path points |
| * to an empty table or partition a dummy file in the scratch dir is instead created and |
| * added to the list. This is needed to avoid special casing the operator pipeline for |
| * these cases. |
| * |
| * @param job JobConf used to run the job |
| * @param work MapWork encapsulating the info about the task |
| * @param hiveScratchDir The tmp dir used to create dummy files if needed |
| * @param ctx Context object |
| * @return List of paths to process for the given MapWork |
| * @throws Exception |
| */ |
| public static List<Path> getInputPaths(JobConf job, MapWork work, Path hiveScratchDir, |
| Context ctx, boolean skipDummy) throws Exception { |
| |
| PerfLogger perfLogger = SessionState.getPerfLogger(); |
| perfLogger.perfLogBegin(CLASS_NAME, PerfLogger.INPUT_PATHS); |
| |
| Set<Path> pathsProcessed = new HashSet<Path>(); |
| List<Path> pathsToAdd = new LinkedList<Path>(); |
| DriverState driverState = DriverState.getDriverState(); |
| // AliasToWork contains all the aliases |
| Collection<String> aliasToWork = work.getAliasToWork().keySet(); |
| if (!skipDummy) { |
| // ConcurrentModification otherwise if adding dummy. |
| aliasToWork = new ArrayList<>(aliasToWork); |
| } |
| for (String alias : aliasToWork) { |
| LOG.info("Processing alias {}", alias); |
| |
| // The alias may not have any path |
| Collection<Map.Entry<Path, List<String>>> pathToAliases = work.getPathToAliases().entrySet(); |
| if (!skipDummy) { |
| // ConcurrentModification otherwise if adding dummy. |
| pathToAliases = new ArrayList<>(pathToAliases); |
| } |
| boolean isEmptyTable = true; |
| boolean hasLogged = false; |
| |
| for (Map.Entry<Path, List<String>> e : pathToAliases) { |
| if (driverState != null && driverState.isAborted()) { |
| throw new IOException("Operation is Canceled."); |
| } |
| |
| Path file = e.getKey(); |
| List<String> aliases = e.getValue(); |
| if (aliases.contains(alias)) { |
| if (file != null) { |
| isEmptyTable = false; |
| } else { |
| LOG.warn("Found a null path for alias {}", alias); |
| continue; |
| } |
| |
| // Multiple aliases can point to the same path - it should be |
| // processed only once |
| if (pathsProcessed.contains(file)) { |
| continue; |
| } |
| |
| StringInternUtils.internUriStringsInPath(file); |
| pathsProcessed.add(file); |
| LOG.debug("Adding input file {}", file); |
| if (!hasLogged) { |
| hasLogged = true; |
| LOG.info("Adding {} inputs; the first input is {}", |
| work.getPathToAliases().size(), file); |
| } |
| |
| pathsToAdd.add(file); |
| } |
| } |
| |
| // If the query references non-existent partitions |
| // We need to add a empty file, it is not acceptable to change the |
| // operator tree |
| // Consider the query: |
| // select * from (select count(1) from T union all select count(1) from |
| // T2) x; |
| // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2 |
| // rows) |
| if (isEmptyTable && !skipDummy) { |
| pathsToAdd.add(createDummyFileForEmptyTable(job, work, hiveScratchDir, alias)); |
| } |
| } |
| |
| List<Path> finalPathsToAdd = new LinkedList<>(); |
| |
| int numExecutors = getMaxExecutorsForInputListing(job, pathsToAdd.size()); |
| if (numExecutors > 1) { |
| ExecutorService pool = Executors.newFixedThreadPool(numExecutors, |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Get-Input-Paths-%d").build()); |
| |
| finalPathsToAdd.addAll(getInputPathsWithPool(job, work, hiveScratchDir, ctx, skipDummy, pathsToAdd, pool)); |
| } else { |
| for (final Path path : pathsToAdd) { |
| if (driverState != null && driverState.isAborted()) { |
| throw new IOException("Operation is Canceled."); |
| } |
| Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call(); |
| updatePathForMapWork(newPath, work, path); |
| finalPathsToAdd.add(newPath); |
| } |
| } |
| |
| perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.INPUT_PATHS); |
| |
| return finalPathsToAdd; |
| } |
| |
| @VisibleForTesting |
| static List<Path> getInputPathsWithPool(JobConf job, MapWork work, Path hiveScratchDir, |
| Context ctx, boolean skipDummy, List<Path> pathsToAdd, |
| ExecutorService pool) throws IOException, ExecutionException, InterruptedException { |
| DriverState driverState = DriverState.getDriverState(); |
| List<Path> finalPathsToAdd = new ArrayList<>(); |
| try { |
| Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>(); |
| for (final Path path : pathsToAdd) { |
| if (driverState != null && driverState.isAborted()) { |
| throw new IOException("Operation is Canceled."); |
| } |
| GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy); |
| getPathsCallableToFuture.put(callable, pool.submit(callable)); |
| } |
| pool.shutdown(); |
| |
| for (Map.Entry<GetInputPathsCallable, Future<Path>> future : getPathsCallableToFuture.entrySet()) { |
| if (driverState != null && driverState.isAborted()) { |
| throw new IOException("Operation is Canceled."); |
| } |
| |
| Path newPath = future.getValue().get(); |
| updatePathForMapWork(newPath, work, future.getKey().path); |
| finalPathsToAdd.add(newPath); |
| } |
| } finally { |
| pool.shutdownNow(); |
| } |
| return finalPathsToAdd; |
| } |
| |
| private static class GetInputPathsCallable implements Callable<Path> { |
| |
| private final Path path; |
| private final JobConf job; |
| private final MapWork work; |
| private final Path hiveScratchDir; |
| private final Context ctx; |
| private final boolean skipDummy; |
| |
| private GetInputPathsCallable(Path path, JobConf job, MapWork work, Path hiveScratchDir, |
| Context ctx, boolean skipDummy) { |
| this.path = path; |
| this.job = job; |
| this.work = work; |
| this.hiveScratchDir = hiveScratchDir; |
| this.ctx = ctx; |
| this.skipDummy = skipDummy; |
| } |
| |
| @Override |
| public Path call() throws Exception { |
| if (!this.skipDummy && isEmptyPath(this.job, this.path, this.ctx)) { |
| return createDummyFileForEmptyPartition(this.path, this.job, this.work.getPathToPartitionInfo().get(this.path), |
| this.hiveScratchDir); |
| } |
| return this.path; |
| } |
| } |
| |
| @SuppressWarnings({"rawtypes", "unchecked"}) |
| private static Path createEmptyFile(Path hiveScratchDir, |
| HiveOutputFormat outFileFormat, JobConf job, |
| Properties props, boolean dummyRow) |
| throws IOException, InstantiationException, IllegalAccessException { |
| |
| // create a dummy empty file in a new directory |
| String newDir = hiveScratchDir + Path.SEPARATOR + UUID.randomUUID().toString(); |
| Path newPath = new Path(newDir); |
| FileSystem fs = newPath.getFileSystem(job); |
| fs.mkdirs(newPath); |
| //Qualify the path against the file system. The user configured path might contain default port which is skipped |
| //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status |
| //file path. |
| newPath = fs.makeQualified(newPath); |
| String newFile = newDir + Path.SEPARATOR + "emptyFile"; |
| Path newFilePath = new Path(newFile); |
| |
| RecordWriter recWriter = outFileFormat.getHiveRecordWriter(job, newFilePath, |
| Text.class, false, props, null); |
| if (dummyRow) { |
| // empty files are omitted at CombineHiveInputFormat. |
| // for meta-data only query, it effectively makes partition columns disappear.. |
| // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955) |
| recWriter.write(new Text("empty")); // written via HiveIgnoreKeyTextOutputFormat |
| } |
| recWriter.close(false); |
| |
| return StringInternUtils.internUriStringsInPath(newPath); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private static Path createDummyFileForEmptyPartition(Path path, JobConf job, PartitionDesc partDesc, |
| Path hiveScratchDir) throws Exception { |
| |
| String strPath = path.toString(); |
| |
| // The input file does not exist, replace it by a empty file |
| if (partDesc.getTableDesc().isNonNative()) { |
| // if this isn't a hive table we can't create an empty file for it. |
| return path; |
| } |
| |
| Properties props = SerDeUtils.createOverlayedProperties( |
| partDesc.getTableDesc().getProperties(), partDesc.getProperties()); |
| HiveOutputFormat outFileFormat = HiveFileFormatUtils.getHiveOutputFormat(job, partDesc); |
| |
| boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class; |
| |
| Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, props, oneRow); |
| |
| LOG.info("Changed input file {} to empty file {} ({})", strPath, newPath, oneRow); |
| return newPath; |
| } |
| |
| private static void updatePathForMapWork(Path newPath, MapWork work, Path path) { |
| // update the work |
| if (!newPath.equals(path)) { |
| PartitionDesc partDesc = work.getPathToPartitionInfo().get(path); |
| work.addPathToAlias(newPath, work.getPathToAliases().get(path)); |
| work.removePathToAlias(path); |
| |
| work.removePathToPartitionInfo(path); |
| work.addPathToPartitionInfo(newPath, partDesc); |
| } |
| } |
| |
| @SuppressWarnings("rawtypes") |
| private static Path createDummyFileForEmptyTable(JobConf job, MapWork work, |
| Path hiveScratchDir, String alias) |
| throws Exception { |
| |
| TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc(); |
| if (tableDesc.isNonNative()) { |
| // if it does not need native storage, we can't create an empty file for it. |
| return null; |
| } |
| |
| Properties props = tableDesc.getProperties(); |
| HiveOutputFormat outFileFormat = HiveFileFormatUtils.getHiveOutputFormat(job, tableDesc); |
| |
| Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, props, false); |
| |
| LOG.info("Changed input file for alias {} to newPath", alias, newPath); |
| |
| // update the work |
| |
| Map<Path, List<String>> pathToAliases = work.getPathToAliases(); |
| List<String> newList = new ArrayList<String>(1); |
| newList.add(alias); |
| pathToAliases.put(newPath, newList); |
| |
| work.setPathToAliases(pathToAliases); |
| |
| PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone(); |
| work.addPathToPartitionInfo(newPath, pDesc); |
| |
| return newPath; |
| } |
| |
| private static final Path[] EMPTY_PATH = new Path[0]; |
| |
| /** |
| * setInputPaths add all the paths in the provided list to the Job conf object |
| * as input paths for the job. |
| * |
| * @param job |
| * @param pathsToAdd |
| */ |
| public static void setInputPaths(JobConf job, List<Path> pathsToAdd) { |
| |
| Path[] addedPaths = FileInputFormat.getInputPaths(job); |
| if (addedPaths == null) { |
| addedPaths = EMPTY_PATH; |
| } |
| |
| Path[] combined = new Path[addedPaths.length + pathsToAdd.size()]; |
| System.arraycopy(addedPaths, 0, combined, 0, addedPaths.length); |
| |
| int i = 0; |
| for (Path p: pathsToAdd) { |
| combined[addedPaths.length + (i++)] = p; |
| } |
| FileInputFormat.setInputPaths(job, combined); |
| } |
| |
| /** |
| * Set hive input format, and input format file if necessary. |
| */ |
| public static void setInputAttributes(Configuration conf, MapWork mWork) { |
| HiveConf.ConfVars var = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ? |
| HiveConf.ConfVars.HIVETEZINPUTFORMAT : HiveConf.ConfVars.HIVEINPUTFORMAT; |
| if (mWork.getInputformat() != null) { |
| HiveConf.setVar(conf, var, mWork.getInputformat()); |
| } |
| // Intentionally overwrites anything the user may have put here |
| conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted()); |
| } |
| |
| /** |
| * Hive uses tmp directories to capture the output of each FileSinkOperator. |
| * This method creates all necessary tmp directories for FileSinks in the Mapwork. |
| * |
| * @param conf Used to get the right FileSystem |
| * @param mWork Used to find FileSinkOperators |
| * @throws IOException |
| */ |
| public static void createTmpDirs(Configuration conf, MapWork mWork) |
| throws IOException { |
| |
| Map<Path, List<String>> pa = mWork.getPathToAliases(); |
| if (MapUtils.isNotEmpty(pa)) { |
| // common case: 1 table scan per map-work |
| // rare case: smb joins |
| HashSet<String> aliases = new HashSet<String>(1); |
| List<Operator<? extends OperatorDesc>> ops = |
| new ArrayList<Operator<? extends OperatorDesc>>(); |
| for (List<String> ls : pa.values()) { |
| for (String a : ls) { |
| aliases.add(a); |
| } |
| } |
| for (String a : aliases) { |
| ops.add(mWork.getAliasToWork().get(a)); |
| } |
| createTmpDirs(conf, ops); |
| } |
| } |
| |
| /** |
| * Hive uses tmp directories to capture the output of each FileSinkOperator. |
| * This method creates all necessary tmp directories for FileSinks in the ReduceWork. |
| * |
| * @param conf Used to get the right FileSystem |
| * @param rWork Used to find FileSinkOperators |
| * @throws IOException |
| */ |
| public static void createTmpDirs(Configuration conf, ReduceWork rWork) |
| throws IOException { |
| if (rWork == null) { |
| return; |
| } |
| List<Operator<? extends OperatorDesc>> ops |
| = new LinkedList<Operator<? extends OperatorDesc>>(); |
| ops.add(rWork.getReducer()); |
| createTmpDirs(conf, ops); |
| } |
| |
| private static void createTmpDirs(Configuration conf, |
| List<Operator<? extends OperatorDesc>> ops) throws IOException { |
| |
| while (!ops.isEmpty()) { |
| Operator<? extends OperatorDesc> op = ops.remove(0); |
| |
| if (op instanceof FileSinkOperator) { |
| FileSinkDesc fdesc = ((FileSinkOperator) op).getConf(); |
| if (fdesc.isMmTable() || fdesc.isDirectInsert()) { |
| // No need to create for MM tables, or ACID insert |
| continue; |
| } |
| Path tempDir = fdesc.getDirName(); |
| if (tempDir != null) { |
| Path tempPath = Utilities.toTempPath(tempDir); |
| FileSystem fs = tempPath.getFileSystem(conf); |
| fs.mkdirs(tempPath); |
| } |
| } |
| |
| if (op.getChildOperators() != null) { |
| ops.addAll(op.getChildOperators()); |
| } |
| } |
| } |
| |
| public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath, |
| FsPermission fsPermission, boolean recursive) throws IOException { |
| String origUmask = null; |
| LOG.debug("Create dirs {} with permission {} recursive {}", |
| mkdirPath, fsPermission, recursive); |
| if (recursive) { |
| origUmask = conf.get(FsPermission.UMASK_LABEL); |
| // this umask is required because by default the hdfs mask is 022 resulting in |
| // all parents getting the fsPermission & !(022) permission instead of fsPermission |
| conf.set(FsPermission.UMASK_LABEL, "000"); |
| } |
| FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf); |
| boolean retval = false; |
| try { |
| retval = fs.mkdirs(mkdirPath, fsPermission); |
| resetUmaskInConf(conf, recursive, origUmask); |
| } catch (IOException ioe) { |
| resetUmaskInConf(conf, recursive, origUmask); |
| throw ioe; |
| } finally { |
| IOUtils.closeStream(fs); |
| } |
| return retval; |
| } |
| |
| private static void resetUmaskInConf(Configuration conf, boolean unsetUmask, String origUmask) { |
| if (unsetUmask) { |
| if (origUmask != null) { |
| conf.set(FsPermission.UMASK_LABEL, origUmask); |
| } else { |
| conf.unset(FsPermission.UMASK_LABEL); |
| } |
| } |
| } |
| |
| /** |
| * Returns true if a plan is both configured for vectorized execution |
| * and the node is vectorized. |
| * |
| * The plan may be configured for vectorization |
| * but vectorization disallowed eg. for FetchOperator execution. |
| */ |
| public static boolean getIsVectorized(Configuration conf) { |
| if (conf.get(VECTOR_MODE) != null) { |
| // this code path is necessary, because with HS2 and client |
| // side split generation we end up not finding the map work. |
| // This is because of thread local madness (tez split |
| // generation is multi-threaded - HS2 plan cache uses thread |
| // locals). |
| return |
| conf.getBoolean(VECTOR_MODE, false); |
| } else { |
| if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && |
| Utilities.getPlanPath(conf) != null) { |
| MapWork mapWork = Utilities.getMapWork(conf); |
| return mapWork.getVectorMode(); |
| } else { |
| return false; |
| } |
| } |
| } |
| |
| |
| public static boolean getIsVectorized(Configuration conf, MapWork mapWork) { |
| return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && |
| mapWork.getVectorMode(); |
| } |
| |
| /** |
| * @param conf |
| * @return the configured VectorizedRowBatchCtx for a MapWork task. |
| */ |
| public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) { |
| VectorizedRowBatchCtx result = null; |
| if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && |
| Utilities.getPlanPath(conf) != null) { |
| MapWork mapWork = Utilities.getMapWork(conf); |
| if (mapWork != null && mapWork.getVectorMode()) { |
| result = mapWork.getVectorizedRowBatchCtx(); |
| } |
| } |
| return result; |
| } |
| |
| public static void clearWorkMapForConf(Configuration conf) { |
| // Remove cached query plans for the current query only |
| Path mapPath = getPlanPath(conf, MAP_PLAN_NAME); |
| Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME); |
| if (mapPath != null) { |
| gWorkMap.get(conf).remove(mapPath); |
| } |
| if (reducePath != null) { |
| gWorkMap.get(conf).remove(reducePath); |
| } |
| // TODO: should this also clean merge work? |
| } |
| |
| public static void clearWorkMap(Configuration conf) { |
| gWorkMap.get(conf).clear(); |
| } |
| |
| /** |
| * Skip header lines in the table file when reading the record. |
| * |
| * @param currRecReader |
| * Record reader. |
| * |
| * @param headerCount |
| * Header line number of the table files. |
| * |
| * @param key |
| * Key of current reading record. |
| * |
| * @param value |
| * Value of current reading record. |
| * |
| * @return Return true if there are 0 or more records left in the file |
| * after skipping all headers, otherwise return false. |
| */ |
| public static <K, V> boolean skipHeader(RecordReader<K, V> currRecReader, int headerCount, K key, V value) |
| throws IOException { |
| while (headerCount > 0) { |
| if (!currRecReader.next(key, value)) { |
| return false; |
| } |
| headerCount--; |
| } |
| return true; |
| } |
| |
| /** |
| * Get header line count for a table. |
| * |
| * @param table |
| * Table description for target table. |
| * |
| */ |
| public static int getHeaderCount(TableDesc table) throws IOException { |
| int headerCount; |
| try { |
| headerCount = |
| getHeaderOrFooterCount(table, serdeConstants.HEADER_COUNT); |
| } catch (NumberFormatException nfe) { |
| throw new IOException(nfe); |
| } |
| return headerCount; |
| } |
| |
| /** |
| * Get footer line count for a table. |
| * |
| * @param table |
| * Table description for target table. |
| * |
| * @param job |
| * Job configuration for current job. |
| */ |
| public static int getFooterCount(TableDesc table, JobConf job) throws IOException { |
| int footerCount; |
| try { |
| footerCount = |
| getHeaderOrFooterCount(table, serdeConstants.FOOTER_COUNT); |
| if (footerCount > HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_FILE_MAX_FOOTER)) { |
| throw new IOException("footer number exceeds the limit defined in hive.file.max.footer"); |
| } |
| } catch (NumberFormatException nfe) { |
| |
| // Footer line number must be set as an integer. |
| throw new IOException(nfe); |
| } |
| return footerCount; |
| } |
| |
| private static int getHeaderOrFooterCount(TableDesc table, |
| String propertyName) { |
| int count = |
| Integer.parseInt(table.getProperties().getProperty(propertyName, "0")); |
| if (count > 0 && table.getInputFileFormatClass() != null |
| && !TextInputFormat.class |
| .isAssignableFrom(table.getInputFileFormatClass())) { |
| LOG.warn(propertyName |
| + " is only valid for TextInputFormat, ignoring the value."); |
| count = 0; |
| } |
| return count; |
| } |
| |
| /** |
| * Convert path to qualified path. |
| * |
| * @param conf |
| * Hive configuration. |
| * @param path |
| * Path to convert. |
| * @return Qualified path |
| */ |
| public static String getQualifiedPath(HiveConf conf, Path path) throws HiveException { |
| FileSystem fs; |
| if (path == null) { |
| return null; |
| } |
| |
| try { |
| fs = path.getFileSystem(conf); |
| return fs.makeQualified(path).toString(); |
| } |
| catch (IOException e) { |
| throw new HiveException(e); |
| } |
| } |
| |
| /** |
| * Checks if the current HiveServer2 logging operation level is >= PERFORMANCE. |
| * @param conf Hive configuration. |
| * @return true if current HiveServer2 logging operation level is >= PERFORMANCE. |
| * Else, false. |
| */ |
| public static boolean isPerfOrAboveLogging(HiveConf conf) { |
| String loggingLevel = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL); |
| return conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED) && |
| (loggingLevel.equalsIgnoreCase("PERFORMANCE") || loggingLevel.equalsIgnoreCase("VERBOSE")); |
| } |
| |
| /** |
| * Returns the full path to the Jar containing the class. It always return a JAR. |
| * |
| * @param klass |
| * class. |
| * |
| * @return path to the Jar containing the class. |
| */ |
| @SuppressWarnings("rawtypes") |
| public static String jarFinderGetJar(Class klass) { |
| Preconditions.checkNotNull(klass, "klass"); |
| ClassLoader loader = klass.getClassLoader(); |
| if (loader != null) { |
| String class_file = klass.getName().replaceAll("\\.", "/") + ".class"; |
| try { |
| for (Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { |
| URL url = (URL) itr.nextElement(); |
| String path = url.getPath(); |
| if (path.startsWith("file:")) { |
| path = path.substring("file:".length()); |
| } |
| path = URLDecoder.decode(path, "UTF-8"); |
| if ("jar".equals(url.getProtocol())) { |
| path = URLDecoder.decode(path, "UTF-8"); |
| return path.replaceAll("!.*$", ""); |
| } |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Sets up the job so that all necessary jars ar passed that contain classes from the given argument of this method. |
| * @param conf jobConf instance to setup |
| * @param classes the classes to look in jars for |
| * @throws IOException |
| */ |
| public static void addDependencyJars(Configuration conf, Class<?>... classes) |
| throws IOException { |
| FileSystem localFs = FileSystem.getLocal(conf); |
| Set<String> jars = new HashSet<>(conf.getStringCollection("tmpjars")); |
| for (Class<?> clazz : classes) { |
| if (clazz == null) { |
| continue; |
| } |
| final String path = Utilities.jarFinderGetJar(clazz); |
| if (path == null) { |
| throw new RuntimeException("Could not find jar for class " + clazz + |
| " in order to ship it to the cluster."); |
| } |
| if (!localFs.exists(new Path(path))) { |
| throw new RuntimeException("Could not validate jar file " + path + " for class " + clazz); |
| } |
| jars.add(localFs.makeQualified(new Path(path)).toString()); |
| } |
| if (jars.isEmpty()) { |
| return; |
| } |
| //noinspection ToArrayCallWithZeroLengthArrayArgument |
| conf.set("tmpjars", org.apache.hadoop.util.StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); |
| } |
| |
| public static int getDPColOffset(FileSinkDesc conf) { |
| |
| if (conf.getWriteType() == AcidUtils.Operation.DELETE) { |
| // For deletes, there is only ROW__ID in non-partitioning, non-bucketing columns. |
| //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. |
| return 1; |
| } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { |
| // For updates, ROW__ID is an extra column at index 0. |
| //See : UpdateDeleteSemanticAnalyzer::reparseAndSuperAnalyze() for details. |
| return getColumnNames(conf.getTableInfo().getProperties()).size() + 1; |
| } else { |
| return getColumnNames(conf.getTableInfo().getProperties()).size(); |
| } |
| |
| } |
| |
| public static List<String> getStatsTmpDirs(BaseWork work, Configuration conf) { |
| |
| List<String> statsTmpDirs = new ArrayList<>(); |
| if (!StatsSetupConst.StatDB.fs.name().equalsIgnoreCase(HiveConf.getVar(conf, ConfVars.HIVESTATSDBCLASS))) { |
| // no-op for non-fs stats collection |
| return statsTmpDirs; |
| } |
| // if its auto-stats gather for inserts or CTAS, stats dir will be in FileSink |
| Set<Operator<? extends OperatorDesc>> ops = work.getAllLeafOperators(); |
| if (work instanceof MapWork) { |
| // if its an analyze statement, stats dir will be in TableScan |
| ops.addAll(work.getAllRootOperators()); |
| } |
| for (Operator<? extends OperatorDesc> op : ops) { |
| OperatorDesc desc = op.getConf(); |
| String statsTmpDir = null; |
| if (desc instanceof IStatsGatherDesc) { |
| statsTmpDir = ((IStatsGatherDesc) desc).getTmpStatsDir(); |
| } |
| if (statsTmpDir != null && !statsTmpDir.isEmpty()) { |
| statsTmpDirs.add(statsTmpDir); |
| } |
| } |
| return statsTmpDirs; |
| } |
| |
| public static boolean isSchemaEvolutionEnabled(Configuration conf, boolean isAcid) { |
| return isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION); |
| } |
| |
| public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) { |
| Class<?> inputFormatClass = pd.getInputFileFormatClass(); |
| return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass); |
| } |
| |
| public static boolean isInputFileFormatVectorized(PartitionDesc pd) { |
| Class<?> inputFormatClass = pd.getInputFileFormatClass(); |
| return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass); |
| } |
| |
| public static Collection<Class<?>> getClassNamesFromConfig(HiveConf hiveConf, ConfVars confVar) { |
| String[] classNames = org.apache.hadoop.util.StringUtils.getStrings(HiveConf.getVar(hiveConf, |
| confVar)); |
| if (classNames == null) { |
| return Collections.emptyList(); |
| } |
| Collection<Class<?>> classList = new ArrayList<Class<?>>(classNames.length); |
| for (String className : classNames) { |
| if (StringUtils.isEmpty(className)) { |
| continue; |
| } |
| try { |
| classList.add(Class.forName(className)); |
| } catch (Exception ex) { |
| LOG.warn("Cannot create class {} for {} checks", className, confVar.varname); |
| } |
| } |
| return classList; |
| } |
| |
| public static void addSchemaEvolutionToTableScanOperator(Table table, |
| TableScanOperator tableScanOp) { |
| String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols()); |
| String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols()); |
| tableScanOp.setSchemaEvolution(colNames, colTypes); |
| } |
| |
| public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI, |
| TableScanOperator tableScanOp) { |
| String colNames = ObjectInspectorUtils.getFieldNames(structOI); |
| String colTypes = ObjectInspectorUtils.getFieldTypes(structOI); |
| tableScanOp.setSchemaEvolution(colNames, colTypes); |
| } |
| |
| public static void unsetSchemaEvolution(Configuration conf) { |
| conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS); |
| conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES); |
| } |
| |
| public static void addTableSchemaToConf(Configuration conf, |
| TableScanOperator tableScanOp) { |
| String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns(); |
| if (schemaEvolutionColumns != null) { |
| conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns()); |
| conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes()); |
| } else { |
| LOG.info("schema.evolution.columns and schema.evolution.columns.types not available"); |
| } |
| } |
| |
| /** |
| * Sets partition column names to the configuration, if there is available info in the operator. |
| */ |
| public static void setPartitionColumnNames(Configuration conf, TableScanOperator tableScanOp) { |
| TableScanDesc scanDesc = tableScanOp.getConf(); |
| Table metadata = scanDesc.getTableMetadata(); |
| if (metadata == null) { |
| return; |
| } |
| List<FieldSchema> partCols = metadata.getPartCols(); |
| if (partCols != null && !partCols.isEmpty()) { |
| conf.set(serdeConstants.LIST_PARTITION_COLUMNS, MetaStoreUtils.getColumnNamesFromFieldSchema(partCols)); |
| } |
| } |
| |
| /** |
| * Returns a list with partition column names present in the configuration, |
| * or empty if there is no such information available. |
| */ |
| public static List<String> getPartitionColumnNames(Configuration conf) { |
| String colNames = conf.get(serdeConstants.LIST_PARTITION_COLUMNS); |
| if (colNames != null) { |
| return splitColNames(new ArrayList<>(), colNames); |
| } else { |
| return Collections.emptyList(); |
| } |
| } |
| |
| /** |
| * Create row key and value object inspectors for reduce vectorization. |
| * The row object inspector used by ReduceWork needs to be a **standard** |
| * struct object inspector, not just any struct object inspector. |
| * @param keyInspector |
| * @param valueInspector |
| * @return OI |
| * @throws HiveException |
| */ |
| public static StandardStructObjectInspector constructVectorizedReduceRowOI( |
| StructObjectInspector keyInspector, StructObjectInspector valueInspector) |
| throws HiveException { |
| |
| ArrayList<String> colNames = new ArrayList<String>(); |
| ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); |
| List<? extends StructField> fields = keyInspector.getAllStructFieldRefs(); |
| for (StructField field: fields) { |
| colNames.add(Utilities.ReduceField.KEY.toString() + '.' + field.getFieldName()); |
| ois.add(field.getFieldObjectInspector()); |
| } |
| fields = valueInspector.getAllStructFieldRefs(); |
| for (StructField field: fields) { |
| colNames.add(Utilities.ReduceField.VALUE.toString() + '.' + field.getFieldName()); |
| ois.add(field.getFieldObjectInspector()); |
| } |
| StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); |
| |
| return rowObjectInspector; |
| } |
| |
| public static String humanReadableByteCount(long bytes) { |
| int unit = 1000; // use binary units instead? |
| if (bytes < unit) { |
| return bytes + "B"; |
| } |
| int exp = (int) (Math.log(bytes) / Math.log(unit)); |
| String suffix = "KMGTPE".charAt(exp-1) + ""; |
| return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix); |
| } |
| |
| private static final String MANIFEST_EXTENSION = ".manifest"; |
| |
| private static void tryDelete(FileSystem fs, Path path) { |
| try { |
| fs.delete(path, true); |
| } catch (IOException ex) { |
| LOG.error("Failed to delete {}", path, ex); |
| } |
| } |
| |
| public static Path[] getDirectInsertDirectoryCandidates(FileSystem fs, Path path, int dpLevels, |
| PathFilter filter, long writeId, int stmtId, Configuration conf, |
| Boolean isBaseDir, AcidUtils.Operation acidOperation) throws IOException { |
| int skipLevels = dpLevels; |
| if (filter == null) { |
| filter = new AcidUtils.IdPathFilter(writeId, stmtId); |
| } |
| if (skipLevels == 0) { |
| return statusToPath(fs.listStatus(path, filter)); |
| } |
| // TODO: for some reason, globStatus doesn't work for masks like "...blah/*/delta_0000007_0000007*" |
| // the last star throws it off. So, for now, if stmtId is missing use recursion. |
| // For the same reason, we cannot use it if we don't know isBaseDir. Currently, we don't |
| // /want/ to know isBaseDir because that is error prone; so, it ends up never being used. |
| if (stmtId < 0 || isBaseDir == null |
| || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { |
| return getDirectInsertDirectoryCandidatesRecursive(fs, path, skipLevels, filter); |
| } |
| return getDirectInsertDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir, |
| acidOperation); |
| } |
| |
| private static boolean isS3(FileSystem fs) { |
| try { |
| return "s3a".equalsIgnoreCase(fs.getScheme()); |
| } catch (UnsupportedOperationException ex) { |
| // Some FS-es do not implement getScheme, e.g. ProxyLocalFileSystem. |
| return false; |
| } |
| } |
| |
| private static Path[] statusToPath(FileStatus[] statuses) { |
| if (statuses == null) { |
| return null; |
| } |
| Path[] paths = new Path[statuses.length]; |
| for (int i = 0; i < statuses.length; ++i) { |
| paths[i] = statuses[i].getPath(); |
| } |
| return paths; |
| } |
| |
| private static Path[] getDirectInsertDirectoryCandidatesRecursive(FileSystem fs, |
| Path path, int skipLevels, PathFilter filter) throws IOException { |
| String lastRelDir = null; |
| HashSet<Path> results = new HashSet<Path>(); |
| String relRoot = Path.getPathWithoutSchemeAndAuthority(path).toString(); |
| if (!relRoot.endsWith(Path.SEPARATOR)) { |
| relRoot += Path.SEPARATOR; |
| } |
| RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(path, true); |
| while (allFiles.hasNext()) { |
| LocatedFileStatus lfs = allFiles.next(); |
| Path lfsPath = lfs.getPath(); |
| Path dirPath = Path.getPathWithoutSchemeAndAuthority(lfsPath); |
| String dir = dirPath.toString(); |
| if (!dir.startsWith(relRoot)) { |
| throw new IOException("Path " + lfsPath + " is not under " + relRoot |
| + " (when shortened to " + dir + ")"); |
| } |
| String subDir = dir.substring(relRoot.length()); |
| Utilities.FILE_OP_LOGGER.trace("Looking at {} from {}", subDir, lfsPath); |
| |
| // If sorted, we'll skip a bunch of files. |
| if (lastRelDir != null && subDir.startsWith(lastRelDir)) { |
| continue; |
| } |
| int startIx = skipLevels > 0 ? -1 : 0; |
| for (int i = 0; i < skipLevels; ++i) { |
| startIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1); |
| if (startIx == -1) { |
| Utilities.FILE_OP_LOGGER.info("Expected level of nesting ({}) is not " |
| + " present in {} (from {})", skipLevels, subDir, lfsPath); |
| break; |
| } |
| } |
| if (startIx == -1) { |
| continue; |
| } |
| int endIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1); |
| if (endIx == -1) { |
| Utilities.FILE_OP_LOGGER.info("Expected level of nesting ({}) is not present in" |
| + " {} (from {})", (skipLevels + 1), subDir, lfsPath); |
| continue; |
| } |
| lastRelDir = subDir = subDir.substring(0, endIx); |
| Path candidate = new Path(relRoot, subDir); |
| if (!filter.accept(candidate)) { |
| continue; |
| } |
| results.add(fs.makeQualified(candidate)); |
| } |
| return results.toArray(new Path[results.size()]); |
| } |
| |
| private static Path[] getDirectInsertDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, |
| PathFilter filter, long writeId, int stmtId, boolean isBaseDir, AcidUtils.Operation acidOperation) throws IOException { |
| StringBuilder sb = new StringBuilder(path.toUri().getPath()); |
| for (int i = 0; i < skipLevels; i++) { |
| sb.append(Path.SEPARATOR).append('*'); |
| } |
| if (stmtId < 0) { |
| // Note: this does not work. |
| // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(writeId, writeId)).append("_*"); |
| throw new AssertionError("GlobStatus should not be called without a statement ID"); |
| } else { |
| String deltaSubDir = AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId); |
| if (AcidUtils.Operation.DELETE.equals(acidOperation)) { |
| deltaSubDir = AcidUtils.deleteDeltaSubdir(writeId, writeId, stmtId); |
| } |
| if (AcidUtils.Operation.UPDATE.equals(acidOperation)) { |
| String deltaPostFix = deltaSubDir.replace("delta", ""); |
| deltaSubDir = "{delete_delta,delta}" + deltaPostFix; |
| } |
| sb.append(Path.SEPARATOR).append(deltaSubDir); |
| } |
| Path pathPattern = new Path(path, sb.toString()); |
| return statusToPath(fs.globStatus(pathPattern, filter)); |
| } |
| |
| private static void tryDeleteAllDirectInsertFiles(FileSystem fs, Path specPath, Path manifestDir, |
| int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, int stmtId, |
| Configuration conf, AcidUtils.Operation acidOperation) throws IOException { |
| Path[] files = getDirectInsertDirectoryCandidates( |
| fs, specPath, dpLevels, filter, writeId, stmtId, conf, null, acidOperation); |
| if (files != null) { |
| for (Path path : files) { |
| Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); |
| tryDelete(fs, path); |
| } |
| } |
| Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", manifestDir); |
| fs.delete(manifestDir, true); |
| } |
| |
| |
| public static void writeCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs, |
| String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite, |
| boolean hasDynamicPartitions, Set<String> dynamicPartitionSpecs, String staticSpec, boolean isDelete) throws HiveException { |
| |
| // When doing a multi-statement insert overwrite with dynamic partitioning, |
| // the partition information will be written to the manifest file. |
| // This is needed because in this use case each FileSinkOperator should clean-up |
| // only the partition directories written by the same FileSinkOperator and do not |
| // clean-up the partition directories written by the other FileSinkOperators. |
| // If a statement from the insert overwrite query, doesn't produce any data, |
| // a manifest file will still be written, otherwise the missing manifest file |
| // would result a clean-up on table level which could delete the data written by |
| // the other FileSinkOperators. (For further details please see HIVE-23114.) |
| boolean writeDynamicPartitionsToManifest = hasDynamicPartitions; |
| if (commitPaths.isEmpty() && !writeDynamicPartitionsToManifest) { |
| return; |
| } |
| |
| // We assume one FSOP per task (per specPath), so we create it in specPath. |
| Path manifestPath = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite, staticSpec, isDelete); |
| manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); |
| Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); |
| try { |
| // Don't overwrite the manifest... should fail if we have collisions. |
| try (FSDataOutputStream out = fs.create(manifestPath, false)) { |
| if (out == null) { |
| throw new HiveException("Failed to create manifest at " + manifestPath); |
| } |
| |
| if (writeDynamicPartitionsToManifest) { |
| out.writeInt(dynamicPartitionSpecs.size()); |
| for (String dynamicPartitionSpec : dynamicPartitionSpecs) { |
| out.writeUTF(dynamicPartitionSpec.toString()); |
| } |
| } |
| |
| out.writeInt(commitPaths.size()); |
| for (Path path : commitPaths) { |
| out.writeUTF(path.toString()); |
| } |
| } |
| } catch (IOException e) { |
| throw new HiveException(e); |
| } |
| } |
| |
| private static Path getManifestDir(Path specPath, long writeId, int stmtId, String unionSuffix, |
| boolean isInsertOverwrite, String staticSpec, boolean isDelete) { |
| Path manifestRoot = specPath; |
| if (staticSpec != null) { |
| String tableRoot = specPath.toString(); |
| tableRoot = tableRoot.substring(0, tableRoot.length() - staticSpec.length()); |
| manifestRoot = new Path(tableRoot); |
| } |
| |
| String deltaDir = AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId); |
| if (isDelete) { |
| deltaDir = AcidUtils.deleteDeltaSubdir(writeId, writeId, stmtId); |
| } |
| Path manifestPath = new Path(manifestRoot, "_tmp." + deltaDir); |
| |
| if (isInsertOverwrite) { |
| // When doing a multi-statement insert overwrite query with dynamic partitioning, the |
| // generated manifest directory is the same for each FileSinkOperator. |
| // To resolve this name collision, extending the manifest path with the statement id. |
| manifestPath = new Path(manifestPath + "_" + stmtId); |
| } |
| |
| return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); |
| } |
| |
| public static final class MissingBucketsContext { |
| public final TableDesc tableInfo; |
| public final int numBuckets; |
| public final boolean isCompressed; |
| public MissingBucketsContext(TableDesc tableInfo, int numBuckets, boolean isCompressed) { |
| this.tableInfo = tableInfo; |
| this.numBuckets = numBuckets; |
| this.isCompressed = isCompressed; |
| } |
| } |
| |
| public static void handleDirectInsertTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, |
| boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId, |
| Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite, boolean isDirectInsert, |
| String staticSpec, AcidUtils.Operation acidOperation, FileSinkDesc conf) throws IOException, HiveException { |
| FileSystem fs = specPath.getFileSystem(hconf); |
| boolean isDelete = AcidUtils.Operation.DELETE.equals(acidOperation); |
| Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite, staticSpec, isDelete); |
| if (!success) { |
| AcidUtils.IdPathFilter filter = new AcidUtils.IdPathFilter(writeId, stmtId); |
| tryDeleteAllDirectInsertFiles(fs, specPath, manifestDir, dpLevels, lbLevels, |
| filter, writeId, stmtId, hconf, acidOperation); |
| return; |
| } |
| |
| Utilities.FILE_OP_LOGGER.debug("Looking for manifests in: {} ({})", manifestDir, writeId); |
| List<Path> manifests = new ArrayList<>(); |
| try { |
| FileStatus[] manifestFiles = fs.listStatus(manifestDir); |
| manifests = selectManifestFiles(manifestFiles); |
| } catch (FileNotFoundException ex) { |
| Utilities.FILE_OP_LOGGER.info("No manifests found in directory {} - query produced no output", manifestDir); |
| manifestDir = null; |
| if (!fs.exists(specPath)) { |
| // Empty insert to new partition |
| fs.mkdirs(specPath); |
| } |
| } |
| |
| Map<String, List<Path>> dynamicPartitionSpecs = new HashMap<>(); |
| Set<Path> committed = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| Set<Path> directInsertDirectories = new HashSet<>(); |
| for (Path mfp : manifests) { |
| Utilities.FILE_OP_LOGGER.info("Looking at manifest file: {}", mfp); |
| try (FSDataInputStream mdis = fs.open(mfp)) { |
| if (dpLevels > 0) { |
| int partitionCount = mdis.readInt(); |
| for (int i = 0; i < partitionCount; ++i) { |
| String nextPart = mdis.readUTF(); |
| Utilities.FILE_OP_LOGGER.debug("Looking at dynamic partition {}", nextPart); |
| if (!dynamicPartitionSpecs.containsKey(nextPart)) { |
| dynamicPartitionSpecs.put(nextPart, new ArrayList<>()); |
| } |
| } |
| } |
| |
| int fileCount = mdis.readInt(); |
| for (int i = 0; i < fileCount; ++i) { |
| String nextFile = mdis.readUTF(); |
| Utilities.FILE_OP_LOGGER.debug("Looking at committed file {}", nextFile); |
| Path path = fs.makeQualified(new Path(nextFile)); |
| if (!committed.add(path)) { |
| throw new HiveException(nextFile + " was specified in multiple manifests"); |
| } |
| dynamicPartitionSpecs.entrySet() |
| .stream() |
| .filter(dynpath -> path.toString().contains(dynpath.getKey())) |
| .findAny() |
| .ifPresent(dynPath -> dynPath.getValue().add(path)); |
| |
| Path parentDirPath = path.getParent(); |
| while (AcidUtils.isChildOfDelta(parentDirPath, specPath)) { |
| // Some cases there are other directory layers between the delta and the datafiles |
| // (export-import mm table, insert with union all to mm table, skewed tables). |
| parentDirPath = parentDirPath.getParent(); |
| } |
| directInsertDirectories.add(parentDirPath); |
| } |
| } |
| } |
| |
| if (manifestDir != null) { |
| Utilities.FILE_OP_LOGGER.info("Deleting manifest directory {}", manifestDir); |
| tryDelete(fs, manifestDir); |
| if (unionSuffix != null) { |
| // Also delete the parent directory if we are the last union FSOP to execute. |
| manifestDir = manifestDir.getParent(); |
| FileStatus[] remainingFiles = fs.listStatus(manifestDir); |
| if (remainingFiles == null || remainingFiles.length == 0) { |
| Utilities.FILE_OP_LOGGER.info("Deleting manifest directory {}", manifestDir); |
| tryDelete(fs, manifestDir); |
| } |
| } |
| } |
| |
| if (!directInsertDirectories.isEmpty()) { |
| cleanDirectInsertDirectoriesConcurrently(directInsertDirectories, committed, fs, hconf, unionSuffix, lbLevels); |
| } |
| |
| conf.setDynPartitionValues(dynamicPartitionSpecs); |
| |
| if (!committed.isEmpty()) { |
| throw new HiveException("The following files were committed but not found: " + committed); |
| } |
| |
| if (directInsertDirectories.isEmpty()) { |
| return; |
| } |
| |
| // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing, |
| // so maintain parity here by not calling it at all. |
| if (lbLevels != 0) { |
| return; |
| } |
| |
| if (!isDirectInsert) { |
| // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles |
| // doesn't need to check anything except path and directory status for MM directories. |
| FileStatus[] finalResults = directInsertDirectories.stream() |
| .map(PathOnlyFileStatus::new) |
| .toArray(FileStatus[]::new); |
| List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, |
| unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId, |
| isMmTable, null, isInsertOverwrite); |
| // create empty buckets if necessary |
| if (!emptyBuckets.isEmpty()) { |
| assert mbc != null; |
| Utilities.createEmptyBuckets(hconf, emptyBuckets, mbc.isCompressed, mbc.tableInfo, reporter); |
| } |
| } |
| } |
| |
| /** |
| * The name of a manifest file consists of the task ID and a .manifest extension, where |
| * the task ID includes the attempt ID as well. It can happen that a task attempt already |
| * wrote out the manifest file, and then fails, so Tez restarts it. If the next attempt |
| * successfully finishes, the query won't fail but there could be multiple manifest files with |
| * the same task ID, but different attempt IDs. In this case the manifest file which has |
| * the highest attempt ID, and not empty, has to be considered. |
| * The empty manifest files and the ones with the same task ID but lower attempt ID has to be ignored. |
| * @param manifestFiles All the files listed in the manifest directory |
| * @return The list of manifest files which have the highest attempt ID and are not empty |
| */ |
| @VisibleForTesting |
| static List<Path> selectManifestFiles(FileStatus[] manifestFiles) { |
| List<Path> manifests = new ArrayList<>(); |
| if (manifestFiles != null) { |
| Map<String, Integer> fileNameToAttemptId = new HashMap<>(); |
| Map<String, Path> fileNameToPath = new HashMap<>(); |
| |
| for (FileStatus manifestFile : manifestFiles) { |
| Path path = manifestFile.getPath(); |
| if (manifestFile.getLen() == 0L) { |
| Utilities.FILE_OP_LOGGER.info("Found manifest file {}, but it is empty.", path); |
| continue; |
| } |
| String fileName = path.getName(); |
| if (fileName.endsWith(MANIFEST_EXTENSION)) { |
| Pattern pattern = Pattern.compile("([0-9]+)_([0-9]+).manifest"); |
| Matcher matcher = pattern.matcher(fileName); |
| if (matcher.matches()) { |
| String taskId = matcher.group(1); |
| int attemptId = Integer.parseInt(matcher.group(2)); |
| Integer maxAttemptId = fileNameToAttemptId.get(taskId); |
| if (maxAttemptId == null) { |
| fileNameToAttemptId.put(taskId, attemptId); |
| fileNameToPath.put(taskId, path); |
| Utilities.FILE_OP_LOGGER.info("Found manifest file {} with attemptId {}.", path, attemptId); |
| } else if (attemptId > maxAttemptId) { |
| fileNameToAttemptId.put(taskId, attemptId); |
| fileNameToPath.put(taskId, path); |
| Utilities.FILE_OP_LOGGER.info( |
| "Found manifest file {} which has higher attemptId than {}. Ignore the manifest files with attemptId below {}.", |
| path, maxAttemptId, attemptId); |
| } else { |
| Utilities.FILE_OP_LOGGER.info( |
| "Found manifest file {} with attemptId {}, but already have a manifest file with attemptId {}. Ignore this manifest file.", |
| path, attemptId, maxAttemptId); |
| } |
| } else { |
| Utilities.FILE_OP_LOGGER.info("Found manifest file {}", path); |
| manifests.add(path); |
| } |
| } |
| } |
| |
| if (!fileNameToPath.isEmpty()) { |
| manifests.addAll(fileNameToPath.values()); |
| } |
| } |
| return manifests; |
| } |
| |
| private static void cleanDirectInsertDirectoriesConcurrently( |
| Set<Path> directInsertDirectories, Set<Path> committed, FileSystem fs, Configuration hconf, String unionSuffix, int lbLevels) |
| throws IOException, HiveException { |
| |
| ExecutorService executor = createCleanTaskExecutor(hconf, directInsertDirectories.size()); |
| List<Future<Void>> cleanTaskFutures = submitCleanTasksForExecution(executor, directInsertDirectories, committed, fs, unionSuffix, lbLevels); |
| waitForCleanTasksToComplete(executor, cleanTaskFutures); |
| } |
| |
| private static ExecutorService createCleanTaskExecutor(Configuration hconf, int numOfDirectories) { |
| int threadCount = Math.min(numOfDirectories, HiveConf.getIntVar(hconf, ConfVars.HIVE_MOVE_FILES_THREAD_COUNT)); |
| threadCount = threadCount <= 0 ? 1 : threadCount; |
| return Executors.newFixedThreadPool(threadCount, |
| new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Clean-Direct-Insert-Dirs-Thread-%d").build()); |
| } |
| |
| private static List<Future<Void>> submitCleanTasksForExecution(ExecutorService executor, Set<Path> directInsertDirectories, |
| Set<Path> committed, FileSystem fs, String unionSuffix, int lbLevels) { |
| List<Future<Void>> cleanTaskFutures = new ArrayList<>(directInsertDirectories.size()); |
| for (Path directory : directInsertDirectories) { |
| Future<Void> cleanTaskFuture = executor.submit(() -> { |
| cleanDirectInsertDirectory(directory, fs, unionSuffix, lbLevels, committed); |
| return null; |
| }); |
| cleanTaskFutures.add(cleanTaskFuture); |
| } |
| return cleanTaskFutures; |
| } |
| |
| private static void waitForCleanTasksToComplete(ExecutorService executor, List<Future<Void>> cleanTaskFutures) |
| throws IOException, HiveException { |
| executor.shutdown(); |
| for (Future<Void> cleanFuture : cleanTaskFutures) { |
| try { |
| cleanFuture.get(); |
| } catch (InterruptedException | ExecutionException e) { |
| executor.shutdownNow(); |
| if (e.getCause() instanceof IOException) { |
| throw (IOException) e.getCause(); |
| } |
| if (e.getCause() instanceof HiveException) { |
| throw (HiveException) e.getCause(); |
| } |
| } |
| } |
| } |
| |
| |
| private static final class PathOnlyFileStatus extends FileStatus { |
| public PathOnlyFileStatus(Path path) { |
| super(0, true, 0, 0, 0, path); |
| } |
| } |
| |
| private static void cleanDirectInsertDirectory(Path dir, FileSystem fs, String unionSuffix, int lbLevels, Set<Path> committed) |
| throws IOException, HiveException { |
| for (FileStatus child : fs.listStatus(dir)) { |
| Path childPath = child.getPath(); |
| if (lbLevels > 0) { |
| // We need to recurse into some LB directories. We don't check the directories themselves |
| // for matches; if they are empty they don't matter, and we do will delete bad files. |
| // This recursion is not the most efficient way to do this but LB is rarely used. |
| if (child.isDirectory()) { |
| Utilities.FILE_OP_LOGGER.trace( |
| "Recursion into LB directory {}; levels remaining ", childPath, lbLevels - 1); |
| cleanDirectInsertDirectory(childPath, fs, unionSuffix, lbLevels - 1, committed); |
| } else { |
| if (committed.contains(childPath)) { |
| throw new HiveException("LB FSOP has commited " |
| + childPath + " outside of LB directory levels " + lbLevels); |
| } |
| deleteUncommitedFile(childPath, fs); |
| } |
| continue; |
| } |
| // No more LB directories expected. |
| if (unionSuffix == null) { |
| if (committed.remove(childPath)) { |
| continue; // A good file. |
| } |
| if (!childPath.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT)) { |
| deleteUncommitedFile(childPath, fs); |
| } |
| } else if (!child.isDirectory()) { |
| if (committed.contains(childPath)) { |
| throw new HiveException("Union FSOP has commited " |
| + childPath + " outside of union directory " + unionSuffix); |
| } |
| deleteUncommitedFile(childPath, fs); |
| } else if (childPath.getName().equals(unionSuffix)) { |
| // Found the right union directory; treat it as "our" directory. |
| cleanDirectInsertDirectory(childPath, fs, null, 0, committed); |
| } else { |
| String childName = childPath.getName(); |
| if (!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX) |
| && !childName.startsWith(".") && !childName.startsWith("_")) { |
| throw new HiveException("Union FSOP has an unknown directory " |
| + childPath + " outside of union directory " + unionSuffix); |
| } |
| Utilities.FILE_OP_LOGGER.trace( |
| "FSOP for {} is ignoring the other side of the union {}", unionSuffix, childPath); |
| } |
| } |
| } |
| |
| private static void deleteUncommitedFile(Path childPath, FileSystem fs) |
| throws IOException, HiveException { |
| Utilities.FILE_OP_LOGGER.info("Deleting {} that was not committed", childPath); |
| // We should actually succeed here - if we fail, don't commit the query. |
| if (!fs.delete(childPath, true)) { |
| throw new HiveException("Failed to delete an uncommitted path " + childPath); |
| } |
| } |
| |
| /** |
| * @return the complete list of valid MM directories under a table/partition path; null |
| * if the entire directory is valid (has no uncommitted/temporary files). |
| */ |
| public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf, |
| ValidWriteIdList validWriteIdList) throws IOException { |
| Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", path); |
| // NULL means this directory is entirely valid. |
| List<Path> result = null; |
| FileSystem fs = path.getFileSystem(conf); |
| FileStatus[] children = fs.listStatus(path); |
| for (int i = 0; i < children.length; ++i) { |
| FileStatus file = children[i]; |
| Path childPath = file.getPath(); |
| Long writeId = AcidUtils.extractWriteId(childPath); |
| if (!file.isDirectory() || writeId == null || !validWriteIdList.isWriteIdValid(writeId)) { |
| Utilities.FILE_OP_LOGGER.debug("Skipping path {}", childPath); |
| if (result == null) { |
| result = new ArrayList<>(children.length - 1); |
| for (int j = 0; j < i; ++j) { |
| result.add(children[j].getPath()); |
| } |
| } |
| } else if (result != null) { |
| result.add(childPath); |
| } |
| } |
| return result; |
| } |
| |
| public static String getAclStringWithHiveModification(Configuration tezConf, |
| String propertyName, |
| boolean addHs2User, |
| String user, |
| String hs2User) throws |
| IOException { |
| |
| // Start with initial ACLs |
| ACLConfigurationParser aclConf = |
| new ACLConfigurationParser(tezConf, propertyName); |
| |
| // Always give access to the user |
| aclConf.addAllowedUser(user); |
| |
| // Give access to the process user if the config is set. |
| if (addHs2User && hs2User != null) { |
| aclConf.addAllowedUser(hs2User); |
| } |
| return aclConf.toAclString(); |
| } |
| |
| public static boolean isHiveManagedFile(Path path) { |
| return AcidUtils.ORIGINAL_PATTERN.matcher(path.getName()).matches() || |
| AcidUtils.ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches(); |
| } |
| |
| /** |
| * Checks if path passed in exists and has writable permissions. |
| * The path will be created if it does not exist. |
| * @param rootHDFSDirPath |
| * @param conf |
| */ |
| public static void ensurePathIsWritable(Path rootHDFSDirPath, HiveConf conf) throws IOException { |
| FsPermission writableHDFSDirPermission = new FsPermission((short)00733); |
| FileSystem fs = rootHDFSDirPath.getFileSystem(conf); |
| |
| if (!fs.exists(rootHDFSDirPath)) { |
| synchronized (ROOT_HDFS_DIR_LOCK) { |
| if (!fs.exists(rootHDFSDirPath)) { |
| Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); |
| } |
| } |
| } |
| FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); |
| if (rootHDFSDirPath.toUri() != null) { |
| String schema = rootHDFSDirPath.toUri().getScheme(); |
| LOG.debug("HDFS dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + |
| currentHDFSDirPermission); |
| } else { |
| LOG.debug( |
| "HDFS dir: " + rootHDFSDirPath + ", permission: " + currentHDFSDirPermission); |
| } |
| // If the root HDFS scratch dir already exists, make sure it is writeable. |
| if (!((currentHDFSDirPermission.toShort() & writableHDFSDirPermission |
| .toShort()) == writableHDFSDirPermission.toShort())) { |
| throw new RuntimeException("The dir: " + rootHDFSDirPath |
| + " on HDFS should be writable. Current permissions are: " + currentHDFSDirPermission); |
| } |
| } |
| |
| // Get the bucketing version stored in the string format |
| public static int getBucketingVersion(final String versionStr) { |
| int bucketingVersion = 1; |
| if (versionStr != null) { |
| try { |
| bucketingVersion = Integer.parseInt(versionStr); |
| } catch (NumberFormatException e) { |
| // Do nothing |
| } |
| } |
| return bucketingVersion; |
| } |
| |
| public static String getPasswdFromKeystore(String keystore, String key) throws IOException { |
| String passwd = null; |
| if (keystore != null && key != null) { |
| Configuration conf = new Configuration(); |
| conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, keystore); |
| char[] pwdCharArray = conf.getPassword(key); |
| if (pwdCharArray != null) { |
| passwd = new String(pwdCharArray); |
| } |
| } |
| return passwd; |
| } |
| |
| /** |
| * Load password from the given uri. |
| * @param uriString The URI which is used to load the password. |
| * @return null if the uri is empty or null, else the password represented by the URI. |
| * @throws IOException |
| * @throws URISyntaxException |
| * @throws HiveException |
| */ |
| public static String getPasswdFromUri(String uriString) throws IOException, URISyntaxException, HiveException { |
| if (uriString == null || uriString.isEmpty()) { |
| return null; |
| } |
| return URISecretSource.getInstance().getPasswordFromUri(new URI(uriString)); |
| } |
| |
| public static String encodeColumnNames(List<String> colNames) throws SemanticException { |
| try { |
| return JSON_MAPPER.writeValueAsString(colNames); |
| } catch (IOException e) { |
| throw new SemanticException(e); |
| } |
| } |
| |
| public static List<String> decodeColumnNames(String colNamesStr) throws SemanticException { |
| try { |
| return JSON_MAPPER.readValue(colNamesStr, List.class); |
| } catch (IOException e) { |
| throw new SemanticException(e); |
| } |
| } |
| |
| /** |
| * Logs the class paths of the job class loader and the thread context class loader to the passed logger. |
| * Checks both loaders if getURLs method is available; if not, prints a message about this (instead of the class path) |
| * |
| * Note: all messages will always be logged with DEBUG log level. |
| */ |
| public static void tryLoggingClassPaths(JobConf job, Logger logger) { |
| if (logger != null && logger.isDebugEnabled()) { |
| tryToLogClassPath("conf", job.getClassLoader(), logger); |
| tryToLogClassPath("thread", Thread.currentThread().getContextClassLoader(), logger); |
| } |
| } |
| |
| private static void tryToLogClassPath(String prefix, ClassLoader loader, Logger logger) { |
| if(loader instanceof URLClassLoader) { |
| logger.debug("{} class path = {}", prefix, Arrays.asList(((URLClassLoader) loader).getURLs()).toString()); |
| } else { |
| logger.debug("{} class path = unavailable for {}", prefix, |
| loader == null ? "null" : loader.getClass().getSimpleName()); |
| } |
| } |
| |
| public static boolean arePathsEqualOrWithin(Path p1, Path p2) { |
| return ((p1.toString().toLowerCase().indexOf(p2.toString().toLowerCase()) > -1) || |
| (p2.toString().toLowerCase().indexOf(p1.toString().toLowerCase()) > -1)) ? true : false; |
| } |
| |
| public static String getTableOrMVSuffix(Context context, boolean createTableOrMVUseSuffix) { |
| String suffix = ""; |
| if (createTableOrMVUseSuffix) { |
| long txnId = Optional.ofNullable(context) |
| .map(ctx -> ctx.getHiveTxnManager().getCurrentTxnId()).orElse(0L); |
| if (txnId != 0) { |
| suffix = AcidUtils.getPathSuffix(txnId); |
| } |
| } |
| return suffix; |
| } |
| } |