blob: 409c2dbcb20bb8a3e5edd23e30ab20840b39f49d [file] [log] [blame]
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.dangdang.ddframe.job.internal.sharding;
import com.dangdang.ddframe.job.api.JobConfiguration;
import com.dangdang.ddframe.job.fixture.TestJob;
import com.dangdang.ddframe.job.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.internal.election.LeaderElectionService;
import com.dangdang.ddframe.job.internal.env.LocalHostService;
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.job.internal.storage.TransactionExecutionCallback;
import com.dangdang.ddframe.job.plugin.sharding.strategy.AverageAllocationJobShardingStrategy;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.api.transaction.TransactionCheckBuilder;
import org.apache.curator.framework.api.transaction.TransactionCreateBuilder;
import org.apache.curator.framework.api.transaction.TransactionDeleteBuilder;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.unitils.util.ReflectionUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public final class ShardingServiceTest {
@Mock
private JobNodeStorage jobNodeStorage;
@Mock
private LocalHostService localHostService;
@Mock
private LeaderElectionService leaderElectionService;
@Mock
private ConfigurationService configService;
@Mock
private ExecutionService executionService;
@Mock
private ServerService serverService;
private final JobConfiguration jobConfig = new JobConfiguration("testJob", TestJob.class, 3, "0/1 * * * * ?");
private final ShardingService shardingService = new ShardingService(null, jobConfig);
@Before
public void setUp() throws NoSuchFieldException {
MockitoAnnotations.initMocks(this);
ReflectionUtils.setFieldValue(shardingService, "jobNodeStorage", jobNodeStorage);
ReflectionUtils.setFieldValue(shardingService, "localHostService", localHostService);
ReflectionUtils.setFieldValue(shardingService, "leaderElectionService", leaderElectionService);
ReflectionUtils.setFieldValue(shardingService, "configService", configService);
ReflectionUtils.setFieldValue(shardingService, "executionService", executionService);
ReflectionUtils.setFieldValue(shardingService, "serverService", serverService);
when(localHostService.getIp()).thenReturn("mockedIP");
when(localHostService.getHostName()).thenReturn("mockedHostName");
when(jobNodeStorage.getJobConfiguration()).thenReturn(jobConfig);
}
@Test
public void assertSetReshardingFlag() {
shardingService.setReshardingFlag();
verify(jobNodeStorage).createJobNodeIfNeeded("leader/sharding/necessary");
}
@Test
public void assertIsNeedSharding() {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
assertTrue(shardingService.isNeedSharding());
verify(jobNodeStorage).isJobNodeExisted("leader/sharding/necessary");
}
@Test
public void assertShardingWhenUnnecessary() {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(false);
shardingService.shardingIfNecessary();
verify(jobNodeStorage).isJobNodeExisted("leader/sharding/necessary");
}
@Test
public void assertShardingWhenIsNotLeaderAndIsShardingProcessing() {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true, true, false, false);
when(leaderElectionService.isLeader()).thenReturn(false);
when(jobNodeStorage.isJobNodeExisted("leader/sharding/processing")).thenReturn(true, false);
shardingService.shardingIfNecessary();
verify(jobNodeStorage, times(4)).isJobNodeExisted("leader/sharding/necessary");
verify(jobNodeStorage, times(2)).isJobNodeExisted("leader/sharding/processing");
}
@Test
public void assertShardingNecessaryWhenMonitorExecutionEnabled() {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
when(leaderElectionService.isLeader()).thenReturn(true);
when(configService.isMonitorExecution()).thenReturn(true);
when(serverService.getAllServers()).thenReturn(Arrays.asList("ip1", "ip2"));
when(executionService.hasRunningItems()).thenReturn(true, false);
when(configService.getJobShardingStrategyClass()).thenReturn(AverageAllocationJobShardingStrategy.class.getCanonicalName());
when(configService.getShardingTotalCount()).thenReturn(3);
when(configService.getShardingItemParameters()).thenReturn(Collections.<Integer, String>emptyMap());
shardingService.shardingIfNecessary();
verify(jobNodeStorage).isJobNodeExisted("leader/sharding/necessary");
verify(leaderElectionService).isLeader();
verify(configService).isMonitorExecution();
verify(executionService, times(2)).hasRunningItems();
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip1/sharding");
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip2/sharding");
verify(jobNodeStorage).fillEphemeralJobNode("leader/sharding/processing", "");
verify(configService).getJobShardingStrategyClass();
verify(configService).getShardingTotalCount();
verify(configService).getShardingItemParameters();
verify(jobNodeStorage).executeInTransaction(any(TransactionExecutionCallback.class));
}
@Test
public void assertShardingNecessaryWhenMonitorExecutionDisabled() throws Exception {
when(jobNodeStorage.isJobNodeExisted("leader/sharding/necessary")).thenReturn(true);
when(leaderElectionService.isLeader()).thenReturn(true);
when(configService.isMonitorExecution()).thenReturn(false);
when(serverService.getAllServers()).thenReturn(Arrays.asList("ip1", "ip2"));
when(configService.getJobShardingStrategyClass()).thenReturn(AverageAllocationJobShardingStrategy.class.getCanonicalName());
when(configService.getShardingTotalCount()).thenReturn(3);
when(configService.getShardingItemParameters()).thenReturn(Collections.<Integer, String>emptyMap());
shardingService.shardingIfNecessary();
verify(jobNodeStorage).isJobNodeExisted("leader/sharding/necessary");
verify(leaderElectionService).isLeader();
verify(configService).isMonitorExecution();
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip1/sharding");
verify(jobNodeStorage).removeJobNodeIfExisted("servers/ip2/sharding");
verify(jobNodeStorage).fillEphemeralJobNode("leader/sharding/processing", "");
verify(configService).getJobShardingStrategyClass();
verify(configService).getShardingTotalCount();
verify(configService).getShardingItemParameters();
verify(jobNodeStorage).executeInTransaction(any(TransactionExecutionCallback.class));
}
@Test
public void assertGetLocalHostShardingItemsWhenNodeExisted() {
when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/sharding")).thenReturn(true);
when(jobNodeStorage.getJobNodeDataDirectly("servers/mockedIP/sharding")).thenReturn("0,1,2");
assertThat(shardingService.getLocalHostShardingItems(), is(Arrays.asList(0, 1, 2)));
verify(jobNodeStorage).isJobNodeExisted("servers/mockedIP/sharding");
verify(jobNodeStorage).getJobNodeDataDirectly("servers/mockedIP/sharding");
}
@Test
public void assertGetLocalHostShardingWhenNodeNotExisted() {
when(jobNodeStorage.isJobNodeExisted("servers/mockedIP/sharding")).thenReturn(false);
assertThat(shardingService.getLocalHostShardingItems(), is(Collections.EMPTY_LIST));
verify(jobNodeStorage).isJobNodeExisted("servers/mockedIP/sharding");
}
@Test
public void assertPersistShardingInfoTransactionExecutionCallback() throws Exception {
CuratorTransactionFinal curatorTransactionFinal = mock(CuratorTransactionFinal.class);
TransactionCreateBuilder transactionCreateBuilder = mock(TransactionCreateBuilder.class);
TransactionDeleteBuilder transactionDeleteBuilder = mock(TransactionDeleteBuilder.class);
CuratorTransactionBridge curatorTransactionBridge = mock(CuratorTransactionBridge.class);
when(curatorTransactionFinal.create()).thenReturn(transactionCreateBuilder);
when(transactionCreateBuilder.forPath("/testJob/servers/host0/sharding", "0,1,2".getBytes())).thenReturn(curatorTransactionBridge);
when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder);
when(transactionDeleteBuilder.forPath("/testJob/leader/sharding/necessary")).thenReturn(curatorTransactionBridge);
when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
when(curatorTransactionFinal.delete()).thenReturn(transactionDeleteBuilder);
when(transactionDeleteBuilder.forPath("/testJob/leader/sharding/processing")).thenReturn(curatorTransactionBridge);
when(curatorTransactionBridge.and()).thenReturn(curatorTransactionFinal);
Map<String, List<Integer>> shardingItems = new HashMap<>(1);
shardingItems.put("host0", Arrays.asList(0, 1, 2));
ShardingService.PersistShardingInfoTransactionExecutionCallback actual = shardingService.new PersistShardingInfoTransactionExecutionCallback(shardingItems);
actual.execute(curatorTransactionFinal);
verify(curatorTransactionFinal).create();
verify(transactionCreateBuilder).forPath("/testJob/servers/host0/sharding", "0,1,2".getBytes());
verify(curatorTransactionFinal, times(2)).delete();
verify(transactionDeleteBuilder).forPath("/testJob/leader/sharding/necessary");
verify(transactionDeleteBuilder).forPath("/testJob/leader/sharding/processing");
verify(curatorTransactionBridge, times(3)).and();
}
}