blob: 6704a328a4023a178ed8f86ae4772cb04eb2fa8e [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.log.mesos;
import java.io.File;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Singleton;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import org.apache.aurora.codec.ThriftBinaryCodec;
import org.apache.aurora.common.args.Arg;
import org.apache.aurora.common.args.CmdLine;
import org.apache.aurora.common.net.InetSocketAddressHelper;
import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.common.zookeeper.Credentials;
import org.apache.aurora.gen.storage.LogEntry;
import org.apache.aurora.scheduler.discovery.ZooKeeperConfig;
import org.apache.aurora.scheduler.log.mesos.LogInterface.ReaderInterface;
import org.apache.aurora.scheduler.log.mesos.LogInterface.WriterInterface;
import org.apache.mesos.Log;
import org.apache.zookeeper.common.PathUtils;
/**
* Binds a native mesos Log implementation.
*
* <p>Exports the following bindings:
* <ul>
* <li>{@link Log} - a log backed by the mesos native distributed log</li>
* </ul>
*/
public class MesosLogStreamModule extends PrivateModule {
@CmdLine(name = "native_log_quorum_size",
help = "The size of the quorum required for all log mutations.")
private static final Arg<Integer> QUORUM_SIZE = Arg.create(1);
@CmdLine(name = "native_log_file_path",
help = "Path to a file to store the native log data in. If the parent directory does"
+ "not exist it will be created.")
private static final Arg<File> LOG_PATH = Arg.create(null);
@CmdLine(name = "native_log_zk_group_path",
help = "A zookeeper node for use by the native log to track the master coordinator.")
private static final Arg<String> ZK_LOG_GROUP_PATH = Arg.create(null);
/*
* This timeout includes the time to get a quorum to promise leadership to the coordinator and
* the time to fill any holes in the coordinator's log.
*/
@CmdLine(name = "native_log_election_timeout",
help = "The timeout for a single attempt to obtain a new log writer.")
private static final Arg<Amount<Long, Time>> COORDINATOR_ELECTION_TIMEOUT =
Arg.create(Amount.of(15L, Time.SECONDS));
/*
* Normally retries would not be expected to help much - however in the small replica set where
* a few down replicas doom a coordinator election attempt, retrying effectively gives us a wider
* window in which to await a live quorum before giving up and thrashing the global election
* process. Observed log replica recovery times as of 4/6/2012 can be ~45 seconds so giving a
* window >= 2x this should support 1-round election events (that possibly use several retries in
* the single round).
*/
@CmdLine(name = "native_log_election_retries",
help = "The maximum number of attempts to obtain a new log writer.")
private static final Arg<Integer> COORDINATOR_ELECTION_RETRIES = Arg.create(20);
@CmdLine(name = "native_log_read_timeout",
help = "The timeout for doing log reads.")
private static final Arg<Amount<Long, Time>> READ_TIMEOUT =
Arg.create(Amount.of(5L, Time.SECONDS));
@CmdLine(name = "native_log_write_timeout",
help = "The timeout for doing log appends and truncations.")
private static final Arg<Amount<Long, Time>> WRITE_TIMEOUT =
Arg.create(Amount.of(3L, Time.SECONDS));
private static <T> T getRequiredArg(Arg<T> arg, String name) {
if (!arg.hasAppliedValue()) {
throw new IllegalStateException(
String.format("A value for the -%s flag must be supplied", name));
}
return arg.get();
}
private final ZooKeeperConfig zkClientConfig;
private final File logPath;
private final String zkLogGroupPath;
public MesosLogStreamModule(ZooKeeperConfig zkClientConfig) {
this(zkClientConfig,
getRequiredArg(LOG_PATH, "native_log_file_path"),
getRequiredArg(ZK_LOG_GROUP_PATH, "native_log_zk_group_path"));
}
public MesosLogStreamModule(
ZooKeeperConfig zkClientConfig,
File logPath,
String zkLogGroupPath) {
this.zkClientConfig = Objects.requireNonNull(zkClientConfig);
this.logPath = Objects.requireNonNull(logPath);
PathUtils.validatePath(zkLogGroupPath); // This checks for null.
this.zkLogGroupPath = zkLogGroupPath;
}
@Override
protected void configure() {
bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(MesosLog.ReadTimeout.class)
.toInstance(READ_TIMEOUT.get());
bind(new TypeLiteral<Amount<Long, Time>>() { }).annotatedWith(MesosLog.WriteTimeout.class)
.toInstance(WRITE_TIMEOUT.get());
bind(org.apache.aurora.scheduler.log.Log.class).to(MesosLog.class);
bind(MesosLog.class).in(Singleton.class);
expose(org.apache.aurora.scheduler.log.Log.class);
}
@Provides
@Singleton
Log provideLog() {
File parentDir = logPath.getParentFile();
if (!parentDir.exists() && !parentDir.mkdirs()) {
addError("Failed to create parent directory to store native log at: %s", parentDir);
}
String zkConnectString = Joiner.on(',').join(
Iterables.transform(zkClientConfig.getServers(), InetSocketAddressHelper::toString));
if (zkClientConfig.getCredentials().isPresent()) {
Credentials zkCredentials = zkClientConfig.getCredentials().get();
return new Log(
QUORUM_SIZE.get(),
logPath.getAbsolutePath(),
zkConnectString,
zkClientConfig.getSessionTimeout().getValue(),
zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(),
zkLogGroupPath,
zkCredentials.scheme(),
zkCredentials.authToken());
} else {
return new Log(
QUORUM_SIZE.get(),
logPath.getAbsolutePath(),
zkConnectString,
zkClientConfig.getSessionTimeout().getValue(),
zkClientConfig.getSessionTimeout().getUnit().getTimeUnit(),
zkLogGroupPath);
}
}
@Provides
Log.Reader provideReader(Log log) {
return new Log.Reader(log);
}
@Provides
Log.Writer provideWriter(Log log) {
Amount<Long, Time> electionTimeout = COORDINATOR_ELECTION_TIMEOUT.get();
return new Log.Writer(log, electionTimeout.getValue(), electionTimeout.getUnit().getTimeUnit(),
COORDINATOR_ELECTION_RETRIES.get());
}
@Provides
LogInterface provideLogInterface(final Log log) {
return log::position;
}
@Provides
ReaderInterface provideReaderInterface(final Log.Reader reader) {
return new ReaderInterface() {
@Override
public List<Log.Entry> read(Log.Position from, Log.Position to, long timeout, TimeUnit unit)
throws TimeoutException, Log.OperationFailedException {
return reader.read(from, to, timeout, unit);
}
@Override
public Log.Position beginning() {
return reader.beginning();
}
@Override
public Log.Position ending() {
return reader.ending();
}
};
}
@Provides
WriterInterface provideWriterInterface(final Log.Writer writer) {
return new WriterInterface() {
@Override
public Log.Position append(byte[] data, long timeout, TimeUnit unit)
throws TimeoutException, Log.WriterFailedException {
return writer.append(data, timeout, unit);
}
@Override
public Log.Position truncate(Log.Position to, long timeout, TimeUnit unit)
throws TimeoutException, Log.WriterFailedException {
return writer.truncate(to, timeout, unit);
}
};
}
@Provides
@Singleton
@MesosLog.NoopEntry
byte[] provideNoopEntry() throws ThriftBinaryCodec.CodingException {
return ThriftBinaryCodec.encodeNonNull(LogEntry.noop(true));
}
}