blob: 6da2c8f403495d529ff33893a110b4a8d028b327 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.coordinator.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.Delete;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCoordinatorStreamSystemConsumer {
@Test
public void testCoordinatorStreamSystemConsumer() {
Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
expectedConfig.put("job.id", "1234");
SystemStream systemStream = new SystemStream("system", "stream");
MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
assertEquals(0, systemConsumer.getRegisterCount());
consumer.register();
assertEquals(1, systemConsumer.getRegisterCount());
assertFalse(systemConsumer.isStarted());
consumer.start();
assertTrue(systemConsumer.isStarted());
try {
consumer.getConfig();
fail("Should have failed when retrieving config before bootstrapping.");
} catch (SamzaException e) {
// Expected.
}
consumer.bootstrap();
assertEquals(expectedConfig, consumer.getConfig());
assertFalse(systemConsumer.isStopped());
consumer.stop();
assertTrue(systemConsumer.isStopped());
}
@Test
public void testCoordinatorStreamSystemConsumerRegisterOnceOnly() throws Exception {
Map<String, String> expectedConfig = new LinkedHashMap<String, String>();
expectedConfig.put("job.id", "1234");
SystemStream systemStream = new SystemStream("system", "stream");
MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
assertEquals(0, systemConsumer.getRegisterCount());
consumer.register();
assertEquals(1, systemConsumer.getRegisterCount());
assertFalse(systemConsumer.isStarted());
consumer.start();
assertTrue(systemConsumer.isStarted());
consumer.register();
assertEquals(1, systemConsumer.getRegisterCount());
}
/**
* Verify that if a particular key-value is written, then another, then the original again,
* that the original occurs last in the set.
*/
@Test
public void testOrderKeyRewrite() throws InterruptedException {
final SystemStream systemStream = new SystemStream("system", "stream");
final SystemStreamPartition ssp = new SystemStreamPartition(systemStream, new Partition(0));
final SystemConsumer systemConsumer = mock(SystemConsumer.class);
final List<IncomingMessageEnvelope> list = new ArrayList<>();
SetConfig setConfig1 = new SetConfig("source", "key1", "value1");
SetConfig setConfig2 = new SetConfig("source", "key1", "value2");
SetConfig setConfig3 = new SetConfig("source", "key1", "value1");
list.add(createIncomingMessageEnvelope(setConfig1, ssp));
list.add(createIncomingMessageEnvelope(setConfig2, ssp));
list.add(createIncomingMessageEnvelope(setConfig3, ssp));
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>() {
{
put(ssp, list);
}
};
when(systemConsumer.poll(anySet(), anyLong())).thenReturn(messages, Collections.<SystemStreamPartition, List<IncomingMessageEnvelope>>emptyMap());
CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
consumer.bootstrap();
Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBootstrappedStream(SetConfig.TYPE);
assertEquals(2, bootstrappedMessages.size()); // First message should have been removed as a duplicate
CoordinatorStreamMessage[] coordinatorStreamMessages = bootstrappedMessages.toArray(new CoordinatorStreamMessage[2]);
assertEquals(setConfig2, coordinatorStreamMessages[0]);
assertEquals(setConfig3, coordinatorStreamMessages[1]); //Config 3 MUST be the last message, not config 2
}
private static class MockSystemConsumer implements SystemConsumer {
private boolean started = false;
private boolean stopped = false;
private int registerCount = 0;
private final SystemStreamPartition expectedSystemStreamPartition;
private int pollCount = 0;
public MockSystemConsumer(SystemStreamPartition expectedSystemStreamPartition) {
this.expectedSystemStreamPartition = expectedSystemStreamPartition;
}
public void start() {
started = true;
}
public void stop() {
stopped = true;
}
public void register(SystemStreamPartition systemStreamPartition, String offset) {
registerCount++;
assertEquals(expectedSystemStreamPartition, systemStreamPartition);
}
public int getRegisterCount() {
return registerCount;
}
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
assertEquals(1, systemStreamPartitions.size());
SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
assertEquals(expectedSystemStreamPartition, systemStreamPartition);
if (pollCount++ == 0) {
List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
map.put(systemStreamPartition, list);
}
return map;
}
private byte[] serialize(Object obj) {
try {
return SamzaObjectMapper.getObjectMapper().writeValueAsString(obj).getBytes("UTF-8");
} catch (Exception e) {
throw new SamzaException(e);
}
}
public boolean isStarted() {
return started;
}
public boolean isStopped() {
return stopped;
}
}
private IncomingMessageEnvelope createIncomingMessageEnvelope(CoordinatorStreamMessage message, SystemStreamPartition ssp) {
try {
byte[] key = SamzaObjectMapper.getObjectMapper().writeValueAsString(message.getKeyArray()).getBytes("UTF-8");
byte[] value = SamzaObjectMapper.getObjectMapper().writeValueAsString(message.getMessageMap()).getBytes("UTF-8");
return new IncomingMessageEnvelope(ssp, null, key, value);
} catch (Exception e) {
return null;
}
}
}