blob: bcb8ef8112b02d49b688b87c6dfdf02da493cf37 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.base;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.baidu.hugegraph.exception.ToolsException;
import com.baidu.hugegraph.rest.ClientException;
import com.baidu.hugegraph.util.E;
public class HdfsDirectory extends Directory {
public static final String HDFS_FS_DEFAULT_NAME = "fs.default.name";
private final Map<String, String> conf;
public HdfsDirectory(String directory, Map<String, String> conf) {
super(directory);
this.conf = conf;
}
private FileSystem fileSystem() {
Configuration conf = new Configuration();
for (Map.Entry<String, String> entry : this.conf.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
try {
return FileSystem.get(conf);
} catch (IOException e) {
throw new ClientException("Failed to access HDFS with " +
"configuration %s", this.conf, e);
}
}
@Override
public List<String> files() {
FileSystem fs = this.fileSystem();
FileStatus[] statuses;
try {
statuses = fs.listStatus(new Path(this.directory()));
} catch (IOException e) {
throw new ToolsException("Failed to get file list in directory " +
"'%s'", e, this.directory());
}
List<String> files = new ArrayList<>();
for (FileStatus status : statuses) {
if (status.isFile()) {
files.add(status.getPath().getName());
}
}
return files;
}
@Override
public void ensureDirectoryExist(boolean create) {
FileSystem fs = this.fileSystem();
Path path = new Path(this.directory());
try {
if (fs.exists(path)) {
E.checkState(fs.getFileStatus(path).isDirectory(),
"Can't use directory '%s' because " +
"a file with same name exists.", this.directory());
} else {
if (create) {
E.checkState(fs.mkdirs(path),
"Directory '%s' not exists and created " +
"failed", path.toString());
} else {
throw new ToolsException("Directory '%s' not exists",
path.toString());
}
}
} catch (IOException e) {
throw new ToolsException("Invalid directory '%s'",
e, this.directory());
}
}
@Override
public String suffix() {
return ".zip";
}
@Override
public InputStream inputStream(String file) {
String path = this.path(file);
FileSystem fs = this.fileSystem();
FSDataInputStream is = null;
ZipInputStream zis;
Path source = new Path(path);
try {
is = fs.open(source);
zis = new ZipInputStream(is);
E.checkState(zis.getNextEntry() != null,
"Invalid zip file '%s'", file);
} catch (IOException e) {
closeAndIgnoreException(is);
throw new ClientException("Failed to read from %s", e, path);
}
return zis;
}
@Override
public OutputStream outputStream(String file, boolean override) {
String path = this.path(file + this.suffix());
FileSystem fs = this.fileSystem();
FSDataOutputStream os = null;
ZipOutputStream zos = null;
Path dest = new Path(path);
try {
if (override) {
os = fs.create(dest, true);
} else {
os = fs.append(dest);
}
zos = new ZipOutputStream(os);
ZipEntry entry = new ZipEntry(file);
zos.putNextEntry(entry);
} catch (IOException e) {
closeAndIgnoreException(zos);
closeAndIgnoreException(os);
throw new ClientException("Failed to write to %s", e, path);
}
return zos;
}
public static HdfsDirectory constructDir(String directory, String graph,
Map<String, String> hdfsConf) {
String hdfsFs = hdfsConf.get(HDFS_FS_DEFAULT_NAME);
E.checkArgument(hdfsFs != null && !hdfsFs.isEmpty(),
"'%s' can not be null or empty " +
"when try to backup to HDFS", HDFS_FS_DEFAULT_NAME);
if (directory == null || directory.isEmpty()) {
if (hdfsFs.endsWith("/")) {
directory = hdfsFs + graph;
} else {
directory = hdfsFs + "/" + graph;
}
}
return new HdfsDirectory(directory, hdfsConf);
}
private String path(String file) {
if (this.directory().endsWith(Path.SEPARATOR)) {
return this.directory() + file;
} else {
return this.directory() + Path.SEPARATOR + file;
}
}
}