blob: 2b41ed2bac58006a1ddfacd6c9363ad155d92979 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators.impl;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.WatermarkMessage;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST;
public class TestWatermarkStates {
@Test
public void testUpdate() {
SystemStream input = new SystemStream("system", "input");
SystemStream intermediate = new SystemStream("system", "intermediate");
Set<SystemStreamPartition> ssps = new HashSet<>();
SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0));
SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0));
SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1));
ssps.add(inputPartition0);
ssps.add(intPartition0);
ssps.add(intPartition1);
Map<SystemStream, Integer> producerCounts = new HashMap<>();
producerCounts.put(intermediate, 2);
// advance watermark on input to 5
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap());
IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
watermarkStates.update((WatermarkMessage) envelope.getMessage(),
envelope.getSystemStreamPartition());
assertEquals(watermarkStates.getWatermark(input), 5L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
// watermark from task 0 on int p0 to 6
WatermarkMessage watermarkMessage = new WatermarkMessage(6L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
// watermark from task 1 on int p0 to 3
watermarkMessage = new WatermarkMessage(3L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
// watermark from task 0 on int p1 to 10
watermarkMessage = new WatermarkMessage(10L, "task 0");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
// watermark from task 1 on int p1 to 4
watermarkMessage = new WatermarkMessage(4L, "task 1");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 4L);
// verify we got a watermark 3 (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 3L);
// advance watermark from task 1 on int p0 to 8
watermarkMessage = new WatermarkMessage(8L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 6L);
// verify we got a watermark 4 (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 4L);
// advance watermark from task 1 on int p1 to 7
watermarkMessage = new WatermarkMessage(7L, "task 1");
watermarkStates.update(watermarkMessage, intPartition1);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 7L);
// verify we got a watermark 6 (min) for int stream
assertEquals(watermarkStates.getWatermark(intermediate), 6L);
}
}