Further improved the HdfsResource by managing filesystem.close() better
diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
index 4590bde..c2f38ce 100644
--- a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
+++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
@@ -26,6 +26,7 @@
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metamodel.MetaModelException;
@@ -91,108 +92,168 @@
@Override
public boolean isExists() {
- return doWithFileSystem(new UncheckedFunc<FileSystem, Boolean>() {
- @Override
- protected Boolean evalUnchecked(FileSystem fs) throws Exception {
- return fs.exists(getHadoopPath());
- }
- });
+ final FileSystem fs = getHadoopFileSystem();
+ try {
+ return fs.exists(getHadoopPath());
+ } catch (Exception e) {
+ throw wrapException(e);
+ } finally {
+ FileHelper.safeClose(fs);
+ }
}
@Override
public long getSize() {
- return doWithFileSystem(new UncheckedFunc<FileSystem, Long>() {
- @Override
- protected Long evalUnchecked(FileSystem fs) throws Exception {
- return fs.getFileStatus(getHadoopPath()).getLen();
- }
- });
+ final FileSystem fs = getHadoopFileSystem();
+ try {
+ return fs.getFileStatus(getHadoopPath()).getLen();
+ } catch (Exception e) {
+ throw wrapException(e);
+ } finally {
+ FileHelper.safeClose(fs);
+ }
}
@Override
public long getLastModified() {
- return doWithFileSystem(new UncheckedFunc<FileSystem, Long>() {
- @Override
- protected Long evalUnchecked(FileSystem fs) throws Exception {
- return fs.getFileStatus(getHadoopPath()).getModificationTime();
- }
- });
+ final FileSystem fs = getHadoopFileSystem();
+ try {
+ return fs.getFileStatus(getHadoopPath()).getModificationTime();
+ } catch (Exception e) {
+ throw wrapException(e);
+ } finally {
+ FileHelper.safeClose(fs);
+ }
}
@Override
public void write(final Action<OutputStream> writeCallback) throws ResourceException {
- final OutputStream out = doWithFileSystem(new UncheckedFunc<FileSystem, OutputStream>() {
- @Override
- protected OutputStream evalUnchecked(FileSystem fs) throws Exception {
- return fs.create(getHadoopPath(), true);
- }
- });
+ final FileSystem fs = getHadoopFileSystem();
try {
- writeCallback.run(out);
+ final FSDataOutputStream out = fs.create(getHadoopPath(), true);
+ try {
+ writeCallback.run(out);
+ } finally {
+ FileHelper.safeClose(out);
+ }
} catch (Exception e) {
throw wrapException(e);
} finally {
- FileHelper.safeClose(out);
+ FileHelper.safeClose(fs);
}
}
@Override
public void append(Action<OutputStream> appendCallback) throws ResourceException {
- final OutputStream out = doWithFileSystem(new UncheckedFunc<FileSystem, OutputStream>() {
- @Override
- protected OutputStream evalUnchecked(FileSystem fs) throws Exception {
- return fs.append(getHadoopPath());
- }
- });
+ final FileSystem fs = getHadoopFileSystem();
try {
- appendCallback.run(out);
+ final FSDataOutputStream out = fs.append(getHadoopPath());
+ try {
+ appendCallback.run(out);
+ } finally {
+ FileHelper.safeClose(out);
+ }
} catch (Exception e) {
throw wrapException(e);
} finally {
- FileHelper.safeClose(out);
+ FileHelper.safeClose(fs);
}
}
@Override
public InputStream read() throws ResourceException {
- return doWithFileSystem(new UncheckedFunc<FileSystem, InputStream>() {
+ final FileSystem fs = getHadoopFileSystem();
+ final InputStream in;
+ try {
+ in = fs.open(getHadoopPath());
+ } catch (Exception e) {
+ // we can close 'fs' in case of an exception
+ FileHelper.safeClose(fs);
+ throw wrapException(e);
+ }
+
+ // return a wrappper InputStream which manages the 'fs' closeable
+ return new InputStream() {
@Override
- protected InputStream evalUnchecked(FileSystem fs) throws Exception {
- return fs.open(getHadoopPath());
+ public int read() throws IOException {
+ return in.read();
}
- });
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return in.read(b);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ in.reset();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return in.skip(n);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ // need to close 'fs' when input stream is closed
+ FileHelper.safeClose(fs);
+ }
+ };
}
@Override
public void read(Action<InputStream> readCallback) throws ResourceException {
- final InputStream in = read();
+ final FileSystem fs = getHadoopFileSystem();
try {
- readCallback.run(in);
+ final InputStream in = fs.open(getHadoopPath());
+ try {
+ readCallback.run(in);
+ } finally {
+ FileHelper.safeClose(in);
+ }
} catch (Exception e) {
throw wrapException(e);
} finally {
- FileHelper.safeClose(in);
+ FileHelper.safeClose(fs);
}
}
@Override
public <E> E read(Func<InputStream, E> readCallback) throws ResourceException {
- final InputStream in = read();
+ final FileSystem fs = getHadoopFileSystem();
try {
- return readCallback.eval(in);
+ final InputStream in = fs.open(getHadoopPath());
+ try {
+ return readCallback.eval(in);
+ } finally {
+ FileHelper.safeClose(in);
+ }
} catch (Exception e) {
throw wrapException(e);
} finally {
- FileHelper.safeClose(in);
- }
- }
-
- private <E> E doWithFileSystem(Func<FileSystem, E> action) {
- final FileSystem hadoopFileSystem = getHadoopFileSystem();
- try {
- return action.eval(hadoopFileSystem);
- } catch (Exception e) {
- throw wrapException(e);
+ FileHelper.safeClose(fs);
}
}
diff --git a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
index 5ef5e53..3f37379 100644
--- a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
+++ b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
@@ -80,9 +80,6 @@
final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);
logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");
- assertFalse(res1.isExists());
- logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - exists");
-
res1.write(new Action<OutputStream>() {
@Override
public void run(OutputStream out) throws Exception {
diff --git a/hadoop/src/test/resources/log4j.xml b/hadoop/src/test/resources/log4j.xml
new file mode 100644
index 0000000..29f497f
--- /dev/null
+++ b/hadoop/src/test/resources/log4j.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %d{HH:mm:ss} %c{1} - %m%n" />
+ </layout>
+ </appender>
+
+ <logger name="org.apache.metamodel">
+ <level value="info" />
+ </logger>
+
+ <root>
+ <priority value="fatal" />
+ <appender-ref ref="consoleAppender" />
+ </root>
+
+</log4j:configuration>
\ No newline at end of file