SciPy

Source code for tethne.readers.base

import os
import re
import xml.etree.ElementTree as ET
import rdflib

import codecs
import chardet
import copy
import unicodedata

import logging

from io import BytesIO

# rdflib complains a lot.
logging.getLogger("rdflib").setLevel(logging.ERROR)

import sys
PYTHON_3 = sys.version_info[0] == 3
if PYTHON_3:
    unicode = str


def _fast_iter(context, func, tag):
    for event, elem in context:
        func(elem)
        if elem.tag == tag:
            elem.clear()
    del context


[docs]class dobject(object): pass
def _cast(value): """ Attempt to convert ``value`` to an ``int`` or ``float``. If unable, return the value unchanged. """ try: return int(value) except ValueError: try: return float(value) except ValueError: return value
[docs]class BaseParser(object): """ Base class for all data parsers. Do not instantiate directly. """ def __init__(self, path, **kwargs): self.path = path self.data = [] self.fields = set([]) for k, v in kwargs.iteritems(): setattr(self, k, v) self.open()
[docs] def new_entry(self): """ Prepare a new data entry. """ self.data.append(self.entry_class())
def _get_handler(self, tag): handler_name = 'handle_{tag}'.format(tag=tag) if hasattr(self, handler_name): return getattr(self, handler_name) return
[docs] def set_value(self, tag, value): setattr(self.data[-1], tag, value)
[docs] def postprocess_entry(self): for field in self.fields: processor_name = 'postprocess_{0}'.format(field) if hasattr(self.data[-1], field) and hasattr(self, processor_name): getattr(self, processor_name)(self.data[-1])
[docs]class IterParser(BaseParser): entry_class = dobject """Model for data entry.""" concat_fields = [] """ Multi-line fields here should be concatenated, rather than represented as lists. """ tags = {} def __init__(self, *args, **kwargs): super(IterParser, self).__init__(*args, **kwargs) self.current_tag = None self.last_tag = None if kwargs.get('autostart', True) and getattr(self, 'autostart', True): self.start()
[docs] def parse(self, parse_only=None): """ """ # The user should be able to limit parsing to specific fields. if parse_only: tag_lookup = {v: k for k, v in self.tags.iteritems()} self.parse_only = set([tag_lookup.get(field) for field in parse_only if field in tag_lookup]) while True: # Main loop. tag, data = self.next() if self.is_eof(tag): self.postprocess_entry() break self.handle(tag, data) self.last_tag = tag return self.data
[docs] def start(self): """ Find the first data entry and prepare to parse. """ while not self.is_start(self.current_tag): self.next() self.new_entry()
[docs] def handle(self, tag, data): """ Process a single line of data, and store the result. Parameters ---------- tag : str data : """ if self.is_end(tag): self.postprocess_entry() if self.is_start(tag): self.new_entry() if not data or not tag: return if getattr(self, 'parse_only', None) and tag not in self.parse_only: return # TODO: revisit encoding here. if isinstance(data, unicode): data = unicodedata.normalize('NFKD', data)#.encode('utf-8','ignore') handler = self._get_handler(tag) if handler is not None: data = handler(data) if tag in self.tags: # Rename the field. tag = self.tags[tag] # Multiline fields are represented as lists of values. if hasattr(self.data[-1], tag): value = getattr(self.data[-1], tag) if tag in self.concat_fields: value = ' '.join([value, unicode(data)]) elif type(value) is list: value.append(data) elif value not in [None, '']: value = [value, data] else: value = data setattr(self.data[-1], tag, value) self.fields.add(tag)
[docs]class FTParser(IterParser): """ Base parser for field-tagged data files. """ start_tag = 'ST' """Signals the start of a data entry.""" end_tag = 'ED' """Signals the end of a data entry."""
[docs] def is_start(self, tag): return tag == self.start_tag
[docs] def is_end(self, tag): return tag == self.end_tag
[docs] def is_eof(self, tag): return self.at_eof
[docs] def open(self): """ Open the data file. """ if not os.path.exists(self.path): raise IOError("No such path: {0}".format(self.path)) with open(self.path, "rb") as f: msg = f.read() result = chardet.detect(msg) self.buffer = codecs.open(self.path, "rb", encoding=result['encoding']) self.at_eof = False
[docs] def next(self): """ Get the next line of data. Returns ------- tag : str data : """ line = self.buffer.readline() while line == '\n': # Skip forward to the next line with content. line = self.buffer.readline() if line == '': # End of file. self.at_eof = True return None, None match = re.match('([A-Z]{2}|[C][1])\W(.*)', line) if match is not None: self.current_tag, data = match.groups() else: self.current_tag = self.last_tag data = line.strip() return self.current_tag, _cast(data)
def __del__(self): if hasattr(self, 'buffer'): self.buffer.close()
[docs]class XMLParser(IterParser): entry_element = 'article' entry_class = dobject
[docs] def open(self): # with open(self.path, 'r') as f: # self.root = ET.fromstring(f.read()) # pattern = './/{elem}'.format(elem=self.entry_element) # self.elements = self.root.findall(pattern) self.f = open(self.path, 'r') self.iterator = ET.iterparse(self.f) self.at_start = False self.at_end = False self.children = []
[docs] def new_entry(self): """ Prepare a new data entry. """ self.postprocess_entry() super(XMLParser, self).new_entry()
[docs] def is_start(self, tag): return tag == self.entry_element
[docs] def is_end(self, tag): return tag == self.entry_element
[docs] def is_eof(self, tag): return len(self.elements) == 0 and len(self.children) == 0
[docs] def start(self): self.new_entry()
[docs] def next(self, child): child = copy.deepcopy(child) tag, data = child.tag, child.text if data: data = data.strip() self.handle(tag, data) self.last_tag = tag
[docs] def parse(self, parse_only=None): """ """ # The user should be able to limit parsing to specific fields. if parse_only: tag_lookup = {v: k for k, v in self.tags.iteritems()} self.parse_only = set([tag_lookup.get(field) for field in parse_only if field in tag_lookup]) | set(parse_only) _fast_iter(self.iterator, self.next, self.entry_element) if len(self.data[-1].__dict__) == 0: del self.data[-1] return self.data
def __del__(self): if hasattr(self, 'f'): self.f.close()
[docs]class RDFParser(BaseParser): entry_elements = ['Document'] # meta_elements = [] concat_fields = []
[docs] def open(self): self.graph = rdflib.Graph() self.graph.parse(self.path) self.entries = [] for element in self.entry_elements: query = 'SELECT * WHERE { ?p a ' + element + ' }' self.entries += [r[0] for r in self.graph.query(query)]
[docs] def next(self): if len(self.entries) > 0: return self.entries.pop(0)
[docs] def parse(self): meta_fields, meta_refs = zip(*self.meta_elements) while True: # Main loop. entry = self.next() if entry is None: break self.new_entry() for s, p, o in self.graph.triples((entry, None, None)): if p in meta_refs: # Look for metadata fields. tag = meta_fields[meta_refs.index(p)] self.handle(tag, o) self.postprocess_entry() return self.data
[docs] def handle(self, tag, data): handler = self._get_handler(tag) if handler is not None: data = handler(data) if tag in self.tags: # Rename the field. tag = self.tags[tag] if data is not None: # Multiline fields are represented as lists of values. if hasattr(self.data[-1], tag): value = getattr(self.data[-1], tag) if tag in self.concat_fields: value = ' '.join([value, data]) elif type(value) is list: value.append(data) elif value not in [None, '']: value = [value, data] else: value = data setattr(self.data[-1], tag, value) self.fields.add(tag)