blob: 78a6a7272a1b2b75f002ae908895906f996299ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.logservice.impl;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import com.google.common.primitives.Bytes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.ratis.logservice.api.ArchiveLogReader;
import org.apache.ratis.logservice.api.ArchiveLogWriter;
import org.apache.ratis.logservice.api.LogName;
import org.apache.ratis.logservice.util.LogServiceUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestArchiveHdfsLogReaderAndWriter {
static MiniDFSCluster cluster;
static Configuration conf;
private static String location;
@BeforeClass public static void setup() throws IOException {
conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
location = "target/tmp/archive/TestArchiveHdfsLogReaderAndWriter";
}
@Test public void testRollingWriter() throws IOException {
String archiveLocation = location+"/testRollingWriter";
LogName logName = LogName.of("testRollingWriterLogName");
DistributedFileSystem fs = cluster.getFileSystem();
fs.delete(new Path(archiveLocation), true);
ArchiveLogWriter writer = new ArchiveHdfsLogWriter(conf);
writer.init(archiveLocation, logName);
int k = 2;
write(writer, 1, k);
Assert.assertEquals(writer.getLastWrittenRecordId(), k);
writer.rollWriter();
String[] files = Arrays.stream(
fs.listStatus(new Path(LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName))))
.map(fileStatus -> fileStatus.getPath().getName()).toArray(String[]::new);
String[] expectedFiles = { logName.getName(), logName.getName() + "_recordId_" + k };
Assert.assertArrayEquals(expectedFiles, files);
ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
verifyRecords(reader, k);
Assert.assertEquals(writer.getLastWrittenRecordId(), reader.getPosition());
write(writer, k + 1, 2 * k);
Assert.assertEquals(writer.getLastWrittenRecordId(), 2 * k);
writer.close();
reader = new ArchiveHdfsLogReader(conf,
LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
verifyRecords(reader, 2 * k);
files = ((ArchiveHdfsLogReader) reader).getFiles().stream()
.map(fileStatus -> fileStatus.getPath().getName()).toArray(String[]::new);
String[] expectedFiles1 =
{ logName.getName() + "_recordId_" + k, logName.getName() + "_recordId_" + 2 * k };
Assert.assertArrayEquals(expectedFiles1, files);
reader.close();
}
private void verifyRecords(ArchiveLogReader reader, int n) throws IOException {
for (int i = 1; i <= n; i++) {
Assert.assertEquals(i, ByteBuffer.wrap(reader.next()).getInt());
}
Assert.assertFalse(reader.hasNext());
Assert.assertTrue(reader.next() == null);
try {
reader.readNext();
Assert.fail();
} catch (NoSuchElementException e) {
//expected
}
}
private void write(ArchiveLogWriter writer, int start, int end) throws IOException {
for (Integer i = start; i <= end; i++) {
writer.write(ByteBuffer.allocate(4).putInt(i));
}
}
@Test public void testCorruptedFileEOF() throws IOException {
FSDataOutputStream fos = FileSystem.get(conf).create(new Path(location,"testEOF"));
fos.write(ByteBuffer.allocate(4).putInt(4).array());
fos.write(new byte[4]);
fos.write(ByteBuffer.allocate(4).putInt(4).array());
// but data is just 2 bytes
fos.write(new byte[2]);
fos.close();
ArchiveLogReader reader = new ArchiveHdfsLogReader(conf, location+"/testEOF");
try {
reader.next();
Assert.fail();
} catch (EOFException e) {
//expected
}
}
@Test public void testSeek() throws IOException {
String archiveLocation = location+"/testSeek";
LogName logName = LogName.of("testSeek");
DistributedFileSystem fs = cluster.getFileSystem();
fs.delete(new Path(archiveLocation), true);
ArchiveLogWriter writer = new ArchiveHdfsLogWriter(conf);
writer.init(archiveLocation, logName);
int k = 100;
write(writer, 1, k);
writer.close();
ArchiveLogReader reader = new ArchiveHdfsLogReader(conf,
LogServiceUtils.getArchiveLocationForLog(archiveLocation, logName));
reader.seek(80);
Assert.assertEquals(80, reader.getPosition());
int count = 0;
while (reader.next() != null) {
count++;
}
Assert.assertEquals(20, count);
}
@AfterClass
public static void teardownafterclass(){
if (cluster != null) {
cluster.shutdown();
}
deleteLocalDirectory(new File(location));
}
static boolean deleteLocalDirectory(File dir) {
File[] allFiles = dir.listFiles();
if (allFiles != null) {
for (File file : allFiles) {
deleteLocalDirectory(file);
}
}
return dir.delete();
}
}