| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.gcp.util; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.api.client.googleapis.batch.BatchRequest; |
| import com.google.api.client.googleapis.batch.json.JsonBatchCallback; |
| import com.google.api.client.googleapis.json.GoogleJsonError; |
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; |
| import com.google.api.client.http.HttpHeaders; |
| import com.google.api.client.http.HttpRequestInitializer; |
| import com.google.api.client.util.BackOff; |
| import com.google.api.client.util.Sleeper; |
| import com.google.api.services.storage.Storage; |
| import com.google.api.services.storage.model.Bucket; |
| import com.google.api.services.storage.model.Objects; |
| import com.google.api.services.storage.model.RewriteResponse; |
| import com.google.api.services.storage.model.StorageObject; |
| import com.google.auto.value.AutoValue; |
| import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; |
| import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; |
| import com.google.cloud.hadoop.gcsio.ObjectWriteConditions; |
| import com.google.cloud.hadoop.util.ApiErrorExtractor; |
| import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; |
| import com.google.cloud.hadoop.util.ClientRequestHelper; |
| import com.google.cloud.hadoop.util.ResilientOperation; |
| import com.google.cloud.hadoop.util.RetryDeterminer; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.nio.channels.SeekableByteChannel; |
| import java.nio.channels.WritableByteChannel; |
| import java.nio.file.AccessDeniedException; |
| import java.nio.file.FileAlreadyExistsException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; |
| import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; |
| import org.apache.beam.sdk.options.DefaultValueFactory; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.util.FluentBackoff; |
| import org.apache.beam.sdk.util.MoreFutures; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Provides operations on GCS. */ |
| public class GcsUtil { |
| /** |
| * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport |
| * flags specified on the {@link PipelineOptions}. |
| */ |
| public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> { |
| /** |
| * Returns an instance of {@link GcsUtil} based on the {@link PipelineOptions}. |
| * |
| * <p>If no instance has previously been created, one is created and the value stored in {@code |
| * options}. |
| */ |
| @Override |
| public GcsUtil create(PipelineOptions options) { |
| LOG.debug("Creating new GcsUtil"); |
| GcsOptions gcsOptions = options.as(GcsOptions.class); |
| Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); |
| return new GcsUtil( |
| storageBuilder.build(), |
| storageBuilder.getHttpRequestInitializer(), |
| gcsOptions.getExecutorService(), |
| gcsOptions.getGcsUploadBufferSizeBytes()); |
| } |
| |
| /** Returns an instance of {@link GcsUtil} based on the given parameters. */ |
| public static GcsUtil create( |
| Storage storageClient, |
| HttpRequestInitializer httpRequestInitializer, |
| ExecutorService executorService, |
| @Nullable Integer uploadBufferSizeBytes) { |
| return new GcsUtil( |
| storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); |
| } |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); |
| |
| /** Maximum number of items to retrieve per Objects.List request. */ |
| private static final long MAX_LIST_ITEMS_PER_CALL = 1024; |
| |
| /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ |
| private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*"); |
| |
| /** Maximum number of requests permitted in a GCS batch request. */ |
| private static final int MAX_REQUESTS_PER_BATCH = 100; |
| /** Maximum number of concurrent batches of requests executing on GCS. */ |
| private static final int MAX_CONCURRENT_BATCHES = 256; |
| |
| private static final FluentBackoff BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1)); |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** Client for the GCS API. */ |
| private Storage storageClient; |
| |
| private final HttpRequestInitializer httpRequestInitializer; |
| /** Buffer size for GCS uploads (in bytes). */ |
| @Nullable private final Integer uploadBufferSizeBytes; |
| |
| // Helper delegate for turning IOExceptions from API calls into higher-level semantics. |
| private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); |
| |
| // Unbounded thread pool for codependent pipeline operations that will deadlock the pipeline if |
| // starved for threads. |
| // Exposed for testing. |
| final ExecutorService executorService; |
| |
| /** Rewrite operation setting. For testing purposes only. */ |
| @VisibleForTesting @Nullable Long maxBytesRewrittenPerCall; |
| |
| @VisibleForTesting @Nullable AtomicInteger numRewriteTokensUsed; |
| |
| /** Returns the prefix portion of the glob that doesn't contain wildcards. */ |
| public static String getNonWildcardPrefix(String globExp) { |
| Matcher m = GLOB_PREFIX.matcher(globExp); |
| checkArgument(m.matches(), String.format("Glob expression: [%s] is not expandable.", globExp)); |
| return m.group("PREFIX"); |
| } |
| |
| /** |
| * Expands glob expressions to regular expressions. |
| * |
| * @param globExp the glob expression to expand |
| * @return a string with the regular expression this glob expands to |
| */ |
| public static String wildcardToRegexp(String globExp) { |
| StringBuilder dst = new StringBuilder(); |
| char[] src = globExp.replace("**/*", "**").toCharArray(); |
| int i = 0; |
| while (i < src.length) { |
| char c = src[i++]; |
| switch (c) { |
| case '*': |
| // One char lookahead for ** |
| if (i < src.length && src[i] == '*') { |
| dst.append(".*"); |
| ++i; |
| } else { |
| dst.append("[^/]*"); |
| } |
| break; |
| case '?': |
| dst.append("[^/]"); |
| break; |
| case '.': |
| case '+': |
| case '{': |
| case '}': |
| case '(': |
| case ')': |
| case '|': |
| case '^': |
| case '$': |
| // These need to be escaped in regular expressions |
| dst.append('\\').append(c); |
| break; |
| case '\\': |
| i = doubleSlashes(dst, src, i); |
| break; |
| default: |
| dst.append(c); |
| break; |
| } |
| } |
| return dst.toString(); |
| } |
| |
| /** Returns true if the given {@code spec} contains wildcard. */ |
| public static boolean isWildcard(GcsPath spec) { |
| return GLOB_PREFIX.matcher(spec.getObject()).matches(); |
| } |
| |
| private GcsUtil( |
| Storage storageClient, |
| HttpRequestInitializer httpRequestInitializer, |
| ExecutorService executorService, |
| @Nullable Integer uploadBufferSizeBytes) { |
| this.storageClient = storageClient; |
| this.httpRequestInitializer = httpRequestInitializer; |
| this.uploadBufferSizeBytes = uploadBufferSizeBytes; |
| this.executorService = executorService; |
| this.maxBytesRewrittenPerCall = null; |
| this.numRewriteTokensUsed = null; |
| } |
| |
| // Use this only for testing purposes. |
| protected void setStorageClient(Storage storageClient) { |
| this.storageClient = storageClient; |
| } |
| |
| /** |
| * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in |
| * the result. For patterns that only match a single object, we ensure that the object exists. |
| */ |
| public List<GcsPath> expand(GcsPath gcsPattern) throws IOException { |
| Pattern p = null; |
| String prefix = null; |
| if (isWildcard(gcsPattern)) { |
| // Part before the first wildcard character. |
| prefix = getNonWildcardPrefix(gcsPattern.getObject()); |
| p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); |
| } else { |
| // Not a wildcard. |
| try { |
| // Use a get request to fetch the metadata of the object, and ignore the return value. |
| // The request has strong global consistency. |
| getObject(gcsPattern); |
| return ImmutableList.of(gcsPattern); |
| } catch (FileNotFoundException e) { |
| // If the path was not found, return an empty list. |
| return ImmutableList.of(); |
| } |
| } |
| |
| LOG.debug( |
| "matching files in bucket {}, prefix {} against pattern {}", |
| gcsPattern.getBucket(), |
| prefix, |
| p.toString()); |
| |
| String pageToken = null; |
| List<GcsPath> results = new ArrayList<>(); |
| do { |
| Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); |
| if (objects.getItems() == null) { |
| break; |
| } |
| |
| // Filter objects based on the regex. |
| for (StorageObject o : objects.getItems()) { |
| String name = o.getName(); |
| // Skip directories, which end with a slash. |
| if (p.matcher(name).matches() && !name.endsWith("/")) { |
| LOG.debug("Matched object: {}", name); |
| results.add(GcsPath.fromObject(o)); |
| } |
| } |
| pageToken = objects.getNextPageToken(); |
| } while (pageToken != null); |
| |
| return results; |
| } |
| |
| @VisibleForTesting |
| @Nullable |
| Integer getUploadBufferSizeBytes() { |
| return uploadBufferSizeBytes; |
| } |
| |
| private static BackOff createBackOff() { |
| return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); |
| } |
| |
| /** |
| * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not |
| * exist. |
| */ |
| public long fileSize(GcsPath path) throws IOException { |
| return getObject(path).getSize().longValue(); |
| } |
| |
| /** Returns the {@link StorageObject} for the given {@link GcsPath}. */ |
| public StorageObject getObject(GcsPath gcsPath) throws IOException { |
| return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT); |
| } |
| |
| @VisibleForTesting |
| StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { |
| Storage.Objects.Get getObject = |
| storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); |
| try { |
| return ResilientOperation.retry( |
| ResilientOperation.getGoogleRequestCallable(getObject), |
| backoff, |
| RetryDeterminer.SOCKET_ERRORS, |
| IOException.class, |
| sleeper); |
| } catch (IOException | InterruptedException e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { |
| throw new FileNotFoundException(gcsPath.toString()); |
| } |
| throw new IOException( |
| String.format("Unable to get the file object for path %s.", gcsPath), e); |
| } |
| } |
| |
| /** |
| * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given {@link |
| * GcsPath GcsPaths}. |
| */ |
| public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) throws IOException { |
| List<StorageObjectOrIOException[]> results = new ArrayList<>(); |
| executeBatches(makeGetBatches(gcsPaths, results)); |
| ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder(); |
| for (StorageObjectOrIOException[] result : results) { |
| ret.add(result[0]); |
| } |
| return ret.build(); |
| } |
| |
| /** Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. */ |
| public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) |
| throws IOException { |
| // List all objects that start with the prefix (including objects in sub-directories). |
| Storage.Objects.List listObject = storageClient.objects().list(bucket); |
| listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); |
| listObject.setPrefix(prefix); |
| |
| if (pageToken != null) { |
| listObject.setPageToken(pageToken); |
| } |
| |
| try { |
| return ResilientOperation.retry( |
| ResilientOperation.getGoogleRequestCallable(listObject), |
| createBackOff(), |
| RetryDeterminer.SOCKET_ERRORS, |
| IOException.class); |
| } catch (Exception e) { |
| throw new IOException( |
| String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), e); |
| } |
| } |
| |
| /** |
| * Returns the file size from GCS or throws {@link FileNotFoundException} if the resource does not |
| * exist. |
| */ |
| @VisibleForTesting |
| List<Long> fileSizes(List<GcsPath> paths) throws IOException { |
| List<StorageObjectOrIOException> results = getObjects(paths); |
| |
| ImmutableList.Builder<Long> ret = ImmutableList.builder(); |
| for (StorageObjectOrIOException result : results) { |
| ret.add(toFileSize(result)); |
| } |
| return ret.build(); |
| } |
| |
| private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) |
| throws IOException { |
| if (storageObjectOrIOException.ioException() != null) { |
| throw storageObjectOrIOException.ioException(); |
| } else { |
| return storageObjectOrIOException.storageObject().getSize().longValue(); |
| } |
| } |
| |
| /** |
| * Opens an object in GCS. |
| * |
| * <p>Returns a SeekableByteChannel that provides access to data in the bucket. |
| * |
| * @param path the GCS filename to read from |
| * @return a SeekableByteChannel that can read the object data |
| */ |
| public SeekableByteChannel open(GcsPath path) throws IOException { |
| return new GoogleCloudStorageReadChannel( |
| storageClient, |
| path.getBucket(), |
| path.getObject(), |
| errorExtractor, |
| new ClientRequestHelper<>()); |
| } |
| |
| /** |
| * Creates an object in GCS. |
| * |
| * <p>Returns a WritableByteChannel that can be used to write data to the object. |
| * |
| * @param path the GCS file to write to |
| * @param type the type of object, eg "text/plain". |
| * @return a Callable object that encloses the operation. |
| */ |
| public WritableByteChannel create(GcsPath path, String type) throws IOException { |
| return create(path, type, uploadBufferSizeBytes); |
| } |
| |
| /** |
| * Same as {@link GcsUtil#create(GcsPath, String)} but allows overriding {code |
| * uploadBufferSizeBytes}. |
| */ |
| public WritableByteChannel create(GcsPath path, String type, Integer uploadBufferSizeBytes) |
| throws IOException { |
| GoogleCloudStorageWriteChannel channel = |
| new GoogleCloudStorageWriteChannel( |
| executorService, |
| storageClient, |
| new ClientRequestHelper<>(), |
| path.getBucket(), |
| path.getObject(), |
| type, |
| /* kmsKeyName= */ null, |
| AsyncWriteChannelOptions.newBuilder().build(), |
| new ObjectWriteConditions(), |
| Collections.emptyMap()); |
| if (uploadBufferSizeBytes != null) { |
| channel.setUploadBufferSize(uploadBufferSizeBytes); |
| } |
| channel.initialize(); |
| return channel; |
| } |
| |
| /** Returns whether the GCS bucket exists and is accessible. */ |
| public boolean bucketAccessible(GcsPath path) throws IOException { |
| return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT); |
| } |
| |
| /** |
| * Returns the project number of the project which owns this bucket. If the bucket exists, it must |
| * be accessible otherwise the permissions exception will be propagated. If the bucket does not |
| * exist, an exception will be thrown. |
| */ |
| public long bucketOwner(GcsPath path) throws IOException { |
| return getBucket(path, createBackOff(), Sleeper.DEFAULT).getProjectNumber().longValue(); |
| } |
| |
| /** |
| * Creates a {@link Bucket} under the specified project in Cloud Storage or propagates an |
| * exception. |
| */ |
| public void createBucket(String projectId, Bucket bucket) throws IOException { |
| createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT); |
| } |
| |
| /** |
| * Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due |
| * to permissions. |
| */ |
| @VisibleForTesting |
| boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { |
| try { |
| return getBucket(path, backoff, sleeper) != null; |
| } catch (AccessDeniedException | FileNotFoundException e) { |
| return false; |
| } |
| } |
| |
| @VisibleForTesting |
| @Nullable |
| Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { |
| Storage.Buckets.Get getBucket = storageClient.buckets().get(path.getBucket()); |
| |
| try { |
| return ResilientOperation.retry( |
| ResilientOperation.getGoogleRequestCallable(getBucket), |
| backoff, |
| new RetryDeterminer<IOException>() { |
| @Override |
| public boolean shouldRetry(IOException e) { |
| if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { |
| return false; |
| } |
| return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); |
| } |
| }, |
| IOException.class, |
| sleeper); |
| } catch (GoogleJsonResponseException e) { |
| if (errorExtractor.accessDenied(e)) { |
| throw new AccessDeniedException(path.toString(), null, e.getMessage()); |
| } |
| if (errorExtractor.itemNotFound(e)) { |
| throw new FileNotFoundException(e.getMessage()); |
| } |
| throw e; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException( |
| String.format( |
| "Error while attempting to verify existence of bucket gs://%s", path.getBucket()), |
| e); |
| } |
| } |
| |
| @VisibleForTesting |
| void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper) |
| throws IOException { |
| Storage.Buckets.Insert insertBucket = storageClient.buckets().insert(projectId, bucket); |
| insertBucket.setPredefinedAcl("projectPrivate"); |
| insertBucket.setPredefinedDefaultObjectAcl("projectPrivate"); |
| |
| try { |
| ResilientOperation.retry( |
| ResilientOperation.getGoogleRequestCallable(insertBucket), |
| backoff, |
| new RetryDeterminer<IOException>() { |
| @Override |
| public boolean shouldRetry(IOException e) { |
| if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) { |
| return false; |
| } |
| return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); |
| } |
| }, |
| IOException.class, |
| sleeper); |
| return; |
| } catch (GoogleJsonResponseException e) { |
| if (errorExtractor.accessDenied(e)) { |
| throw new AccessDeniedException(bucket.getName(), null, e.getMessage()); |
| } |
| if (errorExtractor.itemAlreadyExists(e)) { |
| throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage()); |
| } |
| throw e; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException( |
| String.format( |
| "Error while attempting to create bucket gs://%s for rproject %s", |
| bucket.getName(), projectId), |
| e); |
| } |
| } |
| |
| private static void executeBatches(List<BatchRequest> batches) throws IOException { |
| ExecutorService executor = |
| MoreExecutors.listeningDecorator( |
| new ThreadPoolExecutor( |
| MAX_CONCURRENT_BATCHES, |
| MAX_CONCURRENT_BATCHES, |
| 0L, |
| TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<>())); |
| |
| List<CompletionStage<Void>> futures = new ArrayList<>(); |
| for (final BatchRequest batch : batches) { |
| futures.add(MoreFutures.runAsync(() -> batch.execute(), executor)); |
| } |
| |
| try { |
| MoreFutures.get(MoreFutures.allAsList(futures)); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while executing batch GCS request", e); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof FileNotFoundException) { |
| throw (FileNotFoundException) e.getCause(); |
| } |
| throw new IOException("Error executing batch GCS request", e); |
| } finally { |
| executor.shutdown(); |
| } |
| } |
| |
| /** |
| * Makes get {@link BatchRequest BatchRequests}. |
| * |
| * @param paths {@link GcsPath GcsPaths}. |
| * @param results mutable {@link List} for return values. |
| * @return {@link BatchRequest BatchRequests} to execute. |
| * @throws IOException |
| */ |
| @VisibleForTesting |
| List<BatchRequest> makeGetBatches( |
| Collection<GcsPath> paths, List<StorageObjectOrIOException[]> results) throws IOException { |
| List<BatchRequest> batches = new ArrayList<>(); |
| for (List<GcsPath> filesToGet : |
| Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { |
| BatchRequest batch = createBatchRequest(); |
| for (GcsPath path : filesToGet) { |
| results.add(enqueueGetFileSize(path, batch)); |
| } |
| batches.add(batch); |
| } |
| return batches; |
| } |
| |
| /** |
| * Wrapper for RewriteRequest that supports multiple calls. |
| * |
| * <p>Usage: create, enqueue(), and execute batch. Then, check getReadyToEnqueue() if another |
| * round of enqueue() and execute is required. Repeat until getReadyToEnqueue() returns false. |
| */ |
| class RewriteOp extends JsonBatchCallback<RewriteResponse> { |
| private GcsPath from; |
| private GcsPath to; |
| private boolean readyToEnqueue; |
| @VisibleForTesting Storage.Objects.Rewrite rewriteRequest; |
| |
| public boolean getReadyToEnqueue() { |
| return readyToEnqueue; |
| } |
| |
| public void enqueue(BatchRequest batch) throws IOException { |
| if (!readyToEnqueue) { |
| throw new IOException( |
| String.format( |
| "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s", |
| from, to, readyToEnqueue)); |
| } |
| rewriteRequest.queue(batch, this); |
| readyToEnqueue = false; |
| } |
| |
| public RewriteOp(GcsPath from, GcsPath to) throws IOException { |
| this.from = from; |
| this.to = to; |
| rewriteRequest = |
| storageClient |
| .objects() |
| .rewrite(from.getBucket(), from.getObject(), to.getBucket(), to.getObject(), null); |
| if (maxBytesRewrittenPerCall != null) { |
| rewriteRequest.setMaxBytesRewrittenPerCall(maxBytesRewrittenPerCall); |
| } |
| readyToEnqueue = true; |
| } |
| |
| @Override |
| public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeaders) |
| throws IOException { |
| if (rewriteResponse.getDone()) { |
| LOG.debug("Rewrite done: {} to {}", from, to); |
| readyToEnqueue = false; |
| } else { |
| LOG.debug( |
| "Rewrite progress: {} of {} bytes, {} to {}", |
| rewriteResponse.getTotalBytesRewritten(), |
| rewriteResponse.getObjectSize(), |
| from, |
| to); |
| rewriteRequest.setRewriteToken(rewriteResponse.getRewriteToken()); |
| readyToEnqueue = true; |
| if (numRewriteTokensUsed != null) { |
| numRewriteTokensUsed.incrementAndGet(); |
| } |
| } |
| } |
| |
| @Override |
| public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { |
| readyToEnqueue = false; |
| throw new IOException(String.format("Error trying to rewrite %s to %s: %s", from, to, e)); |
| } |
| } |
| |
| public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) |
| throws IOException { |
| LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, destFilenames); |
| while (rewrites.size() > 0) { |
| executeBatches(makeCopyBatches(rewrites)); |
| } |
| } |
| |
| LinkedList<RewriteOp> makeRewriteOps( |
| Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException { |
| List<String> srcList = Lists.newArrayList(srcFilenames); |
| List<String> destList = Lists.newArrayList(destFilenames); |
| checkArgument( |
| srcList.size() == destList.size(), |
| "Number of source files %s must equal number of destination files %s", |
| srcList.size(), |
| destList.size()); |
| LinkedList<RewriteOp> rewrites = Lists.newLinkedList(); |
| for (int i = 0; i < srcList.size(); i++) { |
| final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); |
| final GcsPath destPath = GcsPath.fromUri(destList.get(i)); |
| rewrites.addLast(new RewriteOp(sourcePath, destPath)); |
| } |
| return rewrites; |
| } |
| |
| List<BatchRequest> makeCopyBatches(LinkedList<RewriteOp> rewrites) throws IOException { |
| List<BatchRequest> batches = new ArrayList<>(); |
| BatchRequest batch = createBatchRequest(); |
| Iterator<RewriteOp> it = rewrites.iterator(); |
| while (it.hasNext()) { |
| RewriteOp rewrite = it.next(); |
| if (!rewrite.getReadyToEnqueue()) { |
| it.remove(); |
| continue; |
| } |
| rewrite.enqueue(batch); |
| |
| if (batch.size() >= MAX_REQUESTS_PER_BATCH) { |
| batches.add(batch); |
| batch = createBatchRequest(); |
| } |
| } |
| if (batch.size() > 0) { |
| batches.add(batch); |
| } |
| return batches; |
| } |
| |
| List<BatchRequest> makeRemoveBatches(Collection<String> filenames) throws IOException { |
| List<BatchRequest> batches = new ArrayList<>(); |
| for (List<String> filesToDelete : |
| Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { |
| BatchRequest batch = createBatchRequest(); |
| for (String file : filesToDelete) { |
| enqueueDelete(GcsPath.fromUri(file), batch); |
| } |
| batches.add(batch); |
| } |
| return batches; |
| } |
| |
| public void remove(Collection<String> filenames) throws IOException { |
| executeBatches(makeRemoveBatches(filenames)); |
| } |
| |
| private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) |
| throws IOException { |
| final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; |
| |
| Storage.Objects.Get getRequest = |
| storageClient.objects().get(path.getBucket(), path.getObject()); |
| getRequest.queue( |
| batch, |
| new JsonBatchCallback<StorageObject>() { |
| @Override |
| public void onSuccess(StorageObject response, HttpHeaders httpHeaders) |
| throws IOException { |
| ret[0] = StorageObjectOrIOException.create(response); |
| } |
| |
| @Override |
| public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { |
| IOException ioException; |
| if (errorExtractor.itemNotFound(e)) { |
| ioException = new FileNotFoundException(path.toString()); |
| } else { |
| ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); |
| } |
| ret[0] = StorageObjectOrIOException.create(ioException); |
| } |
| }); |
| return ret; |
| } |
| |
| /** A class that holds either a {@link StorageObject} or an {@link IOException}. */ |
| @AutoValue |
| public abstract static class StorageObjectOrIOException { |
| |
| /** Returns the {@link StorageObject}. */ |
| @Nullable |
| public abstract StorageObject storageObject(); |
| |
| /** Returns the {@link IOException}. */ |
| @Nullable |
| public abstract IOException ioException(); |
| |
| @VisibleForTesting |
| public static StorageObjectOrIOException create(StorageObject storageObject) { |
| return new AutoValue_GcsUtil_StorageObjectOrIOException( |
| checkNotNull(storageObject, "storageObject"), null /* ioException */); |
| } |
| |
| @VisibleForTesting |
| public static StorageObjectOrIOException create(IOException ioException) { |
| return new AutoValue_GcsUtil_StorageObjectOrIOException( |
| null /* storageObject */, checkNotNull(ioException, "ioException")); |
| } |
| } |
| |
| private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { |
| Storage.Objects.Delete deleteRequest = |
| storageClient.objects().delete(file.getBucket(), file.getObject()); |
| deleteRequest.queue( |
| batch, |
| new JsonBatchCallback<Void>() { |
| @Override |
| public void onSuccess(Void obj, HttpHeaders responseHeaders) { |
| LOG.debug("Successfully deleted {}", file); |
| } |
| |
| @Override |
| public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { |
| if (e.getCode() == 404) { |
| LOG.info( |
| "Ignoring failed deletion of file {} which already does not exist: {}", file, e); |
| } else { |
| throw new IOException(String.format("Error trying to delete %s: %s", file, e)); |
| } |
| } |
| }); |
| } |
| |
| private BatchRequest createBatchRequest() { |
| return storageClient.batch(httpRequestInitializer); |
| } |
| |
| private static int doubleSlashes(StringBuilder dst, char[] src, int i) { |
| // Emit the next character without special interpretation |
| dst.append('\\'); |
| if ((i - 1) != src.length) { |
| dst.append(src[i]); |
| i++; |
| } else { |
| // A backslash at the very end is treated like an escaped backslash |
| dst.append('\\'); |
| } |
| return i; |
| } |
| } |