blob: 922d89ea76fdb3214796e6c4447aafaefd089076 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.util.Arrays;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class TestAppendOnlyStreamReader extends TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestAppendOnlyStreamReader.class);
@Rule
public TestName testNames = new TestName();
// Simple test subroutine writes some records, reads some back, skips ahead, skips back.
public void skipForwardThenSkipBack(String name, DistributedLogConfiguration conf) throws Exception {
DistributedLogManager dlmwrite = createNewDLM(conf, name);
DistributedLogManager dlmreader = createNewDLM(conf, name);
long txid = 1;
AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
writer.write(DLMTestUtil.repeatString("def", 5).getBytes());
writer.write(DLMTestUtil.repeatString("def", 5).getBytes());
writer.write(DLMTestUtil.repeatString("ghi", 5).getBytes());
writer.write(DLMTestUtil.repeatString("ghi", 5).getBytes());
writer.force(false);
writer.close();
AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
byte[] bytesIn = new byte[30];
byte[] bytes1 = DLMTestUtil.repeatString("abc", 10).getBytes();
byte[] bytes2 = DLMTestUtil.repeatString("def", 10).getBytes();
byte[] bytes3 = DLMTestUtil.repeatString("ghi", 10).getBytes();
int read = reader.read(bytesIn, 0, 30);
assertEquals(30, read);
assertTrue(Arrays.equals(bytes1, bytesIn));
reader.skipTo(60);
read = reader.read(bytesIn, 0, 30);
assertEquals(30, read);
assertTrue(Arrays.equals(bytes3, bytesIn));
reader.skipTo(30);
read = reader.read(bytesIn, 0, 30);
assertEquals(30, read);
assertTrue(Arrays.equals(bytes2, bytesIn));
}
@Test(timeout = 60000)
public void testSkipToSkipsBytesWithImmediateFlush() throws Exception {
String name = testNames.getMethodName();
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.loadConf(conf);
confLocal.setImmediateFlushEnabled(true);
confLocal.setOutputBufferSize(0);
skipForwardThenSkipBack(name, confLocal);
}
@Test(timeout = 60000)
public void testSkipToSkipsBytesWithLargerLogRecords() throws Exception {
String name = testNames.getMethodName();
DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
confLocal.loadConf(conf);
confLocal.setImmediateFlushEnabled(false);
confLocal.setOutputBufferSize(1024*100);
confLocal.setPeriodicFlushFrequencyMilliSeconds(1000*60);
skipForwardThenSkipBack(name, confLocal);
}
@Test(timeout = 60000)
public void testSkipToSkipsBytesUntilEndOfStream() throws Exception {
String name = testNames.getMethodName();
DistributedLogManager dlmwrite = createNewDLM(conf, name);
DistributedLogManager dlmreader = createNewDLM(conf, name);
long txid = 1;
AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
writer.markEndOfStream();
writer.force(false);
writer.close();
AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
byte[] bytesIn = new byte[9];
int read = reader.read(bytesIn, 0, 9);
assertEquals(9, read);
assertTrue(Arrays.equals(DLMTestUtil.repeatString("abc", 3).getBytes(), bytesIn));
assertTrue(reader.skipTo(15));
try {
read = reader.read(bytesIn, 0, 1);
fail("Should have thrown");
} catch (EndOfStreamException ex) {
}
assertTrue(reader.skipTo(0));
try {
reader.skipTo(16);
fail("Should have thrown");
} catch (EndOfStreamException ex) {
}
}
@Test(timeout = 60000)
public void testSkipToreturnsFalseIfPositionDoesNotExistYetForUnSealedStream() throws Exception {
String name = testNames.getMethodName();
DistributedLogManager dlmwrite = createNewDLM(conf, name);
DistributedLogManager dlmreader = createNewDLM(conf, name);
long txid = 1;
AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
writer.close();
final AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
byte[] bytesIn = new byte[9];
int read = reader.read(bytesIn, 0, 9);
assertEquals(9, read);
assertTrue(Arrays.equals(DLMTestUtil.repeatString("abc", 3).getBytes(), bytesIn));
assertFalse(reader.skipTo(16));
assertFalse(reader.skipTo(16));
AppendOnlyStreamWriter writer2 = dlmwrite.getAppendOnlyStreamWriter();
writer2.write(DLMTestUtil.repeatString("abc", 5).getBytes());
writer2.close();
assertTrue(reader.skipTo(16));
byte[] bytesIn2 = new byte[5];
read = reader.read(bytesIn2, 0, 5);
assertEquals(5, read);
assertTrue(Arrays.equals("bcabc".getBytes(), bytesIn2));
}
@Test(timeout = 60000)
public void testSkipToForNoPositionChange() throws Exception {
String name = testNames.getMethodName();
DistributedLogManager dlmwrite = createNewDLM(conf, name);
DistributedLogManager dlmreader = createNewDLM(conf, name);
long txid = 1;
AppendOnlyStreamWriter writer = dlmwrite.getAppendOnlyStreamWriter();
writer.write(DLMTestUtil.repeatString("abc", 5).getBytes());
writer.close();
final AppendOnlyStreamReader reader = dlmreader.getAppendOnlyStreamReader();
assertTrue(reader.skipTo(0));
byte[] bytesIn = new byte[4];
int read = reader.read(bytesIn, 0, 4);
assertEquals(4, read);
assertEquals(new String("abca"), new String(bytesIn));
assertTrue(reader.skipTo(reader.position()));
assertTrue(reader.skipTo(1));
read = reader.read(bytesIn, 0, 4);
assertEquals(4, read);
assertEquals(new String("bcab"), new String(bytesIn));
}
}