| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexer; |
| |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Strings; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.FileUtils; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.IOE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.RetryUtils; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.segment.SegmentUtils; |
| import org.apache.druid.segment.loading.DataSegmentPusher; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.utils.CompressionUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.io.retry.RetryPolicy; |
| import org.apache.hadoop.io.retry.RetryProxy; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Progressable; |
| |
| import javax.annotation.Nullable; |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| import java.util.zip.ZipOutputStream; |
| |
| /** |
| */ |
| public class JobHelper |
| { |
| private static final Logger log = new Logger(JobHelper.class); |
| private static final int NUM_RETRIES = 8; |
| private static final int SECONDS_BETWEEN_RETRIES = 2; |
| private static final int DEFAULT_FS_BUFFER_SIZE = 1 << 18; // 256KiB |
| private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*-SNAPSHOT(-selfcontained)?\\.jar$"); |
| |
| public static Path distributedClassPath(String path) |
| { |
| return distributedClassPath(new Path(path)); |
| } |
| |
| public static Path distributedClassPath(Path base) |
| { |
| return new Path(base, "classpath"); |
| } |
| |
| public static final String INDEX_ZIP = "index.zip"; |
| |
| /** |
| * Dose authenticate against a secured hadoop cluster |
| * In case of any bug fix make sure to fix the code at HdfsStorageAuthentication#authenticate as well. |
| * |
| */ |
| public static void authenticate() |
| { |
| String principal = HadoopDruidIndexerConfig.HADOOP_KERBEROS_CONFIG.getPrincipal(); |
| String keytab = HadoopDruidIndexerConfig.HADOOP_KERBEROS_CONFIG.getKeytab(); |
| if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(keytab)) { |
| Configuration conf = new Configuration(); |
| UserGroupInformation.setConfiguration(conf); |
| if (UserGroupInformation.isSecurityEnabled()) { |
| try { |
| if (UserGroupInformation.getCurrentUser().hasKerberosCredentials() == false |
| || !UserGroupInformation.getCurrentUser().getUserName().equals(principal)) { |
| log.info("trying to authenticate user [%s] with keytab [%s]", principal, keytab); |
| UserGroupInformation.loginUserFromKeytab(principal, keytab); |
| } |
| } |
| catch (IOException e) { |
| throw new ISE(e, "Failed to authenticate user principal [%s] with keytab [%s]", principal, keytab); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Uploads jar files to hdfs and configures the classpath. |
| * Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs. |
| * Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs. |
| * |
| * @param distributedClassPath classpath shared across multiple jobs |
| * @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files. |
| * @param job job to run |
| * |
| * @throws IOException |
| */ |
| public static void setupClasspath( |
| final Path distributedClassPath, |
| final Path intermediateClassPath, |
| final Job job |
| ) |
| throws IOException |
| { |
| String classpathProperty = System.getProperty("druid.hadoop.internal.classpath"); |
| if (classpathProperty == null) { |
| classpathProperty = System.getProperty("java.class.path"); |
| } |
| |
| String[] jarFiles = classpathProperty.split(File.pathSeparator); |
| |
| final Configuration conf = job.getConfiguration(); |
| final FileSystem fs = distributedClassPath.getFileSystem(conf); |
| |
| if (fs instanceof LocalFileSystem) { |
| return; |
| } |
| |
| for (String jarFilePath : jarFiles) { |
| |
| final File jarFile = new File(jarFilePath); |
| if (jarFile.getName().endsWith(".jar")) { |
| try { |
| RetryUtils.retry( |
| () -> { |
| if (isSnapshot(jarFile)) { |
| addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job); |
| } else { |
| addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job); |
| } |
| return true; |
| }, |
| shouldRetryPredicate(), |
| NUM_RETRIES |
| ); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |
| |
| public static Predicate<Throwable> shouldRetryPredicate() |
| { |
| return new Predicate<Throwable>() |
| { |
| @Override |
| public boolean apply(Throwable input) |
| { |
| if (input == null) { |
| return false; |
| } |
| if (input instanceof IOException) { |
| return true; |
| } |
| return apply(input.getCause()); |
| } |
| }; |
| } |
| |
| static void addJarToClassPath( |
| File jarFile, |
| Path distributedClassPath, |
| Path intermediateClassPath, |
| FileSystem fs, |
| Job job |
| ) |
| throws IOException |
| { |
| // Create distributed directory if it does not exist. |
| // rename will always fail if destination does not exist. |
| fs.mkdirs(distributedClassPath); |
| |
| // Non-snapshot jar files are uploaded to the shared classpath. |
| final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); |
| if (shouldUploadOrReplace(jarFile, hdfsPath, fs)) { |
| // Muliple jobs can try to upload the jar here, |
| // to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath. |
| final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName()); |
| uploadJar(jarFile, intermediateHdfsPath, fs); |
| IOException exception = null; |
| try { |
| log.info("Renaming jar to path[%s]", hdfsPath); |
| fs.rename(intermediateHdfsPath, hdfsPath); |
| if (!fs.exists(hdfsPath)) { |
| throw new IOE("File does not exist even after moving from[%s] to [%s]", intermediateHdfsPath, hdfsPath); |
| } |
| } |
| catch (IOException e) { |
| // rename failed, possibly due to race condition. check if some other job has uploaded the jar file. |
| try { |
| if (!fs.exists(hdfsPath)) { |
| log.error(e, "IOException while Renaming jar file"); |
| exception = e; |
| } |
| } |
| catch (IOException e1) { |
| e.addSuppressed(e1); |
| exception = e; |
| } |
| } |
| finally { |
| try { |
| if (fs.exists(intermediateHdfsPath)) { |
| fs.delete(intermediateHdfsPath, false); |
| } |
| } |
| catch (IOException e) { |
| if (exception == null) { |
| exception = e; |
| } else { |
| exception.addSuppressed(e); |
| } |
| } |
| if (exception != null) { |
| throw exception; |
| } |
| } |
| } |
| job.addFileToClassPath(hdfsPath); |
| } |
| |
| static boolean shouldUploadOrReplace( |
| File jarFile, |
| Path hdfsPath, |
| FileSystem fs |
| ) |
| throws IOException |
| { |
| try { |
| FileStatus status = fs.getFileStatus(hdfsPath); |
| return status == null || status.getLen() != jarFile.length(); |
| } |
| catch (FileNotFoundException e) { |
| return true; |
| } |
| } |
| |
| static void addSnapshotJarToClassPath( |
| File jarFile, |
| Path intermediateClassPath, |
| FileSystem fs, |
| Job job |
| ) throws IOException |
| { |
| // Snapshot jars are uploaded to non shared intermediate directory. |
| final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName()); |
| // Prevent uploading same file multiple times in same run. |
| if (!fs.exists(hdfsPath)) { |
| uploadJar(jarFile, hdfsPath, fs); |
| } |
| job.addFileToClassPath(hdfsPath); |
| } |
| |
| static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException |
| { |
| log.info("Uploading jar to path[%s]", path); |
| try (OutputStream os = fs.create(path)) { |
| Files.copy(jarFile.toPath(), os); |
| } |
| } |
| |
| static boolean isSnapshot(File jarFile) |
| { |
| return SNAPSHOT_JAR.matcher(jarFile.getName()).matches(); |
| } |
| |
| public static void injectDruidProperties(Configuration configuration, HadoopDruidIndexerConfig hadoopDruidIndexerConfig) |
| { |
| String mapJavaOpts = StringUtils.nullToEmptyNonDruidDataString(configuration.get(MRJobConfig.MAP_JAVA_OPTS)); |
| String reduceJavaOpts = StringUtils.nullToEmptyNonDruidDataString(configuration.get(MRJobConfig.REDUCE_JAVA_OPTS)); |
| |
| for (Map.Entry<String, String> allowedProperties : hadoopDruidIndexerConfig.getAllowedProperties().entrySet()) { |
| mapJavaOpts = StringUtils.format( |
| "%s -D%s=%s", |
| mapJavaOpts, |
| allowedProperties.getKey(), |
| allowedProperties.getValue() |
| ); |
| reduceJavaOpts = StringUtils.format( |
| "%s -D%s=%s", |
| reduceJavaOpts, |
| allowedProperties.getKey(), |
| allowedProperties.getValue() |
| ); |
| } |
| |
| if (!Strings.isNullOrEmpty(mapJavaOpts)) { |
| configuration.set(MRJobConfig.MAP_JAVA_OPTS, mapJavaOpts); |
| } |
| if (!Strings.isNullOrEmpty(reduceJavaOpts)) { |
| configuration.set(MRJobConfig.REDUCE_JAVA_OPTS, reduceJavaOpts); |
| } |
| } |
| |
| public static Configuration injectSystemProperties(Configuration conf, HadoopDruidIndexerConfig hadoopDruidIndexerConfig) |
| { |
| for (String propName : HadoopDruidIndexerConfig.PROPERTIES.stringPropertyNames()) { |
| if (propName.startsWith("hadoop.")) { |
| conf.set(propName.substring("hadoop.".length()), HadoopDruidIndexerConfig.PROPERTIES.getProperty(propName)); |
| } |
| } |
| |
| for (Map.Entry<String, String> allowedProperties : hadoopDruidIndexerConfig.getAllowedProperties().entrySet()) { |
| conf.set(allowedProperties.getKey(), allowedProperties.getValue()); |
| } |
| |
| return conf; |
| } |
| |
| public static void ensurePaths(HadoopDruidIndexerConfig config) |
| { |
| authenticate(); |
| // config.addInputPaths() can have side-effects ( boo! :( ), so this stuff needs to be done before anything else |
| try { |
| Job job = Job.getInstance( |
| new Configuration(), |
| StringUtils.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals()) |
| ); |
| |
| job.getConfiguration().set("io.sort.record.percent", "0.19"); |
| injectSystemProperties(job.getConfiguration(), config); |
| config.addJobProperties(job); |
| |
| config.addInputPaths(job); |
| } |
| catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static void writeJobIdToFile(String hadoopJobIdFileName, String hadoopJobId) |
| { |
| if (hadoopJobId != null && hadoopJobIdFileName != null) { |
| try (final OutputStream out = Files.newOutputStream(Paths.get(hadoopJobIdFileName))) { |
| try (final OutputStreamWriter osw = new OutputStreamWriter(out, StandardCharsets.UTF_8)) { |
| HadoopDruidIndexerConfig.JSON_MAPPER.writeValue( |
| osw, |
| hadoopJobId |
| ); |
| } |
| log.info("MR job id [%s] is written to the file [%s]", hadoopJobId, hadoopJobIdFileName); |
| } |
| catch (IOException e) { |
| log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName); |
| } |
| } else { |
| log.info("Either job id or file name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName); |
| } |
| } |
| |
| public static boolean runSingleJob(Jobby job) |
| { |
| boolean succeeded = job.run(); |
| return succeeded; |
| } |
| |
| public static boolean runJobs(List<Jobby> jobs) |
| { |
| boolean succeeded = true; |
| for (Jobby job : jobs) { |
| if (!job.run()) { |
| succeeded = false; |
| break; |
| } |
| } |
| |
| return succeeded; |
| } |
| |
| public static void maybeDeleteIntermediatePath( |
| boolean jobSucceeded, |
| HadoopIngestionSpec indexerSchema) |
| { |
| HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema); |
| final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config); |
| config.addJobProperties(configuration); |
| JobHelper.injectDruidProperties(configuration, config); |
| if (!config.getSchema().getTuningConfig().isLeaveIntermediate()) { |
| if (jobSucceeded || config.getSchema().getTuningConfig().isCleanupOnFailure()) { |
| Path workingPath = config.makeIntermediatePath(); |
| log.info("Deleting path[%s]", workingPath); |
| try { |
| config.addJobProperties(configuration); |
| workingPath.getFileSystem(configuration).delete(workingPath, true); |
| } |
| catch (IOException e) { |
| log.error(e, "Failed to cleanup path[%s]", workingPath); |
| } |
| } |
| } |
| } |
| |
| public static DataSegmentAndIndexZipFilePath serializeOutIndex( |
| final DataSegment segmentTemplate, |
| final Configuration configuration, |
| final Progressable progressable, |
| final File mergedBase, |
| final Path finalIndexZipFilePath, |
| final Path tmpPath, |
| DataSegmentPusher dataSegmentPusher |
| ) |
| throws IOException |
| { |
| final FileSystem outputFS = FileSystem.get(finalIndexZipFilePath.toUri(), configuration); |
| final AtomicLong size = new AtomicLong(0L); |
| final DataPusher zipPusher = (DataPusher) RetryProxy.create( |
| DataPusher.class, |
| new DataPusher() |
| { |
| @Override |
| public long push() throws IOException |
| { |
| try (OutputStream outputStream = outputFS.create( |
| tmpPath, |
| true, |
| DEFAULT_FS_BUFFER_SIZE, |
| progressable |
| )) { |
| size.set(zipAndCopyDir(mergedBase, outputStream, progressable)); |
| } |
| catch (IOException | RuntimeException exception) { |
| log.error(exception, "Exception in retry loop"); |
| throw exception; |
| } |
| return -1; |
| } |
| }, |
| RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) |
| ); |
| zipPusher.push(); |
| log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri()); |
| |
| final URI indexOutURI = finalIndexZipFilePath.toUri(); |
| final DataSegment finalSegment = segmentTemplate |
| .withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI)) |
| .withSize(size.get()) |
| .withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase)); |
| |
| return new DataSegmentAndIndexZipFilePath( |
| finalSegment, |
| tmpPath.toUri().toString(), |
| finalIndexZipFilePath.toUri().toString() |
| ); |
| } |
| |
| public static void writeSegmentDescriptor( |
| final FileSystem outputFS, |
| final DataSegmentAndIndexZipFilePath segmentAndPath, |
| final Path descriptorPath, |
| final Progressable progressable |
| ) |
| throws IOException |
| { |
| final DataPusher descriptorPusher = (DataPusher) RetryProxy.create( |
| DataPusher.class, |
| new DataPusher() |
| { |
| @Override |
| public long push() throws IOException |
| { |
| try { |
| progressable.progress(); |
| if (outputFS.exists(descriptorPath)) { |
| // If the descriptor path already exists, don't overwrite, and risk clobbering it. |
| // If it already exists, it means that the segment data is already written to the |
| // tmp path, and the existing descriptor written should give us the information we |
| // need to rename the segment index to final path and publish it in the top level task. |
| log.info("descriptor path [%s] already exists, not overwriting", descriptorPath); |
| return -1; |
| } |
| try (final OutputStream descriptorOut = outputFS.create( |
| descriptorPath, |
| true, |
| DEFAULT_FS_BUFFER_SIZE, |
| progressable |
| )) { |
| HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(descriptorOut, segmentAndPath); |
| } |
| } |
| catch (RuntimeException | IOException ex) { |
| log.info(ex, "Exception in descriptor pusher retry loop"); |
| throw ex; |
| } |
| return -1; |
| } |
| }, |
| RetryPolicies.exponentialBackoffRetry(NUM_RETRIES, SECONDS_BETWEEN_RETRIES, TimeUnit.SECONDS) |
| ); |
| descriptorPusher.push(); |
| } |
| |
| /** |
| * Simple interface for retry operations |
| */ |
| public interface DataPusher |
| { |
| long push() throws IOException; |
| } |
| |
| public static long zipAndCopyDir( |
| File baseDir, |
| OutputStream baseOutputStream, |
| Progressable progressable |
| ) throws IOException |
| { |
| long size = 0L; |
| try (ZipOutputStream outputStream = new ZipOutputStream(baseOutputStream)) { |
| String[] filesToCopy = baseDir.list(); |
| if (filesToCopy != null) { |
| for (String fileName : filesToCopy) { |
| final File fileToCopy = new File(baseDir, fileName); |
| if (Files.isRegularFile(fileToCopy.toPath())) { |
| size += copyFileToZipStream(fileToCopy, outputStream, progressable); |
| } else { |
| log.warn("File at [%s] is not a regular file! skipping as part of zip", fileToCopy.getPath()); |
| } |
| } |
| } |
| outputStream.flush(); |
| } |
| return size; |
| } |
| |
| public static long copyFileToZipStream( |
| File file, |
| ZipOutputStream zipOutputStream, |
| Progressable progressable |
| ) throws IOException |
| { |
| createNewZipEntry(zipOutputStream, file); |
| long numRead = 0; |
| try (FileInputStream inputStream = new FileInputStream(file)) { |
| byte[] buf = new byte[0x10000]; |
| for (int bytesRead = inputStream.read(buf); bytesRead >= 0; bytesRead = inputStream.read(buf)) { |
| progressable.progress(); |
| if (bytesRead == 0) { |
| continue; |
| } |
| zipOutputStream.write(buf, 0, bytesRead); |
| progressable.progress(); |
| numRead += bytesRead; |
| } |
| } |
| zipOutputStream.closeEntry(); |
| progressable.progress(); |
| return numRead; |
| } |
| |
| private static void createNewZipEntry(ZipOutputStream out, File file) throws IOException |
| { |
| log.info("Creating new ZipEntry[%s]", file.getName()); |
| out.putNextEntry(new ZipEntry(file.getName())); |
| } |
| |
| public static Path makeFileNamePath( |
| final Path basePath, |
| final FileSystem fs, |
| final DataSegment segmentTemplate, |
| final String baseFileName, |
| DataSegmentPusher dataSegmentPusher |
| ) |
| { |
| return new Path( |
| prependFSIfNullScheme(fs, basePath), |
| dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName) |
| ); |
| } |
| |
| public static Path makeTmpPath( |
| final Path basePath, |
| final FileSystem fs, |
| final DataSegment segmentTemplate, |
| final TaskAttemptID taskAttemptID, |
| DataSegmentPusher dataSegmentPusher |
| ) |
| { |
| return new Path( |
| prependFSIfNullScheme(fs, basePath), |
| StringUtils.format( |
| "./%s.%d", |
| dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP), |
| taskAttemptID.getId() |
| ) |
| ); |
| } |
| |
| /** |
| * Renames the index files for the segments. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename |
| * which will not overwrite. Note: segments should be renamed in the index task, not in a hadoop job, as race |
| * conditions between job retries can cause the final segment index file path to get clobbered. |
| * |
| * @param indexerSchema the hadoop ingestion spec |
| * @param segmentAndIndexZipFilePaths the list of segments with their currently stored tmp path and the final path |
| * that they should be renamed to. |
| */ |
| public static void renameIndexFilesForSegments( |
| HadoopIngestionSpec indexerSchema, |
| List<DataSegmentAndIndexZipFilePath> segmentAndIndexZipFilePaths |
| ) throws IOException |
| { |
| HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(indexerSchema); |
| final Configuration configuration = JobHelper.injectSystemProperties(new Configuration(), config); |
| config.addJobProperties(configuration); |
| JobHelper.injectDruidProperties(configuration, config); |
| for (DataSegmentAndIndexZipFilePath segmentAndIndexZipFilePath : segmentAndIndexZipFilePaths) { |
| Path tmpPath = new Path(segmentAndIndexZipFilePath.getTmpIndexZipFilePath()); |
| Path finalIndexZipFilePath = new Path(segmentAndIndexZipFilePath.getFinalIndexZipFilePath()); |
| final FileSystem outputFS = FileSystem.get(finalIndexZipFilePath.toUri(), configuration); |
| if (!renameIndexFile(outputFS, tmpPath, finalIndexZipFilePath)) { |
| throw new IOE( |
| "Unable to rename [%s] to [%s]", |
| tmpPath.toUri().toString(), |
| finalIndexZipFilePath.toUri().toString() |
| ); |
| } |
| } |
| } |
| |
| /** |
| * Rename the file. This works around some limitations of both FileContext (no s3n support) and NativeS3FileSystem.rename |
| * which will not overwrite |
| * |
| * @param outputFS The output fs |
| * @param indexZipFilePath The original file path |
| * @param finalIndexZipFilePath The to rename the original file to |
| * |
| * @return False if a rename failed, true otherwise (rename success or no rename needed) |
| */ |
| private static boolean renameIndexFile( |
| final FileSystem outputFS, |
| final Path indexZipFilePath, |
| final Path finalIndexZipFilePath |
| ) |
| { |
| try { |
| return RetryUtils.retry( |
| () -> { |
| final boolean needRename; |
| |
| if (outputFS.exists(finalIndexZipFilePath)) { |
| // NativeS3FileSystem.rename won't overwrite, so we might need to delete the old index first |
| final FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath); |
| final FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath); |
| |
| if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() |
| || zipFile.getLen() != finalIndexZipFile.getLen()) { |
| log.info( |
| "File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", |
| finalIndexZipFile.getPath(), |
| DateTimes.utc(finalIndexZipFile.getModificationTime()), |
| finalIndexZipFile.getLen(), |
| zipFile.getPath(), |
| DateTimes.utc(zipFile.getModificationTime()), |
| zipFile.getLen() |
| ); |
| outputFS.delete(finalIndexZipFilePath, false); |
| needRename = true; |
| } else { |
| log.info( |
| "File[%s / %s / %sB] existed and will be kept", |
| finalIndexZipFile.getPath(), |
| DateTimes.utc(finalIndexZipFile.getModificationTime()), |
| finalIndexZipFile.getLen() |
| ); |
| needRename = false; |
| } |
| } else { |
| needRename = true; |
| } |
| |
| if (needRename) { |
| log.info("Attempting rename from [%s] to [%s]", indexZipFilePath, finalIndexZipFilePath); |
| return outputFS.rename(indexZipFilePath, finalIndexZipFilePath); |
| } else { |
| return true; |
| } |
| }, |
| FileUtils.IS_EXCEPTION, |
| NUM_RETRIES |
| ); |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| |
| public static Path prependFSIfNullScheme(FileSystem fs, Path path) |
| { |
| if (path.toUri().getScheme() == null) { |
| path = fs.makeQualified(path); |
| } |
| return path; |
| } |
| |
| // TODO: Replace this whenever hadoop gets their act together and stops breaking with more recent versions of Guava |
| public static long unzipNoGuava( |
| final Path zip, |
| final Configuration configuration, |
| final File outDir, |
| final Progressable progressable, |
| @Nullable final RetryPolicy retryPolicy |
| ) throws IOException |
| { |
| final RetryPolicy effectiveRetryPolicy; |
| if (retryPolicy == null) { |
| effectiveRetryPolicy = RetryPolicies.exponentialBackoffRetry( |
| NUM_RETRIES, |
| SECONDS_BETWEEN_RETRIES, |
| TimeUnit.SECONDS |
| ); |
| } else { |
| effectiveRetryPolicy = retryPolicy; |
| } |
| |
| final DataPusher zipPusher = (DataPusher) RetryProxy.create( |
| DataPusher.class, |
| new DataPusher() |
| { |
| @Override |
| public long push() throws IOException |
| { |
| try { |
| final FileSystem fileSystem = zip.getFileSystem(configuration); |
| long size = 0L; |
| final byte[] buffer = new byte[1 << 13]; |
| progressable.progress(); |
| try (ZipInputStream in = new ZipInputStream(fileSystem.open(zip, 1 << 13))) { |
| for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) { |
| final String fileName = entry.getName(); |
| final String outputPath = new File(outDir, fileName).getAbsolutePath(); |
| |
| CompressionUtils.validateZipOutputFile(zip.getName(), new File(outputPath), outDir); |
| |
| try (final OutputStream out = new BufferedOutputStream(new FileOutputStream(outputPath))) { |
| for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) { |
| progressable.progress(); |
| if (len == 0) { |
| continue; |
| } |
| size += len; |
| out.write(buffer, 0, len); |
| } |
| out.flush(); |
| } |
| } |
| } |
| progressable.progress(); |
| return size; |
| } |
| catch (IOException | RuntimeException exception) { |
| log.error(exception, "Exception in unzip retry loop"); |
| throw exception; |
| } |
| } |
| }, |
| effectiveRetryPolicy |
| ); |
| return zipPusher.push(); |
| } |
| |
| public static URI getURIFromSegment(DataSegment dataSegment) |
| { |
| // There is no good way around this... |
| // TODO: add getURI() to URIDataPuller |
| final Map<String, Object> loadSpec = dataSegment.getLoadSpec(); |
| final String type = loadSpec.get("type").toString(); |
| final URI segmentLocURI; |
| if ("s3_zip".equals(type)) { |
| if ("s3a".equals(loadSpec.get("S3Schema"))) { |
| segmentLocURI = URI.create(StringUtils.format("s3a://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); |
| |
| } else { |
| segmentLocURI = URI.create(StringUtils.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); |
| } |
| } else if ("hdfs".equals(type)) { |
| segmentLocURI = URI.create(loadSpec.get("path").toString()); |
| } else if ("google".equals(type)) { |
| // Segment names contain : in their path. |
| // Google Cloud Storage supports : but Hadoop does not. |
| // This becomes an issue when re-indexing using the current segments. |
| // The Hadoop getSplits code doesn't understand the : and returns "Relative path in absolute URI" |
| // This could be fixed using the same code that generates path names for hdfs segments using |
| // getHdfsStorageDir. But that wouldn't fix this issue for people who already have segments with ":". |
| // Because of this we just URL encode the : making everything work as it should. |
| segmentLocURI = URI.create( |
| StringUtils.format( |
| "gs://%s/%s", |
| loadSpec.get("bucket"), |
| StringUtils.replaceChar(loadSpec.get("path").toString(), ':', "%3A") |
| ) |
| ); |
| } else if ("local".equals(type)) { |
| try { |
| segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); |
| } |
| catch (URISyntaxException e) { |
| throw new ISE(e, "Unable to form simple file uri"); |
| } |
| } else { |
| try { |
| throw new IAE( |
| "Cannot figure out loadSpec %s", |
| HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(loadSpec) |
| ); |
| } |
| catch (JsonProcessingException e) { |
| throw new ISE("Cannot write Map with json mapper"); |
| } |
| } |
| return segmentLocURI; |
| } |
| |
| public static String getJobTrackerAddress(Configuration config) |
| { |
| String jobTrackerAddress = config.get("mapred.job.tracker"); |
| if (jobTrackerAddress == null) { |
| // New Property name for Hadoop 3.0 and later versions |
| jobTrackerAddress = config.get("mapreduce.jobtracker.address"); |
| } |
| return jobTrackerAddress; |
| } |
| } |