blob: a253ac600cbedf4c99d54c7f5c7177a2d894020f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
/**
* InputEntity abstracts an input entity and knows how to read bytes from the given entity.
* Since the implementations of this interface assume that the given entity is not empty, the InputSources
* should not create InputEntities for empty entities.
*/
@UnstableApi
public interface InputEntity
{
Logger LOG = new Logger(InputEntity.class);
int DEFAULT_FETCH_BUFFER_SIZE = 4 * 1024; // 4 KiB
int DEFAULT_MAX_NUM_FETCH_TRIES = 3; // 3 tries including the initial try
/**
* CleanableFile is the result type of {@link #fetch}.
* It should clean up any temporary resource on {@link #close()}.
*/
interface CleanableFile extends Closeable
{
File file();
}
/**
* Returns an URI to identify the input entity. Implementations can return null if they don't have
* an unique URI.
*/
@Nullable
URI getUri();
/**
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
*
* The behavior of this method is only defined fort the first call to open().
* The behavior of subsequent calls is undefined and may vary between implementations.
*
* @see #fetch
*/
InputStream open() throws IOException;
/**
* Fetches the input entity into the local storage.
* This method might be preferred instead of {@link #open()}, for example
*
* - {@link InputFormat} requires expensive random access on remote storage.
* - Holding a connection until you consume the entire InputStream is expensive.
*
* @param temporaryDirectory to store temp data. This directory will be removed automatically once
* the task finishes.
* @param fetchBuffer is used to fetch remote entity into local storage.
*
* @see FileUtils#copyLarge
*/
default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException
{
final File tempFile = File.createTempFile("druid-input-entity", ".tmp", temporaryDirectory);
LOG.debug("Fetching entity into file[%s]", tempFile.getAbsolutePath());
try (InputStream is = open()) {
FileUtils.copyLarge(
is,
tempFile,
fetchBuffer,
getRetryCondition(),
DEFAULT_MAX_NUM_FETCH_TRIES,
StringUtils.format("Failed to fetch into [%s]", tempFile.getAbsolutePath())
);
}
return new CleanableFile()
{
@Override
public File file()
{
return tempFile;
}
@Override
public void close()
{
if (!tempFile.delete()) {
LOG.warn("Failed to remove file[%s]", tempFile.getAbsolutePath());
}
}
};
}
/**
* Returns a retry condition that the caller should retry on.
* The returned condition should be used when reading data from this InputEntity such as in {@link #fetch}
* or {@link RetryingInputEntity}.
*/
default Predicate<Throwable> getRetryCondition()
{
return Predicates.alwaysFalse();
}
}