DLFS - A FileSystem API wrapper over dlog API

Descriptions of the changes in this PR:

- FileSystem API wrapper built over dlog API

(This is based on initial implementation from gerritsundaram at #43)

Features supported:

- create and append files
- open files for reading
- input stream and output stream for reading and writing data
- list files
- get file status
- rename
- mkdir

Features aren't supported:

- truncate
- currently there is no clear distinguish between file and dir
- only support delete recursive

(This change includes small changes for #224 #225 #226 ).

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>

This closes #227 from sijie/fix_create_log
diff --git a/distributedlog-io/dlfs/pom.xml b/distributedlog-io/dlfs/pom.xml
new file mode 100644
index 0000000..775f07b
--- /dev/null
+++ b/distributedlog-io/dlfs/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>distributedlog</artifactId>
+    <groupId>org.apache.distributedlog</groupId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../..</relativePath>
+  </parent>
+  <groupId>org.apache.distributedlog</groupId>
+  <artifactId>dlfs</artifactId>
+  <name>Apache DistributedLog :: IO :: FileSystem</name>
+  <url>http://maven.apache.org</url>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.libdir>${basedir}/lib</project.libdir>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+      <version>${lombok.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-core</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.7.2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-core</artifactId>
+      <version>${project.parent.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>${maven-checkstyle-plugin.version}</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>${puppycrawl.checkstyle.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.distributedlog</groupId>
+            <artifactId>distributedlog-build-tools</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeResources>false</includeResources>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
new file mode 100644
index 0000000..0670a4a
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.fs;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.util.Utils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A FileSystem Implementation powered by replicated logs.
+ */
+@Slf4j
+public class DLFileSystem extends FileSystem {
+
+    //
+    // Settings
+    //
+
+    public static final String DLFS_CONF_FILE = "dlog.configuration.file";
+
+
+    private URI rootUri;
+    private Namespace namespace;
+    private final DistributedLogConfiguration dlConf;
+    private Path workingDir;
+
+    public DLFileSystem() {
+        this.dlConf = new DistributedLogConfiguration();
+        setWorkingDirectory(new Path(System.getProperty("user.dir", "")));
+    }
+
+    @Override
+    public URI getUri() {
+        return rootUri;
+    }
+
+    //
+    // Initialization
+    //
+
+    @Override
+    public void initialize(URI name, Configuration conf) throws IOException {
+        super.initialize(name, conf);
+        setConf(conf);
+
+        // initialize
+
+        this.rootUri = name;
+        // load the configuration
+        String dlConfLocation = conf.get(DLFS_CONF_FILE);
+        if (null != dlConfLocation) {
+            try {
+                this.dlConf.loadConf(new File(dlConfLocation).toURI().toURL());
+                log.info("Loaded the distributedlog configuration from {}", dlConfLocation);
+            } catch (ConfigurationException e) {
+                log.error("Failed to load the distributedlog configuration from " + dlConfLocation, e);
+                throw new IOException("Failed to load distributedlog configuration from " + dlConfLocation);
+            }
+        }
+        log.info("Initializing the filesystem at {}", name);
+        // initialize the namespace
+        this.namespace = NamespaceBuilder.newBuilder()
+                .clientId("dlfs-client-" + InetAddress.getLocalHost().getHostName())
+                .conf(dlConf)
+                .regionId(DistributedLogConstants.LOCAL_REGION_ID)
+                .uri(name)
+                .build();
+        log.info("Initialized the filesystem at {}", name);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // clean up the resource
+        namespace.close();
+        super.close();
+    }
+
+    //
+    // Util Functions
+    //
+
+    private Path makeAbsolute(Path f) {
+        if (f.isAbsolute()) {
+            return f;
+        } else {
+            return new Path(workingDir, f);
+        }
+    }
+
+    private String getStreamName(Path relativePath) {
+        return makeAbsolute(relativePath).toUri().getPath().substring(1);
+    }
+
+    //
+    // Home & Working Directory
+    //
+
+    @Override
+    public Path getHomeDirectory() {
+        return this.makeQualified(new Path(System.getProperty("user.home", "")));
+    }
+
+    protected Path getInitialWorkingDirectory() {
+        return this.makeQualified(new Path(System.getProperty("user.dir", "")));
+    }
+
+    @Override
+    public void setWorkingDirectory(Path path) {
+        workingDir = makeAbsolute(path);
+        checkPath(workingDir);
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+        return workingDir;
+    }
+
+
+    @Override
+    public FSDataInputStream open(Path path, int bufferSize)
+            throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(getStreamName(path));
+            LogReader reader;
+            try {
+                reader = dlm.openLogReader(DLSN.InitialDLSN);
+            } catch (LogNotFoundException lnfe) {
+                throw new FileNotFoundException(path.toString());
+            } catch (LogEmptyException lee) {
+                throw new FileNotFoundException(path.toString());
+            }
+            return new FSDataInputStream(
+                new BufferedFSInputStream(
+                    new DLInputStream(dlm, reader, 0L),
+                    bufferSize));
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(path.toString());
+        }
+    }
+
+    @Override
+    public FSDataOutputStream create(Path path,
+                                     FsPermission fsPermission,
+                                     boolean overwrite,
+                                     int bufferSize,
+                                     short replication,
+                                     long blockSize,
+                                     Progressable progressable) throws IOException {
+        // for overwrite, delete the existing file first.
+        if (overwrite) {
+            delete(path, false);
+        }
+
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(dlConf);
+        confLocal.setEnsembleSize(replication);
+        confLocal.setWriteQuorumSize(replication);
+        confLocal.setAckQuorumSize(replication);
+        confLocal.setMaxLogSegmentBytes(blockSize);
+        return append(path, bufferSize, Optional.of(confLocal));
+    }
+
+    @Override
+    public FSDataOutputStream append(Path path,
+                                     int bufferSize,
+                                     Progressable progressable) throws IOException {
+        return append(path, bufferSize, Optional.empty());
+    }
+
+    private FSDataOutputStream append(Path path,
+                                      int bufferSize,
+                                      Optional<DistributedLogConfiguration> confLocal)
+            throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(
+                getStreamName(path),
+                confLocal,
+                Optional.empty(),
+                Optional.empty());
+            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+            return new FSDataOutputStream(
+                new BufferedOutputStream(
+                    new DLOutputStream(dlm, writer), bufferSize
+                ),
+                statistics,
+                writer.getLastTxId() < 0L ? 0L : writer.getLastTxId());
+        } catch (LogNotFoundException le) {
+            throw new FileNotFoundException(path.toString());
+        }
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        try {
+            String logName = getStreamName(path);
+            if (recursive) {
+                Iterator<String> logs = namespace.getLogs(logName);
+                while (logs.hasNext()) {
+                    String child = logs.next();
+                    Path childPath = new Path(path, child);
+                    delete(childPath, recursive);
+                }
+            }
+            namespace.deleteLog(logName);
+            return true;
+        } catch (LogNotFoundException e) {
+            return true;
+        }
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
+        String logName = getStreamName(path);
+        try {
+            Iterator<String> logs = namespace.getLogs(logName);
+            List<FileStatus> statusList = Lists.newArrayList();
+            while (logs.hasNext()) {
+                String child = logs.next();
+                Path childPath = new Path(path, child);
+                statusList.add(getFileStatus(childPath));
+            }
+            Collections.sort(statusList, Comparator.comparing(FileStatus::getPath));
+            return statusList.toArray(new FileStatus[statusList.size()]);
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(path.toString());
+        }
+    }
+
+
+    @Override
+    public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+        String streamName = getStreamName(path);
+
+        // Create a dummy stream to make the path exists.
+        namespace.createLog(streamName);
+        return true;
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        String logName = getStreamName(path);
+        boolean exists = namespace.logExists(logName);
+        if (!exists) {
+            throw new FileNotFoundException(path.toString());
+        }
+
+        long endPos;
+        try {
+            DistributedLogManager dlm = namespace.openLog(logName);
+            endPos = dlm.getLastTxId();
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(path.toString());
+        } catch (LogEmptyException e) {
+            endPos = 0L;
+        }
+
+        // we need to store more metadata information on logs for supporting filesystem-like use cases
+        return new FileStatus(
+            endPos,
+            false,
+            3,
+            dlConf.getMaxLogSegmentBytes(),
+            0L,
+            makeAbsolute(path));
+    }
+
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        String srcLog = getStreamName(src);
+        String dstLog = getStreamName(dst);
+        namespace.renameLog(srcLog, dstLog);
+        return true;
+    }
+
+    //
+    // Not Supported
+    //
+
+    @Override
+    public boolean truncate(Path f, long newLength) throws IOException {
+        throw new UnsupportedOperationException("Truncate is not supported yet");
+    }
+}
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java
new file mode 100644
index 0000000..c5a810f
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLInputStream.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.hadoop.fs.FSInputStream;
+
+/**
+ * The input stream for a distributedlog stream.
+ */
+@Slf4j
+class DLInputStream extends FSInputStream {
+
+    private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
+
+    private static class RecordStream {
+
+        private final InputStream payloadStream;
+        private final LogRecordWithDLSN record;
+
+        RecordStream(LogRecordWithDLSN record) {
+            checkNotNull(record);
+
+            this.record = record;
+            this.payloadStream = record.getPayLoadInputStream();
+        }
+
+    }
+
+    private static RecordStream nextRecordStream(LogReader reader) throws IOException {
+        LogRecordWithDLSN record = reader.readNext(false);
+        if (null != record) {
+            return new RecordStream(record);
+        }
+        return null;
+    }
+
+    private final DistributedLogManager dlm;
+    private LogReader reader;
+    private long pos;
+    private long lastPos;
+    private RecordStream currentRecord = null;
+
+    DLInputStream(DistributedLogManager dlm,
+                  LogReader reader,
+                  long startPos)
+            throws IOException {
+        this.dlm = dlm;
+        this.reader = reader;
+        this.pos = startPos;
+        this.lastPos = readEndPos();
+        seek(startPos);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        dlm.close();
+    }
+
+    private long readEndPos() throws IOException {
+        return dlm.getLastTxId();
+    }
+
+    //
+    // FSInputStream
+    //
+
+    @Override
+    public void seek(long pos) throws IOException {
+        if (this.pos == pos) {
+            return;
+        }
+
+        if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
+            // close the previous reader
+            this.reader.close();
+            this.reader = dlm.openLogReader(pos);
+            this.currentRecord = null;
+        }
+
+        skipTo(pos);
+    }
+
+    private boolean skipTo(final long position) throws IOException {
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // the stream is empty now
+                return false;
+            }
+
+            long endPos = currentRecord.record.getTransactionId();
+            if (endPos < position) {
+                currentRecord = nextRecordStream(reader);
+                this.pos = endPos;
+                continue;
+            } else if (endPos == position){
+                // find the record, but we defer read next record when actual read happens
+                this.pos = position;
+                this.currentRecord = null;
+                return true;
+            } else {
+                this.currentRecord.payloadStream.skip(
+                    this.currentRecord.payloadStream.available() - (endPos - position));
+                this.pos = position;
+                return true;
+            }
+        }
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return this.pos;
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+        return false;
+    }
+
+    //
+    // Input Stream
+    //
+
+    @Override
+    public int read(byte[] b, final int off, final int len) throws IOException {
+        int remaining = len;
+        int numBytesRead = 0;
+        while (remaining > 0) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) {
+                if (numBytesRead == 0) {
+                    return -1;
+                }
+                break;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            if (bytesLeft <= 0) {
+                currentRecord.payloadStream.close();
+                currentRecord = null;
+                continue;
+            }
+
+            int numBytesToRead = Math.min(bytesLeft, remaining);
+            int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
+            if (numBytes < 0) {
+                continue;
+            }
+            numBytesRead += numBytes;
+            remaining -= numBytes;
+        }
+        return numBytesRead;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        if (n <= 0L) {
+            return 0L;
+        }
+
+        long remaining = n;
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // end of stream
+                return n - remaining;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            long endPos = currentRecord.record.getTransactionId();
+            if (remaining > bytesLeft) {
+                // skip the whole record
+                remaining -= bytesLeft;
+                this.pos = endPos;
+                this.currentRecord = nextRecordStream(reader);
+                continue;
+            } else if (remaining == bytesLeft) {
+                this.pos = endPos;
+                this.currentRecord = null;
+                return n;
+            } else {
+                currentRecord.payloadStream.skip(remaining);
+                this.pos = endPos - currentRecord.payloadStream.available();
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (lastPos - pos == 0L) {
+            lastPos = readEndPos();
+        }
+        return (int) (lastPos - pos);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] data = new byte[1];
+        int numBytes = read(data);
+        if (numBytes <= 0) {
+            return -1;
+        }
+        return data[0];
+    }
+
+}
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
new file mode 100644
index 0000000..3670bc5
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/DLOutputStream.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream extends OutputStream {
+
+    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+
+    private final DistributedLogManager dlm;
+    private final AsyncLogWriter writer;
+
+    // positions
+    private final long[] syncPos = new long[1];
+    private long writePos = 0L;
+
+    // state
+    private final AtomicReference<Throwable> exception = new AtomicReference<>(null);
+
+    DLOutputStream(DistributedLogManager dlm,
+                   AsyncLogWriter writer) {
+        this.dlm = dlm;
+        this.writer = writer;
+        this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
+        this.syncPos[0] = writePos;
+    }
+
+    public synchronized long position() {
+        return syncPos[0];
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        byte[] data = new byte[] { (byte) b };
+        write(data);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(Unpooled.wrappedBuffer(b));
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        write(Unpooled.wrappedBuffer(b, off, len));
+    }
+
+    private synchronized void write(ByteBuf buf) throws IOException {
+        Throwable cause = exception.get();
+        if (null != cause) {
+            if (cause instanceof IOException) {
+                throw (IOException) cause;
+            } else {
+                throw new UnexpectedException("Encountered unknown issue", cause);
+            }
+        }
+
+        writePos += buf.readableBytes();
+        LogRecord record = new LogRecord(writePos, buf);
+        writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
+            @Override
+            public void onSuccess(DLSN value) {
+                synchronized (syncPos) {
+                    syncPos[0] = record.getTransactionId();
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                exception.compareAndSet(null, cause);
+            }
+        });
+    }
+
+    private CompletableFuture<DLSN> writeControlRecord() {
+        LogRecord record;
+        synchronized (this) {
+            record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
+            record.setControl();
+        }
+        return writer.write(record);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try {
+            FutureUtils.result(writeControlRecord());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (Exception e) {
+            log.error("Unexpected exception in DLOutputStream", e);
+            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        Utils.ioResult(
+            writeControlRecord()
+                .thenCompose(ignored -> writer.asyncClose())
+                .thenCompose(ignored -> dlm.asyncClose()));
+    }
+}
diff --git a/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java
new file mode 100644
index 0000000..2af39b7
--- /dev/null
+++ b/distributedlog-io/dlfs/src/main/java/org/apache/distributedlog/fs/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A filesystem API built over distributedlog.
+ */
+package org.apache.distributedlog.fs;
\ No newline at end of file
diff --git a/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java
new file mode 100644
index 0000000..1c67d36
--- /dev/null
+++ b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFSBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import java.net.URI;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+/**
+ * Integration test for {@link DLFileSystem}.
+ */
+public abstract class TestDLFSBase extends TestDistributedLogBase {
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    protected static URI dlfsUri;
+    protected static DLFileSystem fs;
+
+    @BeforeClass
+    public static void setupDLFS() throws Exception {
+        setupCluster();
+        dlfsUri = DLMTestUtil.createDLMURI(zkPort, "");
+        fs = new DLFileSystem();
+        Configuration conf = new Configuration();
+        conf.set(DLFileSystem.DLFS_CONF_FILE, TestDLFSBase.class.getResource("/dlfs.conf").toURI().getPath());
+        fs.initialize(dlfsUri, conf);
+        fs.setWorkingDirectory(new Path("/"));
+    }
+
+    @AfterClass
+    public static void teardownDLFS() throws Exception {
+        fs.close();
+        teardownCluster();
+    }
+
+}
diff --git a/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java
new file mode 100644
index 0000000..70d8a21
--- /dev/null
+++ b/distributedlog-io/dlfs/src/test/java/org/apache/distributedlog/fs/TestDLFileSystem.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.fs;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+/**
+ * Integration test for {@link DLFileSystem}.
+ */
+@Slf4j
+public class TestDLFileSystem extends TestDLFSBase {
+
+    @Test(expected = FileNotFoundException.class)
+    public void testOpenFileNotFound() throws Exception {
+        Path path = new Path("not-found-file");
+        fs.open(path, 1024);
+    }
+
+    @Test
+    public void testBasicIO() throws Exception {
+        Path path = new Path("/path/to/" + runtime.getMethodName());
+
+        assertFalse(fs.exists(path));
+
+        try (FSDataOutputStream out = fs.create(path)) {
+            for (int i = 0; i < 100; i++) {
+                out.writeBytes("line-" + i + "\n");
+            }
+            out.flush();
+        }
+        assertTrue(fs.exists(path));
+
+        File tempFile = new File("/tmp/" + runtime.getMethodName());
+        tempFile.delete();
+        Path localDst = new Path(tempFile.getPath());
+        // copy the file
+        fs.copyToLocalFile(path, localDst);
+        // copy the file to dest
+        fs.copyFromLocalFile(localDst, new Path(runtime.getMethodName() + "-copied"));
+
+        // rename
+        Path dstPath = new Path(runtime.getMethodName() + "-renamed");
+        fs.rename(path, dstPath);
+        assertFalse(fs.exists(path));
+        assertTrue(fs.exists(dstPath));
+
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(dstPath, 1134)))) {
+            int lineno = 0;
+            String line;
+            while ((line = reader.readLine()) != null) {
+                assertEquals("line-" + lineno, line);
+                ++lineno;
+            }
+            assertEquals(100, lineno);
+        }
+
+
+        // delete the file
+        fs.delete(dstPath, false);
+        assertFalse(fs.exists(dstPath));
+    }
+
+    @Test
+    public void testListStatuses() throws Exception {
+        Path parentPath = new Path("/path/to/" + runtime.getMethodName());
+        assertFalse(fs.exists(parentPath));
+        try (FSDataOutputStream parentOut = fs.create(parentPath)) {
+            parentOut.writeBytes("parent");
+            parentOut.flush();
+        }
+        assertTrue(fs.exists(parentPath));
+
+        int numLogs = 3;
+        for (int i = 0; i < numLogs; i++) {
+            Path path = new Path("/path/to/" + runtime.getMethodName()
+                + "/" + runtime.getMethodName() + "-" + i);
+            assertFalse(fs.exists(path));
+            try (FSDataOutputStream out = fs.create(path)) {
+                out.writeBytes("line");
+                out.flush();
+            }
+            assertTrue(fs.exists(path));
+        }
+        FileStatus[] files = fs.listStatus(new Path("/path/to/" + runtime.getMethodName()));
+
+        assertEquals(3, files.length);
+        for (int i = 0; i < numLogs; i++) {
+            FileStatus file = files[i];
+            assertEquals(4, file.getLen());
+            assertFalse(file.isDirectory());
+            assertEquals(3, file.getReplication());
+            assertEquals(0L, file.getModificationTime());
+            assertEquals(
+                new Path("/path/to/" + runtime.getMethodName() + "/" + runtime.getMethodName() + "-" + i),
+                file.getPath());
+        }
+    }
+
+    @Test
+    public void testMkDirs() throws Exception {
+        Path path = new Path("/path/to/" + runtime.getMethodName());
+        assertFalse(fs.exists(path));
+        assertTrue(fs.mkdirs(path));
+        assertTrue(fs.exists(path));
+        assertTrue(fs.mkdirs(path));
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testTruncation() throws Exception {
+        Path path = new Path("/path/to/" + runtime.getMethodName());
+        fs.truncate(path, 10);
+    }
+
+    @Test
+    public void testDeleteRecursive() throws Exception {
+        int numLogs = 3;
+        for (int i = 0; i < numLogs; i++) {
+            Path path = new Path("/path/to/" + runtime.getMethodName()
+                + "/" + runtime.getMethodName() + "-" + i);
+            assertFalse(fs.exists(path));
+            try (FSDataOutputStream out = fs.create(path)) {
+                out.writeBytes("line");
+                out.flush();
+            }
+            assertTrue(fs.exists(path));
+        }
+
+        fs.delete(new Path("/path/to/" + runtime.getMethodName()), true);
+        FileStatus[] files = fs.listStatus(new Path("/path/to/" + runtime.getMethodName()));
+        assertEquals(0, files.length);
+    }
+
+    @Test
+    public void testCreateOverwrite() throws Exception {
+        Path path = new Path("/path/to/" + runtime.getMethodName());
+        assertFalse(fs.exists(path));
+        byte[] originData = "original".getBytes(UTF_8);
+        try (FSDataOutputStream out = fs.create(path)) {
+            out.write(originData);
+            out.flush();
+        }
+
+        try (FSDataInputStream in = fs.open(path, 1024)) {
+            assertEquals(originData.length, in.available());
+            byte[] readData = new byte[originData.length];
+            assertEquals(originData.length, in.read(readData));
+            assertArrayEquals(originData, readData);
+        }
+
+        byte[] overwrittenData = "overwritten".getBytes(UTF_8);
+        try (FSDataOutputStream out = fs.create(path, true)) {
+            out.write(overwrittenData);
+            out.flush();
+        }
+
+        try (FSDataInputStream in = fs.open(path, 1024)) {
+            assertEquals(overwrittenData.length, in.available());
+            byte[] readData = new byte[overwrittenData.length];
+            assertEquals(overwrittenData.length, in.read(readData));
+            assertArrayEquals(overwrittenData, readData);
+        }
+    }
+
+    @Test
+    public void testAppend() throws Exception {
+        Path path = new Path("/path/to/" + runtime.getMethodName());
+        assertFalse(fs.exists(path));
+        byte[] originData = "original".getBytes(UTF_8);
+        try (FSDataOutputStream out = fs.create(path)) {
+            out.write(originData);
+            out.flush();
+        }
+
+        try (FSDataInputStream in = fs.open(path, 1024)) {
+            assertEquals(originData.length, in.available());
+            byte[] readData = new byte[originData.length];
+            assertEquals(originData.length, in.read(readData));
+            assertArrayEquals(originData, readData);
+        }
+
+        byte[] appendData = "append".getBytes(UTF_8);
+        try (FSDataOutputStream out = fs.append(path, 1024)) {
+            out.write(appendData);
+            out.flush();
+        }
+
+        try (FSDataInputStream in = fs.open(path, 1024)) {
+            assertEquals(originData.length + appendData.length, in.available());
+            byte[] readData = new byte[originData.length];
+            assertEquals(originData.length, in.read(readData));
+            assertArrayEquals(originData, readData);
+            readData = new byte[appendData.length];
+            assertEquals(appendData.length, in.read(readData));
+            assertArrayEquals(appendData, readData);
+        }
+    }
+
+}
diff --git a/distributedlog-io/dlfs/src/test/resources/dlfs.conf b/distributedlog-io/dlfs/src/test/resources/dlfs.conf
new file mode 100644
index 0000000..26d2bd9
--- /dev/null
+++ b/distributedlog-io/dlfs/src/test/resources/dlfs.conf
@@ -0,0 +1,27 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+## DLFS settings
+
+writeLockEnabled=false
+
+enableImmediateFlush=false
+
+writerOutputBufferSize=131072
+
+numWorkerThreads=1
diff --git a/distributedlog-io/pom.xml b/distributedlog-io/pom.xml
new file mode 100644
index 0000000..be82c40
--- /dev/null
+++ b/distributedlog-io/pom.xml
@@ -0,0 +1,38 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.distributedlog</groupId>
+    <artifactId>distributedlog</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>distributedlog-io</artifactId>
+  <packaging>pom</packaging>
+  <name>Apache DistributedLog :: IO</name>
+  <modules>
+    <module>dlfs</module>
+  </modules>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+</project>
diff --git a/pom.xml b/pom.xml
index bd93cfc..616d755 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,7 @@
     <module>distributedlog-common</module>
     <module>distributedlog-protocol</module>
     <module>distributedlog-core</module>
+    <module>distributedlog-io</module>
     <module>distributedlog-proxy-protocol</module>
     <module>distributedlog-proxy-client</module>
     <module>distributedlog-proxy-server</module>
@@ -242,7 +243,7 @@
             <exclude>**/dependency-reduced-pom.xml</exclude>
             <exclude>**/org/apache/distributedlog/thrift/*</exclude>
             <exclude>**/logs/*.log</exclude>
-            <exclude>**/target/*</exclude>
+            <exclude>**/target/**/*</exclude>
             <!-- Git -->
             <exclude>.git/**/*</exclude>
             <exclude>.github/**/*</exclude>