blob: ba0d65007da6a4891a4e7f451d85830ad8a2eee8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.tests.EnumValuesDataProvider;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-impl")
public class MessageChecksumTest extends BrokerTestBase {
private static final Logger log = LoggerFactory.getLogger(MessageChecksumTest.class);
@BeforeMethod
@Override
public void setup() throws Exception {
baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
internalCleanup();
}
@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
// disable connection pooling
clientBuilder.connectionsPerBroker(0);
}
@Override
protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws PulsarClientException {
return PulsarTestClient.create(clientBuilder);
}
// Enum parameter used to describe the 2 different scenarios in the
// testChecksumCompatibilityInMixedVersionBrokerCluster test case
enum MixedVersionScenario {
CONNECTED_TO_NEW_THEN_OLD_VERSION,
CONNECTED_TO_OLD_THEN_NEW_VERSION
}
/**
* Pulsar message checksums changed in protocol version v6, broker version v1.15.
*
* This test case verifies that a client is able to send messages to an older broker version
* (<= v1.14, protocol version <= v5) in a mixed environment of broker versions (<= v1.14 & >= v1.15)
*
* This test case makes the assumption that the message checksum is ignored
* if a tampered message can be read by the consumer in the test.
*
* Scenario behind this test case:
*
* MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION
* A Pulsar client produces the message while connected to a broker that supports checksums.
* While sending the message to the broker is pending, the connection breaks and the client
* connects to another broker that doesn't support message checksums.
* In this case, the client should remove the message checksum before resending it to the broker.
* original PR https://github.com/apache/pulsar/pull/43
*
* MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION
* A Pulsar client produces the message while connected to a broker that doesn't support checksums.
* While sending the message to the broker is pending, the connection breaks and the client
* connects to another broker that supports message checksums.
* In this case, the client should remove the message checksum before resending it to the broker.
* original PR https://github.com/apache/pulsar/pull/89
*/
@Test(dataProviderClass = EnumValuesDataProvider.class, dataProvider = "values")
public void testChecksumCompatibilityInMixedVersionBrokerCluster(MixedVersionScenario mixedVersionScenario)
throws Exception {
// GIVEN
final String topicName =
"persistent://prop/use/ns-abc/testChecksumBackwardsCompatibilityWithOldBrokerWithoutChecksumHandling";
if (mixedVersionScenario == MixedVersionScenario.CONNECTED_TO_OLD_THEN_NEW_VERSION) {
// Given, the client thinks it's connected to a broker that doesn't support message checksums
makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
}
PulsarTestClient pulsarTestClient = (PulsarTestClient) pulsarClient;
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("my-sub")
.subscribe();
// inject a CountDownLatch to the pending message callback of the PulsarTestClient
CountDownLatch messageSendingProcessedLatch = new CountDownLatch(2);
// WHEN
// a message is sent, it should succeed
producer.send("message-1".getBytes());
// And
// communication OpSend messages are dropped to simulate a broken connection so that
// the next message doesn't get sent out yet and can be tampered before it's sent out
pulsarTestClient.dropOpSendMessages();
// And
// another message is sent
byte[] messageBytes = "message-2".getBytes();
TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage().value(messageBytes);
CompletableFuture<MessageId> tamperedMessageSendFuture = messageBuilder.sendAsync();
// And
// until the message checksum has been calculated and it is pending
pulsarTestClient.setPendingMessageCallback(null);
// And
// the producer disconnects from the broker and the test client is put in a mode where reconnecting is rejected
pulsarTestClient.disconnectProducerAndRejectReconnecting(producer);
// And
// when the the message is tampered by changing the last byte to '3'. This corrupts the already calculated
// checksum.
((TypedMessageBuilderImpl<byte[]>) messageBuilder).getContent().put(messageBytes.length - 1, (byte) '3');
if (mixedVersionScenario == MixedVersionScenario.CONNECTED_TO_NEW_THEN_OLD_VERSION) {
// Given, the client thinks it's connected to a broker that doesn't support message checksums
makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport();
} else {
// Reset the overriding set in the beginning
resetOverridingConnectedBrokerVersion();
}
// And
// when finally the pulsar client is allowed to reconnect to the broker
pulsarTestClient.allowReconnecting();
// THEN
try {
// sending of tampered message should not fail since the client is expected to remove the checksum from the
// message before sending it an older broker version
tamperedMessageSendFuture.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail", e);
}
// and then
// first message is received
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
// and contains the expected payload
assertEquals(new String(msg.getData()), "message-1");
// second message is received
msg = consumer.receive(1, TimeUnit.SECONDS);
// and contains the tampered payload
assertEquals(new String(msg.getData()), "message-3");
}
private void makeClientAssumeThatItsConnectedToBrokerWithoutChecksumSupport() {
// make the client think that the connected broker is of version which doesn't support checksum validation
((PulsarTestClient) pulsarClient).setOverrideRemoteEndpointProtocolVersion(ProtocolVersion.v5.getValue());
}
private void resetOverridingConnectedBrokerVersion() {
// reset the override and use the actual protocol version
((PulsarTestClient) pulsarClient).setOverrideRemoteEndpointProtocolVersion(0);
}
private void waitUntilMessageIsPendingWithCalculatedChecksum(ProducerImpl<?> producer) {
// wait until the message is in the pending queue
Awaitility.await().untilAsserted(() -> {
assertEquals(producer.getPendingQueueSize(), 1);
});
}
@Test
public void testTamperingMessageIsDetected() throws Exception {
// GIVEN
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("persistent://prop/use/ns-abc/testTamperingMessageIsDetected")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
TypedMessageBuilderImpl<byte[]> msgBuilder = (TypedMessageBuilderImpl<byte[]>) producer.newMessage()
.value("a message".getBytes());
MessageMetadata msgMetadata = msgBuilder.getMetadataBuilder()
.setProducerName("test")
.setSequenceId(1)
.setPublishTime(10L);
ByteBuf payload = Unpooled.wrappedBuffer(msgBuilder.getContent());
// WHEN
// protocol message is created with checksum
ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
OpSendMsg op = OpSendMsg.create((MessageImpl<byte[]>) msgBuilder.getMessage(), cmd, 1, null);
// THEN
// the checksum validation passes
assertTrue(producer.verifyLocalBufferIsNotCorrupted(op));
// WHEN
// the content of the message is tampered
msgBuilder.getContent().put(0, (byte) 'b');
// the checksum validation fails
assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
}
}