| /* |
| * Copyright (C) 2015 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package com.google.cloud.dataflow.sdk.runners.dataflow; |
| |
| import static com.google.api.client.util.Base64.decodeBase64; |
| import static com.google.cloud.dataflow.sdk.runners.worker.ReaderTestUtils.splitRequestAtFraction; |
| import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.dictionaryToCloudSource; |
| import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.readerProgressToCloudProgress; |
| import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.readFromSource; |
| import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; |
| import static com.google.cloud.dataflow.sdk.util.SerializableUtils.deserializeFromByteArray; |
| import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary; |
| import static com.google.cloud.dataflow.sdk.util.Structs.getObject; |
| import static com.google.cloud.dataflow.sdk.util.Structs.getStrings; |
| import static com.google.cloud.dataflow.sdk.util.WindowedValue.valueInGlobalWindow; |
| import static com.google.common.base.Throwables.getStackTraceAsString; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.allOf; |
| import static org.hamcrest.Matchers.contains; |
| import static org.hamcrest.Matchers.containsString; |
| import static org.hamcrest.Matchers.lessThanOrEqualTo; |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; |
| |
| import com.google.api.services.dataflow.model.DataflowPackage; |
| import com.google.api.services.dataflow.model.DerivedSource; |
| import com.google.api.services.dataflow.model.Job; |
| import com.google.api.services.dataflow.model.SourceOperationRequest; |
| import com.google.api.services.dataflow.model.SourceOperationResponse; |
| import com.google.api.services.dataflow.model.SourceSplitRequest; |
| import com.google.api.services.dataflow.model.SourceSplitResponse; |
| import com.google.api.services.dataflow.model.Step; |
| import com.google.cloud.dataflow.sdk.Pipeline; |
| import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; |
| import com.google.cloud.dataflow.sdk.coders.Coder; |
| import com.google.cloud.dataflow.sdk.coders.KvCoder; |
| import com.google.cloud.dataflow.sdk.coders.VarIntCoder; |
| import com.google.cloud.dataflow.sdk.io.BoundedSource; |
| import com.google.cloud.dataflow.sdk.io.Read; |
| import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; |
| import com.google.cloud.dataflow.sdk.options.PipelineOptions; |
| import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; |
| import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator; |
| import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory; |
| import com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.ReaderCacheEntry; |
| import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext; |
| import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill; |
| import com.google.cloud.dataflow.sdk.testing.DataflowAssert; |
| import com.google.cloud.dataflow.sdk.testing.TestPipeline; |
| import com.google.cloud.dataflow.sdk.transforms.Sample; |
| import com.google.cloud.dataflow.sdk.transforms.Sum; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; |
| import com.google.cloud.dataflow.sdk.transforms.windowing.Window; |
| import com.google.cloud.dataflow.sdk.util.CloudObject; |
| import com.google.cloud.dataflow.sdk.util.CloudSourceUtils; |
| import com.google.cloud.dataflow.sdk.util.PropertyNames; |
| import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; |
| import com.google.cloud.dataflow.sdk.util.WindowedValue; |
| import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader; |
| import com.google.cloud.dataflow.sdk.values.KV; |
| import com.google.cloud.dataflow.sdk.values.PCollection; |
| import com.google.common.base.Preconditions; |
| import com.google.protobuf.ByteString; |
| |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Tests for {@code BasicSerializableSourceFormat}. |
| */ |
| @RunWith(JUnit4.class) |
| public class CustomSourcesTest { |
| @Rule |
| public ExpectedException expectedException = ExpectedException.none(); |
| |
| static class TestIO { |
| public static Read fromRange(int from, int to) { |
| return new Read(from, to, false); |
| } |
| |
| static class Read extends BoundedSource<Integer> { |
| final int from; |
| final int to; |
| final boolean produceTimestamps; |
| |
| Read(int from, int to, boolean produceTimestamps) { |
| this.from = from; |
| this.to = to; |
| this.produceTimestamps = produceTimestamps; |
| } |
| |
| public Read withTimestampsMillis() { |
| return new Read(from, to, true); |
| } |
| |
| @Override |
| public List<Read> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) |
| throws Exception { |
| List<Read> res = new ArrayList<>(); |
| DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); |
| float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers(); |
| for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) { |
| res.add(new Read( |
| Math.round(from + i * step), Math.round(from + (i + 1) * step), |
| produceTimestamps)); |
| } |
| return res; |
| } |
| |
| @Override |
| public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { |
| return 8 * (to - from); |
| } |
| |
| @Override |
| public boolean producesSortedKeys(PipelineOptions options) throws Exception { |
| return true; |
| } |
| |
| @Override |
| public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { |
| return new RangeReader(this); |
| } |
| |
| @Override |
| public void validate() {} |
| |
| @Override |
| public String toString() { |
| return "[" + from + ", " + to + ")"; |
| } |
| |
| @Override |
| public Coder<Integer> getDefaultOutputCoder() { |
| return BigEndianIntegerCoder.of(); |
| } |
| |
| private static class RangeReader extends BoundedReader<Integer> { |
| // To verify that BasicSerializableSourceFormat calls our methods according to protocol. |
| enum State { |
| UNSTARTED, |
| STARTED, |
| FINISHED |
| } |
| private Read source; |
| private int current = -1; |
| private State state = State.UNSTARTED; |
| |
| public RangeReader(Read source) { |
| this.source = source; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| Preconditions.checkState(state == State.UNSTARTED); |
| state = State.STARTED; |
| current = source.from; |
| return (current < source.to); |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| Preconditions.checkState(state == State.STARTED); |
| if (current == source.to - 1) { |
| state = State.FINISHED; |
| return false; |
| } |
| current++; |
| return true; |
| } |
| |
| @Override |
| public Integer getCurrent() { |
| Preconditions.checkState(state == State.STARTED); |
| return current; |
| } |
| |
| @Override |
| public Instant getCurrentTimestamp() { |
| return source.produceTimestamps |
| ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| Preconditions.checkState(state == State.STARTED || state == State.FINISHED); |
| state = State.FINISHED; |
| } |
| |
| @Override |
| public Read getCurrentSource() { |
| return source; |
| } |
| |
| @Override |
| public Read splitAtFraction(double fraction) { |
| int proposedIndex = (int) (source.from + fraction * (source.to - source.from)); |
| if (proposedIndex <= current) { |
| return null; |
| } |
| Read primary = new Read(source.from, proposedIndex, source.produceTimestamps); |
| Read residual = new Read(proposedIndex, source.to, source.produceTimestamps); |
| this.source = primary; |
| return residual; |
| } |
| |
| @Override |
| public Double getFractionConsumed() { |
| return (current == -1) |
| ? 0.0 |
| : (1.0 * (1 + current - source.from) / (source.to - source.from)); |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testSplitAndReadBundlesBack() throws Exception { |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| options.setNumWorkers(5); |
| com.google.api.services.dataflow.model.Source source = |
| translateIOToCloudSource(TestIO.fromRange(10, 20), options); |
| List<WindowedValue<Integer>> elems = CloudSourceUtils.readElemsFromSource(options, source); |
| assertEquals(10, elems.size()); |
| for (int i = 0; i < 10; ++i) { |
| assertEquals(valueInGlobalWindow(10 + i), elems.get(i)); |
| } |
| SourceSplitResponse response = performSplit(source, options); |
| assertEquals("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED", response.getOutcome()); |
| List<DerivedSource> bundles = response.getBundles(); |
| assertEquals(5, bundles.size()); |
| for (int i = 0; i < 5; ++i) { |
| DerivedSource bundle = bundles.get(i); |
| assertEquals("SOURCE_DERIVATION_MODE_INDEPENDENT", bundle.getDerivationMode()); |
| com.google.api.services.dataflow.model.Source bundleSource = bundle.getSource(); |
| assertTrue(bundleSource.getDoesNotNeedSplitting()); |
| bundleSource.setCodec(source.getCodec()); |
| List<WindowedValue<Integer>> xs = CloudSourceUtils.readElemsFromSource(options, bundleSource); |
| assertThat(xs, contains(valueInGlobalWindow(10 + 2 * i), valueInGlobalWindow(11 + 2 * i))); |
| } |
| } |
| |
| @Test |
| public void testDirectPipelineWithoutTimestamps() throws Exception { |
| Pipeline p = TestPipeline.create(); |
| PCollection<Integer> sum = p |
| .apply(Read.from(TestIO.fromRange(10, 20))) |
| .apply(Sum.integersGlobally()) |
| .apply(Sample.<Integer>any(1)); |
| |
| DataflowAssert.thatSingleton(sum).isEqualTo(145); |
| p.run(); |
| } |
| |
| @Test |
| public void testDirectPipelineWithTimestamps() throws Exception { |
| Pipeline p = TestPipeline.create(); |
| PCollection<Integer> sums = |
| p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis())) |
| .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(3)))) |
| .apply(Sum.integersGlobally().withoutDefaults()); |
| // Should group into [10 11] [12 13 14] [15 16 17] [18 19]. |
| DataflowAssert.that(sums).containsInAnyOrder(21, 37, 39, 48); |
| p.run(); |
| } |
| |
| @Test |
| public void testRangeProgressAndSplitAtFraction() throws Exception { |
| // Show basic usage of getFractionConsumed and splitAtFraction. |
| // This test only tests TestIO itself, not BasicSerializableSourceFormat. |
| |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| TestIO.Read source = TestIO.fromRange(10, 20); |
| try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) { |
| assertEquals(0, reader.getFractionConsumed().intValue()); |
| assertTrue(reader.start()); |
| assertEquals(0.1, reader.getFractionConsumed(), 1e-6); |
| assertTrue(reader.advance()); |
| assertEquals(0.2, reader.getFractionConsumed(), 1e-6); |
| // Already past 0.0 and 0.1. |
| assertNull(reader.splitAtFraction(0.0)); |
| assertNull(reader.splitAtFraction(0.1)); |
| |
| { |
| TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5); |
| assertNotNull(residual); |
| TestIO.Read primary = (TestIO.Read) reader.getCurrentSource(); |
| assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14)); |
| assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19)); |
| } |
| |
| // Range is now [10, 15) and we are at 12. |
| { |
| TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14. |
| assertNotNull(residual); |
| TestIO.Read primary = (TestIO.Read) reader.getCurrentSource(); |
| assertThat(readFromSource(primary, options), contains(10, 11, 12, 13)); |
| assertThat(readFromSource(residual, options), contains(14)); |
| } |
| |
| assertTrue(reader.advance()); |
| assertEquals(12, reader.getCurrent().intValue()); |
| assertTrue(reader.advance()); |
| assertEquals(13, reader.getCurrent().intValue()); |
| assertFalse(reader.advance()); |
| } |
| } |
| |
| @Test |
| @SuppressWarnings("unchecked") |
| public void testProgressAndSourceSplitTranslation() throws Exception { |
| // Same as previous test, but now using BasicSerializableSourceFormat wrappers. |
| // We know that the underlying reader behaves correctly (because of the previous test), |
| // now check that we are wrapping it correctly. |
| DataflowPipelineOptions options = PipelineOptionsFactory.create() |
| .as(DataflowPipelineOptions.class); |
| NativeReader<WindowedValue<Integer>> reader = |
| (NativeReader<WindowedValue<Integer>>) |
| ReaderFactory.Registry.defaultRegistry() |
| .create( |
| translateIOToCloudSource(TestIO.fromRange(10, 20), options), |
| options, |
| null, // executionContext |
| null, // addCounterMutator |
| null); // operationName |
| try (NativeReader.NativeReaderIterator<WindowedValue<Integer>> iterator = reader.iterator()) { |
| assertTrue(iterator.start()); |
| assertEquals( |
| 0.1, |
| readerProgressToCloudProgress(iterator.getProgress()).getFractionConsumed().doubleValue(), |
| 1e-6); |
| assertEquals(valueInGlobalWindow(10), iterator.getCurrent()); |
| assertEquals( |
| 0.1, |
| readerProgressToCloudProgress(iterator.getProgress()).getFractionConsumed().doubleValue(), |
| 1e-6); |
| assertTrue(iterator.advance()); |
| assertEquals(valueInGlobalWindow(11), iterator.getCurrent()); |
| assertEquals( |
| 0.2, |
| readerProgressToCloudProgress(iterator.getProgress()).getFractionConsumed().doubleValue(), |
| 1e-6); |
| assertTrue(iterator.advance()); |
| assertEquals(valueInGlobalWindow(12), iterator.getCurrent()); |
| |
| assertNull(iterator.requestDynamicSplit(splitRequestAtFraction(0))); |
| assertNull(iterator.requestDynamicSplit(splitRequestAtFraction(0.1f))); |
| CustomSources.BoundedSourceSplit<Integer> sourceSplit = |
| (CustomSources.BoundedSourceSplit<Integer>) |
| iterator.requestDynamicSplit(splitRequestAtFraction(0.5f)); |
| assertNotNull(sourceSplit); |
| assertThat(readFromSource(sourceSplit.primary, options), contains(10, 11, 12, 13, 14)); |
| assertThat(readFromSource(sourceSplit.residual, options), contains(15, 16, 17, 18, 19)); |
| |
| sourceSplit = |
| (CustomSources.BoundedSourceSplit<Integer>) |
| iterator.requestDynamicSplit(splitRequestAtFraction(0.8f)); |
| assertNotNull(sourceSplit); |
| assertThat(readFromSource(sourceSplit.primary, options), contains(10, 11, 12, 13)); |
| assertThat(readFromSource(sourceSplit.residual, options), contains(14)); |
| |
| assertTrue(iterator.advance()); |
| assertEquals(valueInGlobalWindow(13), iterator.getCurrent()); |
| assertFalse(iterator.advance()); |
| } |
| } |
| |
| /** |
| * A source that cannot do anything. Intended to be overridden for testing of individual methods. |
| */ |
| private static class MockSource extends BoundedSource<Integer> { |
| @Override |
| public List<? extends BoundedSource<Integer>> splitIntoBundles( |
| long desiredBundleSizeBytes, PipelineOptions options) throws Exception { |
| return Arrays.asList(this); |
| } |
| |
| @Override |
| public void validate() { } |
| |
| @Override |
| public boolean producesSortedKeys(PipelineOptions options) { |
| return false; |
| } |
| |
| @Override |
| public long getEstimatedSizeBytes(PipelineOptions options) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public String toString() { |
| return "<unknown>"; |
| } |
| |
| @Override |
| public Coder<Integer> getDefaultOutputCoder() { |
| return BigEndianIntegerCoder.of(); |
| } |
| } |
| |
| private static class SourceProducingInvalidSplits extends MockSource { |
| private String description; |
| private String errorMessage; |
| |
| private SourceProducingInvalidSplits(String description, String errorMessage) { |
| this.description = description; |
| this.errorMessage = errorMessage; |
| } |
| |
| @Override |
| public List<? extends BoundedSource<Integer>> splitIntoBundles( |
| long desiredBundleSizeBytes, PipelineOptions options) throws Exception { |
| Preconditions.checkState(errorMessage == null, "Unexpected invalid source"); |
| return Arrays.asList( |
| new SourceProducingInvalidSplits("goodBundle", null), |
| new SourceProducingInvalidSplits("badBundle", "intentionally invalid")); |
| } |
| |
| @Override |
| public void validate() { |
| Preconditions.checkState(errorMessage == null, errorMessage); |
| } |
| |
| @Override |
| public String toString() { |
| return description; |
| } |
| } |
| |
| @Test |
| public void testSplittingProducedInvalidSource() throws Exception { |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| com.google.api.services.dataflow.model.Source cloudSource = |
| translateIOToCloudSource(new SourceProducingInvalidSplits("original", null), options); |
| |
| expectedException.expect(IllegalArgumentException.class); |
| expectedException.expectMessage(allOf( |
| containsString("Splitting a valid source produced an invalid bundle"), |
| containsString("original"), |
| containsString("badBundle"))); |
| expectedException.expectCause(hasMessage(containsString("intentionally invalid"))); |
| performSplit(cloudSource, options); |
| } |
| |
| private static class FailingReader extends BoundedSource.BoundedReader<Integer> { |
| private BoundedSource<Integer> source; |
| |
| private FailingReader(BoundedSource<Integer> source) { |
| this.source = source; |
| } |
| |
| @Override |
| public BoundedSource<Integer> getCurrentSource() { |
| return source; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| throw new IOException("Intentional error"); |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| throw new IllegalStateException("Should have failed in start()"); |
| } |
| |
| @Override |
| public Integer getCurrent() throws NoSuchElementException { |
| throw new IllegalStateException("Should have failed in start()"); |
| } |
| |
| @Override |
| public Instant getCurrentTimestamp() throws NoSuchElementException { |
| throw new IllegalStateException("Should have failed in start()"); |
| } |
| |
| @Override |
| public void close() throws IOException {} |
| |
| @Override |
| public Double getFractionConsumed() { |
| return null; |
| } |
| |
| @Override |
| public BoundedSource<Integer> splitAtFraction(double fraction) { |
| return null; |
| } |
| } |
| |
| private static class SourceProducingFailingReader extends MockSource { |
| @Override |
| public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { |
| return new FailingReader(this); |
| } |
| |
| @Override |
| public String toString() { |
| return "Some description"; |
| } |
| } |
| |
| @Test |
| public void testFailureToStartReadingIncludesSourceDetails() throws Exception { |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| com.google.api.services.dataflow.model.Source source = |
| translateIOToCloudSource(new SourceProducingFailingReader(), options); |
| // Unfortunately Hamcrest doesn't have a matcher that can match on the exception's |
| // printStackTrace(), however we just want to verify that the error and source description |
| // would be contained in the exception *somewhere*, not necessarily in the top-level |
| // Exception object. So instead we use Throwables.getStackTraceAsString and match on that. |
| try { |
| CloudSourceUtils.readElemsFromSource(options, source); |
| fail("Expected to fail"); |
| } catch (Exception e) { |
| assertThat( |
| getStackTraceAsString(e), |
| allOf(containsString("Intentional error"), containsString("Some description"))); |
| } |
| } |
| |
| private static com.google.api.services.dataflow.model.Source translateIOToCloudSource( |
| BoundedSource<?> io, DataflowPipelineOptions options) throws Exception { |
| DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); |
| Pipeline p = Pipeline.create(options); |
| p.begin().apply(Read.from(io)); |
| |
| Job workflow = translator.translate(p, new ArrayList<DataflowPackage>()).getJob(); |
| Step step = workflow.getSteps().get(0); |
| |
| return stepToCloudSource(step); |
| } |
| |
| private static com.google.api.services.dataflow.model.Source stepToCloudSource(Step step) |
| throws Exception { |
| com.google.api.services.dataflow.model.Source res = dictionaryToCloudSource( |
| getDictionary(step.getProperties(), PropertyNames.SOURCE_STEP_INPUT)); |
| // Encoding is specified in the step, not in the source itself. This is |
| // normal: incoming Dataflow API Source objects in map tasks will have the |
| // encoding filled in from the step's output encoding. |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> outputInfo = |
| (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO); |
| |
| CloudObject encoding = CloudObject.fromSpec(getObject(outputInfo.get(0), |
| PropertyNames.ENCODING)); |
| res.setCodec(encoding); |
| return res; |
| } |
| |
| private static SourceSplitResponse performSplit( |
| com.google.api.services.dataflow.model.Source source, PipelineOptions options) |
| throws Exception { |
| SourceSplitRequest splitRequest = new SourceSplitRequest(); |
| splitRequest.setSource(source); |
| SourceOperationRequest request = new SourceOperationRequest(); |
| request.setSplit(splitRequest); |
| SourceOperationResponse response = CustomSources.performSourceOperation(request, options); |
| return response.getSplit(); |
| } |
| |
| @Test |
| public void testUnboundedSplits() throws Exception { |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| com.google.api.services.dataflow.model.Source source = |
| CustomSources.serializeToCloudSource( |
| new CountingSource(Integer.MAX_VALUE), options); |
| List<String> serializedSplits = |
| getStrings(source.getSpec(), CustomSources.SERIALIZED_SOURCE_SPLITS, null); |
| assertEquals(20, serializedSplits.size()); |
| for (String serializedSplit : serializedSplits) { |
| assertTrue( |
| deserializeFromByteArray(decodeBase64(serializedSplit), "source") |
| instanceof CountingSource); |
| } |
| } |
| |
| @Test |
| public void testReadUnboundedReader() throws Exception { |
| StreamingModeExecutionContext context = new StreamingModeExecutionContext("stageName", |
| new ConcurrentHashMap<ByteString, ReaderCacheEntry>(), /*stateNameMap=*/null, |
| /*stateCache=*/null); |
| |
| DataflowPipelineOptions options = |
| PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); |
| options.setNumWorkers(5); |
| |
| ByteString state = ByteString.EMPTY; |
| for (int i = 0; i < 10 * CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE; |
| /* Incremented in inner loop */) { |
| WindowedValue<ValueWithRecordId<KV<Integer, Integer>>> value; |
| |
| // Initialize streaming context with state from previous iteration. |
| context.start( |
| Windmill.WorkItem.newBuilder() |
| .setKey(ByteString.copyFromUtf8("0000000000000001")) // key is zero-padded index. |
| .setWorkToken(0) // Required proto field, unused. |
| .setSourceState( |
| Windmill.SourceState.newBuilder().setState(state).build()) // Source state. |
| .build(), |
| new Instant(0), // input watermark |
| null, // output watermark |
| null, // StateReader |
| null, // StateFetcher |
| Windmill.WorkItemCommitRequest.newBuilder()); |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| NativeReader<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> reader = |
| (NativeReader) |
| CustomSources.create( |
| (CloudObject) |
| CustomSources.serializeToCloudSource( |
| new CountingSource(Integer.MAX_VALUE), options) |
| .getSpec(), |
| options, |
| context); |
| |
| NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> |
| iterator = reader.iterator(); |
| |
| // Verify data. |
| Instant beforeReading = Instant.now(); |
| int numReadOnThisIteration = 0; |
| for (boolean more = iterator.start(); more; more = iterator.advance()) { |
| value = iterator.getCurrent(); |
| assertEquals(KV.of(0, i), value.getValue().getValue()); |
| assertArrayEquals( |
| encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), KV.of(0, i)), |
| value.getValue().getId()); |
| assertThat(value.getWindows(), contains((BoundedWindow) GlobalWindow.INSTANCE)); |
| assertEquals(i, value.getTimestamp().getMillis()); |
| i++; |
| numReadOnThisIteration++; |
| } |
| Instant afterReading = Instant.now(); |
| assertThat( |
| new Duration(beforeReading, afterReading).getStandardSeconds(), |
| lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_READ_TIME.getStandardSeconds() + 1)); |
| assertThat( |
| numReadOnThisIteration, lessThanOrEqualTo(CustomSources.MAX_UNBOUNDED_BUNDLE_SIZE)); |
| |
| // Extract and verify state modifications. |
| context.flushState(); |
| state = context.getOutputBuilder().getSourceStateUpdates().getState(); |
| // CountingSource's watermark is the last record + 1. i is now one past the last record, |
| // so the expected watermark is i millis. |
| assertEquals( |
| TimeUnit.MILLISECONDS.toMicros(i), context.getOutputBuilder().getSourceWatermark()); |
| assertEquals( |
| 1, |
| context |
| .getOutputBuilder() |
| .getSourceStateUpdates() |
| .getFinalizeIdsList() |
| .size()); |
| |
| assertNotNull(context.getCachedReader()); |
| |
| Windmill.Counter backlog = getCounter(context, "dataflow_backlog_size-stageName"); |
| assertEquals(7L, backlog.getIntScalar()); |
| assertTrue(backlog.getCumulative()); |
| assertEquals(Windmill.Counter.Kind.SUM, backlog.getKind()); |
| } |
| } |
| |
| private Windmill.Counter getCounter(StreamingModeExecutionContext context, String name) { |
| for (Windmill.Counter counter : context.getOutputBuilder().getCounterUpdatesList()) { |
| if (counter.getName().equals(name)) { |
| return counter; |
| } |
| } |
| return null; |
| } |
| } |