/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.pig.backend.hadoop.datastorage; | |
import java.io.IOException; | |
import java.net.URI; | |
import java.util.ArrayList; | |
import java.util.Enumeration; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hdfs.DistributedFileSystem; | |
import org.apache.pig.PigException; | |
import org.apache.pig.backend.datastorage.ContainerDescriptor; | |
import org.apache.pig.backend.datastorage.DataStorage; | |
import org.apache.pig.backend.datastorage.DataStorageException; | |
import org.apache.pig.backend.datastorage.ElementDescriptor; | |
public class HDataStorage implements DataStorage { | |
private static final HPath[] EMPTY_HPATH = new HPath[0]; | |
private FileSystem fs; | |
private Configuration configuration; | |
private Properties properties; | |
private URI uri; | |
public HDataStorage(URI uri, Properties properties) { | |
this.properties = properties; | |
this.uri = uri; | |
init(); | |
} | |
public HDataStorage(Properties properties) { | |
this.properties = properties; | |
init(); | |
} | |
@Override | |
public void init() { | |
this.configuration = ConfigurationUtil.toConfiguration(this.properties); | |
try { | |
if (this.uri != null) { | |
this.fs = FileSystem.get(this.uri, this.configuration); | |
} else { | |
this.fs = FileSystem.get(this.configuration); | |
} | |
} catch (IOException e) { | |
throw new RuntimeException("Failed to create DataStorage", e); | |
} | |
} | |
@Override | |
public void close() throws IOException { | |
fs.close(); | |
} | |
@Override | |
public Properties getConfiguration() { | |
return this.properties; | |
} | |
@Override | |
public void updateConfiguration(Properties newConfiguration) | |
throws DataStorageException { | |
// TODO this method is never called and not sure if hadoop would support | |
// that. | |
if (newConfiguration == null) { | |
return; | |
} | |
Enumeration<Object> newKeys = newConfiguration.keys(); | |
while (newKeys.hasMoreElements()) { | |
String key = (String) newKeys.nextElement(); | |
String value = null; | |
value = newConfiguration.getProperty(key); | |
fs.getConf().set(key, value); | |
} | |
} | |
@Override | |
public Map<String, Object> getStatistics() throws IOException { | |
Map<String, Object> stats = new HashMap<String, Object>(); | |
long usedBytes = fs.getUsed(); | |
stats.put(USED_BYTES_KEY, Long.valueOf(usedBytes).toString()); | |
if (fs instanceof DistributedFileSystem) { | |
DistributedFileSystem dfs = (DistributedFileSystem) fs; | |
long rawCapacityBytes = dfs.getStatus().getCapacity(); | |
stats.put(RAW_CAPACITY_KEY, Long.valueOf(rawCapacityBytes) | |
.toString()); | |
long rawUsedBytes = dfs.getStatus().getUsed(); | |
stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString()); | |
} | |
return stats; | |
} | |
@Override | |
public ElementDescriptor asElement(String name) throws DataStorageException { | |
return this.isContainer(name) ? new HDirectory(this, name) : new HFile( | |
this, name); | |
} | |
@Override | |
public ElementDescriptor asElement(ElementDescriptor element) | |
throws DataStorageException { | |
return asElement(element.toString()); | |
} | |
@Override | |
public ElementDescriptor asElement(String parent, String child) | |
throws DataStorageException { | |
return asElement((new Path(parent, child)).toString()); | |
} | |
@Override | |
public ElementDescriptor asElement(ContainerDescriptor parent, String child) | |
throws DataStorageException { | |
return asElement(parent.toString(), child); | |
} | |
@Override | |
public ElementDescriptor asElement(ContainerDescriptor parent, | |
ElementDescriptor child) throws DataStorageException { | |
return asElement(parent.toString(), child.toString()); | |
} | |
@Override | |
public ContainerDescriptor asContainer(String name) | |
throws DataStorageException { | |
return new HDirectory(this, name); | |
} | |
@Override | |
public ContainerDescriptor asContainer(ContainerDescriptor container) | |
throws DataStorageException { | |
return new HDirectory(this, container.toString()); | |
} | |
@Override | |
public ContainerDescriptor asContainer(String parent, String child) | |
throws DataStorageException { | |
return new HDirectory(this, parent, child); | |
} | |
@Override | |
public ContainerDescriptor asContainer(ContainerDescriptor parent, | |
String child) throws DataStorageException { | |
return new HDirectory(this, parent.toString(), child); | |
} | |
@Override | |
public ContainerDescriptor asContainer(ContainerDescriptor parent, | |
ContainerDescriptor child) throws DataStorageException { | |
return new HDirectory(this, parent.toString(), child.toString()); | |
} | |
@Override | |
public void setActiveContainer(ContainerDescriptor container) { | |
fs.setWorkingDirectory(new Path(container.toString())); | |
} | |
@Override | |
public ContainerDescriptor getActiveContainer() { | |
return new HDirectory(this, fs.getWorkingDirectory()); | |
} | |
@Override | |
public boolean isContainer(String name) throws DataStorageException { | |
boolean isContainer = false; | |
Path path = new Path(name); | |
try { | |
if (this.fs.exists(path) && (!this.fs.isFile(path))) { | |
isContainer = true; | |
} | |
} catch (IOException e) { | |
int errCode = 6007; | |
String msg = "Unable to check name " + name; | |
throw new DataStorageException(msg, errCode, | |
PigException.REMOTE_ENVIRONMENT, e); | |
} | |
return isContainer; | |
} | |
@Override | |
public HPath[] asCollection(String pattern) throws DataStorageException { | |
try { | |
FileStatus[] paths = this.fs.globStatus(new Path(pattern)); | |
if (paths == null) { | |
return EMPTY_HPATH; | |
} | |
List<HPath> hpaths = new ArrayList<HPath>(); | |
for (int i = 0; i < paths.length; ++i) { | |
HPath hpath = (HPath) this.asElement(paths[i].getPath().toString()); | |
if (!hpath.systemElement()) { | |
hpaths.add(hpath); | |
} | |
} | |
return hpaths.toArray(new HPath[0]); | |
} catch (IOException e) { | |
int errCode = 6008; | |
String msg = "Failed to obtain glob for " + pattern; | |
throw new DataStorageException(msg, errCode, | |
PigException.REMOTE_ENVIRONMENT, e); | |
} | |
} | |
public FileSystem getHFS() { | |
return fs; | |
} | |
} |