Merge pull request #3075 from krichter722/checkstyle-hdfs

STORM-3457: hdfs: fix all checkstyle warnings
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 60c74a9..d0db240 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -275,7 +275,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>189</maxAllowedViolations>
+                    <maxAllowedViolations>0</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
index adb842a..da7ab74 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
@@ -57,7 +57,7 @@
     }
 
     @Override
-    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
+    public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> someClass) {
         Schema theSchema = this.getSchema(input.readString());
         GenericDatumReader<GenericContainer> reader = new GenericDatumReader<>(theSchema);
         Decoder decoder = DecoderFactory
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 17f3eb7..128d4ff 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -27,7 +27,7 @@
  */
 public class ConfluentAvroSerializer extends AbstractAvroSerializer {
 
-    final private String url;
+    private final String url;
     private SchemaRegistryClient theClient;
 
     /**
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
index 128e802..94607b3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
@@ -30,7 +30,7 @@
  */
 public class FixedAvroSerializer extends AbstractAvroSerializer {
 
-    private final static String FP_ALGO = "CRC-64-AVRO";
+    private static final String FP_ALGO = "CRC-64-AVRO";
     final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
     final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
 
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 156e58a..dfcf30f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -87,15 +87,16 @@
 
     /**
      * Marked as final to prevent override. Subclasses should implement the doPrepare() method.
-     * @param conf
-     * @param topologyContext
-     * @param collector
      */
     @Override
     public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) {
         this.writeLock = new Object();
-        if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified.");
-        if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
+        if (this.syncPolicy == null) {
+            throw new IllegalStateException("SyncPolicy must be specified.");
+        }
+        if (this.rotationPolicy == null) {
+            throw new IllegalStateException("RotationPolicy must be specified.");
+        }
         if (this.fsUrl == null) {
             throw new IllegalStateException("File system URL must be specified.");
         }
@@ -208,13 +209,10 @@
     }
 
     /**
-     * A tuple must be mapped to a writer based on two factors:
+     * A tuple must be mapped to a writer based on two factors.
      *  - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
      *    for an example of this)
      *  - the directory the tuple will be partioned into
-     *
-     * @param tuple
-     * @return
      */
     private String getHashKeyForTuple(Tuple tuple) {
         final String boltKey = getWriterKey(tuple);
@@ -300,12 +298,12 @@
                         this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
     }
 
-    abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws
+    protected abstract void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws
         IOException;
 
-    abstract protected String getWriterKey(Tuple tuple);
+    protected abstract String getWriterKey(Tuple tuple);
 
-    abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
+    protected abstract Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
 
     static class WritersMap extends LinkedHashMap<String, Writer> {
         final long maxWriters;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index c73c6a2..991a23c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -114,7 +114,9 @@
     @Override
     public void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing Sequence File Bolt...");
-        if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
+        if (this.format == null) {
+            throw new IllegalStateException("SequenceFormat must be specified.");
+        }
 
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
         this.codecFactory = new CompressionCodecFactory(hdfsConfig);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
index 42c1f5b..157929c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
@@ -15,7 +15,6 @@
 import java.util.Map;
 import org.apache.storm.task.TopologyContext;
 
-
 /**
  * Creates file names with the following format:
  * <pre>
@@ -26,7 +25,7 @@
  *     MyBolt-5-7-1390579837830.txt
  * </pre>
  *
- * By default, prefix is empty and extenstion is ".txt".
+ * <p>By default, prefix is empty and extenstion is ".txt".
  *
  */
 public class DefaultFileNameFormat implements FileNameFormat {
@@ -38,9 +37,6 @@
 
     /**
      * Overrides the default prefix.
-     *
-     * @param prefix
-     * @return
      */
     public DefaultFileNameFormat withPrefix(String prefix) {
         this.prefix = prefix;
@@ -49,9 +45,6 @@
 
     /**
      * Overrides the default file extension.
-     *
-     * @param extension
-     * @return
      */
     public DefaultFileNameFormat withExtension(String extension) {
         this.extension = extension;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
index e9a81cc..f8cdad9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
@@ -20,7 +20,7 @@
  * By default uses a comma (",") as the field delimiter and a
  * newline ("\n") as the record delimiter.
  *
- * Also by default, this implementation will output all the
+ * <p>Also by default, this implementation will output all the
  * field values in the tuple in the order they were declared. To
  * override this behavior, call <code>withFields()</code> to
  * specify which tuple fields to output.
@@ -35,9 +35,6 @@
 
     /**
      * Only output the specified fields.
-     *
-     * @param fields
-     * @return
      */
     public DelimitedRecordFormat withFields(Fields fields) {
         this.fields = fields;
@@ -46,9 +43,6 @@
 
     /**
      * Overrides the default field delimiter.
-     *
-     * @param delimiter
-     * @return
      */
     public DelimitedRecordFormat withFieldDelimiter(String delimiter) {
         this.fieldDelimiter = delimiter;
@@ -57,9 +51,6 @@
 
     /**
      * Overrides the default record delimiter.
-     *
-     * @param delimiter
-     * @return
      */
     public DelimitedRecordFormat withRecordDelimiter(String delimiter) {
         this.recordDelimiter = delimiter;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
index 891c600..70210a9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
@@ -28,7 +28,6 @@
      * Returns the filename the HdfsBolt will create.
      * @param rotation the current file rotation number (incremented on every rotation)
      * @param timeStamp current time in milliseconds when the rotation occurs
-     * @return
      */
     String getName(long rotation, long timeStamp);
 
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java
index 8d55c03..5102f38 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java
@@ -12,14 +12,11 @@
 
 package org.apache.storm.hdfs.bolt.format;
 
-
 import java.io.Serializable;
 import org.apache.storm.tuple.Tuple;
 
 /**
- * Formats a Tuple object into a byte array
- * that will be written to HDFS.
- *
+ * Formats a Tuple object into a byte array that will be written to HDFS.
  */
 public interface RecordFormat extends Serializable {
     byte[] format(Tuple tuple);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java
index 7c38f66..7ea05a9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java
@@ -26,32 +26,22 @@
  */
 public interface SequenceFormat extends Serializable {
     /**
-     * Key class used by implementation (e.g. IntWritable.class, etc.)
-     *
-     * @return
+     * Key class used by implementation (e.g. IntWritable.class, etc.).
      */
     Class keyClass();
 
     /**
-     * Value class used by implementation (e.g. Text.class, etc.)
-     *
-     * @return
+     * Value class used by implementation (e.g. Text.class, etc.).
      */
     Class valueClass();
 
     /**
      * Given a tuple, return the key that should be written to the sequence file.
-     *
-     * @param tuple
-     * @return
      */
     Object key(Tuple tuple);
 
     /**
      * Given a tuple, return the value that should be written to the sequence file.
-     *
-     * @param tuple
-     * @return
      */
     Object value(Tuple tuple);
 }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
index 7869d69..d80aaa8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
@@ -73,9 +73,7 @@
      * $COMPONENT - component id<br/>
      * $TASK - task id<br/>
      *
-     * @param name
-     *            file name
-     * @return
+     * @param name file name
      */
     public SimpleFileNameFormat withName(String name) {
         this.name = name;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
index 6354ae7..13229dd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
@@ -12,18 +12,17 @@
 
 package org.apache.storm.hdfs.bolt.rotation;
 
-
 import java.io.Serializable;
 import org.apache.storm.tuple.Tuple;
 
 /**
  * Used by the HdfsBolt to decide when to rotate files.
  *
- * The HdfsBolt will call the <code>mark()</code> method for every
+ * <p>The HdfsBolt will call the <code>mark()</code> method for every
  * tuple received. If the <code>mark()</code> method returns
  * <code>true</code> the HdfsBolt will perform a file rotation.
  *
- * After file rotation, the HdfsBolt will call the <code>reset()</code>
+ * <p>After file rotation, the HdfsBolt will call the <code>reset()</code>
  * method.
  */
 public interface FileRotationPolicy extends Serializable {
@@ -39,12 +38,11 @@
 
     /**
      * Called after the HdfsBolt rotates a file.
-     *
      */
     void reset();
 
     /**
-     * Must be able to copy the rotation policy
+     * Must be able to copy the rotation policy.
      */
     FileRotationPolicy copy();
 }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
index 230e8b4..24a46dd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.hdfs.bolt.rotation;
 
-
 import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,7 +20,7 @@
  * File rotation policy that will rotate files when a certain
  * file size is reached.
  *
- * For example:
+ * <p>For example:
  * <pre>
  *     // rotate when files reach 5MB
  *     FileSizeRotationPolicy policy =
@@ -34,6 +33,7 @@
     private long maxBytes;
     private long lastOffset = 0;
     private long currentBytesWritten = 0;
+
     public FileSizeRotationPolicy(float count, Units units) {
         this.maxBytes = (long) (count * units.getByteCount());
     }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java
index 45abc7d..a048dc3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.hdfs.bolt.sync;
 
-
 import org.apache.storm.tuple.Tuple;
 
 /**
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
index 6c2b0e0..caf6b49 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -18,9 +18,10 @@
 import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
 import org.apache.storm.tuple.Tuple;
 
-abstract public class AbstractHDFSWriter implements Writer {
-    final protected Path filePath;
-    final protected FileRotationPolicy rotationPolicy;
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
+public abstract class AbstractHDFSWriter implements Writer {
+    protected final Path filePath;
+    protected final FileRotationPolicy rotationPolicy;
     protected long lastUsedTime;
     protected long offset;
     protected boolean needsRotation;
@@ -32,7 +33,7 @@
     }
 
     @Override
-    final public long write(Tuple tuple) throws IOException {
+    public final long write(Tuple tuple) throws IOException {
         doWrite(tuple);
         this.needsRotation = rotationPolicy.mark(tuple, offset);
 
@@ -40,12 +41,12 @@
     }
 
     @Override
-    final public void sync() throws IOException {
+    public final void sync() throws IOException {
         doSync();
     }
 
     @Override
-    final public void close() throws IOException {
+    public final void close() throws IOException {
         doClose();
     }
 
@@ -59,10 +60,10 @@
         return this.filePath;
     }
 
-    abstract protected void doWrite(Tuple tuple) throws IOException;
+    protected abstract void doWrite(Tuple tuple) throws IOException;
 
-    abstract protected void doSync() throws IOException;
+    protected abstract void doSync() throws IOException;
 
-    abstract protected void doClose() throws IOException;
+    protected abstract void doClose() throws IOException;
 
 }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
index d77423c..713aa58 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.hdfs.common;
 
-
 import java.io.IOException;
 import java.util.EnumSet;
 import org.apache.avro.Schema;
@@ -28,6 +27,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
 public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
index 578bc06..8b3dcd1 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
@@ -23,6 +23,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
 public class HDFSWriter extends AbstractHDFSWriter {
 
     private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
index 9f79373..1488655 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
@@ -21,10 +21,9 @@
      * Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write
      * to /common/output and a partitioner returned "/foo" then the bolt should open a file in "/common/output/foo"
      *
-     * A best practice is to use Path.SEPARATOR instead of a literal "/"
+     * <p>A best practice is to use Path.SEPARATOR instead of a literal "/"
      *
      * @param tuple The tuple for which the relative path is being calculated.
-     * @return
      */
     public String getPartitionPath(final Tuple tuple);
 }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
index f5ade03..bad3d06 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.hdfs.common.rotation;
 
-
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.hadoop.fs.FileSystem;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index f94b8e5..9859d08 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -24,59 +24,68 @@
 
 public class Configs implements Validated {
     /**
-     * @deprecated please use {@link HdfsSpout.setReaderType(String)}
+     * Required - chose the file type being consumed.
+     * @deprecated please use {@link HdfsSpout#setReaderType(String)}
      */
     @Deprecated
     @isString
     @CustomValidator(validatorClass = ReaderTypeValidator.class)
-    public static final String READER_TYPE = "hdfsspout.reader.type";        // Required - chose the file type being consumed
+    public static final String READER_TYPE = "hdfsspout.reader.type";
     public static final String TEXT = "text";
     public static final String SEQ = "seq";
     /**
+     * Required - HDFS name node.
      * @deprecated please use {@link HdfsSpout#setHdfsUri(String)}
      */
     @Deprecated
     @isString
-    public static final String HDFS_URI = "hdfsspout.hdfs";                   // Required - HDFS name node
+    public static final String HDFS_URI = "hdfsspout.hdfs";
     /**
+     * Required - dir from which to read files.
      * @deprecated please use {@link HdfsSpout#setSourceDir(String)}
      */
     @Deprecated
     @isString
-    public static final String SOURCE_DIR = "hdfsspout.source.dir";           // Required - dir from which to read files
+    public static final String SOURCE_DIR = "hdfsspout.source.dir";
     /**
+     * Required - completed files will be moved here.
      * @deprecated please use {@link HdfsSpout#setArchiveDir(String)}
      */
     @Deprecated
     @isString
-    public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // Required - completed files will be moved here
+    public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";
     /**
+     * Required - unparsable files will be moved here.
      * @deprecated please use {@link HdfsSpout#setBadFilesDir(String)}
      */
     @Deprecated
     @isString
-    public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // Required - unparsable files will be moved here
+    public static final String BAD_DIR = "hdfsspout.badfiles.dir";
     /**
+     * Directory in which lock files will be created.
      * @deprecated please use {@link HdfsSpout#setLockDir(String)}
      */
     @Deprecated
     @isString
-    public static final String LOCK_DIR = "hdfsspout.lock.dir";               // dir in which lock files will be created
+    public static final String LOCK_DIR = "hdfsspout.lock.dir";
     /**
+     * Commit after N records. 0 disables this.
      * @deprecated please use {@link HdfsSpout#setCommitFrequencyCount(int)}
      */
     @Deprecated
     @isInteger
     @isPositiveNumber(includeZero = true)
-    public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  // commit after N records. 0 disables this.
+    public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";
     /**
+     * Commit after N secs. cannot be disabled.
      * @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)}
      */
     @Deprecated
     @isInteger
     @isPositiveNumber
-    public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";      // commit after N secs. cannot be disabled.
+    public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";
     /**
+     * Max outstanding.
      * @deprecated please use {@link HdfsSpout#setMaxOutstanding(int)}
      */
     @Deprecated
@@ -84,6 +93,7 @@
     @isPositiveNumber(includeZero = true)
     public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
     /**
+     * Lock timeout.
      * @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)}
      */
     @Deprecated
@@ -91,21 +101,26 @@
     @isPositiveNumber
     public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";
     /**
+     * If clocks on machines in the Storm cluster are in sync inactivity duration after which locks are considered
+     * candidates for being reassigned to another spout.
+     *
      * @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)}
      */
     @Deprecated
     @isBoolean
-    public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";     // if clocks on machines in the Storm cluster are in sync
-        // inactivity duration after which locks are considered candidates for being reassigned to another spout
+    public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";
     /**
+     * Ignore suffix.
      * @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)}
      */
     @Deprecated
     @isString
     public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";
+    /**
+     * Filenames with this suffix in archive dir will be ignored by the Spout.
+     */
     @NotConf
     public static final String DEFAULT_LOCK_DIR = ".lock";
-        // filenames with this suffix in archive dir will be ignored by the Spout
     public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
     public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
     public static final int DEFAULT_MAX_OUTSTANDING = 10000;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index eea23e1..488531a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -39,9 +39,8 @@
         this.lockFile = lockFile;
     }
 
-    /** Get a lock on file if not already locked
+    /** Get a lock on file if not already locked.
      *
-     * @param fs
      * @param dir  the dir on which to get a lock
      * @return The lock object if it the lock was acquired. Returns null if the dir is already locked.
      * @throws IOException if there were errors
@@ -74,7 +73,9 @@
                + Thread.currentThread().getName();
     }
 
-    /** if the lock on the directory is stale, take ownership */
+    /**
+     * if the lock on the directory is stale, take ownership.
+     */
     public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) {
         Path dirLockFile = getDirLockFile(dirToLock);
 
@@ -95,8 +96,8 @@
     private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException {
         if (fs instanceof DistributedFileSystem) {
             if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) {
-                LOG.warn("Unable to recover lease on dir lock file " + dirLockFile +
-                         " right now. Cannot transfer ownership. Will need to try later.");
+                LOG.warn("Unable to recover lease on dir lock file " + dirLockFile
+                        + " right now. Cannot transfer ownership. Will need to try later.");
                 return null;
             }
         }
@@ -112,7 +113,9 @@
         return null;
     }
 
-    /** Release lock on dir by deleting the lock file */
+    /**
+     * Release lock on dir by deleting the lock file.
+     */
     public void release() throws IOException {
         if (!fs.delete(lockFile, false)) {
             LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), lockFile);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index c6529a9..7ff5aaa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.hdfs.spout;
 
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -38,7 +37,7 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(FileLock.class);
     private final FileSystem fs;
-    private final String componentID;
+    private final String componentId;
     private final Path lockFile;
     private final FSDataOutputStream lockFileStream;
     private LogEntry lastEntry;
@@ -48,7 +47,7 @@
         this.fs = fs;
         this.lockFile = lockFile;
         this.lockFileStream = lockFileStream;
-        this.componentID = spoutId;
+        this.componentId = spoutId;
         logProgress("0", false);
     }
 
@@ -57,12 +56,14 @@
         this.fs = fs;
         this.lockFile = lockFile;
         this.lockFileStream = fs.append(lockFile);
-        this.componentID = spoutId;
+        this.componentId = spoutId;
         LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId);
         logProgress(entry.fileOffset, true);
     }
 
-    /** returns lock on file or null if file is already locked. throws if unexpected problem */
+    /**
+     * returns lock on file or null if file is already locked. throws if unexpected problem
+     */
     public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
         throws IOException {
         Path lockFile = new Path(lockDirPath, fileToLock.getName());
@@ -86,11 +87,9 @@
      * checks if lockFile is older than 'olderThan' UTC time by examining the modification time
      * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then
      * returns the last log entry, else returns null.
-     * @param fs
-     * @param lockFile
+     *
      * @param olderThan  time (millis) in UTC.
      * @return the last entry in the file if its too old. null if last entry is not too old
-     * @throws IOException
      */
     public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
         throws IOException {
@@ -117,11 +116,7 @@
     }
 
     /**
-     * returns the last log entry
-     * @param fs
-     * @param lockFile
-     * @return
-     * @throws IOException
+     * returns the last log entry.
      */
     public static LogEntry getLastEntry(FileSystem fs, Path lockFile)
         throws IOException {
@@ -136,14 +131,13 @@
 
     /**
      * Takes ownership of the lock file if possible.
-     * @param lockFile
      * @param lastEntry   last entry in the lock file. this param is an optimization.
      *                    we dont scan the lock file again to find its last entry here since
      *                    its already been done once by the logic used to check if the lock
      *                    file is stale. so this value comes from that earlier scan.
      * @param spoutId     spout id
-     * @throws IOException if unable to acquire
      * @return null if lock File is not recoverable
+     * @throws IOException if unable to acquire
      */
     public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
         throws IOException {
@@ -158,11 +152,12 @@
             }
             return new FileLock(fs, lockFile, spoutId, lastEntry);
         } catch (IOException e) {
-            if (e instanceof RemoteException &&
-                ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
+            if (e instanceof RemoteException
+                    && ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
                 LOG.warn(
-                    "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId,
-                    e);
+                        "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= "
+                            + spoutId,
+                        e);
                 return null;
             } else { // unexpected error
                 LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e);
@@ -173,13 +168,9 @@
 
     /**
      * Finds a oldest expired lock file (using modification timestamp), then takes
-     * ownership of the lock file
+     * ownership of the lock file.
      * Impt: Assumes access to lockFilesDir has been externally synchronized such that
      *       only one thread accessing the same thread
-     * @param fs
-     * @param lockFilesDir
-     * @param locktimeoutSec
-     * @return
      */
     public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
         throws IOException {
@@ -209,14 +200,11 @@
 
     /**
      * Finds oldest expired lock file (using modification timestamp), then takes
-     * ownership of the lock file
+     * ownership of the lock file.
      * Impt: Assumes access to lockFilesDir has been externally synchronized such that
      *       only one thread accessing the same thread
-     * @param fs
-     * @param lockFilesDir
-     * @param locktimeoutSec
-     * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
-     * @throws IOException
+     *
+     * @return a Pair&lt;lock file path, last entry in lock file&gt; .. if expired lock file found
      */
     public static HdfsUtils.Pair<Path, LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec)
         throws IOException {
@@ -248,7 +236,7 @@
     private void logProgress(String fileOffset, boolean prefixNewLine)
         throws IOException {
         long now = System.currentTimeMillis();
-        LogEntry entry = new LogEntry(now, componentID, fileOffset);
+        LogEntry entry = new LogEntry(now, componentId, fileOffset);
         String line = entry.toString();
         if (prefixNewLine) {
             lockFileStream.writeBytes(System.lineSeparator() + line);
@@ -260,16 +248,17 @@
         lastEntry = entry; // update this only after writing to hdfs
     }
 
-    /** Release lock by deleting file
+    /**
+     * Release lock by deleting file.
      * @throws IOException if lock file could not be deleted
      */
     public void release() throws IOException {
         lockFileStream.close();
         if (!fs.delete(lockFile, false)) {
-            LOG.warn("Unable to delete lock file, Spout = {}", componentID);
+            LOG.warn("Unable to delete lock file, Spout = {}", componentId);
             throw new IOException("Unable to delete lock file");
         }
-        LOG.debug("Released lock file {}. Spout {}", lockFile, componentID);
+        LOG.debug("Released lock file {}. Spout {}", lockFile, componentId);
     }
 
     // For testing only.. invoked via reflection
@@ -288,12 +277,12 @@
     public static class LogEntry {
         private static final int NUM_FIELDS = 3;
         public final long eventTime;
-        public final String componentID;
+        public final String componentId;
         public final String fileOffset;
 
-        public LogEntry(long eventtime, String componentID, String fileOffset) {
+        public LogEntry(long eventtime, String componentId, String fileOffset) {
             this.eventTime = eventtime;
-            this.componentID = componentID;
+            this.componentId = componentId;
             this.fileOffset = fileOffset;
         }
 
@@ -304,7 +293,7 @@
 
         @Override
         public String toString() {
-            return eventTime + "," + componentID + "," + fileOffset;
+            return eventTime + "," + componentId + "," + fileOffset;
         }
 
         @Override
@@ -321,7 +310,7 @@
             if (eventTime != logEntry.eventTime) {
                 return false;
             }
-            if (!componentID.equals(logEntry.componentID)) {
+            if (!componentId.equals(logEntry.componentId)) {
                 return false;
             }
             return fileOffset.equals(logEntry.fileOffset);
@@ -331,7 +320,7 @@
         @Override
         public int hashCode() {
             int result = (int) (eventTime ^ (eventTime >>> 32));
-            result = 31 * result + componentID.hashCode();
+            result = 31 * result + componentId.hashCode();
             result = 31 * result + fileOffset.hashCode();
             return result;
         }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
index bf58815..a8b354a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -24,7 +24,9 @@
  */
 
 interface FileOffset extends Comparable<FileOffset>, Cloneable {
-    /** tests if rhs == currOffset+1 */
+    /**
+     * tests if rhs == currOffset+1.
+     */
     boolean isNextOffset(FileOffset rhs);
 
     FileOffset clone();
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
index 49d998a..b6e08f4 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -26,10 +26,9 @@
     FileOffset getFileOffset();
 
     /**
-     * Get the next tuple from the file
+     * Get the next tuple from the file.
      *
      * @return null if no more data
-     * @throws IOException
      */
     List<Object> next() throws IOException, ParseException;
 
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 18c469f..a7ce729 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -62,7 +62,7 @@
     private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
     private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
     private boolean clocksInSync = true;
-    private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts
+    private String inprogressSuffix = ".inprogress"; // not configurable to prevent change between topology restarts
     private String ignoreSuffix = ".ignore";
     private String outputStreamName = null;
     private ProgressTracker tracker = null;
@@ -89,14 +89,14 @@
         return reader.getFilePath() + " " + reader.getFileOffset();
     }
 
-    private static void releaseLockAndLog(FileLock fLock, String spoutId) {
+    private static void releaseLockAndLog(FileLock fileLock, String spoutId) {
         try {
-            if (fLock != null) {
-                fLock.release();
-                LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId);
+            if (fileLock != null) {
+                fileLock.release();
+                LOG.debug("Spout {} released FileLock. SpoutId = {}", fileLock.getLockFile(), spoutId);
             }
         } catch (IOException e) {
-            LOG.error("Unable to delete lock file : " + fLock.getLockFile() + " SpoutId =" + spoutId, e);
+            LOG.error("Unable to delete lock file : " + fileLock.getLockFile() + " SpoutId =" + spoutId, e);
         }
     }
 
@@ -215,7 +215,7 @@
     }
 
     /**
-     * Set output stream name
+     * Set output stream name.
      */
     public HdfsSpout withOutputStream(String streamName) {
         this.outputStreamName = streamName;
@@ -348,7 +348,7 @@
 
     private void markFileAsBad(Path file) {
         String fileName = file.toString();
-        String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+        String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogressSuffix));
         String originalName = new Path(fileNameMinusSuffix).getName();
         Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + originalName);
 
@@ -590,7 +590,7 @@
             Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
 
             for (Path file : listing) {
-                if (file.getName().endsWith(inprogress_suffix)) {
+                if (file.getName().endsWith(inprogressSuffix)) {
                     continue;
                 }
                 if (file.getName().endsWith(ignoreSuffix)) {
@@ -625,7 +625,6 @@
      * check if the lock is updated. if not updated then acquires the lock
      *
      * @return a lock object
-     * @throws IOException
      */
     private FileLock getOldestExpiredLock() throws IOException {
         // 1 - acquire lock on dir
@@ -681,11 +680,9 @@
     }
 
     /**
-     * Creates a reader that reads from beginning of file
+     * Creates a reader that reads from beginning of file.
      *
      * @param file file to read
-     * @return
-     * @throws IOException
      */
     private FileReader createFileReader(Path file)
         throws IOException {
@@ -706,12 +703,10 @@
     }
 
     /**
-     * Creates a reader that starts reading from 'offset'
+     * Creates a reader that starts reading from 'offset'.
      *
      * @param file the file to read
      * @param offset the offset string should be understandable by the reader type being used
-     * @return
-     * @throws IOException
      */
     private FileReader createFileReader(Path file, String offset)
         throws IOException {
@@ -733,14 +728,14 @@
     }
 
     /**
-     * Renames files with .inprogress suffix
+     * Renames files with .inprogress suffix.
      *
      * @return path of renamed file
      * @throws if operation fails
      */
     private Path renameToInProgressFile(Path file)
         throws IOException {
-        Path newFile = new Path(file.toString() + inprogress_suffix);
+        Path newFile = new Path(file.toString() + inprogressSuffix);
         try {
             if (hdfs.rename(file, newFile)) {
                 return newFile;
@@ -757,7 +752,7 @@
     private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
         throws IOException {
         String lockFileName = lockFile.getName();
-        Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogress_suffix);
+        Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogressSuffix);
         if (hdfs.exists(dataFile)) {
             return dataFile;
         }
@@ -771,7 +766,7 @@
     // renames files and returns the new file path
     private Path renameCompletedFile(Path file) throws IOException {
         String fileName = file.toString();
-        String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+        String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogressSuffix));
         String newName = new Path(fileNameMinusSuffix).getName();
 
         Path newFile = new Path(archiveDirPath + Path.SEPARATOR + newName);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 39dccae..d245df7 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -25,8 +25,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SequenceFileReader<Key extends Writable, Value extends Writable>
-    extends AbstractFileReader {
+public class SequenceFileReader<KeyT extends Writable, ValueT extends Writable> extends AbstractFileReader {
     public static final String[] defaultFields = { "key", "value" };
     public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
     private static final Logger LOG = LoggerFactory
@@ -37,8 +36,8 @@
     private final SequenceFileReader.Offset offset;
 
 
-    private final Key key;
-    private final Value value;
+    private final KeyT key;
+    private final ValueT value;
 
 
     public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf)
@@ -46,8 +45,8 @@
         super(fs, file);
         int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString());
         this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize));
-        this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
-        this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
+        this.key = (KeyT) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
+        this.value = (ValueT) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
         this.offset = new SequenceFileReader.Offset(0, 0, 0);
     }
 
@@ -57,8 +56,8 @@
         int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString());
         this.offset = new SequenceFileReader.Offset(offset);
         this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize));
-        this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
-        this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
+        this.key = (KeyT) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
+        this.value = (ValueT) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
         skipToOffset(this.reader, this.offset, this.key);
     }
 
@@ -106,8 +105,11 @@
             this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0);
         }
 
-        public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord
-            , long currRecordEndOffset, long prevRecordEndOffset) {
+        public Offset(long lastSyncPoint,
+                long recordsSinceLastSync,
+                long currentRecord,
+                long currRecordEndOffset,
+                long prevRecordEndOffset) {
             this.lastSyncPoint = lastSyncPoint;
             this.recordsSinceLastSync = recordsSinceLastSync;
             this.currentRecord = currentRecord;
@@ -135,19 +137,19 @@
                     this.currRecordEndOffset = 0;
                 }
             } catch (Exception e) {
-                throw new IllegalArgumentException("'" + offset +
-                                                   "' cannot be interpreted. It is not in expected format for SequenceFileReader." +
-                                                   " Format e.g. {sync=123:afterSync=345:record=67}");
+                throw new IllegalArgumentException("'" + offset
+                        + "' cannot be interpreted. It is not in expected format for SequenceFileReader."
+                        + " Format e.g. {sync=123:afterSync=345:record=67}");
             }
         }
 
         @Override
         public String toString() {
-            return '{' +
-                   "sync=" + lastSyncPoint +
-                   ":afterSync=" + recordsSinceLastSync +
-                   ":record=" + currentRecord +
-                   ":}";
+            return '{'
+                    + "sync=" + lastSyncPoint
+                    + ":afterSync=" + recordsSinceLastSync
+                    + ":record=" + currentRecord
+                    + ":}";
         }
 
         @Override
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index 94b40f3..0b3da9c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -31,7 +31,7 @@
     public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
 
     private static final int DEFAULT_BUFF_SIZE = 4096;
-    private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
     private BufferedReader reader;
     private TextFileReader.Offset offset;
 
@@ -124,26 +124,26 @@
                     this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
                 }
             } catch (Exception e) {
-                throw new IllegalArgumentException("'" + offset +
-                                                   "' cannot be interpreted. It is not in expected format for TextFileReader." +
-                                                   " Format e.g.  {char=123:line=5}");
+                throw new IllegalArgumentException("'" + offset
+                        + "' cannot be interpreted. It is not in expected format for TextFileReader."
+                        + " Format e.g.  {char=123:line=5}");
             }
         }
 
         @Override
         public String toString() {
-            return '{' +
-                   "char=" + charOffset +
-                   ":line=" + lineNumber +
-                   ":}";
+            return '{'
+                    + "char=" + charOffset
+                    + ":line=" + lineNumber
+                    + ":}";
         }
 
         @Override
         public boolean isNextOffset(FileOffset rhs) {
             if (rhs instanceof Offset) {
                 Offset other = ((Offset) rhs);
-                return other.charOffset > charOffset &&
-                       other.lineNumber == lineNumber + 1;
+                return other.charOffset > charOffset
+                        && other.lineNumber == lineNumber + 1;
             }
             return false;
         }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index e6adfbb..118a113 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -101,9 +101,7 @@
      * Reads the last txn record from index file if it exists, if not from .tmp file if exists.
      *
      * @param indexFilePath the index file path
-     * @return the txn record from the index file or a default initial record.
-     *
-     * @throws IOException
+     * @return the txn record from the index file or a default initial record
      */
     private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
         Path tmpPath = tmpFilePath(indexFilePath.toString());
@@ -186,13 +184,13 @@
     }
 
     /**
-     * for unit tests
+     * for unit tests.
      */
     void close() throws IOException {
         this.options.closeOutputFile();
     }
 
-    public static abstract class Options implements Serializable {
+    public abstract static class Options implements Serializable {
 
         protected String fsUrl;
         protected String configKey;
@@ -216,10 +214,11 @@
 
         abstract void doCommit(Long txId) throws IOException;
 
-        abstract void doRecover(Path srcPath, long nBytes) throws Exception;
+        abstract void doRecover(Path srcPath, long numberOfBytes) throws Exception;
 
         protected void rotateOutputFile(boolean doRotateAction) throws IOException {
             LOG.info("Rotating output file...");
+            @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
             long start = System.currentTimeMillis();
             closeOutputFile();
             this.rotation++;
@@ -279,14 +278,14 @@
         /**
          * Recovers nBytes from srcFile to the new file created by calling rotateOutputFile and then deletes the srcFile.
          */
-        private void recover(String srcFile, long nBytes) {
+        private void recover(String srcFile, long numberOfBytes) {
             try {
                 Path srcPath = new Path(srcFile);
                 rotateOutputFile(false);
                 this.rotationPolicy.reset();
-                if (nBytes > 0) {
-                    doRecover(srcPath, nBytes);
-                    LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+                if (numberOfBytes > 0) {
+                    doRecover(srcPath, numberOfBytes);
+                    LOG.info("Recovered {} bytes from {} to {}", numberOfBytes, srcFile, currentFile);
                 } else {
                     LOG.info("Nothing to recover from {}", srcFile);
                 }
@@ -380,11 +379,11 @@
         }
 
         @Override
-        void doRecover(Path srcPath, long nBytes) throws IOException {
+        void doRecover(Path srcPath, long numberOfBytes) throws IOException {
             this.offset = 0;
             FSDataInputStream is = this.fs.open(srcPath);
-            copyBytes(is, out, nBytes);
-            this.offset = nBytes;
+            copyBytes(is, out, numberOfBytes);
+            this.offset = numberOfBytes;
         }
 
         private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
@@ -468,7 +467,9 @@
         @Override
         void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException {
             LOG.info("Preparing Sequence File State...");
-            if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
+            if (this.format == null) {
+                throw new IllegalStateException("SequenceFormat must be specified.");
+            }
 
             this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
             this.codecFactory = new CompressionCodecFactory(hdfsConfig);
@@ -491,9 +492,10 @@
 
 
         @Override
-        void doRecover(Path srcPath, long nBytes) throws Exception {
+        void doRecover(Path srcPath, long numberOfBytes) throws Exception {
             SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig,
-                                                                 SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes));
+                    SequenceFile.Reader.file(srcPath),
+                    SequenceFile.Reader.length(numberOfBytes));
 
             Writable key = (Writable) this.format.keyClass().newInstance();
             Writable value = (Writable) this.format.valueClass().newInstance();
@@ -531,10 +533,9 @@
     }
 
     /**
-     * TxnRecord [txnid, data_file_path, data_file_offset]
-     * <p>
-     * This is written to the index file during beginCommit() and used for recovery.
-     * </p>
+     * TxnRecord [txnid, data_file_path, data_file_offset].
+     *
+     * <p>This is written to the index file during beginCommit() and used for recovery.
      */
     private static class TxnRecord {
         private long txnid;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
index e97bf1c..e48e198 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
@@ -25,7 +25,7 @@
  *     MyBolt-5-7-1390579837830.txt
  * </pre>
  *
- * By default, prefix is empty and extenstion is ".txt".
+ * <p>By default, prefix is empty and extenstion is ".txt".
  *
  */
 public class DefaultFileNameFormat implements FileNameFormat {
@@ -36,9 +36,6 @@
 
     /**
      * Overrides the default prefix.
-     *
-     * @param prefix
-     * @return
      */
     public DefaultFileNameFormat withPrefix(String prefix) {
         this.prefix = prefix;
@@ -47,9 +44,6 @@
 
     /**
      * Overrides the default file extension.
-     *
-     * @param extension
-     * @return
      */
     public DefaultFileNameFormat withExtension(String extension) {
         this.extension = extension;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
index e21fede..c12b478 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
@@ -19,8 +19,6 @@
  * RecordFormat implementation that uses field and record delimiters.
  * By default uses a comma (",") as the field delimiter and a
  * newline ("\n") as the record delimiter.
- *
- *
  */
 public class DelimitedRecordFormat implements RecordFormat {
     public static final String DEFAULT_FIELD_DELIMITER = ",";
@@ -31,9 +29,6 @@
 
     /**
      * Only output the specified fields.
-     *
-     * @param fields
-     * @return
      */
     public DelimitedRecordFormat withFields(Fields fields) {
         this.fields = fields;
@@ -42,9 +37,6 @@
 
     /**
      * Overrides the default field delimiter.
-     *
-     * @param delimiter
-     * @return
      */
     public DelimitedRecordFormat withFieldDelimiter(String delimiter) {
         this.fieldDelimiter = delimiter;
@@ -53,9 +45,6 @@
 
     /**
      * Overrides the default record delimiter.
-     *
-     * @param delimiter
-     * @return
      */
     public DelimitedRecordFormat withRecordDelimiter(String delimiter) {
         this.recordDelimiter = delimiter;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
index fbd8f5a..0b7ac46 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
@@ -17,7 +17,6 @@
 
 /**
  * Formatter interface for determining HDFS file names.
- *
  */
 public interface FileNameFormat extends Serializable {
 
@@ -27,7 +26,6 @@
      * Returns the filename the HdfsBolt will create.
      * @param rotation the current file rotation number (incremented on every rotation)
      * @param timeStamp current time in milliseconds when the rotation occurs
-     * @return
      */
     String getName(long rotation, long timeStamp);
 
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
index 1cc5363..b2f2cc3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
@@ -12,14 +12,11 @@
 
 package org.apache.storm.hdfs.trident.format;
 
-
 import java.io.Serializable;
 import org.apache.storm.trident.tuple.TridentTuple;
 
 /**
- * Formats a Tuple object into a byte array
- * that will be written to HDFS.
- *
+ * Formats a Tuple object into a byte array that will be written to HDFS.
  */
 public interface RecordFormat extends Serializable {
     byte[] format(TridentTuple tuple);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
index 497d045..815bf2f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
@@ -27,32 +27,22 @@
  */
 public interface SequenceFormat extends Serializable {
     /**
-     * Key class used by implementation (e.g. IntWritable.class, etc.)
-     *
-     * @return
+     * Key class used by implementation (e.g. IntWritable.class, etc.).
      */
     Class keyClass();
 
     /**
-     * Value class used by implementation (e.g. Text.class, etc.)
-     *
-     * @return
+     * Value class used by implementation (e.g. Text.class, etc.).
      */
     Class valueClass();
 
     /**
      * Given a tuple, return the key that should be written to the sequence file.
-     *
-     * @param tuple
-     * @return
      */
     Writable key(TridentTuple tuple);
 
     /**
      * Given a tuple, return the value that should be written to the sequence file.
-     *
-     * @param tuple
-     * @return
      */
     Writable value(TridentTuple tuple);
 }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
index 068390f..889c60b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
@@ -68,9 +68,7 @@
      * $HOST - local host name<br/>
      * $PARTITION - partition index<br/>
      *
-     * @param name
-     *            file name
-     * @return
+     * @param name file name
      */
     public SimpleFileNameFormat withName(String name) {
         this.name = name;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index ad9d7aa..a2a5932 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -18,11 +18,11 @@
 /**
  * Used by the HdfsBolt to decide when to rotate files.
  *
- * The HdfsBolt will call the <code>mark()</code> method for every
+ * <p>The HdfsBolt will call the <code>mark()</code> method for every
  * tuple received. If the <code>mark()</code> method returns
  * <code>true</code> the HdfsBolt will perform a file rotation.
  *
- * After file rotation, the HdfsBolt will call the <code>reset()</code>
+ * <p>After file rotation, the HdfsBolt will call the <code>reset()</code>
  * method.
  */
 public interface FileRotationPolicy extends Serializable {
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 2a512c4..18790d8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.hdfs.trident.rotation;
 
-
 import org.apache.storm.trident.tuple.TridentTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,19 +20,19 @@
  * File rotation policy that will rotate files when a certain
  * file size is reached.
  *
- * For example:
+ * <p>For example:
  * <pre>
  *     // rotate when files reach 5MB
  *     FileSizeRotationPolicy policy =
  *          new FileSizeRotationPolicy(5.0, Units.MB);
  * </pre>
- *
  */
 public class FileSizeRotationPolicy implements FileRotationPolicy {
     private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);
     private long maxBytes;
     private long lastOffset = 0;
     private long currentBytesWritten = 0;
+
     public FileSizeRotationPolicy(float count, Units units) {
         this.maxBytes = (long) (count * units.getByteCount());
     }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index 2508a07..4539464 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -23,12 +23,12 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-
 public class TimedRotationPolicy implements FileRotationPolicy {
 
     private long interval;
     private Timer rotationTimer;
     private AtomicBoolean rotationTimerTriggered = new AtomicBoolean();
+
     public TimedRotationPolicy(float count, TimeUnit units) {
         this.interval = (long) (count * units.getMilliSeconds());
     }
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java
index 15f0c6b..f98dbdf 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.hdfs.trident.sync;
 
-
 import org.apache.storm.trident.tuple.TridentTuple;
 
 /**
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 1a30bfa..469a808 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -245,7 +245,7 @@
 
             expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
             Assert.assertNotNull(expired);
-            Assert.assertEquals("spout3", expired.getValue().componentID);
+            Assert.assertEquals("spout3", expired.getValue().componentId);
         } finally {
             lock1.release();
             lock2.release();