blob: ce93bb3662b371ff2ab05240b9b1568ace99376a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.loading;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ExtensionPoint
public interface DataSegmentPusher
{
Joiner JOINER = Joiner.on("/").skipNulls();
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
/**
* Pushes index files and segment descriptor to deep storage.
* @param file directory containing index files
* @param segment segment descriptor
* @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica
* tasks can either overwrite or fail to overwrite existing segments leading to the possibility
* of different versions of the same segment ID containing different data. As an example, a Kafka
* indexing task starting at offset A and ending at offset B may push a segment to deep storage
* and then fail before writing the loadSpec to the metadata table, resulting in a replacement
* task being spawned. This replacement will also start at offset A but will read to offset C and
* will then push a segment to deep storage and write the loadSpec metadata. Without unique file
* paths, this can only work correctly if new segments overwrite existing segments. Suppose that
* at this point the task then fails so that the supervisor retries again from offset A. This 3rd
* attempt will overwrite the segments in deep storage before failing to write the loadSpec
* metadata, resulting in inconsistencies in the segment data now in deep storage and copies of
* the segment already loaded by historicals.
*
* If unique paths are used, caller is responsible for cleaning up segments that were pushed but
* were not written to the metadata table (for example when using replica tasks).
* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException;
default DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException
{
throw new UnsupportedOperationException("not supported");
}
//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);
/**
* @deprecated backward-compatibiliy shim that should be removed on next major release;
* use {@link #getStorageDir(DataSegment, boolean)} instead.
*/
@Deprecated
default String getStorageDir(DataSegment dataSegment)
{
return getStorageDir(dataSegment, false);
}
default String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{
return getDefaultStorageDir(dataSegment, useUniquePath);
}
default String makeIndexPathName(DataSegment dataSegment, String indexName)
{
// This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false
return StringUtils.format("./%s/%s", getStorageDir(dataSegment, false), indexName);
}
/**
* Property prefixes that should be added to the "allowedHadoopPrefix" config for passing down to Hadoop jobs. These
* should be property prefixes like "druid.xxx", which means to include "druid.xxx" and "druid.xxx.*".
*/
default List<String> getAllowedPropertyPrefixesForHadoop()
{
return Collections.emptyList();
}
// Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/
// If above format is ever changed, make sure to change it appropriately in other places
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
// Sanity check for shardSpec type.
// BucketNumberedShardSpec should never be used in segment push.
Preconditions.checkArgument(
!(segment.getShardSpec() instanceof BucketNumberedShardSpec),
"Illegal shardSpec type[%s]",
segment.getShardSpec()
);
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum(),
useUniquePath ? generateUniquePath() : null
);
}
static String getDefaultStorageDirWithExistingUniquePath(DataSegment segment, String uniquePath)
{
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum(),
uniquePath
);
}
static String generateUniquePath()
{
return UUID.randomUUID().toString();
}
}