[BEAM-9143] Make RedisIO code follow standard Beam conventions
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 9612487..cc9649b 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -150,7 +150,7 @@
abstract int batchSize();
- abstract Builder builder();
+ abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
@@ -168,35 +168,37 @@
public Read withEndpoint(String host, int port) {
checkArgument(host != null, "host can not be null");
checkArgument(0 < port && port < 65536, "port must be a positive integer less than 65536");
- return builder()
+ return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
.build();
}
public Read withAuth(String auth) {
checkArgument(auth != null, "auth can not be null");
- return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+ return toBuilder()
+ .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+ .build();
}
public Read withTimeout(int timeout) {
checkArgument(timeout >= 0, "timeout can not be negative");
- return builder()
+ return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
.build();
}
public Read withKeyPattern(String keyPattern) {
checkArgument(keyPattern != null, "keyPattern can not be null");
- return builder().setKeyPattern(keyPattern).build();
+ return toBuilder().setKeyPattern(keyPattern).build();
}
public Read withConnectionConfiguration(RedisConnectionConfiguration connection) {
checkArgument(connection != null, "connection can not be null");
- return builder().setConnectionConfiguration(connection).build();
+ return toBuilder().setConnectionConfiguration(connection).build();
}
public Read withBatchSize(int batchSize) {
- return builder().setBatchSize(batchSize).build();
+ return toBuilder().setBatchSize(batchSize).build();
}
@Override
@@ -228,14 +230,14 @@
abstract int batchSize();
- abstract ReadAll.Builder builder();
+ abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
@Nullable
- abstract ReadAll.Builder setConnectionConfiguration(RedisConnectionConfiguration connection);
+ abstract Builder setConnectionConfiguration(RedisConnectionConfiguration connection);
- abstract ReadAll.Builder setBatchSize(int batchSize);
+ abstract Builder setBatchSize(int batchSize);
abstract ReadAll build();
}
@@ -243,30 +245,32 @@
public ReadAll withEndpoint(String host, int port) {
checkArgument(host != null, "host can not be null");
checkArgument(port > 0, "port can not be negative or 0");
- return builder()
+ return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
.build();
}
public ReadAll withAuth(String auth) {
checkArgument(auth != null, "auth can not be null");
- return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+ return toBuilder()
+ .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+ .build();
}
public ReadAll withTimeout(int timeout) {
checkArgument(timeout >= 0, "timeout can not be negative");
- return builder()
+ return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
.build();
}
public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connection) {
checkArgument(connection != null, "connection can not be null");
- return builder().setConnectionConfiguration(connection).build();
+ return toBuilder().setConnectionConfiguration(connection).build();
}
public ReadAll withBatchSize(int batchSize) {
- return builder().setBatchSize(batchSize).build();
+ return toBuilder().setBatchSize(batchSize).build();
}
@Override
@@ -280,7 +284,7 @@
}
}
- abstract static class BaseReadFn<T> extends DoFn<String, T> {
+ private abstract static class BaseReadFn<T> extends DoFn<String, T> {
protected final RedisConnectionConfiguration connectionConfiguration;
transient Jedis jedis;
@@ -307,9 +311,9 @@
}
@ProcessElement
- public void processElement(ProcessContext processContext) throws Exception {
+ public void processElement(ProcessContext c) {
ScanParams scanParams = new ScanParams();
- scanParams.match(processContext.element());
+ scanParams.match(c.element());
String cursor = ScanParams.SCAN_POINTER_START;
boolean finished = false;
@@ -317,7 +321,7 @@
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
List<String> keys = scanResult.getResult();
for (String k : keys) {
- processContext.output(k);
+ c.output(k);
}
cursor = scanResult.getCursor();
if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
@@ -326,42 +330,52 @@
}
}
}
+
/** A {@link DoFn} requesting Redis server to get key/value pairs. */
private static class ReadFn extends BaseReadFn<KV<String, String>> {
@Nullable transient Multimap<BoundedWindow, String> bundles = null;
@Nullable AtomicInteger batchCount = null;
private final int batchSize;
- @StartBundle
- public void startBundle(StartBundleContext context) {
- bundles = ArrayListMultimap.create();
- batchCount = new AtomicInteger();
- }
-
ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) {
super(connectionConfiguration);
this.batchSize = batchSize;
}
- private int getBatchSize() {
- return batchSize;
+ @StartBundle
+ public void startBundle() {
+ bundles = ArrayListMultimap.create();
+ batchCount = new AtomicInteger();
}
@ProcessElement
- public void processElement(ProcessContext processContext, BoundedWindow window)
- throws Exception {
- String key = processContext.element();
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ String key = c.element();
bundles.put(window, key);
if (batchCount.incrementAndGet() > getBatchSize()) {
Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
for (BoundedWindow w : kvs.keySet()) {
for (KV<String, String> kv : kvs.get(w)) {
- processContext.output(kv);
+ c.output(kv);
}
}
}
}
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context) {
+ Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+ for (BoundedWindow w : kvs.keySet()) {
+ for (KV<String, String> kv : kvs.get(w)) {
+ context.output(kv, w.maxTimestamp(), w);
+ }
+ }
+ }
+
+ private int getBatchSize() {
+ return batchSize;
+ }
+
private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
for (BoundedWindow w : bundles.keySet()) {
@@ -378,16 +392,6 @@
batchCount.set(0);
return kvs;
}
-
- @FinishBundle
- public void finishBundle(FinishBundleContext context) throws Exception {
- Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
- for (BoundedWindow w : kvs.keySet()) {
- for (KV<String, String> kv : kvs.get(w)) {
- context.output(kv, w.maxTimestamp(), w);
- }
- }
- }
}
private static class Reparallelize
@@ -395,8 +399,7 @@
@Override
public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
- // reparallelize mimics the same behavior as in JdbcIO
- // breaking fusion
+ // reparallelize mimics the same behavior as in JdbcIO, used to break fusion
PCollectionView<Iterable<KV<String, String>>> empty =
input
.apply("Consume", Filter.by(SerializableFunctions.constant(false)))
@@ -407,8 +410,8 @@
ParDo.of(
new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
- public void processElement(ProcessContext context) {
- context.output(context.element());
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
}
})
.withSideInputs(empty));
@@ -468,7 +471,7 @@
@Nullable
abstract Long expireTime();
- abstract Builder builder();
+ abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
@@ -486,37 +489,39 @@
public Write withEndpoint(String host, int port) {
checkArgument(host != null, "host can not be null");
checkArgument(port > 0, "port can not be negative or 0");
- return builder()
+ return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
.build();
}
public Write withAuth(String auth) {
checkArgument(auth != null, "auth can not be null");
- return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+ return toBuilder()
+ .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+ .build();
}
public Write withTimeout(int timeout) {
checkArgument(timeout >= 0, "timeout can not be negative");
- return builder()
+ return toBuilder()
.setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
.build();
}
public Write withConnectionConfiguration(RedisConnectionConfiguration connection) {
checkArgument(connection != null, "connection can not be null");
- return builder().setConnectionConfiguration(connection).build();
+ return toBuilder().setConnectionConfiguration(connection).build();
}
public Write withMethod(Method method) {
checkArgument(method != null, "method can not be null");
- return builder().setMethod(method).build();
+ return toBuilder().setMethod(method).build();
}
public Write withExpireTime(Long expireTimeMillis) {
checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
checkArgument(expireTimeMillis > 0, "expireTimeMillis can not be negative or 0");
- return builder().setExpireTime(expireTimeMillis).build();
+ return toBuilder().setExpireTime(expireTimeMillis).build();
}
@Override
@@ -555,8 +560,8 @@
}
@ProcessElement
- public void processElement(ProcessContext processContext) {
- KV<String, String> record = processContext.element();
+ public void processElement(ProcessContext c) {
+ KV<String, String> record = c.element();
writeRecord(record);
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index bcb3fca..badf039 100644
--- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -175,7 +175,7 @@
}
@Test
- public void testWriteUsingINCRBY() throws Exception {
+ public void testWriteUsingINCRBY() {
String key = "key_incr";
List<String> values = Arrays.asList("0", "1", "2", "-3", "2", "4", "0", "5");
List<KV<String, String>> data = buildConstantKeyList(key, values);
@@ -190,7 +190,7 @@
}
@Test
- public void testWriteUsingDECRBY() throws Exception {
+ public void testWriteUsingDECRBY() {
String key = "key_decr";
List<String> values = Arrays.asList("-10", "1", "2", "-3", "2", "4", "0", "5");