""" Chooses a set of implementations based on a policy. """ # Copyright (C) 2007, Thomas Leonard # See the README file for details, or visit http://0install.net. import time import sys, os, sets from logging import info, debug, warn import arch from model import * import basedir from namespaces import * import ConfigParser import reader from zeroinstall import NeedDownload from zeroinstall.injector.iface_cache import iface_cache, PendingFeed from zeroinstall.injector.trust import trust_db # If we started a check within this period, don't start another one: FAILED_CHECK_DELAY = 60 * 60 # 1 Hour class _Cook: """A Cook follows a Recipe.""" # Maybe we're taking this metaphor too far? def __init__(self, policy, required_digest, recipe, force = False): """Start downloading all the ingredients.""" self.recipe = recipe self.required_digest = required_digest self.downloads = {} # Downloads that are not yet successful self.streams = {} # Streams collected from successful downloads # Start a download for each ingredient for step in recipe.steps: dl = policy.begin_archive_download(step, success_callback = lambda stream, step=step: self.ingredient_ready(step, stream), force = force) self.downloads[step] = dl self.test_done() # Needed for empty recipes # Note: the only references to us are held by the on_success callback # in each Download. On error this is removed, which will cause us # to be destoryed, which will release all the temporary files we hold. def ingredient_ready(self, step, stream): # Called when one archive has been fetched. Store it until the other # archives arrive. assert step not in self.streams self.streams[step] = stream del self.downloads[step] self.test_done() def test_done(self): # On success, a download is removed from here. If empty, it means that # all archives have successfully been downloaded. if self.downloads: return from zeroinstall.zerostore import unpack # Create an empty directory for the new implementation store = iface_cache.stores.stores[0] tmpdir = store.get_tmp_dir_for(self.required_digest) try: # Unpack each of the downloaded archives into it in turn for step in self.recipe.steps: unpack.unpack_archive_over(step.url, self.streams[step], tmpdir, step.extract) # Check that the result is correct and store it in the cache store.check_manifest_and_rename(self.required_digest, tmpdir) tmpdir = None finally: # If unpacking fails, remove the temporary directory if tmpdir is not None: from zeroinstall import support support.ro_rmtree(tmpdir) class Policy(object): """Chooses a set of implementations based on a policy. Typical use: 1. Create a Policy object, giving it the URI of the program to be run and a handler. 2. Call L{recalculate}. If more information is needed, the handler will be used to download it. 3. When all downloads are complete, the L{implementation} map contains the chosen versions. 4. Use L{get_uncached_implementations} to find where to get these versions and download them using L{begin_impl_download}. @ivar root: URI of the root interface @ivar implementation: chosen implementations @type implementation: {model.Interface: model.Implementation or None} @ivar watchers: callbacks to invoke after recalculating @ivar help_with_testing: default stability policy @type help_with_testing: bool @ivar network_use: one of the model.network_* values @ivar freshness: seconds allowed since last update @type freshness: int @ivar ready: whether L{implementation} is complete enough to run the program @type ready: bool @ivar handler: handler for main-loop integration @type handler: L{handler.Handler} @ivar restrictions: Currently known restrictions for each interface. @type restrictions: {model.Interface -> [model.Restriction]} @ivar src: whether we are looking for source code @type src: bool @ivar stale_feeds: set of feeds which are present but haven't been checked for a long time @type stale_feeds: set """ __slots__ = ['root', 'implementation', 'watchers', 'help_with_testing', 'network_use', 'freshness', 'ready', 'handler', '_warned_offline', 'restrictions', 'src', 'root_restrictions', 'stale_feeds'] def __init__(self, root, handler = None, src = False): """ @param root: The URI of the root interface (the program we want to run). @param handler: A handler for main-loop integration. @type handler: L{zeroinstall.injector.handler.Handler} @param src: Whether we are looking for source code. @type src: bool """ self.watchers = [] self.help_with_testing = False self.network_use = network_full self.freshness = 60 * 60 * 24 * 30 self.ready = False self.src = src # Root impl must be a "src" machine type self.restrictions = {} self.stale_feeds = sets.Set() # This is used in is_unusable() to check whether the impl is # for the root interface when looking for source. It is also # used to add restrictions to the root (e.g. --before and --not-before) self.root_restrictions = [] # If we need to download something but can't because we are offline, # warn the user. But only the first time. self._warned_offline = False # (allow self for backwards compat) self.handler = handler or self debug("Supported systems: '%s'", arch.os_ranks) debug("Supported processors: '%s'", arch.machine_ranks) path = basedir.load_first_config(config_site, config_prog, 'global') if path: try: config = ConfigParser.ConfigParser() config.read(path) self.help_with_testing = config.getboolean('global', 'help_with_testing') self.network_use = config.get('global', 'network_use') self.freshness = int(config.get('global', 'freshness')) assert self.network_use in network_levels except Exception, ex: warn("Error loading config: %s", ex) self.set_root(root) # Probably need weakrefs here... iface_cache.add_watcher(self) trust_db.watchers.append(self.process_pending) def set_root(self, root): """Change the root interface URI.""" assert isinstance(root, (str, unicode)) self.root = root self.implementation = {} # Interface -> [Implementation | None] def save_config(self): """Write global settings.""" config = ConfigParser.ConfigParser() config.add_section('global') config.set('global', 'help_with_testing', self.help_with_testing) config.set('global', 'network_use', self.network_use) config.set('global', 'freshness', self.freshness) path = basedir.save_config_path(config_site, config_prog) path = os.path.join(path, 'global') config.write(file(path + '.new', 'w')) os.rename(path + '.new', path) def process_pending(self): """For each pending feed, either import it fully (if we now trust one of the signatures) or start performing whatever action is needed next (either downloading a key or confirming a fingerprint). @since: 0.25 """ # process_pending must never be called from recalculate for pending in iface_cache.pending.values(): pending.begin_key_downloads(self.handler, lambda pending = pending: self._keys_ready(pending)) def _keys_ready(self, pending): try: iface = iface_cache.get_interface(pending.url) # Note: this may call recalculate, but it shouldn't do any harm # (just a bit slow) updated = iface_cache.update_interface_if_trusted(iface, pending.sigs, pending.new_xml) except SafeException, ex: self.handler.report_error(ex) # Ignore the problematic new version and continue... else: if not updated: self.handler.confirm_trust_keys(iface, pending.sigs, pending.new_xml) def recalculate(self, fetch_stale_interfaces = True): """Try to choose a set of implementations. This may start downloading more interfaces, but will return immediately. @param fetch_stale_interfaces: whether to begin downloading interfaces which are present but haven't been checked within the L{freshness} period @type fetch_stale_interfaces: bool @postcondition: L{ready} indicates whether a possible set of implementations was chosen @note: A policy may be ready before all feeds have been downloaded. As new feeds arrive, the chosen versions may change. """ self.stale_feeds = sets.Set() self.restrictions = {} self.implementation = {} self.ready = True debug("Recalculate! root = %s", self.root) def process(dep): iface = self.get_interface(dep.interface) if iface in self.implementation: debug("Interface requested twice; skipping second %s", iface) if dep.restrictions: warn("Interface requested twice; I've already chosen an implementation " "of '%s' but there are more restrictions! Ignoring the second set.", iface) return self.implementation[iface] = None # Avoid cycles assert iface not in self.restrictions self.restrictions[iface] = dep.restrictions impl = self._get_best_implementation(iface) if impl: debug("Will use implementation %s (version %s)", impl, impl.get_version()) self.implementation[iface] = impl for d in impl.requires: debug("Considering dependency %s", d) process(d) else: debug("No implementation chould be chosen yet"); self.ready = False process(InterfaceDependency(self.root, restrictions = self.root_restrictions)) if fetch_stale_interfaces and self.network_use != network_offline: for stale in self.stale_feeds: info("Checking for updates to stale feed %s", stale) self.begin_iface_download(stale, False) for w in self.watchers: w() # Only to be called from recalculate, as it is quite slow. # Use the results stored in self.implementation instead. def _get_best_implementation(self, iface): impls = iface.implementations.values() for f in self.usable_feeds(iface): debug("Processing feed %s", f) try: feed_iface = self.get_interface(f.uri) if feed_iface.name and iface.uri not in feed_iface.feed_for: warn("Missing for '%s' in '%s'", iface.uri, f.uri) if feed_iface.implementations: impls.extend(feed_iface.implementations.values()) except NeedDownload, ex: raise ex except Exception, ex: warn("Failed to load feed %s for %s: %s", f, iface, str(ex)) debug("get_best_implementation(%s), with feeds: %s", iface, iface.feeds) if not impls: info("Interface %s has no implementations!", iface) return None best = impls[0] for x in impls[1:]: if self.compare(iface, x, best) < 0: best = x unusable = self.get_unusable_reason(best, self.restrictions.get(iface, [])) if unusable: info("Best implementation of %s is %s, but unusable (%s)", iface, best, unusable) return None return best def compare(self, interface, b, a): """Compare a and b to see which would be chosen first. @param interface: The interface we are trying to resolve, which may not be the interface of a or b if they are from feeds. @rtype: int""" restrictions = self.restrictions.get(interface, []) a_stab = a.get_stability() b_stab = b.get_stability() # Usable ones come first r = cmp(self.is_unusable(b, restrictions), self.is_unusable(a, restrictions)) if r: return r # Preferred versions come first r = cmp(a_stab == preferred, b_stab == preferred) if r: return r if self.network_use != network_full: r = cmp(self.get_cached(a), self.get_cached(b)) if r: return r # Stability stab_policy = interface.stability_policy if not stab_policy: if self.help_with_testing: stab_policy = testing else: stab_policy = stable if a_stab >= stab_policy: a_stab = preferred if b_stab >= stab_policy: b_stab = preferred r = cmp(a_stab, b_stab) if r: return r # Newer versions come before older ones r = cmp(a.version, b.version) if r: return r # Get best OS r = cmp(arch.os_ranks.get(a.os, None), arch.os_ranks.get(b.os, None)) if r: return r # Get best machine r = cmp(arch.machine_ranks.get(a.machine, None), arch.machine_ranks.get(b.machine, None)) if r: return r # Slightly prefer cached versions if self.network_use == network_full: r = cmp(self.get_cached(a), self.get_cached(b)) if r: return r return cmp(a.id, b.id) def usable_feeds(self, iface): """Generator for C{iface.feeds} that are valid for our architecture. @rtype: generator @see: L{arch}""" if self.src and iface.uri == self.root: # Note: when feeds are recursive, we'll need a better test for root here machine_ranks = {'src': 1} else: machine_ranks = arch.machine_ranks for f in iface.feeds: if f.os in arch.os_ranks and f.machine in machine_ranks: yield f else: debug("Skipping '%s'; unsupported architecture %s-%s", f, f.os, f.machine) def get_ranked_implementations(self, iface): """Get all implementations from all feeds, in order. @type iface: Interface @return: a sorted list of implementations. @rtype: [model.Implementation]""" impls = iface.implementations.values() for f in self.usable_feeds(iface): feed_iface = iface_cache.get_interface(f.uri) if feed_iface.implementations: impls.extend(feed_iface.implementations.values()) impls.sort(lambda a, b: self.compare(iface, a, b)) return impls def is_unusable(self, impl, restrictions = []): """@return: whether this implementation is unusable. @rtype: bool""" return self.get_unusable_reason(impl, restrictions) != None def get_unusable_reason(self, impl, restrictions = []): """ @param impl: Implementation to test. @type restrictions: [L{model.Restriction}] @return: The reason why this impl is unusable, or None if it's OK. @rtype: str @note: The restrictions are for the interface being requested, not the interface of the implementation; they may be different when feeds are being used.""" for r in restrictions: if not r.meets_restriction(impl): return "Incompatible with another selected implementation" stability = impl.get_stability() if stability <= buggy: return stability.name if self.network_use == network_offline and not self.get_cached(impl): return "Not cached and we are off-line" if impl.os not in arch.os_ranks: return "Unsupported OS" # When looking for source code, we need to known if we're # looking at an implementation of the root interface, even if # it's from a feed, hence the sneaky restrictions identity check. if self.src and restrictions is self.root_restrictions: if impl.machine != 'src': return "Not source code" else: if impl.machine not in arch.machine_ranks: if impl.machine == 'src': return "Source code" return "Unsupported machine type" return None def get_interface(self, uri): """Get an interface from the L{iface_cache}. If it is missing start a new download. If it is present but stale, add it to L{stale_feeds}. This should only be called from L{recalculate}. @see: iface_cache.iface_cache.get_interface @rtype: L{model.Interface}""" iface = iface_cache.get_interface(uri) if uri in iface_cache.pending: # Don't start another download while one is pending # TODO: unless the pending version is very old return iface if iface.last_modified is None: if self.network_use != network_offline: debug("Interface not cached and not off-line. Downloading...") self.begin_iface_download(iface) else: if self._warned_offline: debug("Nothing known about interface, but we are off-line.") else: if iface.feeds: info("Nothing known about interface '%s' and off-line. Trying feeds only.", uri) else: warn("Nothing known about interface '%s', but we are in off-line mode " "(so not fetching).", uri) self._warned_offline = True elif not uri.startswith('/'): now = time.time() staleness = now - (iface.last_checked or 0) debug("Staleness for %s is %.2f hours", iface, staleness / 3600.0) if self.freshness > 0 and staleness > self.freshness: if iface.last_check_attempt and iface.last_check_attempt > now - FAILED_CHECK_DELAY: debug("Stale, but tried to check recently (%s) so not rechecking now.", time.ctime(iface.last_check_attempt)) else: debug("Adding %s to stale set", iface) self.stale_feeds.add(iface) #else: debug("Local interface, so not checking staleness.") return iface def begin_iface_download(self, interface, force = False): """Start downloading the interface, and add a callback to process it when done. If it is already being downloaded, do nothing.""" debug("begin_iface_download %s (force = %d)", interface, force) if interface.uri.startswith('/'): return debug("Need to download") dl = self.handler.get_download(interface.uri, force = force) if dl.on_success: # Possibly we should handle this better, but it's unlikely anyone will need # to use an interface as an icon or implementation as well, and some of the code # assumes it's OK keep asking for the same interface to be downloaded. debug("Already have a handler for %s; not adding another", interface) return def feed_downloaded(stream): pending = PendingFeed(interface.uri, stream) iface_cache.add_pending(pending) # This will trigger any required confirmations self.process_pending() dl.on_success.append(feed_downloaded) def begin_impl_download(self, impl, retrieval_method, force = False): """Start fetching impl, using retrieval_method. Each download started will call monitor_download.""" assert impl assert retrieval_method from zeroinstall.zerostore import manifest alg = impl.id.split('=', 1)[0] if alg not in manifest.algorithms: raise SafeException("Unknown digest algorithm '%s' for '%s' version %s" % (alg, impl.interface.get_name(), impl.get_version())) if isinstance(retrieval_method, DownloadSource): def archive_ready(stream): iface_cache.add_to_cache(retrieval_method, stream) self.begin_archive_download(retrieval_method, success_callback = archive_ready, force = force) elif isinstance(retrieval_method, Recipe): _Cook(self, impl.id, retrieval_method) else: raise Exception("Unknown download type for '%s'" % retrieval_method) def begin_archive_download(self, download_source, success_callback, force = False): """Start fetching an archive. You should normally call L{begin_impl_download} instead, since it handles other kinds of retrieval method too.""" from zeroinstall.zerostore import unpack mime_type = download_source.type if not mime_type: mime_type = unpack.type_from_url(download_source.url) if not mime_type: raise SafeException("No 'type' attribute on archive, and I can't guess from the name (%s)" % download_source.url) unpack.check_type_ok(mime_type) dl = self.handler.get_download(download_source.url, force = force) dl.expected_size = download_source.size + (download_source.start_offset or 0) dl.on_success.append(success_callback) return dl def begin_icon_download(self, interface, force = False): """Start downloading an icon for this interface. On success, add it to the icon cache. If the interface has no icon, do nothing.""" debug("begin_icon_download %s (force = %d)", interface, force) # Find a suitable icon to download for icon in interface.get_metadata(XMLNS_IFACE, 'icon'): type = icon.getAttribute('type') if type != 'image/png': debug('Skipping non-PNG icon') continue source = icon.getAttribute('href') if source: break warn('Missing "href" attribute on in %s', interface) else: info('No PNG icons found in %s', interface) return dl = self.handler.get_download(source, force = force) if dl.on_success: # Possibly we should handle this better, but it's unlikely anyone will need # to use an icon as an interface or implementation as well, and some of the code # may assume it's OK keep asking for the same icon to be downloaded. info("Already have a handler for %s; not adding another", source) return dl.on_success.append(lambda stream: self.store_icon(interface, stream)) def store_icon(self, interface, stream): """Called when an icon has been successfully downloaded. Subclasses may wish to wrap this to repaint the display.""" from zeroinstall.injector import basedir import shutil icons_cache = basedir.save_cache_path(config_site, 'interface_icons') icon_file = file(os.path.join(icons_cache, escape(interface.uri)), 'w') shutil.copyfileobj(stream, icon_file) def get_implementation_path(self, impl): """Return the local path of impl. @rtype: str @raise zeroinstall.zerostore.NotStored: if it needs to be added to the cache first.""" assert isinstance(impl, Implementation) if impl.id.startswith('/'): return impl.id return iface_cache.stores.lookup(impl.id) def get_implementation(self, interface): """Get the chosen implementation. @type interface: Interface @rtype: L{model.Implementation} @raise SafeException: if interface has not been fetched or no implementation could be chosen.""" assert isinstance(interface, Interface) if not interface.name and not interface.feeds: raise SafeException("We don't have enough information to " "run this program yet. " "Need to download:\n%s" % interface.uri) try: return self.implementation[interface] except KeyError, ex: if interface.implementations: offline = "" if self.network_use == network_offline: offline = "\nThis may be because 'Network Use' is set to Off-line." raise SafeException("No usable implementation found for '%s'.%s" % (interface.name, offline)) raise ex def walk_interfaces(self): """@deprecated: use L{implementation} instead""" return iter(self.implementation) def get_cached(self, impl): """Check whether an implementation is available locally. @type impl: model.Implementation @rtype: bool """ if isinstance(impl, DistributionImplementation): return impl.installed if impl.id.startswith('/'): return os.path.exists(impl.id) else: try: path = self.get_implementation_path(impl) assert path return True except: pass # OK return False def add_to_cache(self, source, data): """Wrapper for L{iface_cache.IfaceCache.add_to_cache}.""" iface_cache.add_to_cache(source, data) def get_uncached_implementations(self): """List all chosen implementations which aren't yet available locally. @rtype: [(str, model.Implementation)]""" uncached = [] for iface in self.implementation: impl = self.implementation[iface] assert impl if not self.get_cached(impl): uncached.append((iface, impl)) return uncached def refresh_all(self, force = True): """Start downloading all feeds for all selected interfaces. @param force: Whether to restart existing downloads.""" for x in self.implementation: self.begin_iface_download(x, force) for f in self.usable_feeds(x): feed_iface = iface_cache.get_interface(f.uri) self.begin_iface_download(feed_iface, force) def interface_changed(self, interface): """Callback used by L{iface_cache.IfaceCache.update_interface_from_network}.""" debug("interface_changed(%s): recalculating", interface) self.recalculate() def get_feed_targets(self, feed_iface_uri): """Return a list of Interfaces for which feed_iface can be a feed. This is used by B{0launch --feed}. @rtype: [model.Interface] @raise SafeException: If there are no known feeds.""" # TODO: what if it isn't cached yet? feed_iface = iface_cache.get_interface(feed_iface_uri) if not feed_iface.feed_for: if not feed_iface.name: raise SafeException("Can't get feed targets for '%s'; failed to load interface." % feed_iface_uri) raise SafeException("Missing element in '%s'; " "this interface can't be used as a feed." % feed_iface_uri) feed_targets = feed_iface.feed_for debug("Feed targets: %s", feed_targets) if not feed_iface.name: warn("Warning: unknown interface '%s'" % feed_iface_uri) return [iface_cache.get_interface(uri) for uri in feed_targets] def get_icon_path(self, iface): """Get an icon for this interface. If the icon is in the cache, use that. If not, start a download. If we already started a download (successful or not) do nothing. @return: The cached icon's path, or None if no icon is currently available. @rtype: str""" path = iface_cache.get_icon_path(iface) if path: return path if self.network_use == network_offline: info("No icon present for %s, but off-line so not downloading", iface) return None self.begin_icon_download(iface) return None def get_best_source(self, impl): """Return the best download source for this implementation. @rtype: L{model.RetrievalMethod}""" if impl.download_sources: return impl.download_sources[0] return None