blob: f5fa12ad7234f8c30523fcb178630595448f6645 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointManagerFactory;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamMetadata;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
public class TestStreamManager {
private static final String SYSTEM1 = "system-1";
private static final String SYSTEM2 = "system-2";
private static final String STREAM1 = "stream-1";
private static final String STREAM2 = "stream-2";
@Test
public void testCreateStreams() {
StreamSpec spec1 = new StreamSpec(STREAM1, STREAM1, SYSTEM1);
StreamSpec spec2 = new StreamSpec(STREAM2, STREAM2, SYSTEM2);
List<StreamSpec> specList = new ArrayList<>();
specList.add(spec1);
specList.add(spec2);
SystemAdmin admin1 = mock(SystemAdmin.class);
SystemAdmin admin2 = mock(SystemAdmin.class);
SystemAdmins systemAdmins = mock(SystemAdmins.class);
when(systemAdmins.getSystemAdmin(SYSTEM1)).thenReturn(admin1);
when(systemAdmins.getSystemAdmin(SYSTEM2)).thenReturn(admin2);
StreamManager manager = new StreamManager(systemAdmins);
manager.createStreams(specList);
ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
verify(admin1).createStream(captor.capture());
assertEquals(STREAM1, captor.getValue().getPhysicalName());
captor = ArgumentCaptor.forClass(StreamSpec.class);
verify(admin2).createStream(captor.capture());
assertEquals(STREAM2, captor.getValue().getPhysicalName());
}
@Test
public void testGetStreamPartitionCounts() {
SystemAdmin admin1 = mock(SystemAdmin.class);
SystemAdmins systemAdmins = mock(SystemAdmins.class);
when(systemAdmins.getSystemAdmin(SYSTEM1)).thenReturn(admin1);
Map<String, SystemStreamMetadata> map = new HashMap<>();
SystemStreamMetadata meta1 = mock(SystemStreamMetadata.class);
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitions = new HashMap<>();
partitions.put(new Partition(0), null);
when(meta1.getSystemStreamPartitionMetadata()).thenReturn(partitions);
map.put(STREAM1, meta1);
SystemStreamMetadata meta2 = mock(SystemStreamMetadata.class);
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitions2 = new HashMap<>();
partitions2.put(new Partition(0), null);
partitions2.put(new Partition(1), null);
when(meta2.getSystemStreamPartitionMetadata()).thenReturn(partitions2);
map.put(STREAM2, meta2);
when(admin1.getSystemStreamMetadata(anyObject())).thenReturn(map);
Set<String> streams = new HashSet<>();
streams.add(STREAM1);
streams.add(STREAM2);
StreamManager manager = new StreamManager(systemAdmins);
Map<String, Integer> counts = manager.getStreamPartitionCounts(SYSTEM1, streams);
assertTrue(counts.get(STREAM1).equals(1));
assertTrue(counts.get(STREAM2).equals(2));
}
private static CheckpointManager checkpointManager = mock(CheckpointManager.class);
public static final class MockCheckpointManagerFactory implements CheckpointManagerFactory {
@Override
public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) {
return checkpointManager;
}
}
@Test
public void testClearStreamsFromPreviousRun() {
SystemAdmin admin1 = mock(SystemAdmin.class);
SystemAdmin admin2 = mock(SystemAdmin.class);
SystemAdmins systemAdmins = mock(SystemAdmins.class);
when(systemAdmins.getSystemAdmin(SYSTEM1)).thenReturn(admin1);
when(systemAdmins.getSystemAdmin(SYSTEM2)).thenReturn(admin2);
String runId = "123";
Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.APP_RUN_ID, "123");
config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
config.put("streams.stream-1.samza.system", SYSTEM1);
config.put("streams.stream-1.samza.physical.name", STREAM1 + "-" + runId);
config.put("streams.stream-1.samza.intermediate", "true");
config.put("task.checkpoint.factory", MockCheckpointManagerFactory.class.getName());
config.put("stores.test-store.factory", "dummyfactory");
config.put("stores.test-store.changelog", SYSTEM2 + "." + STREAM2);
StreamManager manager = new StreamManager(systemAdmins);
manager.clearStreamsFromPreviousRun(new MapConfig(config));
ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
verify(admin1).clearStream(captor.capture());
assertEquals(captor.getValue().getPhysicalName(), STREAM1 + "-" + runId);
captor = ArgumentCaptor.forClass(StreamSpec.class);
verify(admin2).clearStream(captor.capture());
assertEquals(captor.getValue().getPhysicalName(), STREAM2 + "-" + runId);
verify(checkpointManager, times(1)).clearCheckpoints();
}
}