Source code for repository.models

from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.utils.safestring import SafeText

import json, datetime, requests, copy, xmltodict
from urlparse import urljoin
from string import Formatter
from uuid import uuid4


[docs]class FieldValue(object): def __init__(self, name, value, value_type): self.name = name self.value = value self.value_type = value_type self.render_value = getattr(self, '_render_%s' % value_type) def _render_bool(self): if self.value: return SafeText(u'<span class="glyphicon glyphicon-ok"></span>') return SafeText(u'<span class="glyphicon glyphicon-remove"></span>') def _render_text(self): return self.value def _render_int(self): return self.value def _render_float(self): return self.value def _render_url(self): return SafeText(u'<span class="text-warning">%s</span>' % self.value) def _render_datetime(self): # TODO: this should actually implement datetime formatting. return self.value
[docs] def render(self): return SafeText(u'<dt>%s</dt><dd>%s</dd>' % (self.name, self.render_value()))
def __str__(self): return self.render() def __unicode__(self): return self.render()
[docs]class ContentObject(object): def __init__(self, data): self.id = data.pop('id').value self.data = data
[docs]class ContentContainer(object): def __init__(self, content_data): self.contents = {} for datum in content_data: datum = ContentObject(datum) self.contents[int(datum.id)] = datum
[docs] def get(self, key): return self.contents.get(key, None)
[docs] def items(self): return self.contents.items()
@property def count(self): return len(self.contents)
[docs]class Result(object): def __init__(self, **kwargs): content = kwargs.pop('content', None) self.data = kwargs for field, value in self.data.iteritems(): setattr(self, field, value) if content: if type(content) is not list: content = [content] self.content = ContentContainer(content)
[docs] def iteritems(self): return self.data.iteritems()
[docs] def get(self, key, default=None): return self.data.get(key, default)
[docs]class ResultSet(object): def __init__(self, results, **kwargs): for key, value in kwargs.iteritems(): setattr(self, key, value) self.results = results @property def count(self): """ The number of :class:`.Result`\s in this :class:`.ResultSet`\. """ return len(self.results) def __getitem__(self, key): if type(key) is int: return self.results[key] return getattr(self, key, None)
[docs]class Repository(models.Model): name = models.CharField(max_length=255) description = models.TextField() configuration = models.TextField() def __init__(self, *args, **kwargs): super(Repository, self).__init__(*args, **kwargs) try: for method in self.configured_methods: setattr(self, method, self._method_factory(method)) except ValueError: pass @property def endpoint(self): return self._get_configuration()['endpoint'] @property def configured_methods(self): return self._get_configuration()['methods'].keys() @staticmethod def _list_handler(results): return ResultSet([Result(**result) for result in results]) @staticmethod def _instance_handler(result): return Result(**result) def _get_configuration(self, method=None): config = json.loads(self.configuration) if method: try: return config['methods'][method] except KeyError: raise AttributeError('No such method for this repository') return config def _method_factory(self, method): return lambda **q: self._execute_method(method, **q) def _get_request_fields(self, method): return self._get_configuration(method)['request'].get('fields', {}) def _get_response_fields(self, method): return self._get_configuration(method)['response'].get('fields', {}) def _validate_field_value(self, field, value): if 'type' not in field: return True try: if field['type'] == 'text': assert type(value) in [str, unicode] elif field['type'] == 'bool': assert type(value) is bool elif field['type'] == 'date': assert type(value) is datetime.datetime elif field['type'] == 'int': assert type(value) is int elif field['type'] == 'float': assert type(value) is float except AssertionError: raise ValueError('Invalid value for field %s: not a %s' % \ (field['name'], field['type'])) def _render_path_template(self, method, template, **payload): config = self._get_configuration(method) field_part = template.format(**payload) return urljoin(config['path'], field_part) def _get_path_for_method(self, method, **kwargs): config = self._get_configuration(method) template = config['request'].get('template', None) if template: path = self._render_path_template(method, template, **kwargs) else: path = config['path'] return urljoin(self.endpoint, path) # def _get_pagination(self, method, data): # config = self._get_configuration(method) # pagination_config = config['results'].get('pagination', None) # if pagination_config and pagination_config.get('paginated', False): def _get_data_by_path(self, data, path): path_parts = path.split('.') data = copy.copy(data) # Paths can be arbitrarily deep. if len(path_parts) > 0 and path_parts[0]: for part in path_parts: try: data = data.get(part) except (KeyError, AttributeError): # TODO: make this more informative, or add logging. raise RuntimeError('Response data does not match configuration') # If there is no usable path information, we simply return the data. return data def _get_field_data(self, fields, response_type, data): # The `path` of each field specifies where in the data to find data for # that field. field_map = {f.get('path'): k for k, f in fields.iteritems() if 'path' in f} config = self._get_configuration() def map_data(result): mapped_data = {} for path, field in field_map.iteritems(): mapped_data[fields[field]['name']] = FieldValue( fields[field]['display'], self._get_data_by_path(result, path), fields[field]['type'] ) # Rather than reading a field value directly from data, the value # may be a composite of values from other fields, or the # repository configuration itself. Composite fields are described # using the 'template' parameter in the field description. for field in fields.values(): if 'template' in field: template = field['template'] template_keys = [o[1] for o in Formatter().parse(template)] template_values = {} for key in template_keys: if key in mapped_data: template_values[key] = mapped_data[key].value elif key in data: template_values[key] = data[key] elif key in config: template_values[key] = config[key] mapped_data[field['name']] = FieldValue( field['display'], template.format(**template_values), 'text' ) return mapped_data if response_type == 'list': data = [map_data(result) for result in data] elif response_type == 'instance': data = map_data(data) return data def _get_content_data(self, method, data, response_type='instance'): config = self._get_configuration(method)['response'] if 'content' not in config: return # If metadata about the content of a result is present in the result # data itself, then this is indicated with the 'path' paramter. content_path = config['content'].get('path', None) content_method = config['content'].get('method', None) if method and not content_path: external_data = self._execute_method(content_method, raw=True, **data) return external_data content_data = self._get_data_by_path(data, content_path) content_fields = config['content']['fields'] content_results_type = config['content'].get('results', 'instance') if response_type == 'instance': return self._get_field_data(content_fields, content_results_type, content_data) elif response_type == 'list': return [self._get_field_data(content_fields, content_results_type, datum) for datum in data] return None def _get_results(self, method, data): # TODO: this should be more sophisticated. if type(data) is list: return data config = self._get_configuration(method)['response'] # The response path points to a property in the data that represents the # result set. data = self._get_data_by_path(data, config.get('path', '')) # These are the fields that we expect to find. fields = self._get_response_fields(method) response_type = config.get('results', 'instance') if response_type == 'list' and not type(data) is list: data = [data] content = self._get_content_data(method, data, response_type) data = self._get_field_data(fields, response_type, data) if response_type == 'instance' and content: data['content'] = content elif response_type == 'list' and content: for datum, ct in zip(data, content): datum['content'] = ct return data def _get_response_handler(self, method): response_type = self._get_configuration(method)['response']['results'] if response_type == 'list': return self._list_handler elif response_type == 'instance': return self._instance_handler def _execute_method(self, method, **kwargs): """ Handle a method call for this :class:`.Repository`\. This method is returned whenever a configured method is requested. Parameters ---------- method : str Name of a method in the ``.configuration`` for this repository. kwargs : kwargs Query/request parameters for the method. Returns ------- :class:`.Result` or :class:`.ResultSet` """ # If raw is True, then the raw result data will be returned, rather # than wrapping the results as Result and ResultSet instances. raw = kwargs.pop('raw', False) fields = self._get_request_fields(method) config = self._get_configuration() payload = {} for key, value in kwargs.iteritems(): if key not in fields: continue self._validate_field_value(fields[key], value) payload[key] = value request_path = self._get_path_for_method(method, **kwargs) print request_path if config['format'] == 'json': response = requests.get(request_path, params=payload) response_content = response.json() elif config['format'] == 'xml': headers = {'Accept': 'application/xml'} response = requests.get(request_path, params=payload, headers=headers) response_content = xmltodict.parse(response.text) result_data = self._get_results(method, response_content) if raw: # Just the facts, ma'am. return result_data return self._get_response_handler(method)(result_data)