blob: ebd3ab5f46d54469336da705cbadfad5a5f0f1a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableMap;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
/**
* Write ahead log for DAG changes.
* Operations need to be registered with the journal instance before writing.
* Registered prototype instances will be used to apply changes on read.
*
* @since 0.9.2
*/
public final class Journal
{
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
private enum RecoverableOperation
{
OPERATOR_STATE(PTOperator.SET_OPERATOR_STATE),
CONTAINER_STATE(PTContainer.SET_CONTAINER_STATE),
OPERATOR_PROPERTY(StreamingContainerManager.SET_OPERATOR_PROPERTY),
PHYSICAL_OPERATOR_PROPERTY(StreamingContainerManager.SET_PHYSICAL_OPERATOR_PROPERTY);
private static final Map<Class<? extends Recoverable>, Integer> classToId;
static {
final ImmutableMap.Builder<Class<? extends Recoverable>, Integer> builder = ImmutableMap.builder();
for (RecoverableOperation recoverableOperation : RecoverableOperation.values()) {
builder.put(recoverableOperation.operation.getClass(), recoverableOperation.ordinal());
}
classToId = builder.build();
}
private final Recoverable operation;
RecoverableOperation(Recoverable operation)
{
this.operation = operation;
}
private static RecoverableOperation get(int id)
{
return (id < values().length) ? values()[id] : null;
}
private static Integer getId(Class<? extends Recoverable> operationClass)
{
return classToId.get(operationClass);
}
}
public interface Recoverable
{
void read(Object object, Input in) throws KryoException;
void write(Output out) throws KryoException;
}
private final StreamingContainerManager scm;
private final AtomicReference<Output> output;
private final AtomicBoolean replayMode;
public Journal(StreamingContainerManager scm)
{
this.scm = scm;
output = new AtomicReference<>();
replayMode = new AtomicBoolean(false);
}
public void setOutputStream(@Nullable final OutputStream out) throws IOException
{
final Output output;
if (out != null) {
output = new Output(4096, -1)
{
@Override
public void flush() throws KryoException
{
super.flush();
// Kryo does not flush internal output stream during flush. We need to flush it explicitly.
try {
getOutputStream().flush();
} catch (IOException e) {
throw new KryoException(e);
}
}
};
output.setOutputStream(out);
} else {
output = null;
}
final Output oldOut = this.output.getAndSet(output);
if (oldOut != null && oldOut.getOutputStream() != out) {
synchronized (oldOut) {
oldOut.close();
}
}
}
final void write(Recoverable op)
{
if (replayMode.get()) {
throw new IllegalStateException("Request to write while journal is replaying operations");
}
Integer classId = RecoverableOperation.getId(op.getClass());
if (classId == null) {
throw new IllegalArgumentException("Class not registered " + op.getClass());
}
while (true) {
final Output out = output.get();
if (out != null) {
// need to atomically write id, operation and flush the output stream
synchronized (out) {
try {
LOG.debug("WAL write {}", RecoverableOperation.get(classId));
out.writeInt(classId);
op.write(out);
out.flush();
break;
} catch (KryoException e) {
// check that no other threads sneaked between get() and synchronized block and set output stream to a new
// stream or null leading to the current stream being closed
if (output.get() == out) {
throw e;
}
}
}
} else {
LOG.warn("Journal output stream is null. Skipping write to the WAL.");
break;
}
}
}
final void replay(final InputStream input)
{
if (replayMode.compareAndSet(false, true)) {
Input in = new Input(input);
try {
LOG.debug("Start replaying WAL");
while (!in.eof()) {
final int opId = in.readInt();
final RecoverableOperation recoverableOperation = RecoverableOperation.get(opId);
if (recoverableOperation == null) {
throw new IllegalArgumentException("No reader registered for id " + opId);
}
LOG.debug("Replaying {}", recoverableOperation);
switch (recoverableOperation) {
case OPERATOR_STATE:
case CONTAINER_STATE:
recoverableOperation.operation.read(scm.getPhysicalPlan(), in);
break;
case OPERATOR_PROPERTY:
case PHYSICAL_OPERATOR_PROPERTY:
recoverableOperation.operation.read(scm, in);
break;
default:
throw new IllegalArgumentException("Unsupported recoverable operation " + recoverableOperation);
}
}
} finally {
LOG.debug("Done replaying WAL");
replayMode.set(false);
}
} else {
throw new IllegalStateException("Request to replay while journal is already replaying other operations");
}
}
}