[BEAM-9143] Add withOutputParallelization to RedisIO.Read/ReadAll
diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index cc9649b..57617c0 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -114,6 +114,7 @@
.setConnectionConfiguration(RedisConnectionConfiguration.create())
.setKeyPattern("*")
.setBatchSize(1000)
+ .setOutputParallelization(true)
.build();
}
@@ -125,6 +126,7 @@
return new AutoValue_RedisIO_ReadAll.Builder()
.setConnectionConfiguration(RedisConnectionConfiguration.create())
.setBatchSize(1000)
+ .setOutputParallelization(true)
.build();
}
@@ -150,6 +152,8 @@
abstract int batchSize();
+ abstract boolean outputParallelization();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -162,6 +166,8 @@
abstract Builder setBatchSize(int batchSize);
+ abstract Builder setOutputParallelization(boolean outputParallelization);
+
abstract Read build();
}
@@ -201,6 +207,14 @@
return toBuilder().setBatchSize(batchSize).build();
}
+ /**
+ * Whether to reshuffle the resulting PCollection so results are distributed to all workers. The
+ * default is to parallelize and should only be changed if this is known to be unnecessary.
+ */
+ public Read withOutputParallelization(boolean outputParallelization) {
+ return toBuilder().setOutputParallelization(outputParallelization).build();
+ }
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
@@ -216,7 +230,8 @@
.apply(
RedisIO.readAll()
.withConnectionConfiguration(connectionConfiguration())
- .withBatchSize(batchSize()));
+ .withBatchSize(batchSize())
+ .withOutputParallelization(outputParallelization()));
}
}
@@ -230,6 +245,8 @@
abstract int batchSize();
+ abstract boolean outputParallelization();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -239,6 +256,8 @@
abstract Builder setBatchSize(int batchSize);
+ abstract Builder setOutputParallelization(boolean outputParallelization);
+
abstract ReadAll build();
}
@@ -273,14 +292,25 @@
return toBuilder().setBatchSize(batchSize).build();
}
+ /**
+ * Whether to reshuffle the resulting PCollection so results are distributed to all workers. The
+ * default is to parallelize and should only be changed if this is known to be unnecessary.
+ */
+ public ReadAll withOutputParallelization(boolean outputParallelization) {
+ return toBuilder().setOutputParallelization(outputParallelization).build();
+ }
+
@Override
public PCollection<KV<String, String>> expand(PCollection<String> input) {
checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required");
-
- return input
- .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
- .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
- .apply(new Reparallelize());
+ PCollection<KV<String, String>> output =
+ input
+ .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+ if (outputParallelization()) {
+ output = output.apply(new Reparallelize());
+ }
+ return output;
}
}