| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.kafka.source.reader; |
| |
| import org.apache.flink.api.connector.source.Boundedness; |
| import org.apache.flink.api.connector.source.ReaderOutput; |
| import org.apache.flink.api.connector.source.SourceReader; |
| import org.apache.flink.api.connector.source.SourceReaderContext; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.connector.kafka.source.KafkaSource; |
| import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; |
| import org.apache.flink.connector.kafka.source.KafkaSourceOptions; |
| import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils; |
| import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; |
| import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; |
| import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; |
| import org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv; |
| import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase; |
| import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; |
| import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; |
| import org.apache.flink.core.io.InputStatus; |
| import org.apache.flink.metrics.Counter; |
| import org.apache.flink.metrics.Gauge; |
| import org.apache.flink.metrics.MetricGroup; |
| import org.apache.flink.metrics.testutils.MetricListener; |
| import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; |
| |
| import org.apache.kafka.clients.admin.AdminClient; |
| import org.apache.kafka.clients.admin.NewTopic; |
| import org.apache.kafka.clients.consumer.ConsumerConfig; |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.serialization.IntegerDeserializer; |
| import org.apache.kafka.common.serialization.IntegerSerializer; |
| import org.apache.kafka.common.serialization.StringSerializer; |
| import org.hamcrest.MatcherAssert; |
| import org.hamcrest.Matchers; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITTED_OFFSET_METRIC_GAUGE; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.CURRENT_OFFSET_METRIC_GAUGE; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.INITIAL_OFFSET; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.KAFKA_CONSUMER_METRIC_GROUP; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.PARTITION_GROUP; |
| import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.TOPIC_GROUP; |
| import static org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestEnv.NUM_PARTITIONS; |
| import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| /** Unit tests for {@link KafkaSourceReader}. */ |
| public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSplit> { |
| private static final String TOPIC = "KafkaSourceReaderTest"; |
| |
| @BeforeClass |
| public static void setup() throws Throwable { |
| KafkaSourceTestEnv.setup(); |
| try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) { |
| adminClient.createTopics( |
| Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1))); |
| // Use the admin client to trigger the creation of internal __consumer_offsets topic. |
| // This makes sure that we won't see unavailable coordinator in the tests. |
| waitUtil( |
| () -> { |
| try { |
| adminClient |
| .listConsumerGroupOffsets("AnyGroup") |
| .partitionsToOffsetAndMetadata() |
| .get(); |
| } catch (Exception e) { |
| return false; |
| } |
| return true; |
| }, |
| Duration.ofSeconds(60), |
| "Waiting for offsets topic creation failed."); |
| } |
| KafkaSourceTestEnv.produceToKafka( |
| getRecords(), StringSerializer.class, IntegerSerializer.class); |
| } |
| |
| @AfterClass |
| public static void tearDown() throws Exception { |
| KafkaSourceTestEnv.tearDown(); |
| } |
| |
| protected int getNumSplits() { |
| return NUM_PARTITIONS; |
| } |
| |
| // ----------------------------------------- |
| |
| @Test |
| public void testCommitOffsetsWithoutAliveFetchers() throws Exception { |
| final String groupId = "testCommitOffsetsWithoutAliveFetchers"; |
| try (KafkaSourceReader<Integer> reader = |
| (KafkaSourceReader<Integer>) |
| createReader(Boundedness.CONTINUOUS_UNBOUNDED, groupId)) { |
| KafkaPartitionSplit split = |
| new KafkaPartitionSplit(new TopicPartition(TOPIC, 0), 0, NUM_RECORDS_PER_SPLIT); |
| reader.addSplits(Collections.singletonList(split)); |
| reader.notifyNoMoreSplits(); |
| ReaderOutput<Integer> output = new TestingReaderOutput<>(); |
| InputStatus status; |
| do { |
| status = reader.pollNext(output); |
| } while (status != InputStatus.NOTHING_AVAILABLE); |
| pollUntil( |
| reader, |
| output, |
| () -> reader.getNumAliveFetchers() == 0, |
| "The split fetcher did not exit before timeout."); |
| reader.snapshotState(100L); |
| reader.notifyCheckpointComplete(100L); |
| // Due to a bug in KafkaConsumer, when the consumer closes, the offset commit callback |
| // won't be fired, so the offsetsToCommit map won't be cleaned. To make the test |
| // stable, we add a split whose starting offset is the log end offset, so the |
| // split fetcher won't become idle and exit after commitOffsetAsync is invoked from |
| // notifyCheckpointComplete(). |
| reader.addSplits( |
| Collections.singletonList( |
| new KafkaPartitionSplit( |
| new TopicPartition(TOPIC, 0), NUM_RECORDS_PER_SPLIT))); |
| pollUntil( |
| reader, |
| output, |
| () -> reader.getOffsetsToCommit().isEmpty(), |
| "The offset commit did not finish before timeout."); |
| } |
| // Verify the committed offsets. |
| try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) { |
| Map<TopicPartition, OffsetAndMetadata> committedOffsets = |
| adminClient |
| .listConsumerGroupOffsets(groupId) |
| .partitionsToOffsetAndMetadata() |
| .get(); |
| assertEquals(1, committedOffsets.size()); |
| committedOffsets.forEach( |
| (tp, offsetAndMetadata) -> |
| assertEquals(NUM_RECORDS_PER_SPLIT, offsetAndMetadata.offset())); |
| } |
| } |
| |
| @Test |
| public void testCommitEmptyOffsets() throws Exception { |
| final String groupId = "testCommitEmptyOffsets"; |
| try (KafkaSourceReader<Integer> reader = |
| (KafkaSourceReader<Integer>) |
| createReader(Boundedness.CONTINUOUS_UNBOUNDED, groupId)) { |
| reader.snapshotState(100L); |
| reader.notifyCheckpointComplete(100L); |
| } |
| // Verify the committed offsets. |
| try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) { |
| Map<TopicPartition, OffsetAndMetadata> committedOffsets = |
| adminClient |
| .listConsumerGroupOffsets(groupId) |
| .partitionsToOffsetAndMetadata() |
| .get(); |
| assertTrue(committedOffsets.isEmpty()); |
| } |
| } |
| |
| @Test |
| public void testOffsetCommitOnCheckpointComplete() throws Exception { |
| final String groupId = "testOffsetCommitOnCheckpointComplete"; |
| try (KafkaSourceReader<Integer> reader = |
| (KafkaSourceReader<Integer>) |
| createReader(Boundedness.CONTINUOUS_UNBOUNDED, groupId)) { |
| reader.addSplits( |
| getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)); |
| ValidatingSourceOutput output = new ValidatingSourceOutput(); |
| long checkpointId = 0; |
| do { |
| checkpointId++; |
| reader.pollNext(output); |
| // Create a checkpoint for each message consumption, but not complete them. |
| reader.snapshotState(checkpointId); |
| } while (output.count() < totalNumRecords); |
| |
| // The completion of the last checkpoint should subsume all the previous checkpoitns. |
| assertEquals(checkpointId, reader.getOffsetsToCommit().size()); |
| reader.notifyCheckpointComplete(checkpointId); |
| pollUntil( |
| reader, |
| output, |
| () -> reader.getOffsetsToCommit().isEmpty(), |
| "The offset commit did not finish before timeout."); |
| } |
| |
| // Verify the committed offsets. |
| try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) { |
| Map<TopicPartition, OffsetAndMetadata> committedOffsets = |
| adminClient |
| .listConsumerGroupOffsets(groupId) |
| .partitionsToOffsetAndMetadata() |
| .get(); |
| assertEquals(numSplits, committedOffsets.size()); |
| committedOffsets.forEach( |
| (tp, offsetAndMetadata) -> |
| assertEquals(NUM_RECORDS_PER_SPLIT, offsetAndMetadata.offset())); |
| } |
| } |
| |
| @Test |
| public void testNotCommitOffsetsForUninitializedSplits() throws Exception { |
| final long checkpointId = 1234L; |
| try (KafkaSourceReader<Integer> reader = (KafkaSourceReader<Integer>) createReader()) { |
| KafkaPartitionSplit split = |
| new KafkaPartitionSplit( |
| new TopicPartition(TOPIC, 0), KafkaPartitionSplit.EARLIEST_OFFSET); |
| reader.addSplits(Collections.singletonList(split)); |
| reader.snapshotState(checkpointId); |
| assertEquals(1, reader.getOffsetsToCommit().size()); |
| assertTrue(reader.getOffsetsToCommit().get(checkpointId).isEmpty()); |
| } |
| } |
| |
| @Test |
| public void testDisableOffsetCommit() throws Exception { |
| final Properties properties = new Properties(); |
| properties.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false"); |
| try (KafkaSourceReader<Integer> reader = |
| (KafkaSourceReader<Integer>) |
| createReader( |
| Boundedness.CONTINUOUS_UNBOUNDED, |
| new TestingReaderContext(), |
| (ignore) -> {}, |
| properties)) { |
| reader.addSplits( |
| getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)); |
| ValidatingSourceOutput output = new ValidatingSourceOutput(); |
| long checkpointId = 0; |
| do { |
| checkpointId++; |
| reader.pollNext(output); |
| // Create a checkpoint for each message consumption, but not complete them. |
| reader.snapshotState(checkpointId); |
| // Offsets to commit should be always empty because offset commit is disabled |
| assertEquals(0, reader.getOffsetsToCommit().size()); |
| } while (output.count() < totalNumRecords); |
| } |
| } |
| |
| @Test |
| public void testKafkaSourceMetrics() throws Exception { |
| final MetricListener metricListener = new MetricListener(); |
| final String groupId = "testKafkaSourceMetrics"; |
| final TopicPartition tp0 = new TopicPartition(TOPIC, 0); |
| final TopicPartition tp1 = new TopicPartition(TOPIC, 1); |
| |
| try (KafkaSourceReader<Integer> reader = |
| (KafkaSourceReader<Integer>) |
| createReader( |
| Boundedness.CONTINUOUS_UNBOUNDED, |
| groupId, |
| metricListener.getMetricGroup())) { |
| |
| KafkaPartitionSplit split0 = |
| new KafkaPartitionSplit(tp0, KafkaPartitionSplit.EARLIEST_OFFSET); |
| KafkaPartitionSplit split1 = |
| new KafkaPartitionSplit(tp1, KafkaPartitionSplit.EARLIEST_OFFSET); |
| reader.addSplits(Arrays.asList(split0, split1)); |
| |
| TestingReaderOutput<Integer> output = new TestingReaderOutput<>(); |
| pollUntil( |
| reader, |
| output, |
| () -> output.getEmittedRecords().size() == NUM_RECORDS_PER_SPLIT * 2, |
| String.format( |
| "Failed to poll %d records until timeout", NUM_RECORDS_PER_SPLIT * 2)); |
| |
| // Metric "records-consumed-total" of KafkaConsumer should be NUM_RECORDS_PER_SPLIT |
| assertEquals( |
| NUM_RECORDS_PER_SPLIT * 2, |
| getKafkaConsumerMetric("records-consumed-total", metricListener)); |
| |
| // Current consuming offset should be NUM_RECORD_PER_SPLIT - 1 |
| assertEquals(NUM_RECORDS_PER_SPLIT - 1, getCurrentOffsetMetric(tp0, metricListener)); |
| assertEquals(NUM_RECORDS_PER_SPLIT - 1, getCurrentOffsetMetric(tp1, metricListener)); |
| |
| // No offset is committed till now |
| assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp0, metricListener)); |
| assertEquals(INITIAL_OFFSET, getCommittedOffsetMetric(tp1, metricListener)); |
| |
| // Trigger offset commit |
| reader.snapshotState(15213L); |
| reader.notifyCheckpointComplete(15213L); |
| waitUtil( |
| () -> reader.getOffsetsToCommit().isEmpty(), |
| Duration.ofSeconds(60), |
| String.format( |
| "Offsets are not committed successfully. Dangling offsets: %s", |
| reader.getOffsetsToCommit())); |
| |
| // Metric "commit-total" of KafkaConsumer should be 1 |
| assertEquals(1, getKafkaConsumerMetric("commit-total", metricListener)); |
| |
| // Committed offset should be NUM_RECORD_PER_SPLIT |
| assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp0, metricListener)); |
| assertEquals(NUM_RECORDS_PER_SPLIT, getCommittedOffsetMetric(tp1, metricListener)); |
| |
| // Number of successful commits should be 1 |
| final Optional<Counter> commitsSucceeded = |
| metricListener.getCounter( |
| KAFKA_SOURCE_READER_METRIC_GROUP, COMMITS_SUCCEEDED_METRIC_COUNTER); |
| assertTrue(commitsSucceeded.isPresent()); |
| assertEquals(1L, commitsSucceeded.get().getCount()); |
| } |
| } |
| |
| @Test |
| public void testAssigningEmptySplits() throws Exception { |
| // Normal split with NUM_RECORDS_PER_SPLIT records |
| final KafkaPartitionSplit normalSplit = |
| new KafkaPartitionSplit( |
| new TopicPartition(TOPIC, 0), 0, KafkaPartitionSplit.LATEST_OFFSET); |
| // Empty split with no record |
| final KafkaPartitionSplit emptySplit = |
| new KafkaPartitionSplit( |
| new TopicPartition(TOPIC, 1), NUM_RECORDS_PER_SPLIT, NUM_RECORDS_PER_SPLIT); |
| // Split finished hook for listening finished splits |
| final Set<String> finishedSplits = new HashSet<>(); |
| final Consumer<Collection<String>> splitFinishedHook = finishedSplits::addAll; |
| |
| try (final KafkaSourceReader<Integer> reader = |
| (KafkaSourceReader<Integer>) |
| createReader( |
| Boundedness.BOUNDED, |
| "KafkaSourceReaderTestGroup", |
| new TestingReaderContext(), |
| splitFinishedHook)) { |
| reader.addSplits(Arrays.asList(normalSplit, emptySplit)); |
| pollUntil( |
| reader, |
| new TestingReaderOutput<>(), |
| () -> reader.getNumAliveFetchers() == 0, |
| "The split fetcher did not exit before timeout."); |
| MatcherAssert.assertThat( |
| finishedSplits, |
| Matchers.containsInAnyOrder( |
| KafkaPartitionSplit.toSplitId(normalSplit.getTopicPartition()), |
| KafkaPartitionSplit.toSplitId(emptySplit.getTopicPartition()))); |
| } |
| } |
| |
| // ------------------------------------------ |
| |
| @Override |
| protected SourceReader<Integer, KafkaPartitionSplit> createReader() throws Exception { |
| return createReader(Boundedness.BOUNDED, "KafkaSourceReaderTestGroup"); |
| } |
| |
| @Override |
| protected List<KafkaPartitionSplit> getSplits( |
| int numSplits, int numRecordsPerSplit, Boundedness boundedness) { |
| List<KafkaPartitionSplit> splits = new ArrayList<>(); |
| for (int i = 0; i < numSplits; i++) { |
| splits.add(getSplit(i, numRecordsPerSplit, boundedness)); |
| } |
| return splits; |
| } |
| |
| @Override |
| protected KafkaPartitionSplit getSplit(int splitId, int numRecords, Boundedness boundedness) { |
| long stoppingOffset = |
| boundedness == Boundedness.BOUNDED |
| ? NUM_RECORDS_PER_SPLIT |
| : KafkaPartitionSplit.NO_STOPPING_OFFSET; |
| return new KafkaPartitionSplit(new TopicPartition(TOPIC, splitId), 0L, stoppingOffset); |
| } |
| |
| @Override |
| protected long getNextRecordIndex(KafkaPartitionSplit split) { |
| return split.getStartingOffset(); |
| } |
| |
| // --------------------- |
| |
| private SourceReader<Integer, KafkaPartitionSplit> createReader( |
| Boundedness boundedness, String groupId) throws Exception { |
| return createReader(boundedness, groupId, new TestingReaderContext(), (ignore) -> {}); |
| } |
| |
| private SourceReader<Integer, KafkaPartitionSplit> createReader( |
| Boundedness boundedness, String groupId, MetricGroup metricGroup) throws Exception { |
| return createReader( |
| boundedness, |
| groupId, |
| new TestingReaderContext( |
| new Configuration(), InternalSourceReaderMetricGroup.mock(metricGroup)), |
| (ignore) -> {}); |
| } |
| |
| private SourceReader<Integer, KafkaPartitionSplit> createReader( |
| Boundedness boundedness, |
| String groupId, |
| SourceReaderContext context, |
| Consumer<Collection<String>> splitFinishedHook) |
| throws Exception { |
| Properties properties = new Properties(); |
| properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); |
| return createReader(boundedness, context, splitFinishedHook, properties); |
| } |
| |
| private SourceReader<Integer, KafkaPartitionSplit> createReader( |
| Boundedness boundedness, |
| SourceReaderContext context, |
| Consumer<Collection<String>> splitFinishedHook, |
| Properties props) |
| throws Exception { |
| KafkaSourceBuilder<Integer> builder = |
| KafkaSource.<Integer>builder() |
| .setClientIdPrefix("KafkaSourceReaderTest") |
| .setDeserializer( |
| KafkaRecordDeserializationSchema.valueOnly( |
| IntegerDeserializer.class)) |
| .setPartitions(Collections.singleton(new TopicPartition("AnyTopic", 0))) |
| .setProperty( |
| ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, |
| KafkaSourceTestEnv.brokerConnectionStrings) |
| .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") |
| .setProperties(props); |
| if (boundedness == Boundedness.BOUNDED) { |
| builder.setBounded(OffsetsInitializer.latest()); |
| } |
| |
| return KafkaSourceTestUtils.createReaderWithFinishedSplitHook( |
| builder.build(), context, splitFinishedHook); |
| } |
| |
| private void pollUntil( |
| KafkaSourceReader<Integer> reader, |
| ReaderOutput<Integer> output, |
| Supplier<Boolean> condition, |
| String errorMessage) |
| throws Exception { |
| waitUtil( |
| () -> { |
| try { |
| reader.pollNext(output); |
| } catch (Exception exception) { |
| throw new RuntimeException( |
| "Caught unexpected exception when polling from the reader", |
| exception); |
| } |
| return condition.get(); |
| }, |
| Duration.ofSeconds(60), |
| errorMessage); |
| } |
| |
| private long getKafkaConsumerMetric(String name, MetricListener listener) { |
| final Optional<Gauge<Object>> kafkaConsumerGauge = |
| listener.getGauge( |
| KAFKA_SOURCE_READER_METRIC_GROUP, KAFKA_CONSUMER_METRIC_GROUP, name); |
| assertTrue(kafkaConsumerGauge.isPresent()); |
| return ((Double) kafkaConsumerGauge.get().getValue()).longValue(); |
| } |
| |
| private long getCurrentOffsetMetric(TopicPartition tp, MetricListener listener) { |
| final Optional<Gauge<Object>> currentOffsetGauge = |
| listener.getGauge( |
| KAFKA_SOURCE_READER_METRIC_GROUP, |
| TOPIC_GROUP, |
| tp.topic(), |
| PARTITION_GROUP, |
| String.valueOf(tp.partition()), |
| CURRENT_OFFSET_METRIC_GAUGE); |
| assertTrue(currentOffsetGauge.isPresent()); |
| return (long) currentOffsetGauge.get().getValue(); |
| } |
| |
| private long getCommittedOffsetMetric(TopicPartition tp, MetricListener listener) { |
| final Optional<Gauge<Object>> committedOffsetGauge = |
| listener.getGauge( |
| KAFKA_SOURCE_READER_METRIC_GROUP, |
| TOPIC_GROUP, |
| tp.topic(), |
| PARTITION_GROUP, |
| String.valueOf(tp.partition()), |
| COMMITTED_OFFSET_METRIC_GAUGE); |
| assertTrue(committedOffsetGauge.isPresent()); |
| return (long) committedOffsetGauge.get().getValue(); |
| } |
| |
| // --------------------- |
| |
| private static List<ProducerRecord<String, Integer>> getRecords() { |
| List<ProducerRecord<String, Integer>> records = new ArrayList<>(); |
| for (int part = 0; part < NUM_PARTITIONS; part++) { |
| for (int i = 0; i < NUM_RECORDS_PER_SPLIT; i++) { |
| records.add( |
| new ProducerRecord<>( |
| TOPIC, part, TOPIC + "-" + part, part * NUM_RECORDS_PER_SPLIT + i)); |
| } |
| } |
| return records; |
| } |
| } |