[BEAM-9143] Make RedisIO code follow standard Beam conventions
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 9612487..cc9649b 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -150,7 +150,7 @@
 
     abstract int batchSize();
 
-    abstract Builder builder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
@@ -168,35 +168,37 @@
     public Read withEndpoint(String host, int port) {
       checkArgument(host != null, "host can not be null");
       checkArgument(0 < port && port < 65536, "port must be a positive integer less than 65536");
-      return builder()
+      return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
           .build();
     }
 
     public Read withAuth(String auth) {
       checkArgument(auth != null, "auth can not be null");
-      return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
     }
 
     public Read withTimeout(int timeout) {
       checkArgument(timeout >= 0, "timeout can not be negative");
-      return builder()
+      return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
           .build();
     }
 
     public Read withKeyPattern(String keyPattern) {
       checkArgument(keyPattern != null, "keyPattern can not be null");
-      return builder().setKeyPattern(keyPattern).build();
+      return toBuilder().setKeyPattern(keyPattern).build();
     }
 
     public Read withConnectionConfiguration(RedisConnectionConfiguration connection) {
       checkArgument(connection != null, "connection can not be null");
-      return builder().setConnectionConfiguration(connection).build();
+      return toBuilder().setConnectionConfiguration(connection).build();
     }
 
     public Read withBatchSize(int batchSize) {
-      return builder().setBatchSize(batchSize).build();
+      return toBuilder().setBatchSize(batchSize).build();
     }
 
     @Override
@@ -228,14 +230,14 @@
 
     abstract int batchSize();
 
-    abstract ReadAll.Builder builder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
       @Nullable
-      abstract ReadAll.Builder setConnectionConfiguration(RedisConnectionConfiguration connection);
+      abstract Builder setConnectionConfiguration(RedisConnectionConfiguration connection);
 
-      abstract ReadAll.Builder setBatchSize(int batchSize);
+      abstract Builder setBatchSize(int batchSize);
 
       abstract ReadAll build();
     }
@@ -243,30 +245,32 @@
     public ReadAll withEndpoint(String host, int port) {
       checkArgument(host != null, "host can not be null");
       checkArgument(port > 0, "port can not be negative or 0");
-      return builder()
+      return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
           .build();
     }
 
     public ReadAll withAuth(String auth) {
       checkArgument(auth != null, "auth can not be null");
-      return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
     }
 
     public ReadAll withTimeout(int timeout) {
       checkArgument(timeout >= 0, "timeout can not be negative");
-      return builder()
+      return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
           .build();
     }
 
     public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connection) {
       checkArgument(connection != null, "connection can not be null");
-      return builder().setConnectionConfiguration(connection).build();
+      return toBuilder().setConnectionConfiguration(connection).build();
     }
 
     public ReadAll withBatchSize(int batchSize) {
-      return builder().setBatchSize(batchSize).build();
+      return toBuilder().setBatchSize(batchSize).build();
     }
 
     @Override
@@ -280,7 +284,7 @@
     }
   }
 
-  abstract static class BaseReadFn<T> extends DoFn<String, T> {
+  private abstract static class BaseReadFn<T> extends DoFn<String, T> {
     protected final RedisConnectionConfiguration connectionConfiguration;
 
     transient Jedis jedis;
@@ -307,9 +311,9 @@
     }
 
     @ProcessElement
-    public void processElement(ProcessContext processContext) throws Exception {
+    public void processElement(ProcessContext c) {
       ScanParams scanParams = new ScanParams();
-      scanParams.match(processContext.element());
+      scanParams.match(c.element());
 
       String cursor = ScanParams.SCAN_POINTER_START;
       boolean finished = false;
@@ -317,7 +321,7 @@
         ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
         List<String> keys = scanResult.getResult();
         for (String k : keys) {
-          processContext.output(k);
+          c.output(k);
         }
         cursor = scanResult.getCursor();
         if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
@@ -326,42 +330,52 @@
       }
     }
   }
+
   /** A {@link DoFn} requesting Redis server to get key/value pairs. */
   private static class ReadFn extends BaseReadFn<KV<String, String>> {
     @Nullable transient Multimap<BoundedWindow, String> bundles = null;
     @Nullable AtomicInteger batchCount = null;
     private final int batchSize;
 
-    @StartBundle
-    public void startBundle(StartBundleContext context) {
-      bundles = ArrayListMultimap.create();
-      batchCount = new AtomicInteger();
-    }
-
     ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) {
       super(connectionConfiguration);
       this.batchSize = batchSize;
     }
 
-    private int getBatchSize() {
-      return batchSize;
+    @StartBundle
+    public void startBundle() {
+      bundles = ArrayListMultimap.create();
+      batchCount = new AtomicInteger();
     }
 
     @ProcessElement
-    public void processElement(ProcessContext processContext, BoundedWindow window)
-        throws Exception {
-      String key = processContext.element();
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      String key = c.element();
       bundles.put(window, key);
       if (batchCount.incrementAndGet() > getBatchSize()) {
         Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
         for (BoundedWindow w : kvs.keySet()) {
           for (KV<String, String> kv : kvs.get(w)) {
-            processContext.output(kv);
+            c.output(kv);
           }
         }
       }
     }
 
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context) {
+      Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
+      for (BoundedWindow w : kvs.keySet()) {
+        for (KV<String, String> kv : kvs.get(w)) {
+          context.output(kv, w.maxTimestamp(), w);
+        }
+      }
+    }
+
+    private int getBatchSize() {
+      return batchSize;
+    }
+
     private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
       Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create();
       for (BoundedWindow w : bundles.keySet()) {
@@ -378,16 +392,6 @@
       batchCount.set(0);
       return kvs;
     }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext context) throws Exception {
-      Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-      for (BoundedWindow w : kvs.keySet()) {
-        for (KV<String, String> kv : kvs.get(w)) {
-          context.output(kv, w.maxTimestamp(), w);
-        }
-      }
-    }
   }
 
   private static class Reparallelize
@@ -395,8 +399,7 @@
 
     @Override
     public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
-      // reparallelize mimics the same behavior as in JdbcIO
-      // breaking fusion
+      // reparallelize mimics the same behavior as in JdbcIO, used to break fusion
       PCollectionView<Iterable<KV<String, String>>> empty =
           input
               .apply("Consume", Filter.by(SerializableFunctions.constant(false)))
@@ -407,8 +410,8 @@
               ParDo.of(
                       new DoFn<KV<String, String>, KV<String, String>>() {
                         @ProcessElement
-                        public void processElement(ProcessContext context) {
-                          context.output(context.element());
+                        public void processElement(ProcessContext c) {
+                          c.output(c.element());
                         }
                       })
                   .withSideInputs(empty));
@@ -468,7 +471,7 @@
     @Nullable
     abstract Long expireTime();
 
-    abstract Builder builder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder {
@@ -486,37 +489,39 @@
     public Write withEndpoint(String host, int port) {
       checkArgument(host != null, "host can not be null");
       checkArgument(port > 0, "port can not be negative or 0");
-      return builder()
+      return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
           .build();
     }
 
     public Write withAuth(String auth) {
       checkArgument(auth != null, "auth can not be null");
-      return builder().setConnectionConfiguration(connectionConfiguration().withAuth(auth)).build();
+      return toBuilder()
+          .setConnectionConfiguration(connectionConfiguration().withAuth(auth))
+          .build();
     }
 
     public Write withTimeout(int timeout) {
       checkArgument(timeout >= 0, "timeout can not be negative");
-      return builder()
+      return toBuilder()
           .setConnectionConfiguration(connectionConfiguration().withTimeout(timeout))
           .build();
     }
 
     public Write withConnectionConfiguration(RedisConnectionConfiguration connection) {
       checkArgument(connection != null, "connection can not be null");
-      return builder().setConnectionConfiguration(connection).build();
+      return toBuilder().setConnectionConfiguration(connection).build();
     }
 
     public Write withMethod(Method method) {
       checkArgument(method != null, "method can not be null");
-      return builder().setMethod(method).build();
+      return toBuilder().setMethod(method).build();
     }
 
     public Write withExpireTime(Long expireTimeMillis) {
       checkArgument(expireTimeMillis != null, "expireTimeMillis can not be null");
       checkArgument(expireTimeMillis > 0, "expireTimeMillis can not be negative or 0");
-      return builder().setExpireTime(expireTimeMillis).build();
+      return toBuilder().setExpireTime(expireTimeMillis).build();
     }
 
     @Override
@@ -555,8 +560,8 @@
       }
 
       @ProcessElement
-      public void processElement(ProcessContext processContext) {
-        KV<String, String> record = processContext.element();
+      public void processElement(ProcessContext c) {
+        KV<String, String> record = c.element();
 
         writeRecord(record);
 
diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index bcb3fca..badf039 100644
--- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -175,7 +175,7 @@
   }
 
   @Test
-  public void testWriteUsingINCRBY() throws Exception {
+  public void testWriteUsingINCRBY() {
     String key = "key_incr";
     List<String> values = Arrays.asList("0", "1", "2", "-3", "2", "4", "0", "5");
     List<KV<String, String>> data = buildConstantKeyList(key, values);
@@ -190,7 +190,7 @@
   }
 
   @Test
-  public void testWriteUsingDECRBY() throws Exception {
+  public void testWriteUsingDECRBY() {
     String key = "key_decr";
 
     List<String> values = Arrays.asList("-10", "1", "2", "-3", "2", "4", "0", "5");