"""
Helper functions for parsing authority descriptions.
"""
import re, requests, json, jsonpickle
import lxml.etree as ET
from pprint import pprint
[docs]class JSONData(dict):
def __init__(self, obj={}):
for key, value in obj.iteritems():
if type(value) is list:
value = JSONArray(value)
elif type(value) is dict:
value = JSONData(value)
self[key] = value
[docs] def get(self, key, *args, **kwargs):
return super(JSONData, self).get(key)
[docs]class JSONArray(list):
"""
Adds ``get`` support to a list.
"""
def __init__(self, obj=[]):
for item in obj:
if type(item) is dict:
item = JSONData(item)
self.append(item)
[docs] def get(self, key, *args, **kwargs):
"""
Return the value of ``key`` in the first object in list.
"""
return self[0].get(key) if len(self) > 0 else None
[docs] def get_list(self, key=None, *args, **kwargs):
"""
Return the value of ``key`` in each object in list.
"""
if key:
return [obj.get(key) for obj in self if key in obj]
return [obj for obj in self]
[docs]def is_multiple(tag):
"""
Detect the multi-value flag (``*``) in a path part (``tag``).
Parameters
----------
tag : str
Returns
-------
tuple
tag name (str), multiple (bool)
"""
if not tag:
return None, None
if tag == '*':
return None, '*'
return re.match(ur'([^\*]+)(\*)?', tag).groups()
[docs]def get_recursive_pathfinder(nsmap={}, method='find', mult_method='findall'):
"""
Generate a recursive function that follows the path in ``tags``, starting
at ``elem``.
"""
def _get(elem, tags):
"""
Parameters
----------
elem : :class:`lxml.etree.Element`
tags : list
"""
if not tags: # Bottomed out; recursion stops.
return elem
this_tag, multiple = is_multiple(tags.pop())
base = _get(elem, tags)
if type(base) is list:
_apply = lambda b, t, meth: [getattr(c, meth)(t, nsmap) for c in b]
else:
_apply = lambda b, t, meth: getattr(b, meth)(t, nsmap)
if multiple:
return _apply(base, this_tag, mult_method)
return _apply(base, this_tag, method)
return _get
def _to_unicode(e):
if isinstance(e, unicode):
return e
return e.decode('utf-8')
_etree_attribute_getter = lambda e, attr: _to_unicode(getattr(e, 'attrib', {}).get(attr, u'').strip())#.encode('utf-8')
_etree_cdata_getter = lambda e: _to_unicode(getattr(getattr(e, 'text', u''), 'strip', lambda: u'')())#.encode('utf-8')
_json_content_getter = lambda e: e
[docs]def content_picker_factory(env, content_getter=_etree_cdata_getter, attrib_getter=_etree_attribute_getter):
"""
Generates a function that retrives the CDATA content or attribute value of
an element.
Parameters
----------
env : dict
Returns
-------
function
"""
attribute, sep = env.get('attribute', False), env.get('sep', None)
_separator = lambda value: [v.strip() for v in value.split(sep)] if sep else value
if attribute:
return lambda elem: _separator(attrib_getter(elem, attribute[1:-1]))
return lambda elem: _separator(content_getter(elem))
[docs]def passthrough_picker_factory(env, *args, **kwargs):
"""
Generates a function that simply returns a passed
:class:`lxml.etree.Element`\.
Parameters
----------
env : dict
Returns
-------
function
"""
return lambda e: e
[docs]def decompose_path(path_string):
"""
Split a path string into its constituent parts.
Parameters
----------
path_string : str
Returns
-------
path : list
attribute : str or None
"""
if '|' in path_string:
try:
path_string, sep = path_string.split('|')
except ValueError:
raise ValueError("Malformed path: only one separator reference"
" (|) allowed.")
else:
sep = None
path, attribute = re.match(ur'([^\[]+)(\[.+\])?', path_string).groups()
if '[' in path and not attribute:
raise ValueError("Malformed path: attribute references must come at"
" the very end of the path.")
path = path.split('/')
return path, attribute, sep
def _parse_path(path_string, nsmap={}, picker_factory={},
content_getter=_etree_cdata_getter,
attrib_getter=_etree_attribute_getter,
get_method='find', mult_method='findall'):
"""
Generate a function that will retrieve data of interest from an arbitrary
object. This combines common logic from public parser functions.
Parameters
----------
path_string : str
See docs for how this should be written. TODO: write the docs.
nsmap: dict
picker_factory : function
get_method : str
list_method : str
Returns
-------
function
"""
path, attribute, sep = decompose_path(path_string)
_get = get_recursive_pathfinder(nsmap=nsmap, method=get_method,
mult_method=mult_method)
_picker = picker_factory(locals(), content_getter=content_getter)
def _apply(obj): # No empty values.
value = _picker(obj)
if value and (not type(value) is list or value[0]):
return value
def _call(elem):
base = _get(elem, path)
if type(base) is list:
return [_apply(child) for child in base]
return _apply(base)
return _call
[docs]def parse_json_path(path_string, nsmap={}, picker_factory=content_picker_factory):
"""
Generate a function that will retrieve data of interest from a
:class:`.JSONData` object.
Parameters
----------
path_string : str
See docs for how this should be written. TODO: write the docs.
nsmap: dict
Not used.
picker_factory : function
Returns
-------
function
"""
return _parse_path(path_string, nsmap, picker_factory, _json_content_getter,
_json_content_getter, 'get', 'get_list')
[docs]def parse_xml_path(path_string, nsmap={}, picker_factory=content_picker_factory):
"""
Generate a function that will retrieve data of interest from an
:class:`lxml.etree.Element`\.
Parameters
----------
path_string : str
See docs for how this should be written. TODO: write the docs.
nsmap: dict
See the ``lxml.etree`` docs.
picker_factory : function
Returns
-------
function
"""
return _parse_path(path_string, nsmap, picker_factory)
[docs]def generate_request(config, glob={}):
"""
Generate a function that performs an HTTP request based on the configuration
in ``config``.
Parameters
----------
config : dict
glob : dict
Returns
-------
function
Expects keyword arguments defined in the configuration. If provided,
``headers`` will be pulled out and passed as headers in the request.
"""
try:
path_partial = config['path']
except KeyError:
raise ValueError("Malformed configuration: no path specified.")
method = config.get("method", "GET") # GET by default.
# Maps accept -> send parameter names.
parameters = {param['accept']: param['send']
for param in config.get("parameters", [])}
required = {param['accept'] for param in config.get("parameters", [])
if param.get('required', False)}
defaults = {param['accept']: param['default'] for param in config.get("parameters", []) if 'default' in param}
format_keys = re.findall(ur'\{([^\}]+)\}', path_partial)
fmt = {k: v for k, v in glob.iteritems() if k in format_keys}
def _get_path(extra={}):
fmt.update(extra)
return path_partial.format(**fmt)
def _call(**params):
"""
Perform the configured request.
Parameters
----------
params : kwargs
Returns
-------
"""
headers = params.pop('headers', {})
for param in required:
if param not in params:
raise TypeError('expected parameter %s' % param)
# Relabel accepts -> send parameter names.
params = {parameters.get(k):v for k, v in params.iteritems()
if k in parameters}
extra = {key: params.pop(key, defaults.pop(key, ''))
for key in format_keys
if key not in fmt} # Don't overwrite.
if method == 'GET':
request_method = requests.get
payload = {'params': params, 'headers': headers}
elif method == 'POST':
request_method = requests.post
payload = {'data': params, 'headers': headers}
return request_method(_get_path(extra), **payload).content
return _call
[docs]def parse_result(config, data, path_parser=parse_xml_path, glob={}, nsmap={}):
"""
Extract data from an :class:`lxml.etree.Element` using a configuration
schema.
Parameters
----------
config : dict
data : :class:`lxml.etree.Element`
path_parser : function
glob : dict
nsmap : dict
Returns
-------
list
"""
base_path = config.get('path', None)
_, multiple = is_multiple(base_path)
if base_path:
_parser = path_parser(base_path, nsmap=nsmap,
picker_factory=passthrough_picker_factory)
base_elems = _parser(data)
else:
base_elems = [data]
data = []
base_elems = [base_elems] if not type(base_elems) is list else base_elems
for base_elem in base_elems:
# Serialized raw data is preserved.
parsed_data = {'raw': jsonpickle.dumps(base_elem)}
# Each parameter is parsed separately.
for parameter in config.get('parameters'):
name = parameter.get('name')
value = path_parser(parameter.get('path'), nsmap)(base_elem)
# Templated parameters use response data and globals to generate
# values (e.g. URI from ID).
template = parameter.get('template')
if template:
# Isolate only the globals needed to render the template.
format_keys = re.findall(ur'\{([^\}]+)\}', template)
fmt = {k: v for k, v in glob.iteritems() if k in format_keys}
if name in format_keys: # Probably this is always true...
fmt[name] = value
value = template.format(**fmt)
parsed_data[name] = value
data.append(parsed_data)
if not multiple:
assert len(data) == 1
return data[0]
return data
# This isn't particularly special at the moment, but makes it easier to swap
# out parsers later, or add additional logic.
[docs]def parse_raw_xml(raw):
"""
Parse raw XML response content.
Parameters
----------
raw : unicode
Returns
-------
:class:`lxml.etree.Element`
"""
# if type(raw) is str:
# raw = raw.decode('utf-8')
return ET.fromstring(raw)
[docs]def parse_raw_json(raw):
"""
Parse raw JSON response content.
Parameters
----------
raw : unicode
Returns
-------
:class:`lxml.etree.Element`
"""
if type(raw) is str:
raw = raw.decode('utf-8')
return JSONData(json.loads(raw))