Merge pull request #3075 from krichter722/checkstyle-hdfs
STORM-3457: hdfs: fix all checkstyle warnings
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 60c74a9..d0db240 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -275,7 +275,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>189</maxAllowedViolations>
+ <maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
index adb842a..da7ab74 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java
@@ -57,7 +57,7 @@
}
@Override
- public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) {
+ public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> someClass) {
Schema theSchema = this.getSchema(input.readString());
GenericDatumReader<GenericContainer> reader = new GenericDatumReader<>(theSchema);
Decoder decoder = DecoderFactory
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
index 17f3eb7..128d4ff 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java
@@ -27,7 +27,7 @@
*/
public class ConfluentAvroSerializer extends AbstractAvroSerializer {
- final private String url;
+ private final String url;
private SchemaRegistryClient theClient;
/**
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
index 128e802..94607b3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java
@@ -30,7 +30,7 @@
*/
public class FixedAvroSerializer extends AbstractAvroSerializer {
- private final static String FP_ALGO = "CRC-64-AVRO";
+ private static final String FP_ALGO = "CRC-64-AVRO";
final Map<String, Schema> fingerprint2schemaMap = new HashMap<>();
final Map<Schema, String> schema2fingerprintMap = new HashMap<>();
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index 156e58a..dfcf30f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -87,15 +87,16 @@
/**
* Marked as final to prevent override. Subclasses should implement the doPrepare() method.
- * @param conf
- * @param topologyContext
- * @param collector
*/
@Override
public final void prepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) {
this.writeLock = new Object();
- if (this.syncPolicy == null) throw new IllegalStateException("SyncPolicy must be specified.");
- if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy must be specified.");
+ if (this.syncPolicy == null) {
+ throw new IllegalStateException("SyncPolicy must be specified.");
+ }
+ if (this.rotationPolicy == null) {
+ throw new IllegalStateException("RotationPolicy must be specified.");
+ }
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be specified.");
}
@@ -208,13 +209,10 @@
}
/**
- * A tuple must be mapped to a writer based on two factors:
+ * A tuple must be mapped to a writer based on two factors.
* - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt
* for an example of this)
* - the directory the tuple will be partioned into
- *
- * @param tuple
- * @return
*/
private String getHashKeyForTuple(Tuple tuple) {
final String boltKey = getWriterKey(tuple);
@@ -300,12 +298,12 @@
this.fileNameFormat.getName(rotation, System.currentTimeMillis()));
}
- abstract protected void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws
+ protected abstract void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws
IOException;
- abstract protected String getWriterKey(Tuple tuple);
+ protected abstract String getWriterKey(Tuple tuple);
- abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
+ protected abstract Writer makeNewWriter(Path path, Tuple tuple) throws IOException;
static class WritersMap extends LinkedHashMap<String, Writer> {
final long maxWriters;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index c73c6a2..991a23c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -114,7 +114,9 @@
@Override
public void doPrepare(Map<String, Object> conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
LOG.info("Preparing Sequence File Bolt...");
- if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
+ if (this.format == null) {
+ throw new IllegalStateException("SequenceFormat must be specified.");
+ }
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
this.codecFactory = new CompressionCodecFactory(hdfsConfig);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
index 42c1f5b..157929c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DefaultFileNameFormat.java
@@ -15,7 +15,6 @@
import java.util.Map;
import org.apache.storm.task.TopologyContext;
-
/**
* Creates file names with the following format:
* <pre>
@@ -26,7 +25,7 @@
* MyBolt-5-7-1390579837830.txt
* </pre>
*
- * By default, prefix is empty and extenstion is ".txt".
+ * <p>By default, prefix is empty and extenstion is ".txt".
*
*/
public class DefaultFileNameFormat implements FileNameFormat {
@@ -38,9 +37,6 @@
/**
* Overrides the default prefix.
- *
- * @param prefix
- * @return
*/
public DefaultFileNameFormat withPrefix(String prefix) {
this.prefix = prefix;
@@ -49,9 +45,6 @@
/**
* Overrides the default file extension.
- *
- * @param extension
- * @return
*/
public DefaultFileNameFormat withExtension(String extension) {
this.extension = extension;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
index e9a81cc..f8cdad9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
@@ -20,7 +20,7 @@
* By default uses a comma (",") as the field delimiter and a
* newline ("\n") as the record delimiter.
*
- * Also by default, this implementation will output all the
+ * <p>Also by default, this implementation will output all the
* field values in the tuple in the order they were declared. To
* override this behavior, call <code>withFields()</code> to
* specify which tuple fields to output.
@@ -35,9 +35,6 @@
/**
* Only output the specified fields.
- *
- * @param fields
- * @return
*/
public DelimitedRecordFormat withFields(Fields fields) {
this.fields = fields;
@@ -46,9 +43,6 @@
/**
* Overrides the default field delimiter.
- *
- * @param delimiter
- * @return
*/
public DelimitedRecordFormat withFieldDelimiter(String delimiter) {
this.fieldDelimiter = delimiter;
@@ -57,9 +51,6 @@
/**
* Overrides the default record delimiter.
- *
- * @param delimiter
- * @return
*/
public DelimitedRecordFormat withRecordDelimiter(String delimiter) {
this.recordDelimiter = delimiter;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
index 891c600..70210a9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/FileNameFormat.java
@@ -28,7 +28,6 @@
* Returns the filename the HdfsBolt will create.
* @param rotation the current file rotation number (incremented on every rotation)
* @param timeStamp current time in milliseconds when the rotation occurs
- * @return
*/
String getName(long rotation, long timeStamp);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java
index 8d55c03..5102f38 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/RecordFormat.java
@@ -12,14 +12,11 @@
package org.apache.storm.hdfs.bolt.format;
-
import java.io.Serializable;
import org.apache.storm.tuple.Tuple;
/**
- * Formats a Tuple object into a byte array
- * that will be written to HDFS.
- *
+ * Formats a Tuple object into a byte array that will be written to HDFS.
*/
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java
index 7c38f66..7ea05a9 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SequenceFormat.java
@@ -26,32 +26,22 @@
*/
public interface SequenceFormat extends Serializable {
/**
- * Key class used by implementation (e.g. IntWritable.class, etc.)
- *
- * @return
+ * Key class used by implementation (e.g. IntWritable.class, etc.).
*/
Class keyClass();
/**
- * Value class used by implementation (e.g. Text.class, etc.)
- *
- * @return
+ * Value class used by implementation (e.g. Text.class, etc.).
*/
Class valueClass();
/**
* Given a tuple, return the key that should be written to the sequence file.
- *
- * @param tuple
- * @return
*/
Object key(Tuple tuple);
/**
* Given a tuple, return the value that should be written to the sequence file.
- *
- * @param tuple
- * @return
*/
Object value(Tuple tuple);
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
index 7869d69..d80aaa8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/format/SimpleFileNameFormat.java
@@ -73,9 +73,7 @@
* $COMPONENT - component id<br/>
* $TASK - task id<br/>
*
- * @param name
- * file name
- * @return
+ * @param name file name
*/
public SimpleFileNameFormat withName(String name) {
this.name = name;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
index 6354ae7..13229dd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java
@@ -12,18 +12,17 @@
package org.apache.storm.hdfs.bolt.rotation;
-
import java.io.Serializable;
import org.apache.storm.tuple.Tuple;
/**
* Used by the HdfsBolt to decide when to rotate files.
*
- * The HdfsBolt will call the <code>mark()</code> method for every
+ * <p>The HdfsBolt will call the <code>mark()</code> method for every
* tuple received. If the <code>mark()</code> method returns
* <code>true</code> the HdfsBolt will perform a file rotation.
*
- * After file rotation, the HdfsBolt will call the <code>reset()</code>
+ * <p>After file rotation, the HdfsBolt will call the <code>reset()</code>
* method.
*/
public interface FileRotationPolicy extends Serializable {
@@ -39,12 +38,11 @@
/**
* Called after the HdfsBolt rotates a file.
- *
*/
void reset();
/**
- * Must be able to copy the rotation policy
+ * Must be able to copy the rotation policy.
*/
FileRotationPolicy copy();
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
index 230e8b4..24a46dd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java
@@ -12,7 +12,6 @@
package org.apache.storm.hdfs.bolt.rotation;
-
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,7 +20,7 @@
* File rotation policy that will rotate files when a certain
* file size is reached.
*
- * For example:
+ * <p>For example:
* <pre>
* // rotate when files reach 5MB
* FileSizeRotationPolicy policy =
@@ -34,6 +33,7 @@
private long maxBytes;
private long lastOffset = 0;
private long currentBytesWritten = 0;
+
public FileSizeRotationPolicy(float count, Units units) {
this.maxBytes = (long) (count * units.getByteCount());
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java
index 45abc7d..a048dc3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/sync/CountSyncPolicy.java
@@ -12,7 +12,6 @@
package org.apache.storm.hdfs.bolt.sync;
-
import org.apache.storm.tuple.Tuple;
/**
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
index 6c2b0e0..caf6b49 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java
@@ -18,9 +18,10 @@
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;
-abstract public class AbstractHDFSWriter implements Writer {
- final protected Path filePath;
- final protected FileRotationPolicy rotationPolicy;
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
+public abstract class AbstractHDFSWriter implements Writer {
+ protected final Path filePath;
+ protected final FileRotationPolicy rotationPolicy;
protected long lastUsedTime;
protected long offset;
protected boolean needsRotation;
@@ -32,7 +33,7 @@
}
@Override
- final public long write(Tuple tuple) throws IOException {
+ public final long write(Tuple tuple) throws IOException {
doWrite(tuple);
this.needsRotation = rotationPolicy.mark(tuple, offset);
@@ -40,12 +41,12 @@
}
@Override
- final public void sync() throws IOException {
+ public final void sync() throws IOException {
doSync();
}
@Override
- final public void close() throws IOException {
+ public final void close() throws IOException {
doClose();
}
@@ -59,10 +60,10 @@
return this.filePath;
}
- abstract protected void doWrite(Tuple tuple) throws IOException;
+ protected abstract void doWrite(Tuple tuple) throws IOException;
- abstract protected void doSync() throws IOException;
+ protected abstract void doSync() throws IOException;
- abstract protected void doClose() throws IOException;
+ protected abstract void doClose() throws IOException;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
index d77423c..713aa58 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java
@@ -12,7 +12,6 @@
package org.apache.storm.hdfs.common;
-
import java.io.IOException;
import java.util.EnumSet;
import org.apache.avro.Schema;
@@ -28,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter {
private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
index 578bc06..8b3dcd1 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java
@@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class HDFSWriter extends AbstractHDFSWriter {
private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
index 9f79373..1488655 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java
@@ -21,10 +21,9 @@
* Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write
* to /common/output and a partitioner returned "/foo" then the bolt should open a file in "/common/output/foo"
*
- * A best practice is to use Path.SEPARATOR instead of a literal "/"
+ * <p>A best practice is to use Path.SEPARATOR instead of a literal "/"
*
* @param tuple The tuple for which the relative path is being calculated.
- * @return
*/
public String getPartitionPath(final Tuple tuple);
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
index f5ade03..bad3d06 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/rotation/RotationAction.java
@@ -18,7 +18,6 @@
package org.apache.storm.hdfs.common.rotation;
-
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.fs.FileSystem;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index f94b8e5..9859d08 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -24,59 +24,68 @@
public class Configs implements Validated {
/**
- * @deprecated please use {@link HdfsSpout.setReaderType(String)}
+ * Required - chose the file type being consumed.
+ * @deprecated please use {@link HdfsSpout#setReaderType(String)}
*/
@Deprecated
@isString
@CustomValidator(validatorClass = ReaderTypeValidator.class)
- public static final String READER_TYPE = "hdfsspout.reader.type"; // Required - chose the file type being consumed
+ public static final String READER_TYPE = "hdfsspout.reader.type";
public static final String TEXT = "text";
public static final String SEQ = "seq";
/**
+ * Required - HDFS name node.
* @deprecated please use {@link HdfsSpout#setHdfsUri(String)}
*/
@Deprecated
@isString
- public static final String HDFS_URI = "hdfsspout.hdfs"; // Required - HDFS name node
+ public static final String HDFS_URI = "hdfsspout.hdfs";
/**
+ * Required - dir from which to read files.
* @deprecated please use {@link HdfsSpout#setSourceDir(String)}
*/
@Deprecated
@isString
- public static final String SOURCE_DIR = "hdfsspout.source.dir"; // Required - dir from which to read files
+ public static final String SOURCE_DIR = "hdfsspout.source.dir";
/**
+ * Required - completed files will be moved here.
* @deprecated please use {@link HdfsSpout#setArchiveDir(String)}
*/
@Deprecated
@isString
- public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // Required - completed files will be moved here
+ public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";
/**
+ * Required - unparsable files will be moved here.
* @deprecated please use {@link HdfsSpout#setBadFilesDir(String)}
*/
@Deprecated
@isString
- public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // Required - unparsable files will be moved here
+ public static final String BAD_DIR = "hdfsspout.badfiles.dir";
/**
+ * Directory in which lock files will be created.
* @deprecated please use {@link HdfsSpout#setLockDir(String)}
*/
@Deprecated
@isString
- public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created
+ public static final String LOCK_DIR = "hdfsspout.lock.dir";
/**
+ * Commit after N records. 0 disables this.
* @deprecated please use {@link HdfsSpout#setCommitFrequencyCount(int)}
*/
@Deprecated
@isInteger
@isPositiveNumber(includeZero = true)
- public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this.
+ public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";
/**
+ * Commit after N secs. cannot be disabled.
* @deprecated please use {@link HdfsSpout#setCommitFrequencySec(int)}
*/
@Deprecated
@isInteger
@isPositiveNumber
- public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled.
+ public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";
/**
+ * Max outstanding.
* @deprecated please use {@link HdfsSpout#setMaxOutstanding(int)}
*/
@Deprecated
@@ -84,6 +93,7 @@
@isPositiveNumber(includeZero = true)
public static final String MAX_OUTSTANDING = "hdfsspout.max.outstanding";
/**
+ * Lock timeout.
* @deprecated please use {@link HdfsSpout#setLockTimeoutSec(int)}
*/
@Deprecated
@@ -91,21 +101,26 @@
@isPositiveNumber
public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";
/**
+ * If clocks on machines in the Storm cluster are in sync inactivity duration after which locks are considered
+ * candidates for being reassigned to another spout.
+ *
* @deprecated please use {@link HdfsSpout#setClocksInSync(boolean)}
*/
@Deprecated
@isBoolean
- public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
- // inactivity duration after which locks are considered candidates for being reassigned to another spout
+ public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";
/**
+ * Ignore suffix.
* @deprecated please use {@link HdfsSpout#setIgnoreSuffix(String)}
*/
@Deprecated
@isString
public static final String IGNORE_SUFFIX = "hdfsspout.ignore.suffix";
+ /**
+ * Filenames with this suffix in archive dir will be ignored by the Spout.
+ */
@NotConf
public static final String DEFAULT_LOCK_DIR = ".lock";
- // filenames with this suffix in archive dir will be ignored by the Spout
public static final int DEFAULT_COMMIT_FREQ_COUNT = 20000;
public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
public static final int DEFAULT_MAX_OUTSTANDING = 10000;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index eea23e1..488531a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -39,9 +39,8 @@
this.lockFile = lockFile;
}
- /** Get a lock on file if not already locked
+ /** Get a lock on file if not already locked.
*
- * @param fs
* @param dir the dir on which to get a lock
* @return The lock object if it the lock was acquired. Returns null if the dir is already locked.
* @throws IOException if there were errors
@@ -74,7 +73,9 @@
+ Thread.currentThread().getName();
}
- /** if the lock on the directory is stale, take ownership */
+ /**
+ * if the lock on the directory is stale, take ownership.
+ */
public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) {
Path dirLockFile = getDirLockFile(dirToLock);
@@ -95,8 +96,8 @@
private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException {
if (fs instanceof DistributedFileSystem) {
if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) {
- LOG.warn("Unable to recover lease on dir lock file " + dirLockFile +
- " right now. Cannot transfer ownership. Will need to try later.");
+ LOG.warn("Unable to recover lease on dir lock file " + dirLockFile
+ + " right now. Cannot transfer ownership. Will need to try later.");
return null;
}
}
@@ -112,7 +113,9 @@
return null;
}
- /** Release lock on dir by deleting the lock file */
+ /**
+ * Release lock on dir by deleting the lock file.
+ */
public void release() throws IOException {
if (!fs.delete(lockFile, false)) {
LOG.error("Thread {} could not delete dir lock {} ", threadInfo(), lockFile);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index c6529a9..7ff5aaa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -12,7 +12,6 @@
package org.apache.storm.hdfs.spout;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -38,7 +37,7 @@
private static final Logger LOG = LoggerFactory.getLogger(FileLock.class);
private final FileSystem fs;
- private final String componentID;
+ private final String componentId;
private final Path lockFile;
private final FSDataOutputStream lockFileStream;
private LogEntry lastEntry;
@@ -48,7 +47,7 @@
this.fs = fs;
this.lockFile = lockFile;
this.lockFileStream = lockFileStream;
- this.componentID = spoutId;
+ this.componentId = spoutId;
logProgress("0", false);
}
@@ -57,12 +56,14 @@
this.fs = fs;
this.lockFile = lockFile;
this.lockFileStream = fs.append(lockFile);
- this.componentID = spoutId;
+ this.componentId = spoutId;
LOG.info("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId);
logProgress(entry.fileOffset, true);
}
- /** returns lock on file or null if file is already locked. throws if unexpected problem */
+ /**
+ * returns lock on file or null if file is already locked. throws if unexpected problem
+ */
public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
throws IOException {
Path lockFile = new Path(lockDirPath, fileToLock.getName());
@@ -86,11 +87,9 @@
* checks if lockFile is older than 'olderThan' UTC time by examining the modification time
* on file and (if necessary) the timestamp in last log entry in the file. If its stale, then
* returns the last log entry, else returns null.
- * @param fs
- * @param lockFile
+ *
* @param olderThan time (millis) in UTC.
* @return the last entry in the file if its too old. null if last entry is not too old
- * @throws IOException
*/
public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
throws IOException {
@@ -117,11 +116,7 @@
}
/**
- * returns the last log entry
- * @param fs
- * @param lockFile
- * @return
- * @throws IOException
+ * returns the last log entry.
*/
public static LogEntry getLastEntry(FileSystem fs, Path lockFile)
throws IOException {
@@ -136,14 +131,13 @@
/**
* Takes ownership of the lock file if possible.
- * @param lockFile
* @param lastEntry last entry in the lock file. this param is an optimization.
* we dont scan the lock file again to find its last entry here since
* its already been done once by the logic used to check if the lock
* file is stale. so this value comes from that earlier scan.
* @param spoutId spout id
- * @throws IOException if unable to acquire
* @return null if lock File is not recoverable
+ * @throws IOException if unable to acquire
*/
public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
throws IOException {
@@ -158,11 +152,12 @@
}
return new FileLock(fs, lockFile, spoutId, lastEntry);
} catch (IOException e) {
- if (e instanceof RemoteException &&
- ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
+ if (e instanceof RemoteException
+ && ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
LOG.warn(
- "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId,
- e);
+ "Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= "
+ + spoutId,
+ e);
return null;
} else { // unexpected error
LOG.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e);
@@ -173,13 +168,9 @@
/**
* Finds a oldest expired lock file (using modification timestamp), then takes
- * ownership of the lock file
+ * ownership of the lock file.
* Impt: Assumes access to lockFilesDir has been externally synchronized such that
* only one thread accessing the same thread
- * @param fs
- * @param lockFilesDir
- * @param locktimeoutSec
- * @return
*/
public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
throws IOException {
@@ -209,14 +200,11 @@
/**
* Finds oldest expired lock file (using modification timestamp), then takes
- * ownership of the lock file
+ * ownership of the lock file.
* Impt: Assumes access to lockFilesDir has been externally synchronized such that
* only one thread accessing the same thread
- * @param fs
- * @param lockFilesDir
- * @param locktimeoutSec
- * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
- * @throws IOException
+ *
+ * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
*/
public static HdfsUtils.Pair<Path, LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec)
throws IOException {
@@ -248,7 +236,7 @@
private void logProgress(String fileOffset, boolean prefixNewLine)
throws IOException {
long now = System.currentTimeMillis();
- LogEntry entry = new LogEntry(now, componentID, fileOffset);
+ LogEntry entry = new LogEntry(now, componentId, fileOffset);
String line = entry.toString();
if (prefixNewLine) {
lockFileStream.writeBytes(System.lineSeparator() + line);
@@ -260,16 +248,17 @@
lastEntry = entry; // update this only after writing to hdfs
}
- /** Release lock by deleting file
+ /**
+ * Release lock by deleting file.
* @throws IOException if lock file could not be deleted
*/
public void release() throws IOException {
lockFileStream.close();
if (!fs.delete(lockFile, false)) {
- LOG.warn("Unable to delete lock file, Spout = {}", componentID);
+ LOG.warn("Unable to delete lock file, Spout = {}", componentId);
throw new IOException("Unable to delete lock file");
}
- LOG.debug("Released lock file {}. Spout {}", lockFile, componentID);
+ LOG.debug("Released lock file {}. Spout {}", lockFile, componentId);
}
// For testing only.. invoked via reflection
@@ -288,12 +277,12 @@
public static class LogEntry {
private static final int NUM_FIELDS = 3;
public final long eventTime;
- public final String componentID;
+ public final String componentId;
public final String fileOffset;
- public LogEntry(long eventtime, String componentID, String fileOffset) {
+ public LogEntry(long eventtime, String componentId, String fileOffset) {
this.eventTime = eventtime;
- this.componentID = componentID;
+ this.componentId = componentId;
this.fileOffset = fileOffset;
}
@@ -304,7 +293,7 @@
@Override
public String toString() {
- return eventTime + "," + componentID + "," + fileOffset;
+ return eventTime + "," + componentId + "," + fileOffset;
}
@Override
@@ -321,7 +310,7 @@
if (eventTime != logEntry.eventTime) {
return false;
}
- if (!componentID.equals(logEntry.componentID)) {
+ if (!componentId.equals(logEntry.componentId)) {
return false;
}
return fileOffset.equals(logEntry.fileOffset);
@@ -331,7 +320,7 @@
@Override
public int hashCode() {
int result = (int) (eventTime ^ (eventTime >>> 32));
- result = 31 * result + componentID.hashCode();
+ result = 31 * result + componentId.hashCode();
result = 31 * result + fileOffset.hashCode();
return result;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
index bf58815..a8b354a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -24,7 +24,9 @@
*/
interface FileOffset extends Comparable<FileOffset>, Cloneable {
- /** tests if rhs == currOffset+1 */
+ /**
+ * tests if rhs == currOffset+1.
+ */
boolean isNextOffset(FileOffset rhs);
FileOffset clone();
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
index 49d998a..b6e08f4 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -26,10 +26,9 @@
FileOffset getFileOffset();
/**
- * Get the next tuple from the file
+ * Get the next tuple from the file.
*
* @return null if no more data
- * @throws IOException
*/
List<Object> next() throws IOException, ParseException;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 18c469f..a7ce729 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -62,7 +62,7 @@
private int maxOutstanding = Configs.DEFAULT_MAX_OUTSTANDING;
private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
private boolean clocksInSync = true;
- private String inprogress_suffix = ".inprogress"; // not configurable to prevent change between topology restarts
+ private String inprogressSuffix = ".inprogress"; // not configurable to prevent change between topology restarts
private String ignoreSuffix = ".ignore";
private String outputStreamName = null;
private ProgressTracker tracker = null;
@@ -89,14 +89,14 @@
return reader.getFilePath() + " " + reader.getFileOffset();
}
- private static void releaseLockAndLog(FileLock fLock, String spoutId) {
+ private static void releaseLockAndLog(FileLock fileLock, String spoutId) {
try {
- if (fLock != null) {
- fLock.release();
- LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId);
+ if (fileLock != null) {
+ fileLock.release();
+ LOG.debug("Spout {} released FileLock. SpoutId = {}", fileLock.getLockFile(), spoutId);
}
} catch (IOException e) {
- LOG.error("Unable to delete lock file : " + fLock.getLockFile() + " SpoutId =" + spoutId, e);
+ LOG.error("Unable to delete lock file : " + fileLock.getLockFile() + " SpoutId =" + spoutId, e);
}
}
@@ -215,7 +215,7 @@
}
/**
- * Set output stream name
+ * Set output stream name.
*/
public HdfsSpout withOutputStream(String streamName) {
this.outputStreamName = streamName;
@@ -348,7 +348,7 @@
private void markFileAsBad(Path file) {
String fileName = file.toString();
- String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+ String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogressSuffix));
String originalName = new Path(fileNameMinusSuffix).getName();
Path newFile = new Path(badFilesDirPath + Path.SEPARATOR + originalName);
@@ -590,7 +590,7 @@
Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
for (Path file : listing) {
- if (file.getName().endsWith(inprogress_suffix)) {
+ if (file.getName().endsWith(inprogressSuffix)) {
continue;
}
if (file.getName().endsWith(ignoreSuffix)) {
@@ -625,7 +625,6 @@
* check if the lock is updated. if not updated then acquires the lock
*
* @return a lock object
- * @throws IOException
*/
private FileLock getOldestExpiredLock() throws IOException {
// 1 - acquire lock on dir
@@ -681,11 +680,9 @@
}
/**
- * Creates a reader that reads from beginning of file
+ * Creates a reader that reads from beginning of file.
*
* @param file file to read
- * @return
- * @throws IOException
*/
private FileReader createFileReader(Path file)
throws IOException {
@@ -706,12 +703,10 @@
}
/**
- * Creates a reader that starts reading from 'offset'
+ * Creates a reader that starts reading from 'offset'.
*
* @param file the file to read
* @param offset the offset string should be understandable by the reader type being used
- * @return
- * @throws IOException
*/
private FileReader createFileReader(Path file, String offset)
throws IOException {
@@ -733,14 +728,14 @@
}
/**
- * Renames files with .inprogress suffix
+ * Renames files with .inprogress suffix.
*
* @return path of renamed file
* @throws if operation fails
*/
private Path renameToInProgressFile(Path file)
throws IOException {
- Path newFile = new Path(file.toString() + inprogress_suffix);
+ Path newFile = new Path(file.toString() + inprogressSuffix);
try {
if (hdfs.rename(file, newFile)) {
return newFile;
@@ -757,7 +752,7 @@
private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
throws IOException {
String lockFileName = lockFile.getName();
- Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogress_suffix);
+ Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogressSuffix);
if (hdfs.exists(dataFile)) {
return dataFile;
}
@@ -771,7 +766,7 @@
// renames files and returns the new file path
private Path renameCompletedFile(Path file) throws IOException {
String fileName = file.toString();
- String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+ String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogressSuffix));
String newName = new Path(fileNameMinusSuffix).getName();
Path newFile = new Path(archiveDirPath + Path.SEPARATOR + newName);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 39dccae..d245df7 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -25,8 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SequenceFileReader<Key extends Writable, Value extends Writable>
- extends AbstractFileReader {
+public class SequenceFileReader<KeyT extends Writable, ValueT extends Writable> extends AbstractFileReader {
public static final String[] defaultFields = { "key", "value" };
public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
private static final Logger LOG = LoggerFactory
@@ -37,8 +36,8 @@
private final SequenceFileReader.Offset offset;
- private final Key key;
- private final Value value;
+ private final KeyT key;
+ private final ValueT value;
public SequenceFileReader(FileSystem fs, Path file, Map<String, Object> conf)
@@ -46,8 +45,8 @@
super(fs, file);
int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString());
this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize));
- this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
- this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
+ this.key = (KeyT) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
+ this.value = (ValueT) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
this.offset = new SequenceFileReader.Offset(0, 0, 0);
}
@@ -57,8 +56,8 @@
int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt(conf.get(BUFFER_SIZE).toString());
this.offset = new SequenceFileReader.Offset(offset);
this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize));
- this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
- this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
+ this.key = (KeyT) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf());
+ this.value = (ValueT) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf());
skipToOffset(this.reader, this.offset, this.key);
}
@@ -106,8 +105,11 @@
this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0);
}
- public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord
- , long currRecordEndOffset, long prevRecordEndOffset) {
+ public Offset(long lastSyncPoint,
+ long recordsSinceLastSync,
+ long currentRecord,
+ long currRecordEndOffset,
+ long prevRecordEndOffset) {
this.lastSyncPoint = lastSyncPoint;
this.recordsSinceLastSync = recordsSinceLastSync;
this.currentRecord = currentRecord;
@@ -135,19 +137,19 @@
this.currRecordEndOffset = 0;
}
} catch (Exception e) {
- throw new IllegalArgumentException("'" + offset +
- "' cannot be interpreted. It is not in expected format for SequenceFileReader." +
- " Format e.g. {sync=123:afterSync=345:record=67}");
+ throw new IllegalArgumentException("'" + offset
+ + "' cannot be interpreted. It is not in expected format for SequenceFileReader."
+ + " Format e.g. {sync=123:afterSync=345:record=67}");
}
}
@Override
public String toString() {
- return '{' +
- "sync=" + lastSyncPoint +
- ":afterSync=" + recordsSinceLastSync +
- ":record=" + currentRecord +
- ":}";
+ return '{'
+ + "sync=" + lastSyncPoint
+ + ":afterSync=" + recordsSinceLastSync
+ + ":record=" + currentRecord
+ + ":}";
}
@Override
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index 94b40f3..0b3da9c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -31,7 +31,7 @@
public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
private static final int DEFAULT_BUFF_SIZE = 4096;
- private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
private BufferedReader reader;
private TextFileReader.Offset offset;
@@ -124,26 +124,26 @@
this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
}
} catch (Exception e) {
- throw new IllegalArgumentException("'" + offset +
- "' cannot be interpreted. It is not in expected format for TextFileReader." +
- " Format e.g. {char=123:line=5}");
+ throw new IllegalArgumentException("'" + offset
+ + "' cannot be interpreted. It is not in expected format for TextFileReader."
+ + " Format e.g. {char=123:line=5}");
}
}
@Override
public String toString() {
- return '{' +
- "char=" + charOffset +
- ":line=" + lineNumber +
- ":}";
+ return '{'
+ + "char=" + charOffset
+ + ":line=" + lineNumber
+ + ":}";
}
@Override
public boolean isNextOffset(FileOffset rhs) {
if (rhs instanceof Offset) {
Offset other = ((Offset) rhs);
- return other.charOffset > charOffset &&
- other.lineNumber == lineNumber + 1;
+ return other.charOffset > charOffset
+ && other.lineNumber == lineNumber + 1;
}
return false;
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index e6adfbb..118a113 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -101,9 +101,7 @@
* Reads the last txn record from index file if it exists, if not from .tmp file if exists.
*
* @param indexFilePath the index file path
- * @return the txn record from the index file or a default initial record.
- *
- * @throws IOException
+ * @return the txn record from the index file or a default initial record
*/
private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
Path tmpPath = tmpFilePath(indexFilePath.toString());
@@ -186,13 +184,13 @@
}
/**
- * for unit tests
+ * for unit tests.
*/
void close() throws IOException {
this.options.closeOutputFile();
}
- public static abstract class Options implements Serializable {
+ public abstract static class Options implements Serializable {
protected String fsUrl;
protected String configKey;
@@ -216,10 +214,11 @@
abstract void doCommit(Long txId) throws IOException;
- abstract void doRecover(Path srcPath, long nBytes) throws Exception;
+ abstract void doRecover(Path srcPath, long numberOfBytes) throws Exception;
protected void rotateOutputFile(boolean doRotateAction) throws IOException {
LOG.info("Rotating output file...");
+ @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
long start = System.currentTimeMillis();
closeOutputFile();
this.rotation++;
@@ -279,14 +278,14 @@
/**
* Recovers nBytes from srcFile to the new file created by calling rotateOutputFile and then deletes the srcFile.
*/
- private void recover(String srcFile, long nBytes) {
+ private void recover(String srcFile, long numberOfBytes) {
try {
Path srcPath = new Path(srcFile);
rotateOutputFile(false);
this.rotationPolicy.reset();
- if (nBytes > 0) {
- doRecover(srcPath, nBytes);
- LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+ if (numberOfBytes > 0) {
+ doRecover(srcPath, numberOfBytes);
+ LOG.info("Recovered {} bytes from {} to {}", numberOfBytes, srcFile, currentFile);
} else {
LOG.info("Nothing to recover from {}", srcFile);
}
@@ -380,11 +379,11 @@
}
@Override
- void doRecover(Path srcPath, long nBytes) throws IOException {
+ void doRecover(Path srcPath, long numberOfBytes) throws IOException {
this.offset = 0;
FSDataInputStream is = this.fs.open(srcPath);
- copyBytes(is, out, nBytes);
- this.offset = nBytes;
+ copyBytes(is, out, numberOfBytes);
+ this.offset = numberOfBytes;
}
private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy) throws IOException {
@@ -468,7 +467,9 @@
@Override
void doPrepare(Map<String, Object> conf, int partitionIndex, int numPartitions) throws IOException {
LOG.info("Preparing Sequence File State...");
- if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
+ if (this.format == null) {
+ throw new IllegalStateException("SequenceFormat must be specified.");
+ }
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
this.codecFactory = new CompressionCodecFactory(hdfsConfig);
@@ -491,9 +492,10 @@
@Override
- void doRecover(Path srcPath, long nBytes) throws Exception {
+ void doRecover(Path srcPath, long numberOfBytes) throws Exception {
SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig,
- SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes));
+ SequenceFile.Reader.file(srcPath),
+ SequenceFile.Reader.length(numberOfBytes));
Writable key = (Writable) this.format.keyClass().newInstance();
Writable value = (Writable) this.format.valueClass().newInstance();
@@ -531,10 +533,9 @@
}
/**
- * TxnRecord [txnid, data_file_path, data_file_offset]
- * <p>
- * This is written to the index file during beginCommit() and used for recovery.
- * </p>
+ * TxnRecord [txnid, data_file_path, data_file_offset].
+ *
+ * <p>This is written to the index file during beginCommit() and used for recovery.
*/
private static class TxnRecord {
private long txnid;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
index e97bf1c..e48e198 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DefaultFileNameFormat.java
@@ -25,7 +25,7 @@
* MyBolt-5-7-1390579837830.txt
* </pre>
*
- * By default, prefix is empty and extenstion is ".txt".
+ * <p>By default, prefix is empty and extenstion is ".txt".
*
*/
public class DefaultFileNameFormat implements FileNameFormat {
@@ -36,9 +36,6 @@
/**
* Overrides the default prefix.
- *
- * @param prefix
- * @return
*/
public DefaultFileNameFormat withPrefix(String prefix) {
this.prefix = prefix;
@@ -47,9 +44,6 @@
/**
* Overrides the default file extension.
- *
- * @param extension
- * @return
*/
public DefaultFileNameFormat withExtension(String extension) {
this.extension = extension;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
index e21fede..c12b478 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/DelimitedRecordFormat.java
@@ -19,8 +19,6 @@
* RecordFormat implementation that uses field and record delimiters.
* By default uses a comma (",") as the field delimiter and a
* newline ("\n") as the record delimiter.
- *
- *
*/
public class DelimitedRecordFormat implements RecordFormat {
public static final String DEFAULT_FIELD_DELIMITER = ",";
@@ -31,9 +29,6 @@
/**
* Only output the specified fields.
- *
- * @param fields
- * @return
*/
public DelimitedRecordFormat withFields(Fields fields) {
this.fields = fields;
@@ -42,9 +37,6 @@
/**
* Overrides the default field delimiter.
- *
- * @param delimiter
- * @return
*/
public DelimitedRecordFormat withFieldDelimiter(String delimiter) {
this.fieldDelimiter = delimiter;
@@ -53,9 +45,6 @@
/**
* Overrides the default record delimiter.
- *
- * @param delimiter
- * @return
*/
public DelimitedRecordFormat withRecordDelimiter(String delimiter) {
this.recordDelimiter = delimiter;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
index fbd8f5a..0b7ac46 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/FileNameFormat.java
@@ -17,7 +17,6 @@
/**
* Formatter interface for determining HDFS file names.
- *
*/
public interface FileNameFormat extends Serializable {
@@ -27,7 +26,6 @@
* Returns the filename the HdfsBolt will create.
* @param rotation the current file rotation number (incremented on every rotation)
* @param timeStamp current time in milliseconds when the rotation occurs
- * @return
*/
String getName(long rotation, long timeStamp);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
index 1cc5363..b2f2cc3 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/RecordFormat.java
@@ -12,14 +12,11 @@
package org.apache.storm.hdfs.trident.format;
-
import java.io.Serializable;
import org.apache.storm.trident.tuple.TridentTuple;
/**
- * Formats a Tuple object into a byte array
- * that will be written to HDFS.
- *
+ * Formats a Tuple object into a byte array that will be written to HDFS.
*/
public interface RecordFormat extends Serializable {
byte[] format(TridentTuple tuple);
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
index 497d045..815bf2f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SequenceFormat.java
@@ -27,32 +27,22 @@
*/
public interface SequenceFormat extends Serializable {
/**
- * Key class used by implementation (e.g. IntWritable.class, etc.)
- *
- * @return
+ * Key class used by implementation (e.g. IntWritable.class, etc.).
*/
Class keyClass();
/**
- * Value class used by implementation (e.g. Text.class, etc.)
- *
- * @return
+ * Value class used by implementation (e.g. Text.class, etc.).
*/
Class valueClass();
/**
* Given a tuple, return the key that should be written to the sequence file.
- *
- * @param tuple
- * @return
*/
Writable key(TridentTuple tuple);
/**
* Given a tuple, return the value that should be written to the sequence file.
- *
- * @param tuple
- * @return
*/
Writable value(TridentTuple tuple);
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
index 068390f..889c60b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/format/SimpleFileNameFormat.java
@@ -68,9 +68,7 @@
* $HOST - local host name<br/>
* $PARTITION - partition index<br/>
*
- * @param name
- * file name
- * @return
+ * @param name file name
*/
public SimpleFileNameFormat withName(String name) {
this.name = name;
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index ad9d7aa..a2a5932 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -18,11 +18,11 @@
/**
* Used by the HdfsBolt to decide when to rotate files.
*
- * The HdfsBolt will call the <code>mark()</code> method for every
+ * <p>The HdfsBolt will call the <code>mark()</code> method for every
* tuple received. If the <code>mark()</code> method returns
* <code>true</code> the HdfsBolt will perform a file rotation.
*
- * After file rotation, the HdfsBolt will call the <code>reset()</code>
+ * <p>After file rotation, the HdfsBolt will call the <code>reset()</code>
* method.
*/
public interface FileRotationPolicy extends Serializable {
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 2a512c4..18790d8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -12,7 +12,6 @@
package org.apache.storm.hdfs.trident.rotation;
-
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,19 +20,19 @@
* File rotation policy that will rotate files when a certain
* file size is reached.
*
- * For example:
+ * <p>For example:
* <pre>
* // rotate when files reach 5MB
* FileSizeRotationPolicy policy =
* new FileSizeRotationPolicy(5.0, Units.MB);
* </pre>
- *
*/
public class FileSizeRotationPolicy implements FileRotationPolicy {
private static final Logger LOG = LoggerFactory.getLogger(FileSizeRotationPolicy.class);
private long maxBytes;
private long lastOffset = 0;
private long currentBytesWritten = 0;
+
public FileSizeRotationPolicy(float count, Units units) {
this.maxBytes = (long) (count * units.getByteCount());
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index 2508a07..4539464 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -23,12 +23,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.trident.tuple.TridentTuple;
-
public class TimedRotationPolicy implements FileRotationPolicy {
private long interval;
private Timer rotationTimer;
private AtomicBoolean rotationTimerTriggered = new AtomicBoolean();
+
public TimedRotationPolicy(float count, TimeUnit units) {
this.interval = (long) (count * units.getMilliSeconds());
}
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java
index 15f0c6b..f98dbdf 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/sync/CountSyncPolicy.java
@@ -12,7 +12,6 @@
package org.apache.storm.hdfs.trident.sync;
-
import org.apache.storm.trident.tuple.TridentTuple;
/**
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 1a30bfa..469a808 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -245,7 +245,7 @@
expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
Assert.assertNotNull(expired);
- Assert.assertEquals("spout3", expired.getValue().componentID);
+ Assert.assertEquals("spout3", expired.getValue().componentId);
} finally {
lock1.release();
lock2.release();