blob: 673788d3841f197db856c3336d4e2871101d5917 [file] [log] [blame]
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aurora.scheduler.mesos;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Collections2;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.aurora.common.base.Command;
import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
import org.apache.aurora.scheduler.storage.Storage;
import org.apache.mesos.v1.Protos.Credential;
import org.apache.mesos.v1.Protos.Filters;
import org.apache.mesos.v1.Protos.FrameworkID;
import org.apache.mesos.v1.Protos.FrameworkInfo;
import org.apache.mesos.v1.Protos.Offer.Operation;
import org.apache.mesos.v1.Protos.OfferID;
import org.apache.mesos.v1.Protos.TaskID;
import org.apache.mesos.v1.Protos.TaskStatus;
import org.apache.mesos.v1.scheduler.Mesos;
import org.apache.mesos.v1.scheduler.Protos.Call;
import org.apache.mesos.v1.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkState;
/**
* A driver implementation that uses the V1 API drivers from libmesos.
*/
class VersionedSchedulerDriverService extends AbstractIdleService
implements Driver, EventSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(VersionedSchedulerDriverService.class);
private final Storage storage;
private final DriverSettings driverSettings;
private final Scheduler scheduler;
private final VersionedDriverFactory factory;
private final FrameworkInfoFactory infoFactory;
private final SettableFuture<Mesos> mesosFuture = SettableFuture.create();
private final CountDownLatch terminationLatch = new CountDownLatch(1);
private final CountDownLatch registrationLatch = new CountDownLatch(1);
@Inject
VersionedSchedulerDriverService(
Storage storage,
DriverSettings settings,
Scheduler scheduler,
VersionedDriverFactory factory,
FrameworkInfoFactory infoFactory) {
this.storage = requireNonNull(storage);
this.driverSettings = requireNonNull(settings);
this.scheduler = requireNonNull(scheduler);
this.factory = requireNonNull(factory);
this.infoFactory = requireNonNull(infoFactory);
}
private FrameworkID getFrameworkId() {
String id = storage.read(storeProvider ->
storeProvider.getSchedulerStore().fetchFrameworkId().get());
return FrameworkID.newBuilder().setValue(id).build();
}
@Override
protected void startUp() throws Exception {
Optional<String> frameworkId = storage.read(
storeProvider -> storeProvider.getSchedulerStore().fetchFrameworkId());
LOG.info("Connecting to mesos master: " + driverSettings.getMasterUri());
if (!driverSettings.getCredentials().isPresent()) {
LOG.warn("Connecting to master without authentication!");
}
FrameworkInfo.Builder frameworkBuilder = infoFactory.getFrameworkInfo().toBuilder();
if (frameworkId.isPresent()) {
LOG.info("Found persisted framework ID: " + frameworkId);
frameworkBuilder.setId(FrameworkID.newBuilder().setValue(frameworkId.get()));
} else {
LOG.warn("Did not find a persisted framework ID, connecting as a new framework.");
}
Credential credential = driverSettings.getCredentials().orElse(null);
Mesos mesos = factory.create(
scheduler,
frameworkBuilder.build(),
driverSettings.getMasterUri(),
Optional.ofNullable(credential));
mesosFuture.set(mesos);
}
@Override
protected void shutDown() throws Exception {
terminationLatch.countDown();
}
@Override
public void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter) {
whenRegistered(() -> {
Collection<Operation.Type> opTypes = Collections2.transform(operations, Operation::getType);
LOG.info("Accepting offer {} with ops {}", offerId.getValue(), opTypes);
Futures.getUnchecked(mesosFuture).send(
Call.newBuilder()
.setFrameworkId(getFrameworkId())
.setType(Call.Type.ACCEPT)
.setAccept(
Call.Accept.newBuilder()
.addOfferIds(offerId)
.addAllOperations(operations)
.setFilters(filter))
.build());
});
}
@Override
public void acceptInverseOffer(OfferID offerID, Filters filter) {
whenRegistered(() -> {
LOG.info("Accepting Inverse Offer {}", offerID.getValue());
Futures.getUnchecked(mesosFuture).send(
Call.newBuilder()
.setFrameworkId(getFrameworkId())
.setType(Call.Type.ACCEPT_INVERSE_OFFERS)
.setAcceptInverseOffers(
Call.AcceptInverseOffers.newBuilder()
.addInverseOfferIds(offerID)
.setFilters(filter)
)
.build());
});
}
@Override
public void declineOffer(OfferID offerId, Filters filter) {
whenRegistered(() -> {
LOG.info("Declining offer {}", offerId.getValue());
Futures.getUnchecked(mesosFuture).send(
Call.newBuilder().setType(Call.Type.DECLINE)
.setFrameworkId(getFrameworkId())
.setDecline(
Call.Decline.newBuilder()
.setFilters(filter)
.addOfferIds(offerId))
.build()
);
});
}
@Override
public void killTask(String taskId) {
whenRegistered(() -> {
LOG.info("Killing task {}", taskId);
Futures.getUnchecked(mesosFuture).send(
Call.newBuilder().setType(Call.Type.KILL)
.setFrameworkId(getFrameworkId())
.setKill(
Call.Kill.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId)))
.build()
);
});
}
@Override
public void acknowledgeStatusUpdate(TaskStatus status) {
// The Mesos API says frameworks are only supposed to acknowledge status updates
// with a UUID. The V0Driver accepts them just fine but the V1Driver logs every time
// a status update is given without a uuid. To silence logs, we drop them here.
whenRegistered(() -> {
if (!status.hasUuid()) {
return;
}
LOG.info("Acking status update for {} with uuid: {}",
status.getTaskId().getValue(),
status.getUuid());
Futures.getUnchecked(mesosFuture).send(
Call.newBuilder().setType(Call.Type.ACKNOWLEDGE)
.setFrameworkId(getFrameworkId())
.setAcknowledge(
Call.Acknowledge.newBuilder()
.setAgentId(status.getAgentId())
.setTaskId(status.getTaskId())
.setUuid(status.getUuid()))
.build()
);
});
}
@Override
public void reconcileTasks(Collection<TaskStatus> statuses) {
whenRegistered(() -> {
Collection<Call.Reconcile.Task> tasks = Collections2.transform(statuses, taskStatus ->
Call.Reconcile.Task.newBuilder()
.setTaskId(taskStatus.getTaskId())
.build());
Futures.getUnchecked(mesosFuture).send(
Call.newBuilder()
.setType(Call.Type.RECONCILE)
.setFrameworkId(getFrameworkId())
.setReconcile(
Call.Reconcile.newBuilder()
.addAllTasks(tasks))
.build()
);
});
}
@Override
public void blockUntilStopped() {
ensureRunning();
try {
terminationLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void abort() {
terminationLatch.countDown();
stopAsync();
}
@Subscribe
public void registered(DriverRegistered event) {
registrationLatch.countDown();
}
private void whenRegistered(Command c) {
ensureRunning();
// We need to block until registered because thats when we are guaranteed to have our
// framework id. Without it, we cannot construct any Call objects.
try {
registrationLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
c.execute();
}
private void ensureRunning() {
checkState(isRunning(), "Driver is not running.");
}
}