Source code for goat.authorities

from django.conf import settings

from goat.authorities.util import *

import json


[docs]class ConceptSearchResult(object): def __init__(self, name='', identifier='', **extra): assert isinstance(name, unicode) assert isinstance(identifier, unicode) self.name = name self.identifier = identifier self.extra = extra @property def local_identifier(self): return self.extra.get('local_identifier', None) @property def identities(self): return self.extra.get('identities', None) @property def description(self): return self.extra.get('description', None) @property def concept_type(self): return self.extra.get('concept_type', None) @property def raw(self): return self.extra.get('raw', None)
def _get_method_params(cfg): return [prm.get('accept') for prm in cfg.get('response', {}).get('parameters', {})]
[docs]class AuthorityManager(object): """ Configuration-driven manager for authority services. Parameters ---------- config : str Name of an authority configuration in ``authorities``. Will look for ``{path}/{config}.json``. path : str Location of configurations. """ def __init__(self, configuration): self.configuration = json.loads(configuration) self.methods = {method.pop('name'):method for method in self.configuration.get("methods")} def _get_globs(self): return {'endpoint': self.configuration.get('endpoint', '')} def _get_nsmap(self, config): return { ns['prefix']: ns['namespace'] for ns in config.get('response', {}).get('namespaces', []) } def _get_method_config(self, name): if name not in self.methods: raise NotImplementedError('%s not defined in configuration' % name) return self.methods.get(name) def _generic(self, name): """ Build a method using the configuration identified by ``name``. Parameters ---------- name : str Must be the name of a method defined in the configuration. Returns ------- function """ config = self._get_method_config(name) response_type = config.get('response', {}).get('type', 'xml').lower() if response_type not in ['xml', 'json']: raise NotImplementedError('No parser for %s' % response_type) if response_type == 'xml': parse_raw = parse_raw_xml parse_path = parse_xml_path elif response_type == 'json': parse_raw = parse_raw_json parse_path = parse_json_path request_func = generate_request(config, self._get_globs()) def _call(*args, **kwargs): return parse_result(config.get('response'), parse_raw(request_func(*args, **kwargs)), parse_path, self._get_globs(), self._get_nsmap(config)) _call.parameters = _get_method_params(config) return _call def __getattr__(self, name): if name in self.methods: return self._generic(name) return super(AuthorityManager, self).__getattr__(name)
[docs] def accepts(self, method, *params): config = self._get_method_config(method) accepted = {p.get('accept', '') for p in config.get('parameters', [])} return all([param in accepted for param in params])
[docs] def get(self, identifier=None, local_identifier=None): """ Get a concept record from the configured authority. Although both ``identifier`` and ``local_identifier`` are declared as optional, it is a good idea to pass them both and let the configuration sort things out. Parameters ---------- identifier : str Used to populate the ``id`` parameter in the request. local_identifier : str Used to populate the ``local_id`` parameter in the request. Returns ------- dict """ _call = self._generic('get') if identifier and 'id' in _call.parameters: return _call(id=identifier) elif local_identifier and 'local_id' in _call.parameters: return _call(local_id=local_identifier)
[docs] def search(self, params): """ Search for concept records in the configured authority. Parameters ---------- params : kwargs Query parameters used to populate the search request. Returns ------- list """ return [ConceptSearchResult(**o) for o in self._generic('search')(**params)]