# Copyright (C) 2008 One Laptop Per Child Association, Inc.
# Licensed under the terms of the GNU GPL v2 or later; see COPYING for details.
# written by Douglas Bagnall <douglas@paradise.net.nz>
"""Functions for processing XO activities, either for indexing and
presentaton to the laptops, or for diagnostics.
import os, sys, shutil
import zipfile
import re
from cStringIO import StringIO
#import traceback
import syslog
from ConfigParser import SafeConfigParser
# we no longer really have a default in that it is set in the conf file
# we assume that we have a lang_template for the default language
TEMPLATE_DIR = '/library/xs-activity-server/lang_templates'
# how many versions before the latest are worth having around.
#print to stderr, rathe than syslog?
REQUIRED_TAGS = ('bundle_id', 'activity_version', 'host_version', 'name', 'license')
OPTIONAL_TAGS = ('show_launcher', 'exec', 'mime_types', 'icon')
#XXX need either icon or show_launcher=no
def log(msg, level=syslog.LOG_NOTICE):
print >> sys.stderr, msg
syslog.openlog( 'xs-activity-server', 0, syslog.LOG_USER )
syslog.syslog(level, msg)
class BundleError(Exception):
class Bundle(object):
def __init__(self, bundle):
self.linfo = {}
self.zf = zipfile.ZipFile(bundle)
# The activity path will be 'Something.activity/activity/activity.info'
for p in self.zf.namelist():
if p.endswith(self.INFO_PATH):
self.raw_data = read_info_file(self.zf, p, self.INFO_SECTION)
# the file name itself is needed for the URL
self.url = os.path.basename(bundle)
self.mtime = os.stat(bundle).st_mtime
self.name = self.raw_data.get('name')
self.license = self.raw_data.get('license', None)
# child ctor should now call
# _set_bundle_id
# _set_version
# _set_description
def _set_bundle_id(self, id):
if id is None:
raise BundleError("bad bundle: No bundle ID")
self.bundle_id = id
if self.name is None:
self.name = id
def _set_version(self, version):
self.version = version
def _set_description(self, description):
self.description = description
def __cmp__(self, other):
"""Alphabetical sort (locale dependant of course)"""
if self.bundle_id == other.bundle_id:
return cmp(self.version, other.version)
return cmp(self.name, other.name)
def set_older_versions(self, versions):
"""Versions should be a list of (version number, version tuples)"""
self.older_versions = ', '.join('<a href="%s">%s</a>' % (v.url, v.version) for v in versions)
def to_html(self, locale, template=None):
"""Fill in the template with data approriate for the locale."""
if template is None:
template = read_template('activity', locale)
d = {'older_versions': self.older_versions,
'bundle_id': self.bundle_id,
'activity_version': self.version,
'bundle_url': self.url,
'name': self.name,
'description': self.description,
d.update(self.linfo.get(locale, {}))
if d['older_versions']:
d['show_older_versions'] = 'inline'
d['show_older_versions'] = 'none'
return template % d
def get_name(self, locale=None):
return self.name
class Content(Bundle):
INFO_PATH = "library/library.info"
INFO_SECTION = "Library"
def __init__(self, bundle):
super(Content, self).__init__(bundle)
d = self.raw_data
# bundle_id is often missing; service name is used instead.
self._set_bundle_id(d.get('global_name', None))
self._set_version(d.get('library_version', 1))
self._set_description(d.get('long_name', ''))
def debug(self, force_recheck=False):
# FIXME: implement debug checking for content bundles
return {}
class Activity(Bundle):
INFO_PATH = "activity/activity.info"
INFO_SECTION = "Activity"
#Activities appear to be looser than RFC3066, using e.g. _ in place of -.
linfo_re = re.compile(r'/locale/([A-Za-z]+[\w-]*)/activity.linfo$')
def __init__(self, bundle):
"""Takes a zipped .xo bundle name, returns a dictionary of its
activity info. Can raise a variety of exceptions, all of
which should indicate the bundle is invalid."""
super(Activity, self).__init__(bundle)
# The locale info will be Something.activity/locale/xx_XX/activity.linfo
for p in self.zf.namelist():
linfo = self.linfo_re.search(p)
if linfo:
lang = canonicalise(linfo.group(1))
self.linfo[lang] = read_info_file(self.zf, p, self.INFO_SECTION)
# Unfortunately the dictionary lacks some information, and
# stores other bits in inconsistent ways.
d = self.raw_data
# bundle_id is often missing; service name is used instead.
self._set_bundle_id(d.get('bundle_id', d.get('service_name')))
self._set_version(d.get('activity_version', 1))
self._set_description(d.get('description', ''))
def debug(self, force_recheck=False):
"""Make a copy of the raw data with added bits so we can work
out what is going on. This is useful for diagnosing problems
with odd activities and composing tut-tut-ing emails to their
Not used in production."""
if hasattr(self, '_debug_data') and not force_recheck:
return self._debug_data
d = self.raw_data.copy()
correct_forms = {
'name': str.upper,
'activity_version': str.isdigit,
'host_version': str.isdigit,
'bundle_id': re.compile(r'^[\w.]+$').match,
'service_name': re.compile(r'^[\w.]+$').match,
'icon': re.compile(r'^[\S]+$').match,
'exec': str.upper,
'mime_types': re.compile(r'^([\w.+-]+/[\w.+-]+;?)*$').match,
'update_url': re.compile(r'^http://([\w-]+\.?)+(:\d+)?(/[\w~%.-]+)*$').match,
#'update_url': re.compile(r'^$').match,
'show_launcher': re.compile(r'^(yes)|(no)$').match,
'class': re.compile(r'^(\w+.?)+$').match,
'license': str.upper,
#'license': re.compile(r'^GPLv[23]\+?$').match,
for k, v in d.items():
if k in correct_forms:
f = correct_forms.get(k, len)
if not f(v):
d['BAD ' + k] = v
rcount = 0
if k not in d:
d['LACKS %s' % k] = True
rcount += 1
d['MISSING KEYS'] = rcount
if t not in d:
d['NO ' + t] = True
if not 'icon' in d and d.get('show_launcher') != 'no':
d['NO icon AND show_launcher'] = True
self._debug_data = d
return d
def get_name(self, locale):
"""Return the best guess at a name for the locale."""
for loc in locale_search_path(locale):
if loc in self.linfo and 'name' in self.linfo[loc]:
return self.linfo[loc]['name']
return super(Activity, self).get_name()
def check_all_bundles(directory, show_all_bundles=False):
"""A verbose debug function."""
all_bundles = []
unique_bundles = {}
counts = {}
# watch for these tags and print out the lists
bad_contents = {}
all_linfo = {}
unique_linfo = {}
linfo_keys = {}
log('Checking all activities in %s\n' % directory)
for f in os.listdir(directory):
if not f.endswith('.xo') and not f.endswith('.xol'):
if f.endswith('.xo'):
bundle = Activity(os.path.join(directory, f))
bundle = Content(os.path.join(directory, f))
except Exception, e:
log("IRREDEEMABLE bundle %-25s (Error: %s)" % (f, e), syslog.LOG_WARNING)
#Clump together bundles of the same ID
x = unique_bundles.setdefault(bundle.bundle_id, [])
if not show_all_bundles:
#only show the newest one of each set.
bundles = []
for versions in unique_bundles.values():
bundles = all_bundles
licenses = {}
for bundle in bundles:
bid = bundle.bundle_id
for k, v in bundle.debug().iteritems():
counts[k] = counts.get(k, 0) + 1
if k.startswith('BAD '):
bc = bad_contents.setdefault(k, {})
bc[bid] = v
for k, v in bundle.linfo.iteritems():
linfo_l = all_linfo.setdefault(k, [])
for x in v:
linfo_keys[x] = linfo_keys.get(x, 0) + 1
if v['name'] != bundle.name:
linfo_l = unique_linfo.setdefault(k, [])
if bundle.license:
lic = licenses.setdefault(bundle.license, [])
citems = counts.items()
rare_keys = [k for k, v in citems if v < 10]
lack_counts = dict((k, v) for k, v in citems if k.startswith('LACKS '))
bad_counts = dict((k, v) for k, v in citems if k.startswith('BAD '))
no_counts = dict((k, v) for k, v in citems if k.startswith('NO '))
tag_counts = dict((k, v) for k, v in citems if k not in lack_counts and
k not in bad_counts and k not in no_counts and k != 'MISSING KEYS')
# flag whether the tag is needed, ok, or not
tag_quality = dict((k, '*') for k in REQUIRED_TAGS)
tag_quality.update((k, '+') for k in OPTIONAL_TAGS)
linfo_counts = dict((k, len(v)) for k, v in all_linfo.iteritems())
linfo_uniq_counts = dict((k, len(v)) for k, v in unique_linfo.iteritems())
log('\nFound: %s bundles\n %s unique bundles' % (len(all_bundles), len(unique_bundles)))
for d, name, d2 in [(tag_counts, '\nattribute counts:', tag_quality),
(lack_counts, '\nmissing required keys:', {}),
(no_counts, '\nunused optional keys:', {}),
(bad_counts, '\nill-formed values:', {}),
(linfo_counts, '\nlinfo counts: total localised', linfo_uniq_counts),
(linfo_keys, '\nlinfo keys:', {})]:
counts_reversed = [(v, k) for (k, v) in d.iteritems()]
for (k, v) in counts_reversed:
log("%-25s %4s %4s" % (v, k, d2.get(v, '')))
log("\nRare keys:")
for k in rare_keys:
if k.startswith('BAD '):
for b in bundles:
v = b.debug().get(k)
if v:
log(' %-25s %s' % (b.bundle_id, v))
log("\nInteresting contents:")
for k, v in bad_contents.iteritems():
for x in v.iteritems():
log(' %s: %s' % x)
log("\nInteresting linfo:")
for k in ('pseudo',):
for a in all_linfo[k]:
if a in unique_linfo.get(k, []):
log(' * %s (%s vs. %s)' % (a.bundle_id, a.name, a.linfo[k]['name']))
log(' %s (%s)' % (a.bundle_id, a.name))
for lic, v in licenses.iteritems():
log("%-20s %s" %(repr(lic), len(v)))
log("\nRare licenses:")
for lic, v in licenses.iteritems():
if len(v) < 3:
log(' %s' % lic)
for x in v:
log(" %s" %(x))
log("\nAlmost valid activities:")
for b in bundles:
d = b.debug()
if d['MISSING KEYS'] == 1:
missing = ', '.join(x for x in d if x.startswith('LACKS'))
bad_values = ', '. join(x for x in d if x.startswith('BAD'))
log("%-20s %s %s" %(b.name, missing, bad_values))
log("\nValid activities (maybe):")
for b in bundles:
d = b.debug()
bid = b.bundle_id
if (d['MISSING KEYS'] == 0 and
bid not in bad_contents['BAD mime_types']):
log("%-20s - %s" %(b.name, bid))
def read_info_file(zipfile, path, section):
"""Return a dictionary matching the contents of the config file at
path in zipfile"""
cp = SafeConfigParser()
info = StringIO(zipfile.read(path))
return dict(cp.items(section))
def canonicalise(lang):
"""Make all equivalent language strings the same.
>>> canonicalise('Zh-cN')
>>> canonicalise('zh_CN')
lang = lang.replace('_', '-').upper()
bits = lang.split('-', 1)
bits[0] = bits[0].lower()
return '-'.join(bits)
def locale_search_path(locale):
"""Find a series of sensible locales to try, including
DEFAULT_LANG. For example 'zh-CN' would become ('zh-CN', 'zh',
#XXX might be better to be storing locale as tuple
if '-' in locale:
return (locale, locale.split('-')[0], DEFAULT_LANG)
return (locale, DEFAULT_LANG)
def read_metadata(bundle_dir):
"""Attempt to read data in a metadata file. Raises expected
exceptions if the metadata file isn't readable. The file should
look something like this:
description = Succinct description of this activity.
description = Succinct description of this activity.
web_icon = develop.png
md_files = [os.path.join(bundle_dir, x)
for x in os.listdir(bundle_dir) if x.endswith('.info')]
cp = SafeConfigParser()
metadata = {}
for section in cp.sections():
metadata[section] = dict(x for x in cp.items(section))
return metadata
def htmlise_bundles(bundle_dir, dest_html):
"""Makes a nice html manifest for the bundles in a directory. The
manifest only shows the newest version of each bundle.
#so, we collect up a dictionary of lists, then sort each list on
#the version number to find the newest.
bundles = [os.path.join(bundle_dir, x)
for x in os.listdir(bundle_dir) if x.endswith('.xo') or x.endswith('.xol')]
metadata = read_metadata(bundle_dir)
except Exception, e:
log("had trouble reading metadata: %s" % e)
metadata = {}
all_bundles = {}
for filename in bundles:
if filename.endswith('.xo'):
bundle = Activity(filename)
bundle = Content(filename)
x = all_bundles.setdefault(bundle.bundle_id, [])
x.append((bundle.mtime, bundle))
except Exception, e:
log("Couldn't find good activity/library info in %s (Error: %s)" % (filename, e))
newest = []
# create an index for each language that has a template
# but track any locales in bundles in case we do not have templates for them
locales = [os.path.join(o) for o in os.listdir(TEMPLATE_DIR) if os.path.isdir(os.path.join(TEMPLATE_DIR,o))]
locales_found = set ()
for versions in all_bundles.values():
versions = [x[1] for x in sorted(versions)]
# end of list is the newest; beginning of list might need deleting
latest = versions.pop()
goners = versions[:-KEEP_OLD_VERSIONS]
keepers = versions[-KEEP_OLD_VERSIONS:]
for v in goners:
fn = os.path.join(bundle_dir, v.url)
if latest.bundle_id in metadata:
# we have extra metadata with which to fill out details
# presumably this is mainly human-oriented description.
d = metadata[latest.bundle_id]
for k in ('description', 'name'):
if k in d:
setattr(latest, k, d[k])
log('found locales: %s' % locales)
# assume locales is not empty as we have at least the default language
for locale in locales:
make_html(newest, locale, '%s.%s' % (dest_html, locale))
except Exception, e:
log("Couldn't make page for %s (Error: %s)" % (locale, e), syslog.LOG_WARNING)
# make_varfile(locales, dest_html)- have switched to multiviews, so var not needed
def make_varfile(locales, dest_html):
f = open(dest_html + '.var', 'w')
index = os.path.basename(dest_html)
f.write('URI: %s\n\n' % index)
for locale in locales:
f.write('URI: %s.%s\n' % (index, locale))
f.write('Content-type: text/html; charset=utf-8\n')
f.write('Content-language: %s\n\n' % locale)
# now the default, slightly higher qs
f.write('URI: %s.DEFAULT\n' % index)
f.write('Content-type: text/html; charset=utf-8\n')
f.write('Content-language: en\n\n')
def read_template(name, locale):
"""Try to read the locale's template, falling back to the
#also try containing locales, eg 'zh' for 'zh-CN'
for x in locale_search_path(locale):
f = open(os.path.join(TEMPLATE_DIR, x, name))
except (OSError, IOError), e:
s = f.read()
return s
def make_html(bundles, locale, filename):
"""Write a microformated index for the activities in the appropriate language,
and save it to filename."""
page_tmpl = read_template('page', locale)
act_tmpl = read_template('activity', locale)
#bundles.sort() won't cut it.
schwartzian = [ (x.get_name(locale), x.to_html(locale, act_tmpl)) for x in bundles ]
s = page_tmpl % {'activities': '\n'.join(x[1] for x in schwartzian)}
if os.path.exists(filename):
shutil.move(filename, filename + '~')
f = open(filename, 'w')