blob: 284ff5a747be479aa6d44aae1f488568d6c92013 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.binlog;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Sets;
import org.junit.Test;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ExternalArchiverTest
{
@Test
public void testArchiver() throws IOException, InterruptedException
{
Pair<String, String> s = createScript();
String script = s.left;
String dir = s.right;
Path logdirectory = Files.createTempDirectory("logdirectory");
File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile();
Files.write(logfileToArchive.toPath(), "content".getBytes());
ExternalArchiver ea = new ExternalArchiver(script+" %path", null, 10);
ea.onReleased(1, logfileToArchive);
while (logfileToArchive.exists())
{
Thread.sleep(100);
}
File movedFile = new File(dir, logfileToArchive.getName());
assertTrue(movedFile.exists());
movedFile.deleteOnExit();
ea.stop();
assertEquals(0, logdirectory.toFile().listFiles().length);
}
@Test
public void testArchiveExisting() throws IOException, InterruptedException
{
Pair<String, String> s = createScript();
String script = s.left;
String moveDir = s.right;
List<File> existingFiles = new ArrayList<>();
Path dir = Files.createTempDirectory("archive");
for (int i = 0; i < 10; i++)
{
File logfileToArchive = Files.createTempFile(dir, "logfile", SingleChronicleQueue.SUFFIX).toFile();
logfileToArchive.deleteOnExit();
Files.write(logfileToArchive.toPath(), ("content"+i).getBytes());
existingFiles.add(logfileToArchive);
}
ExternalArchiver ea = new ExternalArchiver(script + " %path", dir, 10);
boolean allGone = false;
while (!allGone)
{
allGone = true;
for (File f : existingFiles)
{
if (f.exists())
{
allGone = false;
Thread.sleep(100);
break;
}
File movedFile = new File(moveDir, f.getName());
assertTrue(movedFile.exists());
movedFile.deleteOnExit();
}
}
ea.stop();
assertEquals(0, dir.toFile().listFiles().length);
}
@Test
public void testArchiveOnShutdown() throws IOException, InterruptedException
{
Pair<String, String> s = createScript();
String script = s.left;
String moveDir = s.right;
Path dir = Files.createTempDirectory("archive");
ExternalArchiver ea = new ExternalArchiver(script + " %path", dir, 10);
List<File> existingFiles = new ArrayList<>();
for (int i = 0; i < 10; i++)
{
File logfileToArchive = Files.createTempFile(dir, "logfile", SingleChronicleQueue.SUFFIX).toFile();
logfileToArchive.deleteOnExit();
Files.write(logfileToArchive.toPath(), ("content"+i).getBytes());
existingFiles.add(logfileToArchive);
}
// ea.stop will archive all .cq4 files in the directory
ea.stop();
for (File f : existingFiles)
{
assertFalse(f.exists());
File movedFile = new File(moveDir, f.getName());
assertTrue(movedFile.exists());
movedFile.deleteOnExit();
}
}
/**
* Make sure retries work
* 1. create a script that will fail two times before executing the command
* 2. create an ExternalArchiver that retries two times (this means we execute the script 3 times, meaning the last one will be successful)
* 3. make sure the file is on disk until the script has been executed 3 times
* 4. make sure the file is gone and that the command was executed successfully
*/
@Test
public void testRetries() throws IOException, InterruptedException
{
Pair<String, String> s = createFailingScript(2);
String script = s.left;
String moveDir = s.right;
Path logdirectory = Files.createTempDirectory("logdirectory");
File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile();
Files.write(logfileToArchive.toPath(), "content".getBytes());
AtomicInteger tryCounter = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean();
ExternalArchiver ea = new ExternalArchiver(script + " %path", null, 1000, 2, (cmd) ->
{
tryCounter.incrementAndGet();
ExternalArchiver.exec(cmd);
success.set(true);
});
ea.onReleased(0, logfileToArchive);
while (tryCounter.get() < 2) // while we have only executed this 0 or 1 times, the file should still be on disk
{
Thread.sleep(100);
assertTrue(logfileToArchive.exists());
}
while (!success.get())
Thread.sleep(100);
// there will be 3 attempts in total, 2 failing ones, then the successful one:
assertEquals(3, tryCounter.get());
assertFalse(logfileToArchive.exists());
File movedFile = new File(moveDir, logfileToArchive.getName());
assertTrue(movedFile.exists());
ea.stop();
}
/**
* Makes sure that max retries is honored
*
* 1. create a script that will fail 3 times before actually executing the command
* 2. create an external archiver that retries 2 times (this means that the script will get executed 3 times)
* 3. make sure the file is still on disk and that we have not successfully executed the script
*
*/
@Test
public void testMaxRetries() throws IOException, InterruptedException
{
Pair<String, String> s = createFailingScript(3);
String script = s.left;
String moveDir = s.right;
Path logdirectory = Files.createTempDirectory("logdirectory");
File logfileToArchive = Files.createTempFile(logdirectory, "logfile", "xyz").toFile();
Files.write(logfileToArchive.toPath(), "content".getBytes());
AtomicInteger tryCounter = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean();
ExternalArchiver ea = new ExternalArchiver(script + " %path", null, 1000, 2, (cmd) ->
{
try
{
ExternalArchiver.exec(cmd);
success.set(true);
}
catch (Throwable t)
{
tryCounter.incrementAndGet();
throw t;
}
});
ea.onReleased(0, logfileToArchive);
while (tryCounter.get() < 3)
Thread.sleep(500);
assertTrue(logfileToArchive.exists());
// and the file should not get moved:
Thread.sleep(5000);
assertTrue(logfileToArchive.exists());
assertFalse(success.get());
File [] fs = new File(moveDir).listFiles(f ->
{
if (f.getName().startsWith("file."))
{
f.deleteOnExit();
return true;
}
throw new AssertionError("There should be no other files in the directory");
});
assertEquals(3, fs.length); // maxRetries + the first try
ea.stop();
}
private Pair<String, String> createScript() throws IOException
{
File f = Files.createTempFile("script", "", PosixFilePermissions.asFileAttribute(Sets.newHashSet(PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_EXECUTE))).toFile();
f.deleteOnExit();
File dir = Files.createTempDirectory("archive").toFile();
dir.deleteOnExit();
String script = "#!/bin/sh\nmv $1 "+dir.getAbsolutePath();
Files.write(f.toPath(), script.getBytes());
return Pair.create(f.getAbsolutePath(), dir.getAbsolutePath());
}
private Pair<String, String> createFailingScript(int failures) throws IOException
{
File f = Files.createTempFile("script", "", PosixFilePermissions.asFileAttribute(Sets.newHashSet(PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_EXECUTE))).toFile();
f.deleteOnExit();
File dir = Files.createTempDirectory("archive").toFile();
dir.deleteOnExit();
// this script counts files in dir.getAbsolutePath, then if there are more than failures files in there, it moves the actual file
String script = "#!/bin/bash%n" +
"DIR=%s%n" +
"shopt -s nullglob%n" +
"numfiles=($DIR/*)%n" +
"numfiles=${#numfiles[@]}%n" +
"if (( $numfiles < %d )); then%n" +
" mktemp $DIR/file.XXXXX%n" +
" exit 1%n" +
"else%n" +
" mv $1 $DIR%n"+
"fi%n";
Files.write(f.toPath(), String.format(script, dir.getAbsolutePath(), failures).getBytes());
return Pair.create(f.getAbsolutePath(), dir.getAbsolutePath());
}
}