| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.cassandra.spark.bulkwriter; |
| |
| import java.math.BigInteger; |
| import java.nio.file.Path; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| import java.util.stream.Stream; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterators; |
| import com.google.common.collect.Range; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.io.TempDir; |
| |
| import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; |
| import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; |
| import org.apache.cassandra.spark.common.model.CassandraInstance; |
| import org.apache.cassandra.spark.data.CqlField; |
| import org.apache.cassandra.spark.utils.DigestAlgorithm; |
| import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm; |
| import org.apache.spark.sql.types.DataType; |
| import org.apache.spark.sql.types.DataTypes; |
| import org.apache.spark.sql.types.StructType; |
| import org.mockito.Mockito; |
| import scala.Tuple2; |
| |
| import static org.apache.cassandra.spark.bulkwriter.MockBulkWriterContext.DEFAULT_CASSANDRA_VERSION; |
| import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE; |
| import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT; |
| import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.VARCHAR; |
| import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCqlType; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.containsString; |
| import static org.hamcrest.Matchers.empty; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.startsWith; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyInt; |
| import static org.mockito.Mockito.when; |
| |
| class RecordWriterTest |
| { |
| private static final int REPLICA_COUNT = 3; |
| private static final int FILES_PER_SSTABLE = 8; |
| // writing 270 rows with sstable size cap of 1 MB should produce 2 sstable |
| private static final int UPLOADED_SSTABLES = 2; |
| private static final int ROWS_COUNT = 270; |
| private static final String[] COLUMN_NAMES = { |
| "id", "date", "course", "marks" |
| }; |
| |
| @TempDir |
| private Path folder; |
| |
| private TokenRangeMapping<RingInstance> tokenRangeMapping; |
| private RecordWriter rw; |
| private MockTableWriter tw; |
| private Tokenizer tokenizer; |
| private Range<BigInteger> range; |
| private MockBulkWriterContext writerContext; |
| private TestTaskContext tc; |
| private DigestAlgorithm digestAlgorithm; |
| |
| @BeforeEach |
| public void setUp() |
| { |
| digestAlgorithm = new XXHash32DigestAlgorithm(); |
| tw = new MockTableWriter(folder.getRoot()); |
| tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 3), 12); |
| writerContext = new MockBulkWriterContext(tokenRangeMapping); |
| writerContext.setSstableDataSizeInMB(1); // defaults to the minimum sstable data size allowed to set |
| tc = new TestTaskContext(); |
| range = writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId()); |
| tokenizer = new Tokenizer(writerContext); |
| } |
| |
| @Test |
| void symmetricDifferenceTest() |
| { |
| Set<Integer> s1 = new HashSet<>(Arrays.asList(1, 2, 3)); |
| Set<Integer> s2 = new HashSet<>(Arrays.asList(2, 3, 4)); |
| Set<Integer> s3 = new HashSet<>(Arrays.asList(5, 6, 7)); |
| Set<Integer> s4 = new HashSet<>(Arrays.asList(5, 6, 7)); |
| Set<Integer> s5 = new HashSet<>(); |
| assertThat(RecordWriter.symmetricDifference(s1, s2), is(new HashSet<>(Arrays.asList(1, 4)))); |
| assertThat(RecordWriter.symmetricDifference(s2, s3), is(new HashSet<>(Arrays.asList(2, 3, 4, 5, 6, 7)))); |
| assertThat(RecordWriter.symmetricDifference(s3, s4), empty()); |
| assertThat(RecordWriter.symmetricDifference(s4, s5), is(s4)); |
| } |
| |
| @Test |
| void testWriteFailWhenTopologyChangeWithinTask() |
| { |
| // Generate token range mapping to simulate node movement of the first node by assigning it a different token |
| // within the same partition |
| int moveTargetToken = 50000; |
| TokenRangeMapping<RingInstance> testMapping = |
| TokenRangeMappingUtils.buildTokenRangeMapping(100000, |
| ImmutableMap.of("DC1", 3), |
| 12, |
| true, |
| moveTargetToken); |
| |
| MockBulkWriterContext m = Mockito.spy(writerContext); |
| rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| |
| when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); |
| assertThat(ex.getMessage(), containsString("Token range mappings have changed since the task started")); |
| } |
| |
| @Test |
| void testWriteWithBlockedInstances() |
| { |
| |
| String blockedInstanceIp = "127.0.0.2"; |
| TokenRangeMapping<RingInstance> testMapping = |
| TokenRangeMappingUtils.buildTokenRangeMappingWithBlockedInstance(100000, |
| ImmutableMap.of("DC1", 3), |
| 3, |
| blockedInstanceIp); |
| |
| Set<RingInstance> instances = testMapping.getTokenRanges().keySet(); |
| Map<RingInstance, InstanceAvailability> availability |
| = instances.stream() |
| .collect(Collectors.toMap(Function.identity(), |
| i -> (i.ipAddress().equals(blockedInstanceIp)) ? |
| InstanceAvailability.UNAVAILABLE_BLOCKED : |
| InstanceAvailability.AVAILABLE)); |
| |
| writerContext = new MockBulkWriterContext(tokenRangeMapping, DEFAULT_CASSANDRA_VERSION, ConsistencyLevel.CL.QUORUM); |
| MockBulkWriterContext m = Mockito.spy(writerContext); |
| rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| |
| when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); |
| when(m.getInstanceAvailability()).thenReturn(availability); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| rw.write(data); |
| Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); |
| // Should not upload to blocked instances |
| assertThat(uploads.keySet().size(), is(REPLICA_COUNT - 1)); |
| assertFalse(uploads.keySet().stream().map(CassandraInstance::ipAddress).collect(Collectors.toSet()).contains(blockedInstanceIp)); |
| } |
| |
| @Test |
| void testWriteWithExclusions() |
| { |
| TokenRangeMapping<RingInstance> testMapping = |
| TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000, |
| ImmutableMap.of("DC1", 3), |
| 12); |
| |
| MockBulkWriterContext m = Mockito.spy(writerContext); |
| rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| |
| when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); |
| when(m.getInstanceAvailability()).thenCallRealMethod(); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| rw.write(data); |
| Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); |
| assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas |
| } |
| |
| @Test |
| void testSuccessfulWrite() throws InterruptedException |
| { |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); |
| } |
| |
| @Test |
| void testWriteWithMixedCaseColumnNames() throws InterruptedException |
| { |
| boolean quoteIdentifiers = true; |
| String[] pk = {"ID", "date"}; |
| String[] columnNames = {"ID", "date", "course", "limit"}; |
| |
| Pair<StructType, ImmutableMap<String, CqlField.CqlType>> validPair = TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns( |
| columnNames, |
| new DataType[]{DataTypes.IntegerType, DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType}, |
| new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); |
| |
| MockBulkWriterContext writerContext = new MockBulkWriterContext(tokenRangeMapping, |
| DEFAULT_CASSANDRA_VERSION, |
| ConsistencyLevel.CL.LOCAL_QUORUM, |
| validPair, |
| pk, |
| pk, |
| quoteIdentifiers); |
| writerContext.setSstableDataSizeInMB(1); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| validateSuccessfulWrite(writerContext, data, columnNames); |
| } |
| |
| @Test |
| void testSuccessfulWriteCheckUploads() |
| { |
| rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| rw.write(data); |
| Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); |
| assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas |
| assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_SSTABLES)); |
| List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); |
| for (UploadRequest ur : requests) |
| { |
| assertNotNull(ur.digest); |
| } |
| } |
| |
| @Test |
| void testWriteWithConstantTTL() throws InterruptedException |
| { |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, false); |
| validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); |
| } |
| |
| @Test |
| void testWriteWithTTLColumn() throws InterruptedException |
| { |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, false); |
| String[] columnNamesWithTtl = |
| { |
| "id", "date", "course", "marks", "ttl" |
| }; |
| validateSuccessfulWrite(writerContext, data, columnNamesWithTtl); |
| } |
| |
| @Test |
| void testWriteWithConstantTimestamp() throws InterruptedException |
| { |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, false); |
| validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); |
| } |
| |
| @Test |
| void testWriteWithTimestampColumn() throws InterruptedException |
| { |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, true); |
| String[] columnNamesWithTimestamp = |
| { |
| "id", "date", "course", "marks", "timestamp" |
| }; |
| validateSuccessfulWrite(writerContext, data, columnNamesWithTimestamp); |
| } |
| |
| @Test |
| void testWriteWithTimestampAndTTLColumn() throws InterruptedException |
| { |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, true); |
| String[] columnNames = |
| { |
| "id", "date", "course", "marks", "ttl", "timestamp" |
| }; |
| validateSuccessfulWrite(writerContext, data, columnNames); |
| } |
| |
| @Test |
| void testWriteWithSubRanges() |
| { |
| MockBulkWriterContext m = Mockito.spy(writerContext); |
| TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class); |
| when(m.job().getTokenPartitioner()).thenReturn(mtp); |
| |
| // Override partition's token range to span across ranges to force a split into sub-ranges |
| Range<BigInteger> overlapRange = Range.openClosed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(200000)); |
| when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange); |
| |
| rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateDataWithFakeToken(ROWS_COUNT, range); |
| List<StreamResult> res = rw.write(data).streamResults(); |
| assertEquals(1, res.size()); |
| assertNotEquals(overlapRange, res.get(0).tokenRange); |
| assertEquals(range, res.get(0).tokenRange); |
| Map<CassandraInstance, List<UploadRequest>> uploads = m.getUploads(); |
| // Should upload to 3 replicas |
| assertEquals(REPLICA_COUNT, uploads.keySet().size()); |
| assertEquals(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_SSTABLES, uploads.values().stream().mapToInt(List::size).sum()); |
| List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); |
| for (UploadRequest ur : requests) |
| { |
| assertNotNull(ur.digest); |
| } |
| } |
| |
| @Test |
| void testWriteWithDataInMultipleSubRanges() |
| { |
| MockBulkWriterContext m = Mockito.spy(writerContext); |
| TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class); |
| when(m.job().getTokenPartitioner()).thenReturn(mtp); |
| // Override partition's token range to span across ranges to force a split into sub-ranges |
| Range<BigInteger> overlapRange = Range.openClosed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(200000)); |
| Range<BigInteger> firstSubrange = Range.openClosed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(100000)); |
| when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange); |
| rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| |
| // Generate rows that belong to the first sub-range |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateDataWithFakeToken(ROWS_COUNT, firstSubrange); |
| List<StreamResult> res = rw.write(data).streamResults(); |
| assertEquals(1, res.size()); |
| assertNotEquals(overlapRange, res.get(0).tokenRange); |
| assertEquals(firstSubrange, res.get(0).tokenRange); |
| Map<CassandraInstance, List<UploadRequest>> uploads = m.getUploads(); |
| // Should upload to 3 replicas |
| assertEquals(REPLICA_COUNT, uploads.keySet().size()); |
| assertEquals(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_SSTABLES, |
| uploads.values().stream().mapToInt(List::size).sum()); |
| List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); |
| for (UploadRequest ur : requests) |
| { |
| assertNotNull(ur.digest); |
| } |
| } |
| |
| @Test |
| void testWriteWithTokensAcrossSubRanges() |
| { |
| MockBulkWriterContext m = Mockito.spy(writerContext); |
| m.setSstableDataSizeInMB(1); |
| TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class); |
| when(m.job().getTokenPartitioner()).thenReturn(mtp); |
| // Override partition's token range to span across ranges to force a split into sub-ranges |
| Range<BigInteger> overlapRange = Range.openClosed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(200000L)); |
| Range<BigInteger> firstSubrange = Range.openClosed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(100000L)); |
| Range<BigInteger> secondSubrange = Range.openClosed(BigInteger.valueOf(100000L), BigInteger.valueOf(200000L)); |
| when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange); |
| rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| int numRows = 1; // generate 1 row in each range |
| Iterator<Tuple2<DecoratedKey, Object[]>> firstRangeData = generateDataWithFakeToken(numRows, firstSubrange); |
| Iterator<Tuple2<DecoratedKey, Object[]>> secondRangeData = generateDataWithFakeToken(numRows, secondSubrange); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = Iterators.concat(firstRangeData, secondRangeData); |
| List<StreamResult> res = rw.write(data).streamResults(); |
| // We expect 2 streams since rows belong to different sub-ranges |
| assertEquals(2, res.size()); |
| assertNotEquals(overlapRange, res.get(0).tokenRange); |
| final Map<CassandraInstance, List<UploadRequest>> uploads = m.getUploads(); |
| // Should upload to 3 replicas |
| assertEquals((REPLICA_COUNT + 1), uploads.keySet().size()); |
| |
| // There are a total of 2 SSTable files - One for each sub-range |
| // Although the replica-sets for each file were different they will still be 3 for each subrange |
| assertEquals(REPLICA_COUNT * FILES_PER_SSTABLE * 2, uploads.values().stream().mapToInt(List::size).sum()); |
| List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); |
| for (UploadRequest ur : requests) |
| { |
| assertNotNull(ur.digest); |
| } |
| } |
| |
| @Test |
| void testCorruptSSTable() |
| { |
| rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, |
| (wc, path, dp) -> new SortedSSTableWriter(tw.setOutDir(path), path, digestAlgorithm)); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| // TODO: Add better error handling with human-readable exception messages in SSTableReader::new |
| // That way we can assert on the exception thrown here |
| RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); |
| } |
| |
| @Test |
| void testWriteWithOutOfRangeTokenFails() |
| { |
| rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, |
| (wc, path, dp) -> new SortedSSTableWriter(tw, folder, digestAlgorithm)); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, Range.all(), false, false, false); |
| RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); |
| String expectedErr = "java.lang.IllegalStateException: Received Token " + |
| "5765203080415074583 outside the expected ranges [(-9223372036854775808‥100000]]"; |
| assertEquals(expectedErr, ex.getMessage()); |
| } |
| |
| @Test |
| void testAddRowThrowingFails() |
| { |
| rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, |
| (wc, path, dp) -> new SortedSSTableWriter(tw, folder, digestAlgorithm)); |
| tw.setAddRowThrows(true); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); |
| assertEquals("java.lang.RuntimeException: Failed to write because addRow throws", ex.getMessage()); |
| } |
| |
| @Test |
| void testBadTimeSkewFails() |
| { |
| // Mock context returns a 60-minute allowable time skew, so we use something just outside the limits |
| long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61); |
| rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, |
| (wc, path, dp) -> new SortedSSTableWriter(tw, folder, digestAlgorithm)); |
| writerContext.setTimeProvider(() -> System.currentTimeMillis() - sixtyOneMinutesInMillis); |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); |
| assertThat(ex.getMessage(), startsWith("Time skew between Spark and Cassandra is too large. Allowable skew is 60 minutes. Spark executor time is ")); |
| } |
| |
| @Test |
| void testTimeSkewWithinLimitsSucceeds() |
| { |
| // Mock context returns a 60-minute allowable time skew, so we use something just inside the limits |
| long fiftyNineMinutesInMillis = TimeUnit.MINUTES.toMillis(59); |
| long remoteTime = System.currentTimeMillis() - fiftyNineMinutesInMillis; |
| rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SortedSSTableWriter::new); |
| writerContext.setTimeProvider(() -> remoteTime); // Return a very low "current time" to make sure we fail if skew is too bad |
| Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); |
| rw.write(data); |
| } |
| |
| private void validateSuccessfulWrite(MockBulkWriterContext writerContext, |
| Iterator<Tuple2<DecoratedKey, Object[]>> data, |
| String[] columnNames) throws InterruptedException |
| { |
| validateSuccessfulWrite(writerContext, |
| data, |
| columnNames, |
| REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_SSTABLES, |
| new CountDownLatch(0)); |
| } |
| |
| private void validateSuccessfulWrite(MockBulkWriterContext writerContext, |
| Iterator<Tuple2<DecoratedKey, Object[]>> data, |
| String[] columnNames, |
| int expectedUploads, |
| CountDownLatch uploadsLatch) throws InterruptedException |
| { |
| RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SortedSSTableWriter::new); |
| rw.write(data); |
| |
| uploadsLatch.await(1, TimeUnit.SECONDS); |
| Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); |
| assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas |
| assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(expectedUploads)); |
| List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); |
| for (UploadRequest ur : requests) |
| { |
| assertNotNull(ur.digest); |
| } |
| } |
| |
| private Iterator<Tuple2<DecoratedKey, Object[]>> generateData() |
| { |
| return generateData(false, false); |
| } |
| |
| private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean withTTL, boolean withTimestamp) |
| { |
| return generateData(ROWS_COUNT, range, false, withTTL, withTimestamp); |
| } |
| |
| // generate data with fake tokens assigend. The fake tokens are provided by the input range. |
| // Although the data generated have fake tokens, the actual tokens computed from each tuple |
| // are still ordered ascendingly. |
| private Iterator<Tuple2<DecoratedKey, Object[]>> generateDataWithFakeToken(int numValues, Range<BigInteger> range) |
| { |
| return generateData(numValues, range, /* fake tokens */ true, /* ttl */ false, /* timestamp */ false); |
| } |
| |
| /** |
| * Generate test data |
| * @param rowsCount number of rows to generate; the rows are sorted |
| * @param tokenRange token range that the generated rows belong to |
| * @param fakeTokens whether to fake tokens of each row. The acutal tokens of each row are still sorted |
| * @param withTTL wthether to add the TTL |
| * @param withTimestamp whether to add the timestamp |
| * @return iterator of the test rows |
| */ |
| private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int rowsCount, |
| Range<BigInteger> tokenRange, |
| boolean fakeTokens, |
| boolean withTTL, boolean withTimestamp) |
| { |
| String courseString = IntStream.range(0, 100000).boxed().map(i -> "Long long string").collect(Collectors.joining()); |
| Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0, integer -> integer + 1).mapToObj(index -> { |
| Object[] columns; |
| if (withTTL && withTimestamp) |
| { |
| columns = new Object[] |
| { |
| index, index, courseString, index, index * 100, System.currentTimeMillis() * 1000 |
| }; |
| } |
| else if (withTimestamp) |
| { |
| columns = new Object[] |
| { |
| index, index, courseString, index, System.currentTimeMillis() * 1000 |
| }; |
| } |
| else if (withTTL) |
| { |
| columns = new Object[] |
| { |
| index, index, courseString, index, index * 100 |
| }; |
| } |
| else |
| { |
| columns = new Object[] |
| { |
| index, index, courseString, index |
| }; |
| } |
| return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns); |
| }); |
| |
| // filter based on the actual token if the test does not want to fake tokens |
| if (!fakeTokens) |
| { |
| source = source.filter(val -> tokenRange.contains(val._1.getToken())); |
| } |
| |
| Stream<Tuple2<DecoratedKey, Object[]>> limitedStream = source.limit(rowsCount); |
| |
| List<Tuple2<DecoratedKey, Object[]>> sortedData = limitedStream.sorted(Comparator.comparing(tuple -> tuple._1().getToken())) |
| .collect(Collectors.toList()); |
| |
| if (fakeTokens) |
| { |
| // update each tuple with fake token assigned |
| // bytebuffer keys in the list are still ordered |
| BigInteger currentToken = tokenRange.lowerEndpoint().add(BigInteger.ONE); // lower end is open; thus plus 1 |
| for (int i = 0; i < sortedData.size(); i++, currentToken = currentToken.add(BigInteger.ONE)) |
| { |
| Tuple2<DecoratedKey, Object[]> item = sortedData.get(i); |
| DecoratedKey key = item._1(); |
| item = Tuple2.apply(new DecoratedKey(currentToken, key.getKey()), item._2()); |
| sortedData.set(i, item); |
| } |
| } |
| |
| return sortedData.iterator(); |
| } |
| } |