blob: ec5cfdfcf9d5ae8427a33667f2f1b94118996248 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# S3 access utilities
#
# This file uses the boto3 client and provides simple functions to the Impala test suite
# to access Amazon S3.
import boto3
from tests.util.filesystem_base import BaseFilesystem
class S3Client(BaseFilesystem):
@classmethod
def __init__(self, bucket):
self.bucketname = bucket
self.s3 = boto3.resource('s3')
self.bucket = self.s3.Bucket(self.bucketname)
self.s3client = boto3.client('s3')
def create_file(self, path, file_data, overwrite=True):
if not overwrite and self.exists(path): return False
self.s3client.put_object(Bucket=self.bucketname, Key=path, Body=file_data)
return True
def make_dir(self, path, permission=None):
# This function is a no-op. S3 is a key-value store and does not have a directory
# structure. We can use a non existant path as though it already exists.
pass
def copy(self, src, dst):
self.s3client.copy_object(Bucket=self.bucketname,
CopySource={'Bucket':self.bucketname, 'Key':src}, Key=dst)
assert self.exists(dst), \
'S3 copy failed: Destination file {dst} does not exist'.format(dst=dst)
# Since S3 is a key-value store, it does not have a command like 'ls' for a directory
# structured filesystem. It lists everything under a path recursively.
# We have to manipulate its response to get an 'ls' like output.
def ls(self, path):
if not path.endswith('/'):
path += '/'
# Use '/' as a delimiter so that we don't get all keys under a path recursively.
response = self.s3client.list_objects(
Bucket=self.bucketname, Prefix=path, Delimiter='/')
dirs = []
# Non-keys or "directories" will be listed as 'Prefix' under 'CommonPrefixes'.
if 'CommonPrefixes' in response:
dirs = [t['Prefix'] for t in response['CommonPrefixes']]
files = []
# Keys or "files" will be listed as 'Key' under 'Contents'.
if 'Contents' in response:
files = [t['Key'] for t in response['Contents']]
files_and_dirs = []
files_and_dirs.extend([d.split('/')[-2] for d in dirs])
for f in files:
key = f.split("/")[-1]
if not key == '':
files_and_dirs += [key]
return files_and_dirs
def get_all_file_sizes(self, path):
if not path.endswith('/'):
path += '/'
# Use '/' as a delimiter so that we don't get all keys under a path recursively.
response = self.s3client.list_objects(
Bucket=self.bucketname, Prefix=path, Delimiter='/')
if 'Contents' in response:
return [t['Size'] for t in response['Contents']]
return []
def exists(self, path):
response = self.s3client.list_objects(Bucket=self.bucketname,Prefix=path)
return response.get('Contents') is not None
# Helper function which lists keys in a path. Should not be used by the tests directly.
def _list_keys(self, path):
if not self.exists(path):
return False
response = self.s3client.list_objects(Bucket=self.bucketname, Prefix=path)
contents = response.get('Contents')
return [c['Key'] for c in contents]
def delete_file_dir(self, path, recursive=False):
if not self.exists(path):
return True
objects = [{'Key': k} for k in self._list_keys(path)] if recursive else path
self.s3client.delete_objects(Bucket=self.bucketname, Delete={'Objects':objects})
return True