blob: b34397a3e5df10c9c3328701faf03486c35a6681 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fcntl
import os
import time
import select
import subprocess
from lib.path import accumuloConf
from lib.options import log
def tserverNames():
return [s.strip() for s in open(accumuloConf('tservers'))]
def runEach(commandMap):
result = {}
handles = []
for tserver, command in commandMap.items():
log.debug("ssh: %s: %s", tserver, command)
handle = subprocess.Popen(['ssh',tserver] + [command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
for h in handle.stdout, handle.stderr:
fcntl.fcntl(h, fcntl.F_SETFL, os.O_NDELAY)
handle.tserver = tserver
handle.command = command
handle.start = time.time()
handles.append(handle)
handlesLeft = set(handles[:])
while handlesLeft:
fds = {}
doomed = set()
for h in handlesLeft:
more = []
if h.stdout != None:
more.append(h.stdout)
if h.stderr != None:
more.append(h.stderr)
for m in more:
fds[m] = h
if not more:
doomed.add(h)
handlesLeft -= doomed
if not handlesLeft: break
rd, wr, ex = select.select(fds.keys(), [], [], 10)
for r in rd:
handle = fds[r]
data = r.read(1024)
result.setdefault(handle, ['', ''])
if not data:
if r == handle.stdout:
handle.stdout = None
else:
handle.stderr = None
if r == handle.stdout:
result[handle][0] += data
else:
result[handle][1] += data
if handle.stdout == None and handle.stderr == None:
log.debug("Tserver %s finished in %.2f",
handle.tserver,
time.time() - handle.start)
handle.wait()
if not rd:
log.debug("Waiting on %d tservers (%s...)",
len(handlesLeft),
', '.join([h.tserver for h in handlesLeft])[:50])
return dict([(h.tserver, (h.returncode, out, err))
for h, (out, err) in result.items()])
def runAll(command):
tservers = tserverNames()
log.debug("Running %s on %s..", command, ', '.join(tservers)[:50])
return runEach(dict([(s, command) for s in tservers]))