blob: 8970cf5d8da024eae1a3c884fff888999ce2d34a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.dfs;
import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxFile;
import com.box.sdk.BoxFolder;
import com.box.sdk.BoxFolder.Info;
import com.box.sdk.BoxItem;
import com.box.sdk.BoxSearch;
import com.box.sdk.BoxSearchParameters;
import com.box.sdk.PartialCollection;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.security.CredentialsProvider;
import org.apache.drill.exec.oauth.PersistentTokenTable;
import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials;
import org.apache.drill.exec.store.security.oauth.OAuthTokenCredentials.Builder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BoxFileSystem extends OAuthEnabledFileSystem {
private static final Logger logger = LoggerFactory.getLogger(BoxFileSystem.class);
private static final String TIMEOUT_DEFAULT = "5000";
private static final List<String> SEARCH_CONTENT_TYPES = new ArrayList<>(Collections.singletonList("name"));
private Path workingDirectory;
private BoxAPIConnection client;
private String workingDirectoryID;
private BoxFolder rootFolder;
private boolean usesDeveloperToken;
private final List<String> ancestorFolderIDs = new ArrayList<>();
private final Map<Path, BoxItem> itemCache = new HashMap<>();
/**
* Returns a URI which identifies this FileSystem.
*
* @return the URI of this filesystem.
*/
@Override
public URI getUri() {
try {
return new URI("box:///");
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
/**
* Opens an FSDataInputStream at the indicated Path.
*
* @param inputPath the file name to open
* @param bufferSize the size of the buffer to be used.
* @throws IOException IO failure
*/
@Override
public FSDataInputStream open(Path inputPath, int bufferSize) throws IOException {
client = getClient();
ByteArrayOutputStream out = new ByteArrayOutputStream();
BoxItem item = getItem(inputPath);
if (item instanceof BoxFile) {
BoxFile file = (BoxFile) item;
updateTokens();
file.download(out);
updateTokens();
FSDataInputStream fsDataInputStream = new FSDataInputStream(new SeekableByteArrayInputStream(out.toByteArray()));
out.close();
return fsDataInputStream;
} else {
throw new IOException("Attempted to read " + inputPath + " which is not a file. Only files can be read by Box.");
}
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
*
* @param f the file name to open
* @param permission file permission
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize block size
* @param progress the progress reporter
* @throws IOException IO failure
* @see #setPermission(Path, FsPermission)
*/
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
throw new UnsupportedOperationException("Box is read only.");
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
throw new UnsupportedOperationException("Box does not support append.");
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
throw new UnsupportedOperationException("Box does not support rename.");
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return false;
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
* <p>
* Does not guarantee to return the List of files/directories status in a
* sorted order.
* <p>
* Will not return null. Expect IOException upon access error.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
BoxItem incoming = getItem(f);
// If the incoming item is a folder, find all the items in it,
// and iterate over them. This is not recursive, so it will only
// iterate over the current folder.
if (incoming instanceof BoxFolder) {
BoxFolder folder = (BoxFolder) incoming;
// Iterate over the children
int itemCount = 0;
FileStatus status;
List<FileStatus> fileStatusList = new ArrayList<>();
for (BoxItem.Info childInfo : folder.getChildren()) {
Path newPath = new Path(f + childInfo.getName());
if (childInfo instanceof BoxFolder.Info) {
status = new FileStatus(childInfo.getSize(), true, 1,0,
getModifiedMillis(childInfo), newPath);
fileStatusList.add(status);
itemCount++;
} else if (childInfo instanceof BoxFile.Info) {
status = new FileStatus(childInfo.getSize(), false, 1,0,
getModifiedMillis(childInfo), newPath);
fileStatusList.add(status);
itemCount++;
}
}
return fileStatusList.toArray(new FileStatus[itemCount]);
} else if (incoming instanceof BoxFile) {
// If the incoming object is a file, return the file info
BoxFile infile = (BoxFile)incoming;
FileStatus[] results = new FileStatus[1];
results[0] = new FileStatus(infile.getInfo().getSize(), false, 1, 0,
getModifiedMillis(infile), f);
return results;
}
// Ignore web links
return new FileStatus[0];
}
/**
* Set the current working directory for the given FileSystem. All relative
* paths will be resolved relative to it.
*
* @param new_dir Path of new working directory
*/
@Override
public void setWorkingDirectory(Path new_dir) {
logger.debug("Setting working directory to: " + new_dir.getName());
workingDirectory = new_dir;
ancestorFolderIDs.clear();
// Set the working directory id.
if (StringUtils.isEmpty(workingDirectoryID) || new_dir.toString().contentEquals("/")) {
workingDirectoryID = "0";
ancestorFolderIDs.add("0");
} else {
// Split the path by the slash
List<String> pathParts = new ArrayList<>(Arrays.asList(new_dir.toString().split("/")));
for (String pathPart : pathParts) {
BoxSearch search = new BoxSearch(client);
updateTokens();
BoxSearchParameters searchParams = new BoxSearchParameters();
searchParams.setQuery(pathPart);
searchParams.setContentTypes(SEARCH_CONTENT_TYPES);
searchParams.setType("folder");
searchParams.setAncestorFolderIds(ancestorFolderIDs);
PartialCollection<BoxItem.Info> searchResults = search.searchRange(1, 3, searchParams);
updateTokens();
// Iterate over search results
for (BoxItem.Info result : searchResults) {
// Get the ID of the folder and add it to the ancestorFolderId list
String id = result.getID();
ancestorFolderIDs.add(id);
break;
}
}
}
}
@Override
public Path getWorkingDirectory() {
// If the working directory is empty, assume it is the root directory and set both
// the id and working directory to root.
if (StringUtils.isEmpty(workingDirectoryID)) {
workingDirectory = new Path("/");
workingDirectoryID = "0";
}
return workingDirectory;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return false;
}
/**
* Return a file status object that represents the path.
*
* @param f The path we want information from
* @return a FileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException see specific implementation
*/
@Override
public FileStatus getFileStatus(Path f) throws IOException {
this.client = getClient();
BoxItem pathItem = getItem(f);
if (pathItem instanceof BoxFolder) {
BoxFolder folder = (BoxFolder) pathItem;
updateTokens();
// Folders may have a modified date of zero.
Info folderInfo = folder.getInfo();
long size = folderInfo.getSize();
return new FileStatus(size, true, 1,0, getModifiedMillis(pathItem), f);
} else if (pathItem instanceof BoxFile) {
BoxFile boxFile = (BoxFile) pathItem;
BoxFile.Info fileInfo = boxFile.getInfo();
long fileSize = fileInfo.getSize();
return new FileStatus(fileSize, false, 1,0,
getModifiedMillis(pathItem), f);
} else {
// The only other option here would be a BoxWebLink and Drill can't do anything with that.
return new FileStatus();
}
}
private BoxItem getItem(Path path) {
if (itemCache.containsKey(path)) {
return itemCache.get(path);
}
// Make sure the client is initialized
this.client = getClient();
// Check to see if it is the root
if (path.isRoot()) {
if (rootFolder == null) {
rootFolder = BoxFolder.getRootFolder(client);
updateTokens();
itemCache.put(path, rootFolder);
}
return rootFolder;
}
// Next check to see if the item ends in a file extension.
// In Box, you can only access items via their ID, so rather than making numerous API
// calls to traverse a directory path, we will use Box's search API to find candidates.
// If there is one candidate, return that. If not, we will use the item's parents to make
// sure it is the correct item.
long offsetValue = 0;
long limitValue = 100;
BoxSearch search = new BoxSearch(client);
updateTokens();
BoxSearchParameters searchParams = new BoxSearchParameters();
searchParams.setQuery(path.getName());
searchParams.setContentTypes(SEARCH_CONTENT_TYPES);
// Get the file extension and use it as a search parameter
String fileExtension = FilenameUtils.getExtension(path.getName());
// If there is no extension, assume the path is a folder
if (StringUtils.isEmpty(fileExtension)) {
searchParams.setType("folder");
} else {
searchParams.setType("file");
searchParams.setFileExtensions(Collections.singletonList(fileExtension));
}
if (ancestorFolderIDs.size() > 0) {
searchParams.setAncestorFolderIds(ancestorFolderIDs);
}
PartialCollection<BoxItem.Info> searchResults =
search.searchRange(offsetValue, limitValue, searchParams);
updateTokens();
// Assuming that the first result is the one we want, return the item for that.
for (BoxItem.Info resultInfo : searchResults) {
String id = resultInfo.getID();
if (resultInfo.getType().contentEquals("file")) {
BoxFile file = new BoxFile(client, id);
updateTokens();
itemCache.put(path, file);
return file;
} else if (resultInfo.getType().contentEquals("folder")) {
BoxFolder folder = new BoxFolder(client, id);
updateTokens();
itemCache.put(path, folder);
return folder;
}
}
return null;
}
/**
* Updates the OAuth tokens. Box API tokens seem to be very short-lived and change after every
* API call. Be sure to call this method after any call that actually calls the Box API.
* <p>
* If the Box API client uses a developer token, the function will simply return.
*/
private void updateTokens() {
if (client == null || usesDeveloperToken) {
return;
} else if (client.canRefresh() && client.needsRefresh()) {
if (!client.canRefresh()) {
throw UserException.connectionError()
.message("Box file system missing refresh token. Please reauthenticate to obtain a refresh token.")
.build(logger);
}
super.updateTokens(client.getAccessToken(), client.getRefreshToken(), String.valueOf(client.getExpires()));
}
}
/**
* The Box client can use either OAuth 2.0 or a static developer token. The OAuth tokens are very
* short lived and hence are difficult to use for testing.
* @return An authenticated {@link BoxAPIConnection} Box client
*/
private BoxAPIConnection getClient() {
if (this.client != null) {
return client;
}
// Get timeout values from configuration.
int connectionTimeout = Integer.parseInt(this.getConf().get("boxConnectionTimeout", TIMEOUT_DEFAULT));
int readTimeout = Integer.parseInt(this.getConf().get("boxReadTimeout", TIMEOUT_DEFAULT));
// If the developer token is populated, use this rather than the OAuth tokens to create the client
// This should only be used for testing.
String developerToken = this.getConf().get("boxAccessToken", "");
if (StringUtils.isNotEmpty(developerToken)) {
BoxAPIConnection client = new BoxAPIConnection(developerToken);
client.setConnectTimeout(connectionTimeout);
client.setReadTimeout(readTimeout);
this.usesDeveloperToken = true;
return client;
}
CredentialsProvider credentialsProvider = getCredentialsProvider();
PersistentTokenTable tokenTable = getTokenTable();
OAuthTokenCredentials credentials = new Builder()
.setCredentialsProvider(credentialsProvider)
.setTokenTable(tokenTable)
.build()
.get();
BoxAPIConnection newClient = new BoxAPIConnection(credentials.getClientID(), credentials.getClientSecret(),
tokenTable.getAccessToken(), tokenTable.getRefreshToken());
newClient.setConnectTimeout(connectionTimeout);
newClient.setReadTimeout(readTimeout);
return newClient;
}
/**
* Returns the milliseconds of the modified time. In some cases, this can be null, so these functions
* avoid having multiple null checks all over the place.
* @param item {@link BoxItem} of the input
* @return The milliseconds of the modified time.
*/
private long getModifiedMillis(BoxItem item) {
return getModifiedMillis(item.getInfo());
}
/**
* Returns the milliseconds of the modified time. In some cases, this can be null, so these functions
* avoid having multiple null checks all over the place.
* @param info {@link BoxItem.Info} of the input
* @return The milliseconds of the modified time.
*/
private long getModifiedMillis(BoxItem.Info info ) {
Date modifiedDate = info.getModifiedAt();
if (modifiedDate == null) {
return 0;
} else {
return modifiedDate.getTime();
}
}
}