blob: 8ab8aee0555c5deb7d48ee0ae8a1b2b0e2adb797 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import errno
import fcntl
import json
import time
import random
# This is a simple sequencer to be run as a location assignment script
# by a Kudu master. The script can be used in location-aware test scenarios
# and other cases when location assignment rules are specified simply as the
# distribution of tablet servers among locations: i.e. how many tablet
# servers should be in every specified location (see below for an example).
#
# The script takes as input location mapping rules and an identifier.
# On success, the script prints the location assigned to the specified
# identifier to stdout. The identifier might be any string uniquely identifying
# a tablet server.
#
# Locations are assigned based on:
# a) Location mapping rules specified in the command line and sequencer's
# offset persistently stored in a state file.
# b) Previously established and persisted { id, location } mappings in the
# state file.
#
# Once assigned, the location for the specified identifier is recorded and
# output again upon next call of the script for the same identifier.
#
# It's safe to run multiple instances of the script concurrently with the
# same set of parameters. The access to the sequencer's state file is
# serialized and the scripts produces consistent results for all concurrent
# callers.
#
# A location mapping rule is specified as a pair 'loc:num', where the 'num'
# stands for the number of servers to assign to the location 'loc'. Location
# mapping rules are provided to the script by --map 'loc:num' command line
# arguments.
#
# Below is an example of invocation of the script for location mapping rules
# specifying that location 'l0' should have one tablet server, location 'l1'
# should have two, and location 'l2' should have three. The script is run
# to assign a location for a tablet server running at IP address 127.1.2.3.
#
# assign-location.py --map l0:1 --map l1:2 --map l2:3 127.1.2.3
#
class LocationAssignmentRule(object):
def __init__(self, location_mapping_rules):
# Convert the input location information into an auxiliary array of
# location strings.
self.location_mapping_rules = location_mapping_rules
if self.location_mapping_rules is None:
self.location_mapping_rules = []
self.locations = []
self.total_count = 0
seen_locations = []
for info in self.location_mapping_rules:
location, server_num_str = info.split(':')
seen_locations.append(location)
server_num = int(server_num_str)
for i in range(0, server_num):
self.total_count += 1
self.locations.append(location)
assert (len(set(seen_locations)) == len(seen_locations)), \
'duplicate locations specified: {0}'.format(seen_locations)
def get_location(self, idx):
"""
Get location for the specified index.
"""
if self.locations:
return self.locations[idx % len(self.locations)]
else:
return ""
def acquire_advisory_lock(fpath):
"""
Acquire a lock on a special .lock file. Don't block while trying: return
if failed to acquire a lock in 30 seconds.
"""
timeout_seconds = 30
now = time.time()
deadline = now + timeout_seconds
random.seed(int(now))
fpath_lock_file = fpath + ".lock"
# Open the lock file; create the file if doesn't exist.
lock_file = open(fpath_lock_file, 'w+')
got_lock = False
while time.time() < deadline:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
got_lock = True
break
except IOError as e:
if e.errno != errno.EAGAIN:
raise
else:
time.sleep(random.uniform(0.001, 0.100))
if not got_lock:
raise Exception('could not obtain exclusive lock for {} in {} seconds',
fpath_lock_file, timeout_seconds)
return lock_file
def get_location(fpath, rule, uid, relaxed):
"""
Return location for the specified identifier 'uid'. To do that, use the
specified location mapping rules and the information stored
in the sequencer's state file.
* Obtain advisory lock for the state file (using additional .lock file)
* If the sequencer's state file exists:
1. Open the state file in read-only mode.
2. Read the information from the state file and search for location
assigned to the server with the specified identifier.
a. If already assigned location found:
-- Return the location.
b. If location assigned to the identifier is not found:
-- Use current sequence number 'seq' to assign next location
by calling LocationAssignmentRule.get_location(seq).
-- Add the newly generated location assignment into the
sequencer's state.
-- Increment the sequence number.
-- Reopen the state file for writing (if file exists)
-- Rewrite the file with the new state of the sequencer.
-- Return the newly assigned location.
* If the sequencer's state file does not exist:
1. Set sequence number 'seq' to 0.
2. Use current sequence number 'seq' to assign next location
by calling LocationAssignmentRule.get_location(seq).
3. Update the sequencer's state accordingly.
3. Rewrite the file with the new state of the sequencer.
4. Return the newly assigned location.
"""
lock_file = acquire_advisory_lock(fpath)
state_file = None
try:
state_file = open(fpath)
except IOError as e:
if e.errno != errno.ENOENT:
raise
new_assignment = False
if state_file is None:
seq = 0
state = {}
state['seq'] = seq
state['mapping_rules'] = rule.location_mapping_rules
state['mappings'] = {}
mappings = state['mappings']
new_assignment = True
else:
# If the file exists, it must have proper content.
state = json.load(state_file)
seq = state.get('seq')
mapping_rules = state.get('mapping_rules')
# Make sure the stored mapping rule corresponds to the specified in args.
rule_stored = json.dumps(mapping_rules)
rule_specified = json.dumps(rule.location_mapping_rules)
if rule_stored != rule_specified:
raise Exception('stored and specified mapping rules mismatch: '
'{0} vs {1}'.format(rule_stored, rule_specified))
mappings = state['mappings']
location = mappings.get(uid, None)
if location is None:
seq += 1
state['seq'] = seq
new_assignment = True
if not new_assignment:
return location
if not relaxed and rule.total_count != 0 and rule.total_count <= seq:
raise Exception('too many unique identifiers ({0}) to assign next location '
'to {1} using mapping rules {2}. State: {3}'.format(
seq + 1, uid, rule.location_mapping_rules, json.dumps(state)))
if relaxed and rule.total_count <= seq:
return ""
# Get next location and add the { uid, location} binding into the mappings.
location = rule.get_location(seq)
mappings[uid] = location
# Rewrite the file with the updated state information.
if state_file is not None:
state_file.close()
state_file = open(fpath, 'w+')
json.dump(state, state_file)
state_file.close()
lock_file.close()
return location
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--state_store",
nargs="?",
default="/tmp/location-sequencer-state",
help="path to a file to store the sequencer's state")
parser.add_argument("--map", "-m",
action="append",
dest="location_mapping_rules",
metavar="RULE",
help="location mapping rule: number of tablet servers per specified "
"location in form <location>:<number>; this option may be specified "
"multiple times")
parser.add_argument("--relaxed",
action="store_true",
help="whether to allow more location assignments than specified "
"by the specified mapping rules")
parser.add_argument("uid",
help="hostname, IP address, or any other unique identifier")
args = parser.parse_args()
location = get_location(args.state_store,
LocationAssignmentRule(args.location_mapping_rules), args.uid, args.relaxed)
print(location)
if __name__ == "__main__":
main()