blob: 75ca5092d07789aa97e76c2224c86acf83e0d7e3 [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import mesos.interface
from mesos.interface import mesos_pb2
import mesos.native
TOTAL_TASKS = 5
TASK_CPUS = 1
TASK_MEM = 32
class TestScheduler(mesos.interface.Scheduler):
def __init__(self, executor):
self.executor = executor
self.taskData = {}
self.tasksLaunched = 0
self.tasksFinished = 0
self.messagesSent = 0
self.messagesReceived = 0
def registered(self, driver, frameworkId, masterInfo):
print "Registered with framework ID %s" % frameworkId.value
def resourceOffers(self, driver, offers):
print "Got %d resource offers" % len(offers)
for offer in offers:
tasks = []
print "Got resource offer %s" % offer.id.value
if self.tasksLaunched < TOTAL_TASKS:
tid = self.tasksLaunched
self.tasksLaunched += 1
print "Accepting offer on %s to start task %d" \
% (offer.hostname, tid)
task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
task.slave_id.value = offer.slave_id.value
task.name = "task %d" % tid
task.executor.MergeFrom(self.executor)
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = TASK_CPUS
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = TASK_MEM
tasks.append(task)
self.taskData[task.task_id.value] = (
offer.slave_id, task.executor.executor_id)
driver.launchTasks(offer.id, tasks)
def statusUpdate(self, driver, update):
print "Task %s is in state %d" % (update.task_id.value, update.state)
# Ensure the binary data came through.
if update.data != "data with a \0 byte":
print "The update data did not match!"
print " Expected: 'data with a \\x00 byte'"
print " Actual: ", repr(str(update.data))
sys.exit(1)
if update.state == mesos_pb2.TASK_FINISHED:
self.tasksFinished += 1
if self.tasksFinished == TOTAL_TASKS:
print "All tasks done, waiting for final framework message"
slave_id, executor_id = self.taskData[update.task_id.value]
self.messagesSent += 1
driver.sendFrameworkMessage(
executor_id,
slave_id,
'data with a \0 byte')
def frameworkMessage(self, driver, executorId, slaveId, message):
self.messagesReceived += 1
# The message bounced back as expected.
if message != "data with a \0 byte":
print "The returned message data did not match!"
print " Expected: 'data with a \\x00 byte'"
print " Actual: ", repr(str(message))
sys.exit(1)
print "Received message:", repr(str(message))
if self.messagesReceived == TOTAL_TASKS:
if self.messagesReceived != self.messagesSent:
print "Sent", self.messagesSent,
print "but received", self.messagesReceived
sys.exit(1)
print "All tasks done, and all messages received, exiting"
driver.stop()
if __name__ == "__main__":
if len(sys.argv) != 2:
print "Usage: %s master" % sys.argv[0]
sys.exit(1)
executor = mesos_pb2.ExecutorInfo()
executor.executor_id.value = "default"
executor.command.value = os.path.abspath("./test-executor")
executor.name = "Test Executor (Python)"
executor.source = "python_test"
framework = mesos_pb2.FrameworkInfo()
framework.user = "" # Have Mesos fill in the current user.
framework.name = "Test Framework (Python)"
# TODO(vinod): Make checkpointing the default when it is default
# on the slave.
if os.getenv("MESOS_CHECKPOINT"):
print "Enabling checkpoint for the framework"
framework.checkpoint = True
if os.getenv("MESOS_AUTHENTICATE"):
print "Enabling authentication for the framework"
if not os.getenv("DEFAULT_PRINCIPAL"):
print "Expecting authentication principal in the environment"
sys.exit(1);
if not os.getenv("DEFAULT_SECRET"):
print "Expecting authentication secret in the environment"
sys.exit(1);
credential = mesos_pb2.Credential()
credential.principal = os.getenv("DEFAULT_PRINCIPAL")
credential.secret = os.getenv("DEFAULT_SECRET")
framework.principal = os.getenv("DEFAULT_PRINCIPAL")
driver = mesos.native.MesosSchedulerDriver(
TestScheduler(executor),
framework,
sys.argv[1],
credential)
else:
framework.principal = "test-framework-python"
driver = mesos.native.MesosSchedulerDriver(
TestScheduler(executor),
framework,
sys.argv[1])
status = 0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1
# Ensure that the driver process terminates.
driver.stop();
sys.exit(status)