blob: 4e599c6a524491982d951c31ddd224678adbe088 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.runtime.service;
import com.alibaba.fastjson.JSONObject;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.Future;
import io.openmessaging.producer.SendResult;
import java.io.File;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@RunWith(MockitoJUnitRunner.class)
public class PositionManagementServiceImplTest {
private ConnectConfig connectConfig;
@Mock
private DefaultMQProducer producer;
@Mock
private DefaultMQPushConsumer consumer;
@Mock
private Future<SendResult> future;
private PositionManagementServiceImpl positionManagementService;
private Set<ByteBuffer> needSyncPartition;
private KeyValueStore<ByteBuffer, ByteBuffer> positionStore;
private ByteBuffer sourcePartition;
private ByteBuffer sourcePosition;
private Map<ByteBuffer, ByteBuffer> positions;
@Before
public void init() throws Exception {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
connectConfig.setNamesrvAddr("localhost:9876");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
connectConfig.setRmqConsumerGroup("testConsumerGroup");
doAnswer(new Answer() {
@Override
public Void answer(InvocationOnMock invocation) throws Exception {
final Message message = invocation.getArgument(0);
byte[] bytes = message.getBody();
final Field dataSynchronizerField = PositionManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
dataSynchronizerField.setAccessible(true);
BrokerBasedLog<String, Map> dataSynchronizer = (BrokerBasedLog<String, Map>) dataSynchronizerField.get(positionManagementService);
final Method decodeKeyValueMethod = BrokerBasedLog.class.getDeclaredMethod("decodeKeyValue", byte[].class);
decodeKeyValueMethod.setAccessible(true);
Map<String, Map> map = (Map<String, Map>) decodeKeyValueMethod.invoke(dataSynchronizer, bytes);
final Field dataSynchronizerCallbackField = BrokerBasedLog.class.getDeclaredField("dataSynchronizerCallback");
dataSynchronizerCallbackField.setAccessible(true);
final DataSynchronizerCallback<String, Map> dataSynchronizerCallback = (DataSynchronizerCallback<String, Map>) dataSynchronizerCallbackField.get(dataSynchronizer);
for (String key : map.keySet()) {
dataSynchronizerCallback.onCompletion(null, key, map.get(key));
}
return null;
}
}).when(producer).send(any(Message.class), any(SendCallback.class));
positionManagementService = new PositionManagementServiceImpl(connectConfig);
final Field dataSynchronizerField = PositionManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
dataSynchronizerField.setAccessible(true);
final Field producerField = BrokerBasedLog.class.getDeclaredField("producer");
producerField.setAccessible(true);
producerField.set((BrokerBasedLog<String, ConnAndTaskConfigs>) dataSynchronizerField.get(positionManagementService), producer);
final Field consumerField = BrokerBasedLog.class.getDeclaredField("consumer");
consumerField.setAccessible(true);
consumerField.set((BrokerBasedLog<String, ConnAndTaskConfigs>) dataSynchronizerField.get(positionManagementService), consumer);
positionManagementService.start();
Field positionStoreField = PositionManagementServiceImpl.class.getDeclaredField("positionStore");
positionStoreField.setAccessible(true);
positionStore = (KeyValueStore<ByteBuffer, ByteBuffer>) positionStoreField.get(positionManagementService);
Field needSyncPartitionField = PositionManagementServiceImpl.class.getDeclaredField("needSyncPartition");
needSyncPartitionField.setAccessible(true);
needSyncPartition = (ConcurrentSet<ByteBuffer>) needSyncPartitionField.get(positionManagementService);
sourcePartition = ByteBuffer.wrap("127.0.0.13306".getBytes("UTF-8"));
JSONObject jsonObject = new JSONObject();
// jsonObject.put(MysqlConstants.BINLOG_FILENAME, "binlogFilename");
// jsonObject.put(MysqlConstants.NEXT_POSITION, "100");
sourcePosition = ByteBuffer.wrap(jsonObject.toJSONString().getBytes());
positions = new HashMap<ByteBuffer, ByteBuffer>() {
{
put(sourcePartition, sourcePosition);
}
};
}
@After
public void destory() {
positionManagementService.stop();
TestUtils.deleteFile(new File(System.getProperty("user.home") + File.separator + "testConnectorStore"));
}
@Test
public void testGetPositionTable() {
Map<ByteBuffer, ByteBuffer> positionTable = positionManagementService.getPositionTable();
ByteBuffer bytes = positionTable.get(sourcePartition);
assertNull(bytes);
positionManagementService.putPosition(positions);
positionTable = positionManagementService.getPositionTable();
bytes = positionTable.get(sourcePartition);
assertNotNull(bytes);
}
@Test
public void testPutPosition() throws Exception {
ByteBuffer bytes = positionStore.get(sourcePartition);
assertNull(bytes);
positionManagementService.putPosition(positions);
bytes = positionStore.get(sourcePartition);
assertNotNull(bytes);
}
@Test
public void testRemovePosition() {
positionManagementService.putPosition(positions);
ByteBuffer bytes = positionStore.get(sourcePartition);
assertNotNull(bytes);
List<ByteBuffer> sourcePartitions = new ArrayList<ByteBuffer>(8) {
{
add(sourcePartition);
}
};
positionManagementService.removePosition(sourcePartitions);
bytes = positionStore.get(sourcePartition);
assertNull(bytes);
}
@Test
public void testNeedSyncPartition() {
positionManagementService.putPosition(positions);
assertTrue(needSyncPartition.contains(sourcePartition));
List<ByteBuffer> sourcePartitions = new ArrayList<ByteBuffer>(8) {
{
add(sourcePartition);
}
};
positionManagementService.removePosition(sourcePartitions);
assertFalse(needSyncPartition.contains(sourcePartition));
positionManagementService.putPosition(sourcePartition, sourcePosition);
assertTrue(needSyncPartition.contains(sourcePartition));
positionManagementService.removePosition(sourcePartitions);
assertFalse(needSyncPartition.contains(sourcePartition));
}
@Test
public void testSendNeedSynchronizePosition() throws Exception {
positionManagementService.putPosition(positions);
ByteBuffer sourcePartitionTmp = ByteBuffer.wrap("127.0.0.2:3306".getBytes("UTF-8"));
JSONObject jsonObject = new JSONObject();
ByteBuffer sourcePositionTmp = ByteBuffer.wrap(jsonObject.toJSONString().getBytes());
positionStore.put(sourcePartitionTmp, sourcePositionTmp);
Set<ByteBuffer> needSyncPartitionTmp = needSyncPartition;
needSyncPartition = new ConcurrentSet<>();
Map<ByteBuffer, ByteBuffer> needSyncPosition = positionStore.getKVMap().entrySet().stream()
.filter(entry -> needSyncPartitionTmp.contains(entry.getKey()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
assertTrue(needSyncPartition.size() == 0);
ByteBuffer bytes = needSyncPosition.get(sourcePartition);
assertNotNull(bytes);
ByteBuffer tmpBytes = needSyncPosition.get(sourcePartitionTmp);
assertNull(tmpBytes);
List<ByteBuffer> sourcePartitions = new ArrayList<ByteBuffer>(8) {
{
add(sourcePartition);
add(sourcePartitionTmp);
}
};
needSyncPartition = needSyncPartitionTmp;
needSyncPartition.addAll(sourcePartitions);
positionManagementService.removePosition(sourcePartitions);
assertTrue(needSyncPartition.size() == 0);
}
}