blob: 19b22a0bcf99d12358cb4379354a080c28220e9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.pge.staging;
//OODT static imports
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.QUERY_FILE_MANAGER_URL;
//JDK imports
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
//OODT imports
import org.apache.commons.lang.Validate;
import org.apache.oodt.cas.filemgr.structs.Product;
import org.apache.oodt.cas.filemgr.structs.Reference;
import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException;
import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient;
import org.apache.oodt.cas.pge.config.FileStagingInfo;
import org.apache.oodt.cas.pge.metadata.PgeMetadata;
//Google imports
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
/**
* Responsible for transferring Product files to a directory accessible by
* by CAS-PGE.
*
* @author bfoster (Brian Foster)
*/
public abstract class FileStager {
public void stageFiles(FileStagingInfo fileStagingInfo,
PgeMetadata pgeMetadata, Logger logger) throws Exception {
logger.log(Level.INFO, "Creating staging directory ["
+ fileStagingInfo.getStagingDir() + "]");
new File(fileStagingInfo.getStagingDir()).mkdirs();
for (String file : fileStagingInfo.getFilePaths()) {
File fileHandle = new File(file);
if (fileStagingInfo.isForceStaging() || !fileHandle.exists()) {
logger.log(Level.INFO, "Staging file [" + file
+ "] to directory ["
+ fileStagingInfo.getStagingDir() + "]");
stageFile(asURI(file), new File(fileStagingInfo.getStagingDir()),
pgeMetadata, logger);
}
}
if (!fileStagingInfo.getProductIds().isEmpty()) {
XmlRpcFileManagerClient fmClient = createFileManagerClient(pgeMetadata);
for (String productId : fileStagingInfo.getProductIds()) {
logger.log(Level.INFO, "Staging product [" + productId
+ "] to directory ["
+ fileStagingInfo.getStagingDir() + "]");
for (URI uri : getProductReferences(productId, fmClient)) {
logger.log(Level.INFO, "Staging product [" + productId
+ "] reference [" + uri
+ "] to directory ["
+ fileStagingInfo.getStagingDir() + "]");
stageFile(uri, new File(fileStagingInfo.getStagingDir()),
pgeMetadata, logger);
}
}
}
}
@VisibleForTesting
static XmlRpcFileManagerClient createFileManagerClient(PgeMetadata pgeMetadata)
throws Exception {
String filemgrUrl = pgeMetadata.getMetadata(QUERY_FILE_MANAGER_URL);
if (filemgrUrl == null) {
throw new Exception("Must specify [" + QUERY_FILE_MANAGER_URL
+ "] if you want to stage product IDs");
}
return new XmlRpcFileManagerClient(new URL(filemgrUrl));
}
@VisibleForTesting
static List<URI> getProductReferences(
String productId, XmlRpcFileManagerClient fmClient)
throws URISyntaxException, CatalogException {
List<URI> files = Lists.newArrayList();
Product product = new Product();
product.setProductId(productId);
List<Reference> refs = fmClient.getProductReferences(product);
for (Reference ref : refs) {
files.add(new URI(ref.getDataStoreReference()));
}
return files;
}
@VisibleForTesting
static URI asURI(String path) throws URISyntaxException {
Validate.notNull(path, "path must not be null");
URI uri = URI.create(path);
if (uri.getScheme() == null) {
uri = URI.create("file://" + new File(path).getAbsolutePath());
}
return uri;
}
protected abstract void stageFile(URI stageFile, File destDir,
PgeMetadata pgeMetadata, Logger logger) throws Exception;
}