Further improved the HdfsResource by managing filesystem.close() better
diff --git a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
index 4590bde..c2f38ce 100644
--- a/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
+++ b/hadoop/src/main/java/org/apache/metamodel/util/HdfsResource.java
@@ -26,6 +26,7 @@
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.metamodel.MetaModelException;
@@ -91,108 +92,168 @@
 
     @Override
     public boolean isExists() {
-        return doWithFileSystem(new UncheckedFunc<FileSystem, Boolean>() {
-            @Override
-            protected Boolean evalUnchecked(FileSystem fs) throws Exception {
-                return fs.exists(getHadoopPath());
-            }
-        });
+        final FileSystem fs = getHadoopFileSystem();
+        try {
+            return fs.exists(getHadoopPath());
+        } catch (Exception e) {
+            throw wrapException(e);
+        } finally {
+            FileHelper.safeClose(fs);
+        }
     }
 
     @Override
     public long getSize() {
-        return doWithFileSystem(new UncheckedFunc<FileSystem, Long>() {
-            @Override
-            protected Long evalUnchecked(FileSystem fs) throws Exception {
-                return fs.getFileStatus(getHadoopPath()).getLen();
-            }
-        });
+        final FileSystem fs = getHadoopFileSystem();
+        try {
+            return fs.getFileStatus(getHadoopPath()).getLen();
+        } catch (Exception e) {
+            throw wrapException(e);
+        } finally {
+            FileHelper.safeClose(fs);
+        }
     }
 
     @Override
     public long getLastModified() {
-        return doWithFileSystem(new UncheckedFunc<FileSystem, Long>() {
-            @Override
-            protected Long evalUnchecked(FileSystem fs) throws Exception {
-                return fs.getFileStatus(getHadoopPath()).getModificationTime();
-            }
-        });
+        final FileSystem fs = getHadoopFileSystem();
+        try {
+            return fs.getFileStatus(getHadoopPath()).getModificationTime();
+        } catch (Exception e) {
+            throw wrapException(e);
+        } finally {
+            FileHelper.safeClose(fs);
+        }
     }
 
     @Override
     public void write(final Action<OutputStream> writeCallback) throws ResourceException {
-        final OutputStream out = doWithFileSystem(new UncheckedFunc<FileSystem, OutputStream>() {
-            @Override
-            protected OutputStream evalUnchecked(FileSystem fs) throws Exception {
-                return fs.create(getHadoopPath(), true);
-            }
-        });
+        final FileSystem fs = getHadoopFileSystem();
         try {
-            writeCallback.run(out);
+            final FSDataOutputStream out = fs.create(getHadoopPath(), true);
+            try {
+                writeCallback.run(out);
+            } finally {
+                FileHelper.safeClose(out);
+            }
         } catch (Exception e) {
             throw wrapException(e);
         } finally {
-            FileHelper.safeClose(out);
+            FileHelper.safeClose(fs);
         }
     }
 
     @Override
     public void append(Action<OutputStream> appendCallback) throws ResourceException {
-        final OutputStream out = doWithFileSystem(new UncheckedFunc<FileSystem, OutputStream>() {
-            @Override
-            protected OutputStream evalUnchecked(FileSystem fs) throws Exception {
-                return fs.append(getHadoopPath());
-            }
-        });
+        final FileSystem fs = getHadoopFileSystem();
         try {
-            appendCallback.run(out);
+            final FSDataOutputStream out = fs.append(getHadoopPath());
+            try {
+                appendCallback.run(out);
+            } finally {
+                FileHelper.safeClose(out);
+            }
         } catch (Exception e) {
             throw wrapException(e);
         } finally {
-            FileHelper.safeClose(out);
+            FileHelper.safeClose(fs);
         }
     }
 
     @Override
     public InputStream read() throws ResourceException {
-        return doWithFileSystem(new UncheckedFunc<FileSystem, InputStream>() {
+        final FileSystem fs = getHadoopFileSystem();
+        final InputStream in;
+        try {
+            in = fs.open(getHadoopPath());
+        } catch (Exception e) {
+            // we can close 'fs' in case of an exception
+            FileHelper.safeClose(fs);
+            throw wrapException(e);
+        }
+
+        // return a wrappper InputStream which manages the 'fs' closeable
+        return new InputStream() {
             @Override
-            protected InputStream evalUnchecked(FileSystem fs) throws Exception {
-                return fs.open(getHadoopPath());
+            public int read() throws IOException {
+                return in.read();
             }
-        });
+
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                return in.read(b, off, len);
+            }
+
+            @Override
+            public int read(byte[] b) throws IOException {
+                return in.read(b);
+            }
+
+            @Override
+            public boolean markSupported() {
+                return in.markSupported();
+            }
+
+            @Override
+            public synchronized void mark(int readlimit) {
+                in.mark(readlimit);
+            }
+
+            @Override
+            public int available() throws IOException {
+                return in.available();
+            }
+
+            @Override
+            public synchronized void reset() throws IOException {
+                in.reset();
+            }
+
+            @Override
+            public long skip(long n) throws IOException {
+                return in.skip(n);
+            }
+
+            @Override
+            public void close() throws IOException {
+                super.close();
+                // need to close 'fs' when input stream is closed
+                FileHelper.safeClose(fs);
+            }
+        };
     }
 
     @Override
     public void read(Action<InputStream> readCallback) throws ResourceException {
-        final InputStream in = read();
+        final FileSystem fs = getHadoopFileSystem();
         try {
-            readCallback.run(in);
+            final InputStream in = fs.open(getHadoopPath());
+            try {
+                readCallback.run(in);
+            } finally {
+                FileHelper.safeClose(in);
+            }
         } catch (Exception e) {
             throw wrapException(e);
         } finally {
-            FileHelper.safeClose(in);
+            FileHelper.safeClose(fs);
         }
     }
 
     @Override
     public <E> E read(Func<InputStream, E> readCallback) throws ResourceException {
-        final InputStream in = read();
+        final FileSystem fs = getHadoopFileSystem();
         try {
-            return readCallback.eval(in);
+            final InputStream in = fs.open(getHadoopPath());
+            try {
+                return readCallback.eval(in);
+            } finally {
+                FileHelper.safeClose(in);
+            }
         } catch (Exception e) {
             throw wrapException(e);
         } finally {
-            FileHelper.safeClose(in);
-        }
-    }
-
-    private <E> E doWithFileSystem(Func<FileSystem, E> action) {
-        final FileSystem hadoopFileSystem = getHadoopFileSystem();
-        try {
-            return action.eval(hadoopFileSystem);
-        } catch (Exception e) {
-            throw wrapException(e);
+            FileHelper.safeClose(fs);
         }
     }
 
diff --git a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
index 5ef5e53..3f37379 100644
--- a/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
+++ b/hadoop/src/test/java/org/apache/metamodel/util/HdfsResourceIntegrationTest.java
@@ -80,9 +80,6 @@
         final HdfsResource res1 = new HdfsResource(_hostname, _port, _filePath);

         logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - start");

 

-        assertFalse(res1.isExists());

-        logger.info(stopwatch.elapsed(TimeUnit.MILLISECONDS) + " - exists");

-

         res1.write(new Action<OutputStream>() {

             @Override

             public void run(OutputStream out) throws Exception {

diff --git a/hadoop/src/test/resources/log4j.xml b/hadoop/src/test/resources/log4j.xml
new file mode 100644
index 0000000..29f497f
--- /dev/null
+++ b/hadoop/src/test/resources/log4j.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8" ?>

+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">

+

+	<appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender">

+		<param name="Target" value="System.out" />

+		<layout class="org.apache.log4j.PatternLayout">

+			<param name="ConversionPattern" value="%-5p %d{HH:mm:ss} %c{1} - %m%n" />

+		</layout>

+	</appender>

+

+	<logger name="org.apache.metamodel">

+		<level value="info" />

+	</logger>

+

+	<root>

+		<priority value="fatal" />

+		<appender-ref ref="consoleAppender" />

+	</root>

+

+</log4j:configuration>
\ No newline at end of file