blob: 819d70ebd308f87e72ae8ad31aae50c172ccd159 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.storage.durability;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import com.google.common.collect.Lists;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.gen.storage.Op;
import org.apache.aurora.scheduler.storage.durability.Persistence.Edit;
import org.apache.aurora.scheduler.storage.durability.Persistence.PersistenceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class to clone a persistence.
*/
final class Recovery {
private static final Logger LOG = LoggerFactory.getLogger(Recovery.class);
private Recovery() {
// utility class.
}
/**
* Copies all state from one persistence to another, batching into calls to
* {@link Persistence#persist(Stream)}.
*
* @param from Source.
* @param to Destination.
* @param batchSize Maximum number of entries to include in any given persist.
*/
static void copy(Persistence from, Persistence to, int batchSize) {
requireEmpty(to);
long start = System.nanoTime();
AtomicLong count = new AtomicLong();
AtomicInteger batchNumber = new AtomicInteger();
List<Op> batch = Lists.newArrayListWithExpectedSize(batchSize);
Runnable saveBatch = () -> {
LOG.info("Saving batch " + batchNumber.incrementAndGet());
try {
to.persist(batch.stream());
} catch (PersistenceException e) {
throw new RuntimeException(e);
}
batch.clear();
};
AtomicBoolean dataBegin = new AtomicBoolean(false);
try {
from.recover()
.filter(edit -> {
if (edit.isDeleteAll()) {
// Suppress any storage reset instructions.
// Persistence implementations may recover with these, but do not support persisting
// them. As a result, we require that the recovery source produces a reset
// instruction at the beginning of the stream, if at all.
if (dataBegin.get()) {
throw new IllegalStateException(
"A storage reset instruction arrived after the beginning of data");
}
return false;
} else {
dataBegin.set(true);
}
return true;
})
.forEach(edit -> {
count.incrementAndGet();
batch.add(edit.getOp());
if (batch.size() == batchSize) {
saveBatch.run();
LOG.info("Fetching batch");
}
});
} catch (PersistenceException e) {
throw new RuntimeException(e);
}
if (!batch.isEmpty()) {
saveBatch.run();
}
long end = System.nanoTime();
LOG.info("Recovery finished");
LOG.info("Copied " + count.get() + " ops in "
+ Amount.of(end - start, Time.NANOSECONDS).as(Time.MILLISECONDS) + " ms");
}
private static void requireEmpty(Persistence persistence) {
LOG.info("Ensuring recovery destination is empty");
try (Stream<Edit> edits = persistence.recover()) {
if (edits.findFirst().isPresent()) {
throw new IllegalStateException("Refusing to recover into non-empty persistence");
}
} catch (PersistenceException e) {
throw new RuntimeException(e);
}
}
}