blob: 46190ecf94141b57eef33ffa558cdc4c715f5865 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.datastorage;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
public class HDataStorage implements DataStorage {
private static final HPath[] EMPTY_HPATH = new HPath[0];
private FileSystem fs;
private Configuration configuration;
private Properties properties;
private URI uri;
public HDataStorage(URI uri, Properties properties) {
this.properties = properties;
this.uri = uri;
init();
}
public HDataStorage(Properties properties) {
this.properties = properties;
init();
}
@Override
public void init() {
this.configuration = ConfigurationUtil.toConfiguration(this.properties);
try {
if (this.uri != null) {
this.fs = FileSystem.get(this.uri, this.configuration);
} else {
this.fs = FileSystem.get(this.configuration);
}
} catch (IOException e) {
throw new RuntimeException("Failed to create DataStorage", e);
}
}
@Override
public void close() throws IOException {
fs.close();
}
@Override
public Properties getConfiguration() {
return this.properties;
}
@Override
public void updateConfiguration(Properties newConfiguration)
throws DataStorageException {
// TODO this method is never called and not sure if hadoop would support
// that.
if (newConfiguration == null) {
return;
}
Enumeration<Object> newKeys = newConfiguration.keys();
while (newKeys.hasMoreElements()) {
String key = (String) newKeys.nextElement();
String value = null;
value = newConfiguration.getProperty(key);
fs.getConf().set(key, value);
}
}
@Override
public Map<String, Object> getStatistics() throws IOException {
Map<String, Object> stats = new HashMap<String, Object>();
long usedBytes = fs.getUsed();
stats.put(USED_BYTES_KEY, Long.valueOf(usedBytes).toString());
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
long rawCapacityBytes = dfs.getStatus().getCapacity();
stats.put(RAW_CAPACITY_KEY, Long.valueOf(rawCapacityBytes)
.toString());
long rawUsedBytes = dfs.getStatus().getUsed();
stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString());
}
return stats;
}
@Override
public ElementDescriptor asElement(String name) throws DataStorageException {
return this.isContainer(name) ? new HDirectory(this, name) : new HFile(
this, name);
}
@Override
public ElementDescriptor asElement(ElementDescriptor element)
throws DataStorageException {
return asElement(element.toString());
}
@Override
public ElementDescriptor asElement(String parent, String child)
throws DataStorageException {
return asElement((new Path(parent, child)).toString());
}
@Override
public ElementDescriptor asElement(ContainerDescriptor parent, String child)
throws DataStorageException {
return asElement(parent.toString(), child);
}
@Override
public ElementDescriptor asElement(ContainerDescriptor parent,
ElementDescriptor child) throws DataStorageException {
return asElement(parent.toString(), child.toString());
}
@Override
public ContainerDescriptor asContainer(String name)
throws DataStorageException {
return new HDirectory(this, name);
}
@Override
public ContainerDescriptor asContainer(ContainerDescriptor container)
throws DataStorageException {
return new HDirectory(this, container.toString());
}
@Override
public ContainerDescriptor asContainer(String parent, String child)
throws DataStorageException {
return new HDirectory(this, parent, child);
}
@Override
public ContainerDescriptor asContainer(ContainerDescriptor parent,
String child) throws DataStorageException {
return new HDirectory(this, parent.toString(), child);
}
@Override
public ContainerDescriptor asContainer(ContainerDescriptor parent,
ContainerDescriptor child) throws DataStorageException {
return new HDirectory(this, parent.toString(), child.toString());
}
@Override
public void setActiveContainer(ContainerDescriptor container) {
fs.setWorkingDirectory(new Path(container.toString()));
}
@Override
public ContainerDescriptor getActiveContainer() {
return new HDirectory(this, fs.getWorkingDirectory());
}
@Override
public boolean isContainer(String name) throws DataStorageException {
boolean isContainer = false;
Path path = new Path(name);
try {
if (this.fs.exists(path) && (!this.fs.isFile(path))) {
isContainer = true;
}
} catch (IOException e) {
int errCode = 6007;
String msg = "Unable to check name " + name;
throw new DataStorageException(msg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
return isContainer;
}
@Override
public HPath[] asCollection(String pattern) throws DataStorageException {
try {
FileStatus[] paths = this.fs.globStatus(new Path(pattern));
if (paths == null) {
return EMPTY_HPATH;
}
List<HPath> hpaths = new ArrayList<HPath>();
for (int i = 0; i < paths.length; ++i) {
HPath hpath = (HPath) this.asElement(paths[i].getPath().toString());
if (!hpath.systemElement()) {
hpaths.add(hpath);
}
}
return hpaths.toArray(new HPath[0]);
} catch (IOException e) {
int errCode = 6008;
String msg = "Failed to obtain glob for " + pattern;
throw new DataStorageException(msg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
}
public FileSystem getHFS() {
return fs;
}
}