[BEAM-9143] Add withOutputParallelization to RedisIO.Read/ReadAll
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index cc9649b..57617c0 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -114,6 +114,7 @@
         .setConnectionConfiguration(RedisConnectionConfiguration.create())
         .setKeyPattern("*")
         .setBatchSize(1000)
+        .setOutputParallelization(true)
         .build();
   }
 
@@ -125,6 +126,7 @@
     return new AutoValue_RedisIO_ReadAll.Builder()
         .setConnectionConfiguration(RedisConnectionConfiguration.create())
         .setBatchSize(1000)
+        .setOutputParallelization(true)
         .build();
   }
 
@@ -150,6 +152,8 @@
 
     abstract int batchSize();
 
+    abstract boolean outputParallelization();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -162,6 +166,8 @@
 
       abstract Builder setBatchSize(int batchSize);
 
+      abstract Builder setOutputParallelization(boolean outputParallelization);
+
       abstract Read build();
     }
 
@@ -201,6 +207,14 @@
       return toBuilder().setBatchSize(batchSize).build();
     }
 
+    /**
+     * Whether to reshuffle the resulting PCollection so results are distributed to all workers. The
+     * default is to parallelize and should only be changed if this is known to be unnecessary.
+     */
+    public Read withOutputParallelization(boolean outputParallelization) {
+      return toBuilder().setOutputParallelization(outputParallelization).build();
+    }
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       connectionConfiguration().populateDisplayData(builder);
@@ -216,7 +230,8 @@
           .apply(
               RedisIO.readAll()
                   .withConnectionConfiguration(connectionConfiguration())
-                  .withBatchSize(batchSize()));
+                  .withBatchSize(batchSize())
+                  .withOutputParallelization(outputParallelization()));
     }
   }
 
@@ -230,6 +245,8 @@
 
     abstract int batchSize();
 
+    abstract boolean outputParallelization();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -239,6 +256,8 @@
 
       abstract Builder setBatchSize(int batchSize);
 
+      abstract Builder setOutputParallelization(boolean outputParallelization);
+
       abstract ReadAll build();
     }
 
@@ -273,14 +292,25 @@
       return toBuilder().setBatchSize(batchSize).build();
     }
 
+    /**
+     * Whether to reshuffle the resulting PCollection so results are distributed to all workers. The
+     * default is to parallelize and should only be changed if this is known to be unnecessary.
+     */
+    public ReadAll withOutputParallelization(boolean outputParallelization) {
+      return toBuilder().setOutputParallelization(outputParallelization).build();
+    }
+
     @Override
     public PCollection<KV<String, String>> expand(PCollection<String> input) {
       checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
-
-      return input
-          .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
-          .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
-          .apply(new Reparallelize());
+      PCollection<KV<String, String>> output =
+          input
+              .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
+              .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+      if (outputParallelization()) {
+        output = output.apply(new Reparallelize());
+      }
+      return output;
     }
   }