Source code for waflib.Tools.wafcache

#! /usr/bin/env python
# encoding: utf-8
# Thomas Nagy, 2019 (ita)

"""
Filesystem-based cache system to share and re-use build artifacts

Cache access operations (copy to and from) are delegated to
independent pre-forked worker subprocesses.

Usage::

	def build(bld):
		bld.load('wafcache')
		...

To troubleshoot::

	waf clean build --zone=wafcache

General parameters
^^^^^^^^^^^^^^^^^^

Use the following environment variables

* WAFCACHE, which can be

  - An absolute path to the waf cache folder, by default::

      ~/.cache/wafcache_user # `user` represents the currently logged-in user

  - A URL to a cache server, for example::

      export WAFCACHE=http://localhost:8080/files/
      #in that case, `GET/POST` requests are made to urls of the form
      #`http://localhost:8080/files/000000000/0` (cache management is delegated to the server)

  - A GCS, S3 or MINIO bucket::

      gs://my-bucket/    # (uses gsutil command line tool or WAFCACHE_CMD)
      s3://my-bucket/    # (uses aws command line tool or WAFCACHE_CMD)
      minio://my-bucket/ # (uses mc command line tool or WAFCACHE_CMD)

* WAFCACHE_CMD: bucket upload/download command, for example::

    WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"

* WAFCACHE_NO_PUSH: if set, disables pushing to the cache
* WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
* WAFCACHE_STATS: if set, displays cache usage statistics on exit

Remote buckets
^^^^^^^^^^^^^^

The WAFCACHE bucket value is used for the source or destination
depending on the operation (upload or download). For example, with::

    WAFCACHE="gs://mybucket/"

the following commands may be run::

    gsutil cp build/myprogram  gs://mybucket/aa/aaaaa/1
    gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile

File cache
^^^^^^^^^^

Files are copied using hard links by default; if the cache is located
onto another partition, the system switches to file copies instead.

Additional environments can be set:

* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try and trim the cache (3 minutes)

Asynchronous transfers
^^^^^^^^^^^^^^^^^^^^^^

* WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously
                          this may improve build performance with many/long file uploads
                          the default is unset (synchronous uploads)
* WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False)
                         this requires asynchonous uploads to have an effect
"""

import atexit, base64, errno, getpass, os, re, shutil, sys, time, threading, traceback, shlex
try:
	import subprocess32 as subprocess
except ImportError:
	import subprocess

base_cache = os.path.expanduser('~/.cache/')
if not os.path.isdir(base_cache):
	base_cache = '/tmp/'
default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())

CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS')
WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT')
OK = "ok"

re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')

try:
	import cPickle
except ImportError:
	import pickle as cPickle

if __name__ != '__main__':
	from waflib import Task, Logs, Utils, Build

[docs]def can_retrieve_cache(self): """ New method for waf Task classes """ if not self.outputs: return False self.cached = False sig = self.signature() ssig = Utils.to_hex(self.uid() + sig) if WAFCACHE_STATS: self.generator.bld.cache_reqs += 1 files_to = [node.abspath() for node in self.outputs] proc = get_process() err = cache_command(proc, ssig, [], files_to) process_pool.append(proc) if err.startswith(OK): if WAFCACHE_VERBOSITY: Logs.pprint('CYAN', ' Fetched %r from cache' % files_to) else: Logs.debug('wafcache: fetched %r from cache', files_to) if WAFCACHE_STATS: self.generator.bld.cache_hits += 1 else: if WAFCACHE_VERBOSITY: Logs.pprint('YELLOW', ' No cache entry %s' % files_to) else: Logs.debug('wafcache: No cache entry %s: %s', files_to, err) return False self.cached = True return True
[docs]def put_files_cache(self): """ New method for waf Task classes """ if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs: return files_from = [] for node in self.outputs: path = node.abspath() if not os.path.isfile(path): return files_from.append(path) bld = self.generator.bld old_sig = self.signature() for node in self.inputs: try: del node.ctx.cache_sig[node] except KeyError: pass delattr(self, 'cache_sig') sig = self.signature() def _async_put_files_cache(bld, ssig, files_from): proc = get_process() if WAFCACHE_ASYNC_WORKERS: with bld.wafcache_lock: if bld.wafcache_stop: process_pool.append(proc) return bld.wafcache_procs.add(proc) err = cache_command(proc, ssig, files_from, []) process_pool.append(proc) if err.startswith(OK): if WAFCACHE_VERBOSITY: Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from) else: Logs.debug('wafcache: Successfully uploaded %r to cache', files_from) if WAFCACHE_STATS: bld.cache_puts += 1 else: if WAFCACHE_VERBOSITY: Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err)) else: Logs.debug('wafcache: Error caching results %s: %s', files_from, err) if old_sig == sig: ssig = Utils.to_hex(self.uid() + sig) if WAFCACHE_ASYNC_WORKERS: fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from) bld.wafcache_uploads.append(fut) else: _async_put_files_cache(bld, ssig, files_from) else: Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs) bld.task_sigs[self.uid()] = self.cache_sig
[docs]def hash_env_vars(self, env, vars_lst): """ Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths """ if not env.table: env = env.parent if not env: return Utils.SIG_NIL idx = str(id(env)) + str(vars_lst) try: cache = self.cache_env except AttributeError: cache = self.cache_env = {} else: try: return self.cache_env[idx] except KeyError: pass v = str([env[a] for a in vars_lst]) v = v.replace(self.srcnode.abspath().__repr__()[:-1], '') m = Utils.md5() m.update(v.encode()) ret = m.digest() Logs.debug('envhash: %r %r', ret, v) cache[idx] = ret return ret
[docs]def uid(self): """ Reimplement Task.uid() so that the signature does not depend on local paths """ try: return self.uid_ except AttributeError: m = Utils.md5() src = self.generator.bld.srcnode up = m.update up(self.__class__.__name__.encode()) for x in self.inputs + self.outputs: up(x.path_from(src).encode()) self.uid_ = m.digest() return self.uid_
[docs]def make_cached(cls): """ Enable the waf cache for a given task class """ if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False): return full_name = "%s.%s" % (cls.__module__, cls.__name__) if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'): return m1 = getattr(cls, 'run', None) def run(self): if getattr(self, 'nocache', False): return m1(self) if self.can_retrieve_cache(): return 0 return m1(self) cls.run = run m2 = getattr(cls, 'post_run', None) def post_run(self): if getattr(self, 'nocache', False): return m2(self) ret = m2(self) self.put_files_cache() return ret cls.post_run = post_run cls.has_cache = True
process_pool = []
[docs]def get_process(): """ Returns a worker process that can process waf cache commands The worker process is assumed to be returned to the process pool when unused """ try: return process_pool.pop() except IndexError: filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py' cmd = [sys.executable, '-c', Utils.readf(filepath)] return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
def atexit_pool(): for proc in process_pool: proc.kill() atexit.register(atexit_pool)
[docs]def build(bld): """ Called during the build process to enable file caching """ if WAFCACHE_ASYNC_WORKERS: try: num_workers = int(WAFCACHE_ASYNC_WORKERS) except ValueError: Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS) else: from concurrent.futures import ThreadPoolExecutor bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers) bld.wafcache_uploads = [] bld.wafcache_procs = set([]) bld.wafcache_stop = False bld.wafcache_lock = threading.Lock() def finalize_upload_async(bld): if WAFCACHE_ASYNC_NOWAIT: with bld.wafcache_lock: bld.wafcache_stop = True for fut in reversed(bld.wafcache_uploads): fut.cancel() for proc in bld.wafcache_procs: proc.kill() bld.wafcache_procs.clear() else: Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads)) bld.wafcache_executor.shutdown(wait=True) bld.add_post_fun(finalize_upload_async) if WAFCACHE_STATS: # Init counter for statistics and hook to print results at the end bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0 def printstats(bld): hit_ratio = 0 if bld.cache_reqs > 0: hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100 Logs.pprint('CYAN', ' wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' % (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) ) bld.add_post_fun(printstats) if process_pool: # already called once return # pre-allocation processes = [get_process() for x in range(bld.jobs)] process_pool.extend(processes) Task.Task.can_retrieve_cache = can_retrieve_cache Task.Task.put_files_cache = put_files_cache Task.Task.uid = uid Build.BuildContext.hash_env_vars = hash_env_vars for x in reversed(list(Task.classes.values())): make_cached(x)
[docs]def cache_command(proc, sig, files_from, files_to): """ Create a command for cache worker processes, returns a pickled base64-encoded tuple containing the task signature, a list of files to cache and a list of files files to get from cache (one of the lists is assumed to be empty) """ obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to])) proc.stdin.write(obj) proc.stdin.write('\n'.encode()) proc.stdin.flush() obj = proc.stdout.readline() if not obj: raise OSError('Preforked sub-process %r died' % proc.pid) return cPickle.loads(base64.b64decode(obj))
try: copyfun = os.link except NameError: copyfun = shutil.copy2
[docs]def atomic_copy(orig, dest): """ Copy files to the cache, the operation is atomic for a given file """ global copyfun tmp = dest + '.tmp' up = os.path.dirname(dest) try: os.makedirs(up) except OSError: pass try: copyfun(orig, tmp) except OSError as e: if e.errno == errno.EXDEV: copyfun = shutil.copy2 copyfun(orig, tmp) else: raise os.rename(tmp, dest)
[docs]def lru_trim(): """ the cache folders take the form: `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9` they are listed in order of last access, and then removed until the amount of folders is within TRIM_MAX_FOLDERS and the total space taken by files is less than EVICT_MAX_BYTES """ lst = [] for up in os.listdir(CACHE_DIR): if len(up) == 2: sub = os.path.join(CACHE_DIR, up) for hval in os.listdir(sub): path = os.path.join(sub, hval) size = 0 for fname in os.listdir(path): try: size += os.lstat(os.path.join(path, fname)).st_size except OSError: pass lst.append((os.stat(path).st_mtime, size, path)) lst.sort(key=lambda x: x[0]) lst.reverse() tot = sum(x[1] for x in lst) while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS: _, tmp_size, path = lst.pop() tot -= tmp_size tmp = path + '.remove' try: shutil.rmtree(tmp) except OSError: pass try: os.rename(path, tmp) except OSError: sys.stderr.write('Could not rename %r to %r\n' % (path, tmp)) else: try: shutil.rmtree(tmp) except OSError: sys.stderr.write('Could not remove %r\n' % tmp) sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
[docs]def lru_evict(): """ Reduce the cache size """ lockfile = os.path.join(CACHE_DIR, 'all.lock') try: st = os.stat(lockfile) except EnvironmentError as e: if e.errno == errno.ENOENT: with open(lockfile, 'w') as f: f.write('') else: # any other errors such as permissions raise if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60: # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big # OCLOEXEC is unnecessary because no cleaning processes are spawned fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755) try: try: import fcntl except ImportError: import msvcrt, ctypes, ctypes.wintypes handle = msvcrt.get_osfhandle(fd) kernel32 = ctypes.windll('kernel32', use_last_error=True) DWORD = ctypes.wintypes.DWORD HANDLE = ctypes.wintypes.HANDLE class DUMMYSTRUCTNAME(ctypes.Structure): _fields = [('Offset', ctypes.wintypes.DWORD), ('OffsetHigh', DWORD)] class DUMMYUNIONNAME(ctypes.Union): _fields_ = [('_dummystructname', DUMMYSTRUCTNAME), ('Pointer', ctypes.c_void_p)] class OVERLAPPED(ctypes.Structure): _fields_ = [('Internal', ctypes.c_void_p), ('InternalHigh', ctypes.c_void_p), ('_dummyunionname', DUMMYUNIONNAME), ('hEvent', HANDLE)] LockFileEx = kernel32.LockFileEx LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, POINTER(OVERLAPPED)] LockFileEx.restype = BOOL UnlockFileEx = kernel32.UnlockFileEx UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, POINTER(OVERLAPPED)] UnlockFileEx.restype = BOOL if LockFileEx(handle, 3, 0, 1, 0, ctypes.pointer(OVERLAPPED())): try: lru_trim() os.utime(lockfile, None) finally: win32file.UnlockFileEx(handle, 0, 1, 0, ctypes.pointer(OVERLAPPED())) else: last_error = kernel32.GetLastError() if last_error == 33: if WAFCACHE_VERBOSITY: sys.stderr.write('wafcache: another cleaning process is running\n') else: raise OSError(last_error) else: try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except EnvironmentError: if WAFCACHE_VERBOSITY: sys.stderr.write('wafcache: another cleaning process is running\n') else: # now dow the actual cleanup lru_trim() os.utime(lockfile, None) finally: os.close(fd)
class netcache(object): def __init__(self): import urllib3 self.http = urllib3.PoolManager() def url_of(self, sig, i): return "%s/%s/%s" % (CACHE_DIR, sig, i) def upload(self, file_path, sig, i): url = self.url_of(sig, i) with open(file_path, 'rb') as f: file_data = f.read() r = self.http.request('POST', url, timeout=60, fields={ 'file': ('%s/%s' % (sig, i), file_data), }) if r.status >= 400: raise OSError("Invalid status %r %r" % (url, r.status)) def download(self, file_path, sig, i): url = self.url_of(sig, i) with self.http.request('GET', url, preload_content=False, timeout=60) as inf: if inf.status >= 400: raise OSError("Invalid status %r %r" % (url, inf.status)) with open(file_path, 'wb') as out: shutil.copyfileobj(inf, out) def copy_to_cache(self, sig, files_from, files_to): try: for i, x in enumerate(files_from): if not os.path.islink(x): self.upload(x, sig, i) except Exception: return traceback.format_exc() return OK def copy_from_cache(self, sig, files_from, files_to): try: for i, x in enumerate(files_to): self.download(x, sig, i) except Exception: return traceback.format_exc() return OK class fcache(object): def __init__(self): if not os.path.exists(CACHE_DIR): try: os.makedirs(CACHE_DIR) except OSError: pass if not os.path.exists(CACHE_DIR): raise ValueError('Could not initialize the cache directory') def copy_to_cache(self, sig, files_from, files_to): """ Copy files to the cache, existing files are overwritten, and the copy is atomic only for a given file, not for all files that belong to a given task object """ try: for i, x in enumerate(files_from): dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) atomic_copy(x, dest) except Exception: return traceback.format_exc() else: # attempt trimming if caching was successful: # we may have things to trim! try: lru_evict() except Exception: return traceback.format_exc() return OK def copy_from_cache(self, sig, files_from, files_to): """ Copy files from the cache """ try: for i, x in enumerate(files_to): orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) atomic_copy(orig, x) # success! update the cache time os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None) except Exception: return traceback.format_exc() return OK class bucket_cache(object): def bucket_copy(self, source, target): if WAFCACHE_CMD: def replacer(match): if match.group('src'): return source elif match.group('tgt'): return target cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)] elif CACHE_DIR.startswith('s3://'): cmd = ['aws', 's3', 'cp', source, target] elif CACHE_DIR.startswith('gs://'): cmd = ['gsutil', 'cp', source, target] else: cmd = ['mc', 'cp', source, target] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate() if proc.returncode: raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % ( source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace'))) def copy_to_cache(self, sig, files_from, files_to): try: for i, x in enumerate(files_from): dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) self.bucket_copy(x, dest) except Exception: return traceback.format_exc() return OK def copy_from_cache(self, sig, files_from, files_to): try: for i, x in enumerate(files_to): orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) self.bucket_copy(orig, x) except EnvironmentError: return traceback.format_exc() return OK
[docs]def loop(service): """ This function is run when this file is run as a standalone python script, it assumes a parent process that will communicate the commands to it as pickled-encoded tuples (one line per command) The commands are to copy files to the cache or copy files from the cache to a target destination """ # one operation is performed at a single time by a single process # therefore stdin never has more than one line txt = sys.stdin.readline().strip() if not txt: # parent process probably ended sys.exit(1) ret = OK [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt)) if files_from: # TODO return early when pushing files upstream ret = service.copy_to_cache(sig, files_from, files_to) elif files_to: # the build process waits for workers to (possibly) obtain files from the cache ret = service.copy_from_cache(sig, files_from, files_to) else: ret = "Invalid command" obj = base64.b64encode(cPickle.dumps(ret)) sys.stdout.write(obj.decode()) sys.stdout.write('\n') sys.stdout.flush()
if __name__ == '__main__': if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'): if CACHE_DIR.startswith('minio://'): CACHE_DIR = CACHE_DIR[8:] # minio doesn't need the protocol part, uses config aliases service = bucket_cache() elif CACHE_DIR.startswith('http'): service = netcache() else: service = fcache() while 1: try: loop(service) except KeyboardInterrupt: break