blob: c7f6ce10759ec8043a961f17b0de5f718407f10c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.Range;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.predicate.ValueSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
public class TestPulsarSplitManager extends TestPulsarConnector {
private static final Logger log = Logger.get(TestPulsarSplitManager.class);
public class ResultCaptor<T> implements Answer {
private T result = null;
public T getResult() {
return result;
}
@Override
public T answer(InvocationOnMock invocationOnMock) throws Throwable {
result = (T) invocationOnMock.callRealMethod();
return result;
}
}
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<TopicName> topics = new LinkedList<>();
topics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
for (TopicName topicName : topics) {
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all());
final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new ResultCaptor<>();
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
verify(this.pulsarSplitManager, times(1))
.getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
int totalSize = 0;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
totalSize += pulsarSplit.getSplitSize();
}
assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
cleanup();
}
}
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testPartitionedTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : partitionedTopicNames) {
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, TupleDomain.all());
final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new ResultCaptor<>();
doAnswer(resultCaptor).when(this.pulsarSplitManager).getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
this.pulsarSplitManager.getSplits(mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
verify(this.pulsarSplitManager, times(1))
.getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
int partitions = partitionedTopicsToPartitions.get(topicName.toString());
for (int i = 0; i < partitions; i++) {
List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
int totalSize = 0;
for (PulsarSplit pulsarSplit : splits) {
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), totalSize);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, totalSize));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), totalSize + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, totalSize + pulsarSplit.getSplitSize()));
totalSize += pulsarSplit.getSplitSize();
}
assertEquals(totalSize, topicsToNumEntries.get(topicName.getSchemaName()).intValue());
}
cleanup();
}
}
private List<PulsarSplit> getSplitsForPartition(TopicName target, Collection<PulsarSplit> splits) {
return splits.stream().filter(pulsarSplit -> {
TopicName topicName = TopicName.get(pulsarSplit.getSchemaName() + "/" + pulsarSplit.getTableName());
return target.equals(topicName);
}).collect(Collectors.toList());
}
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testPublishTimePredicatePushdown(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = TOPIC_1;
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
currentTimeMs + 50L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new ResultCaptor<>();
doAnswer(resultCaptor).when(this.pulsarSplitManager)
.getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
verify(this.pulsarSplitManager, times(1))
.getSplitsNonPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
int totalSize = 0;
int initalStart = 1;
for (PulsarSplit pulsarSplit : resultCaptor.getResult()) {
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), initalStart);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initalStart));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), initalStart + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initalStart + pulsarSplit
.getSplitSize()));
initalStart += pulsarSplit.getSplitSize();
totalSize += pulsarSplit.getSplitSize();
}
assertEquals(totalSize, 49);
}
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = PARTITIONED_TOPIC_1;
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP, currentTimeMs + 1L, true,
currentTimeMs + 50L, true)), false);
domainMap.put(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
final ResultCaptor<Collection<PulsarSplit>> resultCaptor = new ResultCaptor<>();
doAnswer(resultCaptor).when(this.pulsarSplitManager)
.getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
verify(this.pulsarSplitManager, times(1))
.getSplitsPartitionedTopic(anyInt(), any(), any(), any(), any(), any());
int partitions = partitionedTopicsToPartitions.get(topicName.toString());
for (int i = 0; i < partitions; i++) {
List<PulsarSplit> splits = getSplitsForPartition(topicName.getPartition(i), resultCaptor.getResult());
int totalSize = 0;
int initialStart = 1;
for (PulsarSplit pulsarSplit : splits) {
assertEquals(pulsarSplit.getConnectorId(), pulsarConnectorId.toString());
assertEquals(pulsarSplit.getSchemaName(), topicName.getNamespace());
assertEquals(pulsarSplit.getTableName(), topicName.getPartition(i).getLocalName());
assertEquals(pulsarSplit.getSchema(),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()));
assertEquals(pulsarSplit.getSchemaType(), topicsToSchemas.get(topicName.getSchemaName()).getType());
assertEquals(pulsarSplit.getStartPositionEntryId(), initialStart);
assertEquals(pulsarSplit.getStartPositionLedgerId(), 0);
assertEquals(pulsarSplit.getStartPosition(), PositionImpl.get(0, initialStart));
assertEquals(pulsarSplit.getEndPositionLedgerId(), 0);
assertEquals(pulsarSplit.getEndPositionEntryId(), initialStart + pulsarSplit.getSplitSize());
assertEquals(pulsarSplit.getEndPosition(), PositionImpl.get(0, initialStart + pulsarSplit.getSplitSize()));
initialStart += pulsarSplit.getSplitSize();
totalSize += pulsarSplit.getSplitSize();
}
assertEquals(totalSize, 49);
}
}
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testPartitionFilter(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : partitionedTopicNames) {
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = mock(PulsarTableHandle.class);
when(pulsarTableHandle.getConnectorId()).thenReturn(pulsarConnectorId.toString());
when(pulsarTableHandle.getSchemaName()).thenReturn(topicName.getNamespace());
when(pulsarTableHandle.getTopicName()).thenReturn(topicName.getLocalName());
when(pulsarTableHandle.getTableName()).thenReturn(topicName.getLocalName());
// test single domain with equal low and high of "__partition__"
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
Domain domain = Domain.create(ValueSet.ofRanges(Range.range(INTEGER, 0L, true,
0L, true)), false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
Collection<PulsarSplit> splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 2);
}
for (PulsarSplit split : splits) {
assertEquals(TopicName.getPartitionIndex(split.getTableName()), 0);
}
// test multiple domain with equal low and high of "__partition__"
domainMap.clear();
domain = Domain.create(ValueSet.ofRanges(
Range.range(INTEGER, 0L, true, 0L, true),
Range.range(INTEGER, 3L, true, 3L, true)),
false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
tupleDomain = TupleDomain.withColumnDomains(domainMap);
splits = this.pulsarSplitManager.getSplitsPartitionedTopic(1, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 2);
}
for (PulsarSplit split : splits) {
assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0 || TopicName.getPartitionIndex(split.getTableName()) == 3);
}
// test single domain with unequal low and high of "__partition__"
domainMap.clear();
domain = Domain.create(ValueSet.ofRanges(
Range.range(INTEGER, 0L, true, 2L, true)),
false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
tupleDomain = TupleDomain.withColumnDomains(domainMap);
splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 3);
}
for (PulsarSplit split : splits) {
assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0
|| TopicName.getPartitionIndex(split.getTableName()) == 1
|| TopicName.getPartitionIndex(split.getTableName()) == 2);
}
// test multiple domain with unequal low and high of "__partition__"
domainMap.clear();
domain = Domain.create(ValueSet.ofRanges(
Range.range(INTEGER, 0L, true, 1L, true),
Range.range(INTEGER, 3L, true, 4L, true)),
false);
domainMap.put(PulsarInternalColumn.PARTITION.getColumnHandle(pulsarConnectorId.toString(), false), domain);
tupleDomain = TupleDomain.withColumnDomains(domainMap);
splits = this.pulsarSplitManager.getSplitsPartitionedTopic(2, topicName, pulsarTableHandle,
schemas.getSchemaInfo(topicName.getSchemaName()), tupleDomain, null);
if (topicsToNumEntries.get(topicName.getSchemaName()) > 1) {
Assert.assertEquals(splits.size(), 4);
}
for (PulsarSplit split : splits) {
assertTrue(TopicName.getPartitionIndex(split.getTableName()) == 0
|| TopicName.getPartitionIndex(split.getTableName()) == 1
|| TopicName.getPartitionIndex(split.getTableName()) == 3
|| TopicName.getPartitionIndex(split.getTableName()) == 4);
}
}
}
@Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
public void testGetSplitNonSchema(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = NON_SCHEMA_TOPIC;
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
assertNotNull(connectorSplitSource);
}
@Test
public void pulsarSplitJsonCodecTest() throws JsonProcessingException, UnsupportedEncodingException {
OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(
"aws-s3",
"test-region",
"test-bucket",
"test-endpoint",
"role-",
"role-session-name",
"test-credential-id",
"test-credential-secret",
5000,
2000,
1000L,
5000L,
OffloadedReadPriority.BOOKKEEPER_FIRST
);
SchemaInfo schemaInfo = JSONSchema.of(Foo.class).getSchemaInfo();
final String schema = new String(schemaInfo.getSchema(), "ISO8859-1");
final String originSchemaName = schemaInfo.getName();
final String schemaName = schemaInfo.getName();
final String schemaInfoProperties = new ObjectMapper().writeValueAsString(schemaInfo.getProperties());
final SchemaType schemaType = schemaInfo.getType();
final long splitId = 1;
final String connectorId = "connectorId";
final String tableName = "tableName";
final long splitSize = 5;
final long startPositionEntryId = 22;
final long endPositionEntryId = 33;
final long startPositionLedgerId = 10;
final long endPositionLedgerId = 21;
final TupleDomain<ColumnHandle> tupleDomain = TupleDomain.all();
byte[] pulsarSplitData;
JsonCodec<PulsarSplit> jsonCodec = JsonCodec.jsonCodec(PulsarSplit.class);
try {
PulsarSplit pulsarSplit = new PulsarSplit(
splitId, connectorId, schemaName, originSchemaName, tableName, splitSize, schema,
schemaType, startPositionEntryId, endPositionEntryId, startPositionLedgerId,
endPositionLedgerId, tupleDomain, schemaInfoProperties, offloadPolicies);
pulsarSplitData = jsonCodec.toJsonBytes(pulsarSplit);
} catch (Exception e) {
e.printStackTrace();
log.error("Failed to serialize the PulsarSplit.", e);
fail("Failed to serialize the PulsarSplit.");
return;
}
try {
PulsarSplit pulsarSplit = jsonCodec.fromJson(pulsarSplitData);
Assert.assertEquals(pulsarSplit.getSchema(), schema);
Assert.assertEquals(pulsarSplit.getOriginSchemaName(), originSchemaName);
Assert.assertEquals(pulsarSplit.getSchemaName(), schemaName);
Assert.assertEquals(pulsarSplit.getSchemaInfoProperties(), schemaInfoProperties);
Assert.assertEquals(pulsarSplit.getSchemaType(), schemaType);
Assert.assertEquals(pulsarSplit.getSplitId(), splitId);
Assert.assertEquals(pulsarSplit.getConnectorId(), connectorId);
Assert.assertEquals(pulsarSplit.getTableName(), tableName);
Assert.assertEquals(pulsarSplit.getSplitSize(), splitSize);
Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), startPositionEntryId);
Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), endPositionEntryId);
Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), startPositionLedgerId);
Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), endPositionLedgerId);
Assert.assertEquals(pulsarSplit.getTupleDomain(), tupleDomain);
Assert.assertEquals(pulsarSplit.getOffloadPolicies(), offloadPolicies);
} catch (Exception e) {
log.error("Failed to deserialize the PulsarSplit.", e);
fail("Failed to deserialize the PulsarSplit.");
}
}
}