blob: 36da6ac32e01efe62a20460c81d5976c155e6449 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.connect;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.connect.kafka.KafkaControlAgent;
import org.apache.hudi.connect.transaction.ConnectTransactionParticipant;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.hudi.helper.TestHudiWriterProvider;
import org.apache.hudi.helper.MockKafkaConnect;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestConnectTransactionParticipant {
private static final String TOPIC_NAME = "kafka-connect-test-topic";
private static final int NUM_RECORDS_BATCH = 5;
private static final int PARTITION_NUMBER = 4;
private ConnectTransactionParticipant participant;
private MockCoordinator mockCoordinator;
private TopicPartition partition;
private KafkaConnectConfigs configs;
private KafkaControlAgent kafkaControlAgent;
private TestHudiWriterProvider testHudiWriterProvider;
private MockKafkaConnect mockKafkaConnect;
@BeforeEach
public void setUp() throws Exception {
partition = new TopicPartition(TOPIC_NAME, PARTITION_NUMBER);
kafkaControlAgent = new MockKafkaControlAgent();
mockKafkaConnect = new MockKafkaConnect(partition);
mockCoordinator = new MockCoordinator(kafkaControlAgent);
mockCoordinator.start();
configs = KafkaConnectConfigs.newBuilder()
.build();
initializeParticipant();
}
@ParameterizedTest
@EnumSource(value = CoordinatorFailureTestScenarios.class)
public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) {
try {
assertTrue(mockKafkaConnect.isPaused());
switch (testScenario) {
case REGULAR_SCENARIO:
break;
case COORDINATOR_FAILED_AFTER_START_COMMIT:
triggerAndProcessStartCommit();
// Coordinator Failed
initializeCoordinator();
break;
case COORDINATOR_FAILED_AFTER_END_COMMIT:
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
// Coordinator Failed
initializeCoordinator();
break;
default:
throw new HoodieException("Unknown test scenario " + testScenario);
}
// Despite failures in the previous commit, a fresh 2-phase commit should PASS.
testTwoPhaseCommit(0);
} catch (Exception exception) {
throw new HoodieException("Unexpected test failure ", exception);
}
participant.stop();
}
@ParameterizedTest
@EnumSource(value = ParticipantFailureTestScenarios.class)
public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) {
try {
int currentKafkaOffset = 0;
switch (testScenario) {
case FAILURE_BEFORE_START_COMMIT:
// Participant failing after START_COMMIT will not write any data in this commit cycle.
initializeParticipant();
break;
case FAILURE_AFTER_START_COMMIT:
triggerAndProcessStartCommit();
// Participant failing after START_COMMIT will not write any data in this commit cycle.
initializeParticipant();
triggerAndProcessEndCommit();
triggerAndProcessAckCommit();
break;
case FAILURE_AFTER_END_COMMIT:
// Regular Case or Coordinator Recovery Case
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
initializeParticipant();
triggerAndProcessAckCommit();
// Participant failing after and END_COMMIT should not cause issues with the present commit,
// since the data would have been written by previous participant before failing
// and hence moved the kafka offset.
currentKafkaOffset = NUM_RECORDS_BATCH;
break;
default:
throw new HoodieException("Unknown test scenario " + testScenario);
}
// Despite failures in the previous commit, a fresh 2-phase commit should PASS.
testTwoPhaseCommit(currentKafkaOffset);
} catch (Exception exception) {
throw new HoodieException("Unexpected test failure ", exception);
}
}
private void initializeParticipant() {
testHudiWriterProvider = new TestHudiWriterProvider();
participant = new ConnectTransactionParticipant(
partition,
kafkaControlAgent,
mockKafkaConnect,
testHudiWriterProvider);
mockKafkaConnect.setParticipant(participant);
participant.start();
}
private void initializeCoordinator() {
mockCoordinator = new MockCoordinator(kafkaControlAgent);
mockCoordinator.start();
}
// Test and validate result of a single 2 Phase commit from START_COMMIT to ACK_COMMIT.
// Validates that NUM_RECORDS_BATCH number of kafka records are written,
// and the kafka offset only increments by NUM_RECORDS_BATCH.
private void testTwoPhaseCommit(long currentKafkaOffset) {
triggerAndProcessStartCommit();
triggerAndProcessEndCommit();
triggerAndProcessAckCommit();
// Validate records written, current kafka offset and kafka offsets committed across
// coordinator and participant are in sync despite failure scenarios.
assertEquals(NUM_RECORDS_BATCH, testHudiWriterProvider.getLatestNumberWrites());
assertEquals((currentKafkaOffset + NUM_RECORDS_BATCH), mockKafkaConnect.getCurrentKafkaOffset());
// Ensure Coordinator and participant are in sync in the kafka offsets
assertEquals(participant.getLastKafkaCommittedOffset(), mockCoordinator.getCommittedKafkaOffset());
}
private void triggerAndProcessStartCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.START_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(NUM_RECORDS_BATCH);
assertTrue(mockKafkaConnect.isResumed());
}
private void triggerAndProcessEndCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.END_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(0);
assertTrue(mockKafkaConnect.isPaused());
}
private void triggerAndProcessAckCommit() {
mockCoordinator.sendEventFromCoordinator(ControlMessage.EventType.ACK_COMMIT);
mockKafkaConnect.publishBatchRecordsToParticipant(0);
assertTrue(mockKafkaConnect.isPaused());
}
private static class MockCoordinator implements TransactionCoordinator {
private static int currentCommitTime;
static {
currentCommitTime = 101;
}
private final KafkaControlAgent kafkaControlAgent;
private final TopicPartition partition;
private Option<ControlMessage> lastReceivedWriteStatusEvent;
private long committedKafkaOffset;
public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
this.kafkaControlAgent = kafkaControlAgent;
partition = new TopicPartition(TOPIC_NAME, 0);
lastReceivedWriteStatusEvent = Option.empty();
committedKafkaOffset = 0L;
}
public void sendEventFromCoordinator(ControlMessage.EventType type) {
try {
if (type.equals(ControlMessage.EventType.START_COMMIT)) {
++currentCommitTime;
}
kafkaControlAgent.publishMessage(
ControlMessage.newBuilder()
.setType(type)
.setTopicName(partition.topic())
.setSenderType(ControlMessage.EntityType.COORDINATOR)
.setSenderPartition(partition.partition())
.setReceiverType(ControlMessage.EntityType.PARTICIPANT)
.setCommitTime(String.valueOf(currentCommitTime))
.setCoordinatorInfo(
ControlMessage.CoordinatorInfo.newBuilder()
.putAllGlobalKafkaCommitOffsets(Collections.singletonMap(PARTITION_NUMBER, committedKafkaOffset))
.build()
).build());
} catch (Exception exception) {
throw new HoodieException("Fatal error sending control event to Participant");
}
}
public Option<ControlMessage> getLastReceivedWriteStatusEvent() {
return lastReceivedWriteStatusEvent;
}
public long getCommittedKafkaOffset() {
return committedKafkaOffset;
}
@Override
public void start() {
kafkaControlAgent.registerTransactionCoordinator(this);
}
@Override
public void stop() {
kafkaControlAgent.deregisterTransactionCoordinator(this);
}
@Override
public TopicPartition getPartition() {
return partition;
}
@Override
public void processControlEvent(ControlMessage message) {
if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
lastReceivedWriteStatusEvent = Option.of(message);
assertTrue(message.getParticipantInfo().getKafkaOffset() >= committedKafkaOffset);
committedKafkaOffset = message.getParticipantInfo().getKafkaOffset();
}
}
}
private enum CoordinatorFailureTestScenarios {
REGULAR_SCENARIO,
COORDINATOR_FAILED_AFTER_START_COMMIT,
COORDINATOR_FAILED_AFTER_END_COMMIT,
}
private enum ParticipantFailureTestScenarios {
FAILURE_BEFORE_START_COMMIT,
FAILURE_AFTER_START_COMMIT,
FAILURE_AFTER_END_COMMIT,
}
}