SciPy
Need help? Have a feature request? Please check out the tethne-users group .

Source code for tethne.readers.wos

"""
Parser for Web of Science field-tagged bibliographic data.

Tethne parsers Web of Science field-tagged data into a set of
:class:`.Paper`\s, which are then encapsulated in a :class:`.Corpus`\. The
:class:`.WoSParser` can be instantiated directly, or you can simply use
:func:`.read` to parse a single file or a directory containing several data
files.

.. code-block:: python

   >>> from tethne.readers import wos
   >>> corpus = wos.read("/path/to/some/wos/data")
   >>> corpus
   <tethne.classes.corpus.Corpus object at 0x10057c2d0>

"""

import re, os
from collections import defaultdict

from tethne.readers.base import FTParser
from tethne import Corpus, Paper, StreamingCorpus
from tethne.utilities import _strip_punctuation, _space_sep, strip_tags, is_number

import sys
PYTHON_3 = sys.version_info[0] == 3
if PYTHON_3:
    unicode = str


[docs]class WoSParser(FTParser): """ Parser for Web of Science field-tagged data. .. code-block:: python >>> from tethne.readers.wos import WoSParser >>> parser = WoSParser("/path/to/download.txt") >>> papers = parser.read() """ start_tag = 'PT' """ Field-tag used to mark the start of a record. """ end_tag = 'ER' """ Field-tag used to mark the end of a record. """ concat_fields = ['abstract', 'keywords', 'funding', 'title', 'references', 'journal'] """ Fields that span multiple lines that should be concatenated into a single value. """ entry_class = Paper """ The class that should be used to represent a single bibliographic record. This can be changed to support more sophisticated data models. """ tags = { 'PY': 'date', 'SO': 'journal', 'AB': 'abstract', 'TI': 'title', 'AF': 'authors_full', 'AU': 'authors_init', 'ID': 'authorKeywords', 'DE': 'keywordsPlus', 'DI': 'doi', 'BP': 'pageStart', 'EP': 'pageEnd', 'VL': 'volume', 'IS': 'issue', 'CR': 'citedReferences', 'DT': 'documentType', 'CA': 'groupAuthors', 'ED': 'editors', 'SE': 'bookSeriesTitle', 'BS': 'bookSeriesSubtitle', 'LA': 'language', 'CT': 'conferenceTitle', 'CY': 'conferenceDate', 'HO': 'conferenceHost', 'CL': 'conferenceLocation', 'SP': 'conferenceSponsors', 'C1': 'authorAddress', 'RP': 'reprintAddress', 'EM': 'emailAddress', 'FU': 'funding', 'NR': 'citationCount', 'TC': 'timesCited', 'PU': 'publisher', 'PI': 'publisherCity', 'PA': 'publisherAddress', 'SC': 'subject', 'SN': 'ISSN', 'BN': 'ISSN', 'UT': 'wosid', 'JI': 'isoSource', } """ Maps field-tags onto field names. """
[docs] def parse_author(self, value): """ Attempts to split an author name into last and first parts. """ tokens = tuple([t.upper().strip() for t in value.split(',')]) if len(tokens) == 1: tokens = value.split(' ') if len(tokens) > 0: if len(tokens) > 1: aulast, auinit = tokens[0:2] # Ignore JR, II, III, etc. else: aulast = tokens[0] auinit = '' else: aulast, auinit = tokens[0], '' aulast = _strip_punctuation(aulast).upper() auinit = _strip_punctuation(auinit).upper() return aulast, auinit
[docs] def handle_AF(self, value): return self.parse_author(value)
[docs] def handle_PY(self, value): """ WoS publication years are cast to integers. """ return int(value)
[docs] def handle_AU(self, value): aulast, auinit = self.parse_author(value) auinit = _space_sep(auinit) # Separate author initials with spaces. return aulast, auinit
[docs] def handle_TI(self, value): """ Convert article titles to Title Case. """ return unicode(value).title()
[docs] def handle_VL(self, value): """ Volume should be a unicode string, even if it looks like an integer. """ return unicode(value)
[docs] def handle_CR(self, value): """ Parses cited references. """ citation = self.entry_class() value = strip_tags(value) # First-author name and publication date. ptn = '([\w\s\W]+),\s([0-9]{4}),\s([\w\s]+)' ny_match = re.match(ptn, value, flags=re.U) nj_match = re.match('([\w\s\W]+),\s([\w\s]+)', value, flags=re.U) if ny_match is not None: name_raw, date, journal = ny_match.groups() elif nj_match is not None: name_raw, journal = nj_match.groups() date = None else: return datematch = re.match('([0-9]{4})', value) if datematch: date = datematch.group(1) name_raw = None if name_raw: name_tokens = [t.replace('.', '') for t in name_raw.split(' ')] if len(name_tokens) > 4 or value.startswith('*'): # Probably not a person. proc = lambda x: _strip_punctuation(x) aulast = ' '.join([proc(n) for n in name_tokens]).upper() auinit = '' elif len(name_tokens) > 0: aulast = name_tokens[0].upper() proc = lambda x: _space_sep(_strip_punctuation(x)) auinit = ' '.join([proc(n) for n in name_tokens[1:]]).upper() else: aulast = name_tokens[0].upper() auinit = '' setattr(citation, 'authors_init', [(aulast, auinit)]) if date: date = int(date) setattr(citation, 'date', date) setattr(citation, 'journal', journal) # Volume. v_match = re.search('\,\s+V([0-9A-Za-z]+)', value) if v_match is not None: volume = v_match.group(1) else: volume = None setattr(citation, 'volume', volume) # Start page. p_match = re.search('\,\s+[Pp]([0-9A-Za-z]+)', value) if p_match is not None: page = p_match.group(1) else: page = None setattr(citation, 'pageStart', page) # DOI. doi_match = re.search('DOI\s(.*)', value) if doi_match is not None: doi = doi_match.group(1) else: doi = None setattr(citation, 'doi', doi) return citation
[docs] def postprocess_WC(self, entry): """ Parse WC keywords. Subject keywords are usually semicolon-delimited. """ if type(entry.WC) not in [str, unicode]: WC= u' '.join([unicode(k) for k in entry.WC]) else: WC= entry.WC entry.WC= [k.strip().upper() for k in WC.split(';')]
[docs] def postprocess_subject(self, entry): """ Parse subject keywords. Subject keywords are usually semicolon-delimited. """ if type(entry.subject) not in [str, unicode]: subject = u' '.join([unicode(k) for k in entry.subject]) else: subject = entry.subject entry.subject = [k.strip().upper() for k in subject.split(';')]
[docs] def postprocess_authorAddress(self, entry): """ Parses ``authorAddress`` field into ``address``. :attr:`.Paper.address` will be a ``dict`` mapping author name-tuples (e.g. ``(u'PEROT', u'R')``) onto a list of (institution, country, [address, parts]) tuples. If it is not possible to determine an explicit mapping, then there will be only one key, ``__all__`` with a list of all (parsed) addresses in the record. Examples -------- .. code-block:: python >>> corpus[0].addresses { '__all__': [ ( u'CTR OCEANOG MURCIA', u'SPAIN', [u'Ctr Oceanog Murcia', u'Inst Espanol Oceanog', u'Murcia 30740', u'Spain.'] ) ] } .. code-block:: python >>> corpus[0].addresses { (u'KLEINDIENST', u'SARA'): [ ( u'UNIV GEORGIA', u'USA', [u'Univ Georgia', u'Dept Marine Sci', u'Athens', u'GA 30602 USA.'] ) ], (u'PAUL', u'JOHN H'): [ ( u'UNIV S FLORIDA', u'USA', [u'Univ S Florida', u'Coll Marine Sci', u'St Petersburg', u'FL 33701 USA.'] ) ], (u'JOYE', u'SAMANTHA B'): [ ( u'UNIV GEORGIA', u'USA', [u'Univ Georgia', u'Dept Marine Sci', u'Athens', u'GA 30602 USA.'] ) ] } """ if not hasattr(entry, 'authorAddress'): return if not type(entry.authorAddress) is list: entry.authorAddress = [entry.authorAddress.strip()] _clean = lambda s: s.strip().upper().replace('.', '') def _process_address(address_part): address_parts = address_part.split(',') # The insitution --usually-- comes first. institution = _clean(address_parts[0]) # The country --usually-- comes last. country = _clean(address_parts[-1]) # USA addresses usually include the state, zip code, and country, usa_match = re.match('[A-Z]{2}\s+[0-9]{5}\s+(USA)', country) # or sometimes just the state and country. usa_match_state = re.match('[A-Z]{2}\s+(USA)', country) if usa_match: country, = usa_match.groups() elif usa_match_state: country, = usa_match_state.groups() return institution, country, [a.strip() for a in address_parts] # We won't assume that there is only one address per author. addresses_final = defaultdict(list) # for addr in addresses + addresses_2: for addr in entry.authorAddress: # More recent WoS records have explicit author-address mappings. match = re.match('[[](.+)[]](.+)', addr) if match: name_part, address_part = match.groups() name_parts = [name.split(',') for name in name_part.split(';')] names = [] for part in name_parts: # We may encounter non-human names, or names that don't # follow western fore/surnmae conventions. if len(part) == 2: surname = _clean(part[0]) forename = _clean(part[1]) names.append((surname, forename)) institution, country, address_parts = _process_address(address_part) for name in names: addresses_final[name].append((institution, country, address_parts)) else: addresses_final['__all__'].append(_process_address(addr)) entry.addresses = dict(addresses_final) # Keep it native.
[docs] def postprocess_authorKeywords(self, entry): """ Parse author keywords. Author keywords are usually semicolon-delimited. """ if type(entry.authorKeywords) not in [str, unicode]: aK = u' '.join([unicode(k) for k in entry.authorKeywords]) else: aK = entry.authorKeywords entry.authorKeywords = [k.strip().upper() for k in aK.split(';')]
[docs] def postprocess_keywordsPlus(self, entry): """ Parse WoS "Keyword Plus" keywords. Keyword Plus keywords are usually semicolon-delimited. """ if type(entry.keywordsPlus) in [str, unicode]: entry.keywordsPlus = [k.strip().upper() for k in entry.keywordsPlus.split(';')]
[docs] def postprocess_funding(self, entry): """ Separates funding agency from grant numbers. """ if type(entry.funding) not in [str, unicode]: return sources = [fu.strip() for fu in entry.funding.split(';')] sources_processed = [] for source in sources: m = re.search('(.*)?\s+\[(.+)\]', source) if m: agency, grant = m.groups() else: agency, grant = source, None sources_processed.append((agency, grant)) entry.funding = sources_processed
[docs] def postprocess_authors_full(self, entry): """ If only a single author was found, ensure that ``authors_full`` is nonetheless a list. """ if type(entry.authors_full) is not list: entry.authors_full = [entry.authors_full]
[docs] def postprocess_authors_init(self, entry): """ If only a single author was found, ensure that ``authors_init`` is nonetheless a list. """ if type(entry.authors_init) is not list: entry.authors_init = [entry.authors_init]
[docs] def postprocess_citedReferences(self, entry): """ If only a single cited reference was found, ensure that ``citedReferences`` is nonetheless a list. """ if type(entry.citedReferences) is not list: entry.citedReferences = [entry.citedReferences]
[docs]def from_dir(path, corpus=True, **kwargs): raise DeprecationWarning("from_dir() is deprecated. Use read() instead.") papers = [] for sname in os.listdir(path): if sname.endswith('txt') and not sname.startswith('.'): papers += read(os.path.join(path, sname), corpus=False) if corpus: return Corpus(papers, **kwargs) return papers
[docs]def corpus_from_dir(path, **kwargs): raise DeprecationWarning("corpus_from_dir is deprecated in v0.8, use" + " read directly, instead.") return read(path, corpus=True, **kwargs)
[docs]def read_corpus(path, **kwargs): """ .. DANGER:: read_corpus is deprecated in v0.8, use :func:`.read` instead. """ raise DeprecationWarning("read_corpus is deprecated in v0.8, use" + " read directly, instead.") return read(path, corpus=True, **kwargs)
[docs]def read(path, corpus=True, index_by='wosid', streaming=False, parse_only=None, corpus_class=Corpus, **kwargs): """ Parse one or more WoS field-tagged data files. Examples -------- .. code-block:: python >>> from tethne.readers import wos >>> corpus = wos.read("/path/to/some/wos/data") >>> corpus <tethne.classes.corpus.Corpus object at 0x10057c2d0> Parameters ---------- path : str Path to WoS field-tagged data. Can be a path directly to a single data file, or to a directory containing several data files. corpus : bool If True (default), returns a :class:`.Corpus`\. If False, will return only a list of :class:`.Paper`\s. Returns ------- :class:`.Corpus` or :class:`.Paper` """ if not os.path.exists(path): raise ValueError('No such file or directory') # We need the primary index field in the parse results. if parse_only: parse_only.append(index_by) if streaming: return streaming_read(path, corpus=corpus, index_by=index_by, parse_only=parse_only, **kwargs) if os.path.isdir(path): # Directory containing 1+ WoS data files. papers = [] for sname in os.listdir(path): if sname.endswith('txt') and not sname.startswith('.'): papers += read(os.path.join(path, sname), corpus=False, parse_only=parse_only) else: # A single data file. papers = WoSParser(path).parse(parse_only=parse_only) if corpus: return corpus_class(papers, index_by=index_by, **kwargs) return papers
[docs]def streaming_read(path, corpus=True, index_by='wosid', parse_only=None, **kwargs): return read(path, corpus=corpus, index_by=index_by, parse_only=parse_only, corpus_class=StreamingCorpus, **kwargs)
# corpus = StreamingCorpus(index_by=index_by, **kwargs) # if os.path.isdir(path): # Directory containing 1+ WoS data files. # papers = [] # for sname in os.listdir(path): # if sname.endswith('txt') and not sname.startswith('.'): # corpus.add_papers(read(os.path.join(path, sname), # corpus=False, # parse_only=parse_only)) # else: # A single data file. # corpus.add_papers(WoSParser(path).parse(parse_only=parse_only)) # # return corpus