blob: 3facaf53c5362d3f22313a37707985db9d600f84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.state.instance.InstanceState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceChangedEvent;
import org.apache.shardingsphere.mode.event.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class StateChangedSubscriberTest {
private StateChangedSubscriber subscriber;
private ContextManager contextManager;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ShardingSphereDatabase database;
@BeforeEach
void setUp() throws SQLException {
contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
subscriber = new StateChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
}
private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
InstanceMetaData instanceMetaData = new ProxyInstanceMetaData("foo_instance_id", 3307);
return new ContextManagerBuilderParameter(modeConfig, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(),
new Properties(), Collections.emptyList(), instanceMetaData, false);
}
private Map<String, ShardingSphereDatabase> createDatabases() {
when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", new ShardingSphereSchema()));
when(database.getProtocolType()).thenReturn(TypedSPILoader.getService(DatabaseType.class, "FIXTURE"));
when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
when(database.getRuleMetaData().getRules()).thenReturn(new LinkedList<>());
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
return Collections.singletonMap("db", database);
}
@Test
void assertRenewForDisableStateChanged() {
StaticDataSourceRuleAttribute ruleAttribute = mock(StaticDataSourceRuleAttribute.class);
when(database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)).thenReturn(Collections.singleton(ruleAttribute));
StorageNodeChangedEvent event = new StorageNodeChangedEvent(new QualifiedDataSource("db.readwrite_ds.ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED));
subscriber.renew(event);
verify(ruleAttribute).updateStatus(argThat(
(ArgumentMatcher<StorageNodeDataSourceChangedEvent>) argumentEvent -> Objects.equals(event.getQualifiedDataSource(), argumentEvent.getQualifiedDataSource())
&& Objects.equals(event.getDataSource(), argumentEvent.getDataSource())));
}
@Test
void assertResetClusterStatus() {
ClusterLockDeletedEvent mockLockDeletedEvent = new ClusterLockDeletedEvent(ClusterState.OK);
subscriber.renew(mockLockDeletedEvent);
assertThat(contextManager.getClusterStateContext().getCurrentState(), is(ClusterState.OK));
}
@Test
void assertRenewClusterStatus() {
ClusterStateEvent mockClusterStateEvent = new ClusterStateEvent("READ_ONLY");
subscriber.renew(mockClusterStateEvent);
assertThat(contextManager.getClusterStateContext().getCurrentState(), is(ClusterState.READ_ONLY));
}
@Test
void assertRenewInstanceStatus() {
StateEvent mockStateEvent = new StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), InstanceState.OK.name());
subscriber.renew(mockStateEvent);
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(), is(InstanceState.OK));
}
@Test
void assertRenewInstanceWorkerIdEvent() {
subscriber.renew(new WorkerIdEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), 0));
assertThat(contextManager.getInstanceContext().getInstance().getWorkerId(), is(0));
}
@Test
void assertRenewInstanceLabels() {
Collection<String> labels = Collections.singleton("test");
subscriber.renew(new LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(), labels));
assertThat(contextManager.getInstanceContext().getInstance().getLabels(), is(labels));
}
@Test
void assertRenewInstanceOfflineEvent() {
subscriber.renew(new InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
assertThat(((ProxyInstanceMetaData) contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), is(3307));
}
@Test
void assertRenewInstanceOnlineEvent() {
InstanceMetaData instanceMetaData1 = new ProxyInstanceMetaData("foo_instance_3307", 3307);
InstanceOnlineEvent instanceOnlineEvent1 = new InstanceOnlineEvent(instanceMetaData1);
subscriber.renew(instanceOnlineEvent1);
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(1));
assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(), is(instanceMetaData1));
InstanceMetaData instanceMetaData2 = new ProxyInstanceMetaData("foo_instance_3308", 3308);
InstanceOnlineEvent instanceOnlineEvent2 = new InstanceOnlineEvent(instanceMetaData2);
subscriber.renew(instanceOnlineEvent2);
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData2));
subscriber.renew(instanceOnlineEvent1);
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), is(2));
assertThat(((LinkedList<ComputeNodeInstance>) contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(), is(instanceMetaData1));
}
}