blob: bb2b65623d4ae236c0c567e7377d9208af357dfa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.utils;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang.mutable.MutableInt;
import org.junit.Test;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.SSTable;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.streaming.SSTableInputStream;
import org.apache.cassandra.spark.utils.streaming.SSTableSource;
import org.apache.cassandra.spark.utils.streaming.StreamBuffer;
import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
import org.jetbrains.annotations.Nullable;
import static org.apache.cassandra.spark.utils.streaming.SSTableInputStream.timeoutLeftNanos;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test the {@link SSTableInputStream} by mocking the {@link SSTableSource}
*/
public class SSTableInputStreamTests
{
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
private static final ExecutorService EXECUTOR =
Executors.newFixedThreadPool(4, new ThreadFactoryBuilder().setNameFormat("sstable-tests-%d")
.setDaemon(true)
.build());
static final int DEFAULT_CHUNK_SIZE = 8192;
static final Stats STATS = Stats.DoNothingStats.INSTANCE;
// Mocked Tests
@Test
public void testMockedClient() throws IOException
{
runMockedTest(1, 1, DEFAULT_CHUNK_SIZE);
runMockedTest(1, 5, DEFAULT_CHUNK_SIZE * 5);
runMockedTest(10, 10, SSTableSource.DEFAULT_MAX_BUFFER_SIZE);
runMockedTest(20, 1024, 33554400L);
runMockedTest(10, 10, DEFAULT_CHUNK_SIZE * 10);
}
private interface SSTableRequest
{
void request(long start, long end, StreamConsumer consumer);
}
private static SSTableSource<SSTable> buildSource(long size,
Long maxBufferSize,
Long requestChunkSize,
SSTableRequest request,
Duration duration)
{
return new SSTableSource<SSTable>()
{
public void request(long start, long end, StreamConsumer consumer)
{
request.request(start, end, consumer);
}
public SSTable sstable()
{
return null;
}
public FileType fileType()
{
return null;
}
public long size()
{
return size;
}
public long maxBufferSize()
{
return maxBufferSize != null ? maxBufferSize : SSTableSource.DEFAULT_MAX_BUFFER_SIZE;
}
public long chunkBufferSize()
{
return requestChunkSize != null ? requestChunkSize : SSTableSource.DEFAULT_CHUNK_BUFFER_SIZE;
}
public Duration timeout()
{
return duration;
}
};
}
// Test SSTableInputStream using mocked SSTableSource
private void runMockedTest(int numRequests, int chunksPerRequest, long maxBufferSize) throws IOException
{
long requestChunkSize = (long) DEFAULT_CHUNK_SIZE * chunksPerRequest;
long fileSize = requestChunkSize * (long) numRequests;
AtomicInteger requestCount = new AtomicInteger(0);
SSTableSource<SSTable> mockedClient = buildSource(fileSize,
maxBufferSize,
requestChunkSize,
(start, end, consumer) -> {
requestCount.incrementAndGet();
writeBuffers(consumer, randomBuffers(chunksPerRequest));
}, null);
SSTableInputStream<SSTable> is = new SSTableInputStream<>(mockedClient, STATS);
readStreamFully(is);
assertEquals(numRequests, requestCount.get());
assertEquals(0L, is.bytesBuffered());
assertEquals(fileSize, is.bytesWritten());
assertEquals(fileSize, is.bytesRead());
}
@Test(expected = IOException.class)
public void testFailure() throws IOException
{
int chunksPerRequest = 10;
int numRequests = 10;
long length = SSTableSource.DEFAULT_CHUNK_BUFFER_SIZE * chunksPerRequest * numRequests;
AtomicInteger count = new AtomicInteger(0);
SSTableSource<SSTable> source = buildSource(length,
SSTableSource.DEFAULT_MAX_BUFFER_SIZE,
SSTableSource.DEFAULT_CHUNK_BUFFER_SIZE,
(start, end, consumer) -> {
if (count.incrementAndGet() > (numRequests / 2))
{
// Halfway through throw random exception
EXECUTOR.submit(() -> consumer.onError(new RuntimeException("Something bad happened...")));
}
else
{
writeBuffers(consumer, randomBuffers(chunksPerRequest));
}
}, null);
readStreamFully(new SSTableInputStream<>(source, STATS));
fail("Should have failed with IOException");
}
@Test
public void testTimeout()
{
long now = System.nanoTime();
assertEquals(Duration.ofMillis(100).toNanos(),
timeoutLeftNanos(Duration.ofMillis(1000), now, now - Duration.ofMillis(900).toNanos()));
assertEquals(Duration.ofMillis(-500).toNanos(),
timeoutLeftNanos(Duration.ofMillis(1000), now, now - Duration.ofMillis(1500).toNanos()));
assertEquals(Duration.ofMillis(995).toNanos(),
timeoutLeftNanos(Duration.ofMillis(1000), now, now - Duration.ofMillis(5).toNanos()));
assertEquals(Duration.ofMillis(1000).toNanos(),
timeoutLeftNanos(Duration.ofMillis(1000), now, now - Duration.ofMillis(0).toNanos()));
assertEquals(Duration.ofMillis(1000).toNanos(),
timeoutLeftNanos(Duration.ofMillis(1000), now, now + Duration.ofMillis(500).toNanos()));
assertEquals(Duration.ofMillis(35000).toNanos(),
timeoutLeftNanos(Duration.ofMillis(60000), now, now - Duration.ofMillis(25000).toNanos()));
assertEquals(Duration.ofMillis(-5000).toNanos(),
timeoutLeftNanos(Duration.ofMillis(60000), now, now - Duration.ofMillis(65000).toNanos()));
assertEquals(Duration.ofMillis(0).toNanos(),
timeoutLeftNanos(Duration.ofMillis(60000), now, now - Duration.ofMillis(60000).toNanos()));
}
@Test
@SuppressWarnings("UnstableApiUsage")
public void testTimeoutShouldAccountForActivityTime()
{
int chunksPerRequest = 10;
int numRequests = 10;
long length = SSTableSource.DEFAULT_CHUNK_BUFFER_SIZE * chunksPerRequest * numRequests;
AtomicInteger count = new AtomicInteger(0);
Duration timeout = Duration.ofMillis(1000);
long startTime = System.nanoTime();
long sleepTimeInMillis = 100L;
SSTableSource<SSTable> source = buildSource(length,
SSTableSource.DEFAULT_MAX_BUFFER_SIZE,
SSTableSource.DEFAULT_CHUNK_BUFFER_SIZE,
(start, end, consumer) -> {
// Only respond once so future requests will time out
if (count.incrementAndGet() == 1)
{
EXECUTOR.submit(() -> {
Uninterruptibles.sleepUninterruptibly(sleepTimeInMillis, TimeUnit.MILLISECONDS);
writeBuffers(consumer, randomBuffers(chunksPerRequest));
});
}
}, timeout);
try
{
readStreamFully(new SSTableInputStream<>(source, STATS));
fail("Should not reach here, should throw TimeoutException");
}
catch (IOException exception)
{
assertTrue(exception.getCause() instanceof TimeoutException);
}
Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
Duration maxAcceptable = timeout.plus(Duration.ofMillis(sleepTimeInMillis));
assertTrue("Timeout didn't account for activity time. "
+ "Took " + duration.toMillis() + "ms should have taken at most " + maxAcceptable.toMillis() + "ms",
duration.minus(maxAcceptable).toMillis() < 100);
}
@Test
public void testSkipOnInit() throws IOException
{
int size = 20971520;
int chunkSize = 1024;
int numChunks = 16;
MutableInt bytesRead = new MutableInt(0);
MutableInt count = new MutableInt(0);
SSTableSource<SSTable> source = new SSTableSource<SSTable>()
{
@Override
public void request(long start, long end, StreamConsumer consumer)
{
assertNotEquals(0, start);
int length = (int) (end - start + 1);
consumer.onRead(randomBuffer(length));
bytesRead.add(length);
count.increment();
consumer.onEnd();
}
@Override
public long chunkBufferSize()
{
return chunkSize;
}
@Override
public SSTable sstable()
{
return null;
}
@Override
public FileType fileType()
{
return FileType.INDEX;
}
@Override
public long size()
{
return size;
}
@Override
@Nullable
public Duration timeout()
{
return Duration.ofSeconds(5);
}
};
int bytesToRead = chunkSize * numChunks;
long skipAhead = size - bytesToRead + 1;
try (SSTableInputStream<SSTable> stream = new SSTableInputStream<>(source, STATS))
{
// Skip ahead so we only read the final chunks
ByteBufferUtils.skipFully(stream, skipAhead);
readStreamFully(stream);
}
// Verify we only read final chunks and not the start of the file
assertEquals(bytesToRead, bytesRead.intValue());
assertEquals(numChunks, count.intValue());
}
@Test
public void testSkipToEnd() throws IOException
{
SSTableSource<SSTable> source = new SSTableSource<SSTable>()
{
@Override
public void request(long start, long end, StreamConsumer consumer)
{
consumer.onRead(randomBuffer((int) (end - start + 1)));
consumer.onEnd();
}
@Override
public SSTable sstable()
{
return null;
}
@Override
public FileType fileType()
{
return FileType.INDEX;
}
@Override
public long size()
{
return 20971520;
}
@Override
@Nullable
public Duration timeout()
{
return Duration.ofSeconds(5);
}
};
try (SSTableInputStream<SSTable> stream = new SSTableInputStream<>(source, STATS))
{
ByteBufferUtils.skipFully(stream, 20971520);
readStreamFully(stream);
}
}
// Utils
private static ImmutableList<StreamBuffer> randomBuffers(int count)
{
return ImmutableList.copyOf(IntStream.range(0, count)
.mapToObj(buffer -> randomBuffer())
.collect(Collectors.toList()));
}
private static StreamBuffer randomBuffer()
{
return randomBuffer(DEFAULT_CHUNK_SIZE);
}
private static StreamBuffer randomBuffer(int size)
{
return StreamBuffer.wrap(RandomUtils.randomBytes(size));
}
@SuppressWarnings("StatementWithEmptyBody")
private static void readStreamFully(SSTableInputStream<SSTable> inputStream) throws IOException
{
try (SSTableInputStream<SSTable> in = inputStream)
{
while (in.read() >= 0)
{
// CHECKSTYLE IGNORE: Do nothing
}
}
}
private static void writeBuffers(StreamConsumer consumer, ImmutableList<StreamBuffer> buffers)
{
if (buffers.isEmpty())
{
// No more buffers so finished
consumer.onEnd();
return;
}
SCHEDULER.schedule(() -> {
EXECUTOR.submit(() -> {
// Write next buffer to StreamConsumer
consumer.onRead(buffers.get(0));
writeBuffers(consumer, buffers.subList(1, buffers.size()));
});
}, RandomUtils.RANDOM.nextInt(50), TimeUnit.MICROSECONDS); // Inject random latency
}
}