blob: 766840168ff836a0155027d56a985e4aef73bc89 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''
The pysync_remote module is a companion to the pysync module containing the
components required for the remote end of a synchronization pair.
'''
from __future__ import with_statement
import sys,os
if sys.hexversion<0x2040400:
sys.stderr.write("pysync_remote.py needs python version at least 2.4.4 on the remote computer.\n")
sys.stderr.write("You are using %s\n"%sys.version)
sys.stderr.write("Here is a guess at where the python executable is--\n")
os.system("/bin/sh -c 'type python>&2'");
sys.exit(1)
import copy
import cPickle
from datetime import datetime
import errno
import hashlib
import socket
import stat
import threading
import time
import zlib
# "Simulate" importing this module as 'pysync_remote' to mirror the namespace
# used by the local pysync component so pickled object deserialization works.
if __name__ == '__main__':
sys.modules['pysync_remote'] = sys.modules['__main__']
# The number of file bytes processed at a time
CHUNK_SIZE=4000000
class Progress:
def __init__(self, byteMax, fileMax):
self.status = '';
self.fname = '';
self.byteMax = byteMax;
self.byteNow = 0;
self.fileMax = fileMax;
self.fileNow = 0;
self.tstamp = 0;
def _printStatus(self):
pct = 0
if self.byteMax > 0: pct = int(self.byteNow * 100 / self.byteMax);
sys.stderr.write('[%s %d%%] %s %s byte:%d/%d file:%d/%d\n' % (
datetime.now().isoformat(' '),
pct, self.status, self.fname, self.byteNow,
self.byteMax, self.fileNow, self.fileMax))
def update(self, status, fname, byteNow, fileNow):
changed = (status != self.status
or fname != self.fname
or fileNow != self.fileNow
or (time.time() - self.tstamp > 5))
self.status = status
self.fname = fname
self.byteNow = byteNow
self.fileNow = fileNow
if changed:
self.tstamp = time.time()
self._printStatus()
class Options:
'''
A container for the options collected by LocalPysync and use by RemotePysync.
'''
def __init__(self):
self.compress = False
self.verbose = False
self.insecure = False
self.delete = False
self.minusn = False
self.progressTime = 0
self.progressBytes = 0
self.progressTimestamp = False
self.sendProgress = True
self.sendRawProgress = False
self.addrinfo = None
def statToTuple(s,name):
'''
Converts the os.stat() structure to a tuple used internally. The tuple
content varies by type but is generally of the form:
(entry_type, protection_bits [, variable_content])
entry_type:
- regular files
l symbolic link
d directory
c character special file
b block special file
p FIFO
s socket link
These entry types are coordinated with code in RemotePysync.createFiles().
'''
if stat.S_ISREG(s.st_mode):
return ['-',s.st_mode,s.st_size,None]
if stat.S_ISLNK(s.st_mode):
return ('l',0,os.readlink(name))
if stat.S_ISDIR(s.st_mode):
return ('d',s.st_mode)
if stat.S_ISCHR(s.st_mode):
return ('c',s.st_mode,s.st_rdev)
if stat.S_ISBLK(s.st_mode):
return ('b',s.st_mode,s.st_rdev)
if stat.S_ISFIFO(s.st_mode):
return ('p',s.st_mode)
if stat.S_ISSOCK(s.st_mode):
return ('s',s.st_mode)
raise Exception('Pysync can not handle %s.'%name)
def formatSiBinary(number):
'''
Format a number using SI/IEC 60027-2 prefixes as described in
http://physics.nist.gov/cuu/Units/binary.html.
The result is of the form "D.dd P" where D.dd is the input
number in P units with no more than 2 decimal places. D will
be longer than 4 digits only when number exceeds the scale of
the largest SI/IEC binary prefix supported.
'''
number = float(number)
factor = 1024
prefixes = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
for prefix in prefixes[:-1]:
if number < factor:
return '%.2f %s' % (number, prefix)
number /= factor
else:
return '%.2f %s' % (number, prefixes[-1])
class ProgressUpdate():
'''
A container used to report raw synchronization progress information collected
by SyncProgress and RemotePysync to LocalPysync.
'''
def __init__(self, isFinal=False, totalExpectedBytes=0, totalExpectedFiles=0, progressCounters=None):
# An indication of whether or not this ProgressUpdate is the final
self.isFinal = isFinal
# The total number of file bytes expected to be processed
self.totalExpectedBytes = totalExpectedBytes
# The total number of files expected to be processed
self.totalExpectedFiles = totalExpectedFiles
# ProgressCounters for this update
self.progressCounters = copy.copy(progressCounters)
def __str__(self):
return ("ProgressUpdate(isFinal=%s, totalExpectedBytes=%d, totalExpectedFiles=%d, progressCounters=%s)"
% (self.isFinal,
self.totalExpectedBytes,
self.totalExpectedFiles,
self.progressCounters))
class ProgressCounters():
'''
A structure used to track the progress reported to SyncProgress.
'''
def __init__(self, updateTime=None, bytesProcessed=0, filesProcessed=0, processingFile=None):
# The time (from time.time()) this instance was last updated
self.updateTime = updateTime
# The cumulative number of bytes processed as of the last update
self.bytesProcessed = bytesProcessed
# The cumulative number of files *completed* as of the last update
self.filesProcessed = filesProcessed
# The file being processed during the last update
self.processingFile = processingFile
def __str__(self):
return ("ProgressCounters(updateTime=%f, bytesProcessed=%d, filesProcessed=%d, processingFile=\"%s\"))"
% (self.updateTime,
self.bytesProcessed,
self.filesProcessed,
self.processingFile))
class SyncProgress(threading.Thread):
'''
The SyncProgress class is used to record synchronization progress by RemotePysync and
issue a periodic progress report. SyncProgress runs as a daemon Thread and must be
started following allocation. The update() method must be called to feed progress
information to this instance; the stop() method should be called when the sync
operation is complete so a final report can be issued.
'''
# The minimum allowed reporting interval based on time (in seconds)
MINIMUM_TIME_INTERVAL = 2 * 60.0
DEFAULT_TIME_INTERVAL = 10 * 60.0
# The minimum allowed reporting interval based on byte volume
MINIMUM_VOLUME_INTERVAL = 512 * 1024 * 1024
DEFAULT_VOLUME_INTERVAL = "5.0%"
def __init__(self, list, reportInterval=0, reportBytes=0,
recordProgressCallback=None, recordRawProgressCallback=None,
progressTimestamp=False):
'''
Initialize a new SyncProgress instance.
list - the dict containing the name:attribute pairings for the
file system objects to synchronize; this list is expected
to be obtained from a LocalPysync "getList" command.
reportInterval - the minimum number of seconds between progress
reports; volume reports are emitted regardless of the
reportInterval value. If zero, the default report interval
is SyncProgress.DEFAULT_TIME_INTERVAL
reportBytes - the number of bytes processed for a volume report
to be issued; if zero, the default volume report interval
is SyncProgress.DEFAULT_VOLUME_INTERVAL. If a string of the
form "n.n%", the reportBytes is calculated to be n.n percent
of the total expected bytes to process.
recordProgressCallback - method to call to emit the progress *message*
instead of writting the message to stderr. This method is
presented a string containing the progress message as the only
argument. If None, the progress message is written to stderr
prepended with the timestamp of the observation.
recordRawProgressCallback - method to call to emit the raw progress data
collected by an update() call. This method is presented a
ProgressUpdate instance as the only argument.
progressTimestamp - indicates whether or not the observation timestamp is
included in progress messages emitted by this SyncProgress instance.
'''
self._final = False
self._totalBytes = 0
self._totalFiles = 0
self.processingChunk = 0
self.progressTimestamp = progressTimestamp
# Calculate the total number of files and bytes that could be
# processed. Only regular files contribute to the total byte
# count.
for desc in list.itervalues():
if desc[0] == '-':
self._totalBytes += desc[2]
self._totalFiles += 1
# Indicates the time, in seconds, of the progress reporting interval.
if reportInterval == 0:
reportInterval = SyncProgress.DEFAULT_TIME_INTERVAL
elif reportInterval < SyncProgress.MINIMUM_TIME_INTERVAL:
raise ValueError("reportInterval must be at least %d" % SyncProgress.MINIMUM_TIME_INTERVAL, reportInterval)
self.progressInterval = reportInterval
# Indicates the amount, in bytes, of the data processed before a progress report is forced.
if reportBytes == 0:
reportBytes = SyncProgress.DEFAULT_VOLUME_INTERVAL
elif type(reportBytes) == str:
pass
elif reportBytes < SyncProgress.MINIMUM_VOLUME_INTERVAL:
raise ValueError("reportBytes must be at least %d" % SyncProgress.MINIMUM_VOLUME_INTERVAL, reportBytes)
if type(reportBytes) == str:
if reportBytes[-1] == '%':
# Caller has requested a percent of the total
reportBytes = int(round(self._totalBytes * float(reportBytes[:-1]) / 100))
else:
raise ValueError("reportBytes must be numeric or a string in the form \"n.n%\"", reportBytes)
self.progressBytes = reportBytes
# Method to call for recording progress details
self.recordProgressCallback = recordProgressCallback
# Method to call for recording raw progress data
self.recordRawProgressCallback = recordRawProgressCallback
# Active ProgressCounters instance
self.progressCounters = ProgressCounters()
# Access synchronization lock for self.progressCounters
self.progressCountersLock = threading.Lock()
# Last time progress was reported
self.lastReportTime = None
# Copy of ProgressCounters at self.lastReportTime
self.lastReportProgressCounters = None
# Access synchronization lock for self.lastReportTime and self.lastReportProgressCounters
self.progressReportLock = threading.Lock()
# threading.Event instance used to run the interval timer;
# setting this Event terminates the reporting loop.
self.timerEvent = None
threading.Thread.__init__(self, name="ProgressTimer")
self.daemon = True
def _emitProgress(self, progressCounters=None, timedReport=True):
'''
Prepare and emit a progress message. If progressCounters
is not provided, a copy of self.progressCounters is made
while holding the self.progressCountersLock; the lock is
not held while actually formatting and emitting the progress
message.
Returns the amount of time, in seconds, for the next timed
report.
'''
if not progressCounters:
with self.progressCountersLock:
progressCounters = copy.copy(self.progressCounters)
currentReportTime = time.time()
with self.progressReportLock:
lastReportTime = self.lastReportTime
if timedReport:
# If the last report was more recent than the desired reporting interval,
# suppress this report and return the amount of time remaining until the
# next reporting time
lastReportInterval = currentReportTime - lastReportTime
if lastReportInterval < self.progressInterval:
return self.progressInterval - lastReportInterval
lastReportProgressCounters = self.lastReportProgressCounters
self.lastReportTime = currentReportTime
self.lastReportProgressCounters = progressCounters
if self._final:
bytesMoved = progressCounters.bytesProcessed
processingInterval = currentReportTime - self.collectionStartTime
byteRate = bytesMoved / processingInterval if processingInterval else 0.0
else:
bytesMoved = progressCounters.bytesProcessed - lastReportProgressCounters.bytesProcessed if lastReportProgressCounters else 0
processingInterval = progressCounters.updateTime - lastReportProgressCounters.updateTime if lastReportProgressCounters else 0.0
byteRate = bytesMoved / processingInterval if processingInterval else 0.0
message = ("%sProcessed %sB of %sB (%d%%); %sB processed at %sBps; %d of %d files processed"
% ('*' if timedReport else ' ',
formatSiBinary(progressCounters.bytesProcessed),
formatSiBinary(self._totalBytes),
100 * progressCounters.bytesProcessed / self._totalBytes if self._totalBytes !=0 else 1,
formatSiBinary(bytesMoved),
formatSiBinary(byteRate),
progressCounters.filesProcessed,
self._totalFiles))
if self.progressTimestamp:
message = "[%s]%s" % (datetime.fromtimestamp(currentReportTime).isoformat(' '), message)
if self.recordProgressCallback:
self.recordProgressCallback(message)
else:
sys.stderr.write(message)
sys.stderr.write("\n")
return self.progressInterval
def update(self, fileBytesProcessed=0, processingFile=None):
'''
Update the progress information and emit a progress message if
deemed necessary.
fileBytesProcessed - the number of bytes processed (moved/examined)
since progress was last reported.
processingFile - the file currently being processed. If different
from the previous value, the file completion counter is
incremented.
The final (closing) call to this method is made when the SyncProgress
thread is terminated by the stop() method call.
'''
progressUpdate = None
progressCounters = None
with self.progressCountersLock:
self.progressCounters.updateTime = time.time()
self.progressCounters.bytesProcessed += fileBytesProcessed
if self.progressCounters.processingFile != processingFile:
if self.progressCounters.processingFile:
self.progressCounters.filesProcessed += 1
self.progressCounters.processingFile = processingFile
processingChunk = self.progressCounters.bytesProcessed // self.progressBytes if self.progressBytes != 0 else 1
if self._final or processingChunk > self.processingChunk:
self.processingChunk = processingChunk
progressCounters = copy.copy(self.progressCounters)
if self.recordRawProgressCallback:
progressUpdate = ProgressUpdate(self._final, self._totalBytes, self._totalFiles, self.progressCounters)
if progressUpdate:
self.recordRawProgressCallback(progressUpdate)
if progressCounters:
self._emitProgress(progressCounters, False)
def run(self):
'''
Emit a progress message periodically based on self.progressInterval.
Exit once self.timerEvent is set by self.stop().
Updates to the progress counters should be made by calling
self.update().
'''
self.collectionStartTime = self.lastReportTime = self.progressCounters.updateTime = time.time()
self.lastReportProgressCounters = copy.copy(self.progressCounters)
self.timerEvent = threading.Event()
waitInterval = self.progressInterval
while not self.timerEvent.isSet():
self.timerEvent.wait(waitInterval)
if self.timerEvent.isSet():
break
waitInterval = self._emitProgress()
# Close out statistics and emit the final report
self._final = True
self.update()
self.timerEvent = None
def stop(self):
'''
Stop emitting progress messages and wait for this thread to terminate.
'''
if self.isAlive():
if self.timerEvent:
self.timerEvent.set()
self.join(0.5)
class RemotePysync:
'''
The RemotePysync class is the receiving end of the directory synchronization
initiaited by LocalPysync. Once started by LocalPysync, RemotePysync manages
copying the content of a source directory.
A RemotePysync instance and a LocalPysync instance communicate with each other
over a pipe; command orders and arguments are transferred to LocalPysync over
stdout; responses are received from LocalPysync via stdin.
This class is *not* thread-safe; a single RemotePysync instance may be used
in a process and must be free to change the current working directory for the
duration of the synchronization operation.
'''
def __init__(self):
# The SyncProgress instance used to track progress of the directory
# synchronization managed by this RemotePysync instance.
self.syncProgress = None
# Synchronization lock to prevent interleaved communications using
# self.sendCommand() and self.getAnswer().
self.commLock = threading.RLock()
def sendCommand(self,*args):
'''
Serialize the command & arguments using cPickle and send write to stdout.
The self.commLock must be held before calling this method. The lock should
not be released until the getAnswer() method is called to obtain the results.
'''
a = cPickle.dumps(args)
sys.stdout.write('%d\n%s'%(len(a),a))
sys.stdout.flush()
def getAnswer(self):
'''
Read a command response from stdin for a command sent using the sendCommand()
method. The response is a data stream serialized using cPickle and preceded by
the integer size of the serialized data. The data stream is deserialized using
cPickle and returned.
The self.commLock must be held before calling this method. The lock should
be obtained before calling the self.sendCommand() method to send the command
for which the call to this method is being made. Once the answer is obtained,
the self.commLock should be released.
'''
size = int(sys.stdin.readline())
data = sys.stdin.read(size)
assert size==len(data)
return cPickle.loads(data)
def doCommand(self,*args):
'''
Sends the command represented by args to the local peer of this
RemotePysync and returns with the answer.
This method calls the sendCommand() method followed immediately
by the getAnswer() method while holding self.commLock.
'''
with self.commLock:
self.sendCommand(*args)
return self.getAnswer()
def _recordProgress(self, message):
'''
Send progress information to associated LocalPysync instance.
'''
if message:
self.doCommand('recordProgress', message)
def _recordRawProgress(self, progressUpdate):
'''
Send raw progress data to associated LocalPysync instance.
'''
if progressUpdate:
self.doCommand('recordRawProgress', progressUpdate)
def removeJunk(self,list):
'''
Trim the current directory of files and directories not in list
(if options.delete is True) and files and directories in list if
the current type is not the expected type.
Existing files of the expected type may be altered or removed in
self.createFiles().
'''
for root,dirs,files in os.walk('.',False):
os.chmod(root,0700)
for i in dirs+files:
name = os.path.join(root,i)
t = statToTuple(os.lstat(name),name)
if name not in list and self.options.delete or name in list and list[name][0]!=t[0]:
if t[0]=='d':
os.rmdir(name)
else:
os.remove(name)
def createDirs(self,list):
'''
Create all directories identified in list.
'''
for name in sorted(list.keys()):
value = list[name]
if value[0]=='d':
try:
os.mkdir(name,0700)
except OSError, err:
if err.errno != errno.EEXIST:
raise
# Ignoring creation error for existing directory
def createFiles(self,list):
'''
Create all files identified in the list.
Directories must be created by calling self.createDirs() before calling
this method. Hard links (entry type = 'L') are not processed by this
method.
'''
for name,value in list.iteritems():
# If a directory or hard link, skip it.
if value[0]=='d' or value[0]=='L':
continue
# If the file already exists, process according to its existing
# type:
# 1) If the existing type and the expected/desired type
# are the same:
# a) set the permissions to 0700 for all except symbolic
# links
# b) if a regular file and the existing size is greater
# than the expected size, truncate the file to the
# expected size and skip further processing for this
# file
# c) if the variable content of the internal stat tuple
# for the existing file and the expected file are the
# same, skip further processing for this file.
# 2) If the existing type and the expected type are not the
# same or file processing was not skipped above, remove the
# existing file and continue processing.
if os.path.lexists(name):
t = statToTuple(os.lstat(name),name)
if value[0]==t[0]:
if value[0]!='l':
os.chmod(name,0700)
if value[0]=='-':
value[3] = t[2]
if t[2]>value[2]:
f = open(name,'r+b')
f.truncate(value[2])
f.close()
continue
if value[2:]==t[2:]:
continue
os.remove(name)
# Create the file entry based on the desired type.
# For regular files, an empty (zero-length) file is created.
if value[0]=='-':
open(name,'w+b').close()
elif value[0]=='l':
os.symlink(value[2],name)
elif value[0]=='c':
os.mknod(name,0700|stat.S_IFCHR,value[2])
elif value[0]=='b':
os.mknod(name,0700|stat.S_IFBLK,value[2])
elif value[0]=='p':
os.mkfifo(name,0700)
elif value[0]=='s':
f = socket.socket(socket.AF_UNIX)
f.bind(name)
f.close()
else:
assert 0
def linkLinks(self,list):
'''
Create hard links identified in the list. An existing file
having the same name as the link is removed before the link
is created.
Directories must be created by calling self.createDirs() and
files must be created by calling self.createFiles() before calling
this method.
'''
for name,value in list.iteritems():
if value[0]=='L':
if os.path.lexists(name):
os.remove(name)
os.link(value[1],name)
def getData(self,name,offset,size):
if not self.socket:
ss = socket.socket(self.options.addrinfo[0])
ss.bind(self.options.addrinfo[4])
ss.listen(1)
self.doCommand('connect',ss.getsockname())
self.socket = ss.accept()[0]
ss.close()
s = self.doCommand('getData',name,offset,size)
o = 0
sb = []
while o<s:
a = self.socket.recv(min(s-o,65536))
if len(a)==0:
raise IOError('EOF')
sb.append(a)
o += len(a)
data = "".join(sb)
if self.options.compress:
data = zlib.decompress(data)
assert len(data)==size
return data
def copyData(self,list):
fileMax = 0
byteMax = 0
for name,value in list.iteritems():
if value[0]=='-':
fileMax += 1
byteMax += value[2]
progress = Progress(byteMax,fileMax)
fileNow = 0
byteNow = 0
bytesTotal = 0
unchanged = 0
for name,value in list.iteritems():
if value[0]=='-':
f = None
if self.options.minusn:
if os.path.lexists(name) and stat.S_ISREG(os.lstat(name).st_mode):
f = open(name,'rb')
else:
f = open(name,'r+b')
localSize = 0
if f:
f.seek(0,2)
localSize = f.tell()
offset = 0
bytesFile = 0
changed = value[2]!=value[3]
if not value[2]:
# Zero-length files require little work
if not self.options.minusn:
self.syncProgress.update(processingFile=name)
else:
while offset<value[2]:
if self.options.verbose:
progress.update('',name,byteNow,fileNow)
size = min(value[2]-offset,CHUNK_SIZE)
digest = None
if offset+size<=localSize:
with self.commLock:
self.sendCommand('getDigest',name,offset,size)
f.seek(offset)
b = f.read(size)
assert len(b)==size
m = hashlib.md5()
m.update(b)
a = m.digest()
digest = self.getAnswer()
if a==digest:
if not self.options.minusn:
self.syncProgress.update(fileBytesProcessed=size, processingFile=name)
offset += size
byteNow += size
continue
changed = True
if not self.options.minusn:
data = self.getData(name,offset,size)
if not self.options.insecure:
if digest==None:
digest = self.doCommand('getDigest',name,offset,size)
m = hashlib.md5()
m.update(data)
if m.digest()!=digest:
raise Exception('Digest did not match.')
f.seek(offset)
f.write(data)
self.syncProgress.update(fileBytesProcessed=size, processingFile=name)
bytesFile += size
bytesTotal += size
offset += size
byteNow += size
if f:
f.close()
if self.options.minusn and self.options.verbose:
sys.stderr.write('%s %d\n'%(name,bytesFile))
if not changed:
if self.options.verbose:
sys.stderr.write('%s is the same\n'%name)
unchanged += 1
fileNow += 1
if self.options.verbose:
progress.update('','',byteNow,fileNow)
updated = fileMax - unchanged
if self.options.minusn or self.options.verbose:
sys.stderr.write('%d out of %d file(s) updated.\n'%(updated, fileMax))
if self.options.minusn:
sys.stderr.write('Total--%d byte%s\n'%(bytesTotal,bytesTotal!=1 and 's' or ''))
def fixPermissions(self,list):
'''
Set the permission bits of the non-link files and directories to
the expected value.
'''
for name,value in list.iteritems():
if value[0]!='L' and value[0]!='l':
os.chmod(name,value[1]&01777)
def run(self):
self.socket = None
self.options = self.doCommand('getOptions')
# TODO: Add a safety check for destDir
os.chdir(self.doCommand('getDestDir'))
list = self.doCommand('getList')
if self.options.minusn:
self.copyData(list)
else:
self.syncProgress = SyncProgress(list,
reportInterval=self.options.progressTime,
reportBytes=self.options.progressBytes,
recordProgressCallback=(self._recordProgress if self.options.sendProgress else None),
recordRawProgressCallback=(self._recordRawProgress if self.options.sendRawProgress else None),
progressTimestamp=self.options.progressTimestamp)
self.syncProgress.start()
self.removeJunk(list)
self.createDirs(list)
self.createFiles(list)
self.linkLinks(list)
self.copyData(list)
self.fixPermissions(list)
self.syncProgress.stop()
with self.commLock:
self.sendCommand('quit', 0)
if __name__ == '__main__':
RemotePysync().run()