""" Parses an XML interface into a Python representation. """ # Copyright (C) 2006, Thomas Leonard # See the README file for details, or visit http://0install.net. import os import sys import shutil import time from logging import debug, warn, info from os.path import dirname from zeroinstall.injector import basedir, qdom, distro from zeroinstall.injector.namespaces import * from zeroinstall.injector.model import * from zeroinstall.injector import model from zeroinstall import version, SafeException class InvalidInterface(SafeException): """Raised when parsing an invalid interface.""" def __init__(self, message, ex = None): if ex: message += "\n\n(exact error: %s)" % ex SafeException.__init__(self, message) def _process_depends(item): # Note: also called from selections dep_iface = item.getAttribute('interface') if dep_iface is None: raise InvalidInterface("Missing 'interface' on ") dependency = InterfaceDependency(dep_iface, metadata = item.attrs) for e in item.childNodes: if e.uri != XMLNS_IFACE: continue if e.name == 'environment': mode = { None: EnvironmentBinding.PREPEND, 'prepend': EnvironmentBinding.PREPEND, 'append': EnvironmentBinding.APPEND, 'replace': EnvironmentBinding.REPLACE, }[e.getAttribute('mode')] binding = EnvironmentBinding(e.getAttribute('name'), insert = e.getAttribute('insert'), default = e.getAttribute('default'), mode = mode) if not binding.name: raise InvalidInterface("Missing 'name' in binding") if binding.insert is None: raise InvalidInterface("Missing 'insert' in binding") dependency.bindings.append(binding) elif e.name == 'version': dependency.restrictions.append( Restriction(not_before = parse_version(e.getAttribute('not-before')), before = parse_version(e.getAttribute('before')))) return dependency def update_from_cache(interface): """Read a cached interface and any native feeds or user overrides. @param interface: the interface object to update @type interface: L{model.Interface} @return: True if cached version and user overrides loaded OK. False if upstream not cached. Local interfaces (starting with /) are always considered to be cached, although they are not actually stored in the cache. @rtype: bool""" interface.reset() if interface.uri.startswith('/'): debug("Loading local interface file '%s'", interface.uri) update(interface, interface.uri, local = True) interface.last_modified = int(os.stat(interface.uri).st_mtime) cached = True else: cached = basedir.load_first_cache(config_site, 'interfaces', escape(interface.uri)) if cached: debug("Loading cached information for %s from %s", interface, cached) update(interface, cached) # Add the distribution package manager's version, if any path = basedir.load_first_data(config_site, 'native_feeds', model._pretty_escape(interface.uri)) if path: # Resolve any symlinks info("Adding native packager feed '%s'", path) interface.feeds.append(Feed(os.path.realpath(path), None, False)) update_user_overrides(interface) # Special case: add our fall-back local copy of the injector as a feed if interface.uri == injector_gui_uri: local_gui = os.path.join(os.path.abspath(dirname(dirname(__file__))), '0launch-gui', 'ZeroInstall-GUI.xml') interface.feeds.append(Feed(local_gui, None, False)) return bool(cached) def update_user_overrides(interface): """Update an interface with user-supplied information. @param interface: the interface object to update @type interface: L{model.Interface}""" user = basedir.load_first_config(config_site, config_prog, 'user_overrides', escape(interface.uri)) if not user: return root = qdom.parse(file(user)) last_checked = root.getAttribute('last-checked') if last_checked: interface.last_checked = int(last_checked) last_check_attempt = root.getAttribute('last-check-attempt') if last_check_attempt: interface.last_check_attempt = int(last_check_attempt) stability_policy = root.getAttribute('stability-policy') if stability_policy: interface.set_stability_policy(stability_levels[str(stability_policy)]) for item in root.childNodes: if item.uri != XMLNS_IFACE: continue if item.name == 'implementation': id = item.getAttribute('id') assert id is not None if not (id.startswith('/') or id.startswith('.') or id.startswith('package:')): assert '=' in id impl = interface.implementations.get(id, None) if not impl: debug("Ignoring user-override for unknown implementation %s in %s", id, interface) continue user_stability = item.getAttribute('user-stability') if user_stability: impl.user_stability = stability_levels[str(user_stability)] elif item.name == 'feed': feed_src = item.getAttribute('src') if not feed_src: raise InvalidInterface('Missing "src" attribute in ') interface.feeds.append(Feed(feed_src, item.getAttribute('arch'), True)) def check_readable(interface_uri, source): """Test whether an interface file is valid. @param interface_uri: the interface's URI @type interface_uri: str @param source: the name of the file to test @type source: str @return: the modification time in src (usually just the mtime of the file) @rtype: int @raise InvalidInterface: If the source's syntax is incorrect, """ tmp = Interface(interface_uri) try: update(tmp, source) except InvalidInterface, ex: info("Error loading interface:\n" "Interface URI: %s\n" "Local file: %s\n%s" % (interface_uri, source, ex)) raise InvalidInterface("Error loading feed '%s':\n\n%s" % (interface_uri, ex)) return tmp.last_modified def _parse_time(t): try: return long(t) except Exception, ex: raise InvalidInterface("Date '%s' not in correct format (should be integer number " "of seconds since Unix epoch)\n%s" % (t, ex)) def _check_canonical_name(interface, root): "Ensure the uri= attribute in the interface file matches the interface we are trying to load" canonical_name = root.getAttribute('uri') if not canonical_name: raise InvalidInterface(" uri attribute missing") if canonical_name != interface.uri: raise InvalidInterface("Incorrect URL used for feed.\n\n" "%s is given in the feed, but\n" "%s was requested" % (canonical_name, interface.uri)) def _get_long(elem, attr_name): val = elem.getAttribute(attr_name) if val is not None: try: val = long(val) except ValueError, ex: raise SafeException("Invalid value for integer attribute '%s': %s" % (attr_name, val)) return val def _merge_attrs(attrs, item): """Add each attribute of item to a copy of attrs and return the copy. @type attrs: {str: str} @type item: L{qdom.Element} @rtype: {str: str} """ new = attrs.copy() for a in item.attrs: new[str(a)] = item.attrs[a] return new def update(interface, source, local = False): """Read in information about an interface. @param interface: the interface object to update @type interface: L{model.Interface} @param source: the name of the file to read @type source: str @param local: use file's mtime for last-modified, and uri attribute is ignored @raise InvalidInterface: if the source's syntax is incorrect @see: L{update_from_cache}, which calls this""" assert isinstance(interface, Interface) try: root = qdom.parse(file(source)) except Exception, ex: raise InvalidInterface("Invalid XML", ex) if not local: _check_canonical_name(interface, root) time_str = root.getAttribute('last-modified') if time_str: # Old style cached items use an attribute interface.last_modified = _parse_time(time_str) else: # New style items have the mtime in the signature, # but for quick access we use the mtime of the file interface.last_modified = int(os.stat(source).st_mtime) main = root.getAttribute('main') if main: interface.main = main min_injector_version = root.getAttribute('min-injector-version') if min_injector_version: try: min_ints = map(int, min_injector_version.split('.')) except ValueError, ex: raise InvalidInterface("Bad version number '%s'" % min_injector_version) injector_version = map(int, version.split('.')) if min_ints > injector_version: raise InvalidInterface("This interface requires version %s or later of " "the Zero Install injector, but I am only version %s. " "You can get a newer version from http://0install.net" % (min_injector_version, version)) if local: iface_dir = os.path.dirname(source) else: iface_dir = None # Can't have relative paths for x in root.childNodes: if x.uri != XMLNS_IFACE: interface.add_metadata(x) continue if x.name == 'name': interface.name = interface.name or x.content elif x.name == 'description': interface.description = interface.description or x.content elif x.name == 'summary': interface.summary = interface.summary or x.content elif x.name == 'feed-for': feed_iface = x.getAttribute('interface') if not feed_iface: raise InvalidInterface('Missing "interface" attribute in ') interface.feed_for[feed_iface] = True # Bug report from a Debian/stable user that --feed gets the wrong value. # Can't reproduce (even in a Debian/stable chroot), but add some logging here # in case it happens again. debug("Is feed-for %s", feed_iface) elif x.name == 'feed': feed_src = x.getAttribute('src') if not feed_src: raise InvalidInterface('Missing "src" attribute in ') if feed_src.startswith('http:') or local: interface.feeds.append(Feed(feed_src, x.getAttribute('arch'), False)) else: raise InvalidInterface("Invalid feed URL '%s'" % feed_src) else: interface.add_metadata(x) def process_group(group, group_attrs, base_depends): for item in group.childNodes: if item.uri != XMLNS_IFACE: continue depends = base_depends[:] item_attrs = _merge_attrs(group_attrs, item) for child in item.childNodes: if child.uri != XMLNS_IFACE: continue if child.name == 'requires': dep = _process_depends(child) depends.append(dep) if item.name == 'group': process_group(item, item_attrs, depends) elif item.name == 'implementation': process_impl(item, item_attrs, depends) elif item.name == 'package-implementation': process_native_impl(item, item_attrs, depends) def process_impl(item, item_attrs, depends): id = item.getAttribute('id') if id is None: raise InvalidInterface("Missing 'id' attribute on %s" % item) if local and (id.startswith('/') or id.startswith('.')): impl = interface.get_impl(os.path.abspath(os.path.join(iface_dir, id))) else: if '=' not in id: raise InvalidInterface('Invalid "id"; form is "alg=value" (got "%s")' % id) alg, sha1 = id.split('=') try: long(sha1, 16) except Exception, ex: raise InvalidInterface('Bad SHA1 attribute: %s' % ex) impl = interface.get_impl(id) impl.metadata = item_attrs try: version = item_attrs['version'] version_mod = item_attrs.get('version-modifier', None) if version_mod: version += version_mod except KeyError: raise InvalidInterface("Missing version attribute") impl.version = parse_version(version) item_main = item_attrs.get('main', None) if item_main and item_main.startswith('/'): raise InvalidInterface("'main' attribute must be relative, but '%s' starts with '/'!" % item_main) impl.main = item_main impl.released = item_attrs.get('released', None) size = item.getAttribute('size') if size: impl.size = long(size) impl.arch = item_attrs.get('arch', None) try: stability = stability_levels[str(item_attrs['stability'])] except KeyError: stab = str(item_attrs['stability']) if stab != stab.lower(): raise InvalidInterface('Stability "%s" invalid - use lower case!' % item_attrs.stability) raise InvalidInterface('Stability "%s" invalid' % item_attrs['stability']) if stability >= preferred: raise InvalidInterface("Upstream can't set stability to preferred!") impl.upstream_stability = stability impl.requires = depends for elem in item.childNodes: if elem.uri != XMLNS_IFACE: continue if elem.name == 'archive': url = elem.getAttribute('href') if not url: raise InvalidInterface("Missing href attribute on ") size = elem.getAttribute('size') if not size: raise InvalidInterface("Missing size attribute on ") impl.add_download_source(url = url, size = long(size), extract = elem.getAttribute('extract'), start_offset = _get_long(elem, 'start-offset'), type = elem.getAttribute('type')) elif elem.name == 'recipe': recipe = Recipe() for recipe_step in elem.childNodes: if recipe_step.uri == XMLNS_IFACE and recipe_step.name == 'archive': url = recipe_step.getAttribute('href') if not url: raise InvalidInterface("Missing href attribute on ") size = recipe_step.getAttribute('size') if not size: raise InvalidInterface("Missing size attribute on ") recipe.steps.append(DownloadSource(None, url = url, size = long(size), extract = recipe_step.getAttribute('extract'), start_offset = _get_long(recipe_step, 'start-offset'), type = recipe_step.getAttribute('type'))) else: info("Unknown step '%s' in recipe; skipping recipe", recipe_step.name) break else: impl.download_sources.append(recipe) def process_native_impl(item, item_attrs, depends): package = item_attrs.get('package', None) if package is None: raise InvalidInterface("Missing 'package' attribute on %s" % item) def factory(id): assert id.startswith('package:') impl = interface.get_impl(id) impl.metadata = item_attrs item_main = item_attrs.get('main', None) if item_main and not item_main.startswith('/'): raise InvalidInterface("'main' attribute must be absolute, but '%s' doesn't start with '/'!" % item_main) impl.main = item_main impl.upstream_stability = packaged impl.requires = depends return impl distro.host_distribution.get_package_info(package, factory) process_group(root, {'stability': 'testing', 'main' : root.getAttribute('main') or None, }, [])