SciPy

Source code for tethne.model.corpus.dtmmodel

"""
Classes and methods related to the :class:`.DTMModel`\.
"""

from ..basemodel import BaseModel
import numpy as np
import os
import re
import csv

# Logging.
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')

[docs]class DTMModel(BaseModel): """ Represents a Dynamic Topic Model (DTM). The DTM is similar to the LDA model (see :class:`.LDAModel`) except that each topic is permitted to evolve over time (i.e. probabilities associated with terms in the topic can change). For a complete description of the model see `Blei & Lafferty 2006 <http://www.cs.cmu.edu/~lafferty/pub/dtm.pdf>`_. To generate a :class:`.DTMModel` from a :class:`.Corpus` use the :class:`.DTMModelManager`\, which relies on S. Gerrish's `C++ implementation of DTM <http://code.google.com/p/princeton-statistical-learning/downloads/detail?name=dtm_release-0.8.tgz>`_. Alternatively, you can build the model externally (e.g. using the Gerrish DTM implementation directly), and then load the results with :func:`.from_gerrish`\. If you are using a different implementation of DTM, you can initialize a :class:`.DTMModel` directly by providing parameters and metadata. * ``e_theta`` should describe the distribution of topics (rows) in documents (cols). * ``phi`` should describe the topic (dimension 0) distributions over words (dimension 1) over time (dimension 2). * ``metadata`` should map matrix indices for documents onto :class:`.Paper` IDs (or whatever you use to identify documents). * ``vocabulary`` should map matrix indices for words onto word-strings. .. autosummary:: :nosignatures: list_topic list_topics topic_evolution print_topic print_topics Parameters ---------- e_theta : matrix-like Distribution of topics (Z) in documents (M). Shape: (Z, M). phi : matrix-like Topic (Z) distribution over words (W), over time (T). Shape: (Z, W, T) metadata : dict Maps matrix indices onto document datadata. vocabulary : dict Maps W indices onto words. """ def __init__(self, e_theta, phi, metadata, vocabulary): """ Initialize the :class:`.DTMModel`\. """ self.e_theta = e_theta self.Z = e_theta.shape[0] # Number of topics. self.M = e_theta.shape[1] # Number of documents. self.phi = phi self.W = phi.shape[1] # Number of words. self.T = phi.shape[2] # Number of time periods. self.metadata = metadata self.vocabulary = vocabulary self.lookup = { v['id']:k for k,v in metadata.iteritems() } logging.debug('DTMModel.__init__(): loaded model with' + \ ' {0} topics, {1} documents,'.format(self.Z, self.M) + \ ' {0} words, {1} time periods.'.format(self.W, self.T)) def _item_description(self, i, **kwargs): """ Proportion of each topic in document. """ return [ (k, self.e_theta[k, i]) for k in xrange(self.e_theta[:, i].size) ] def _dimension_description(self, k, t=0, **kwargs): """ Yields probability distribution over terms. """ return [ (w, self.phi[k, w, t]) for w in xrange(self.phi[k, :, t].size) ] def _dimension_items(self, k, threshold, **kwargs): """ Returns items that contain ``k`` at or above ``threshold``. Parameters ---------- k : int Topic index. threshold : float Minimum representation of ``k`` in document. Returns ------- description : list A list of ( item, weight ) tuples. """ description = [ (self.metadata[i]['id'], self.e_theta[k,i]) for i in xrange(self.e_theta[k,:].size) if self.e_theta[k,i] >= threshold ] return description
[docs] def topic_evolution(self, k, Nwords=5): """ Generate a plot that shows p(w|z) over time for the top ``Nwords`` terms. Parameters ---------- k : int A topic index. Nwords : int Number of words to return. Returns ------- keys : list Start-date of each time-period. t_series : list Array of p(w|t) for Nwords for each time-period. """ t_keys = range(self.T) t_values = {} for t in t_keys: dim = self.dimension(k, t=t, top=Nwords) for w,p in dim: if w not in t_values: t_values[w] = {} t_values[w][t] = p t_series = {} for w, values in t_values.iteritems(): word = self.vocabulary[w] series = [] for t in t_keys: if t in values: series.append(values[t]) else: # No value for that time-period. series.append(0.) t_series[word] = series return t_keys, t_series
[docs] def list_topic(self, k, t, Nwords=10): """ Yields the top ``Nwords`` for topic ``k``. Parameters ---------- k : int A topic index. t : int A time index. Nwords : int Number of words to return. Returns ------- as_list : list List of words in topic. """ words = self.dimension(k, t=t, top=Nwords) as_list = [ self.vocabulary[w] for w,p in words ] return as_list
[docs] def list_topic_diachronic(self, k, Nwords=10): as_dict = { t:self.list_topic(k, t, Nwords) for t in xrange(self.T) } return as_dict
[docs] def print_topic_diachronic(self, k, Nwords=10): as_dict = self.list_topic_diachronic(k, Nwords) s = [] for key, value in as_dict.iteritems(): s.append('{0}: {1}'.format(key, ', '.join(value))) as_string = '\n'.join(s) return as_string
[docs] def print_topic(self, k, t, Nwords=10): """ Yields the top ``Nwords`` for topic ``k``. Parameters ---------- k : int A topic index. t : int A time index. Nwords : int Number of words to return. Returns ------- as_string : str Joined list of words in topic. """ as_string = ', '.join(self.list_topic(k, t=t, Nwords=Nwords)) return as_string
[docs] def list_topics(self, t, Nwords=10): """ Yields the top ``Nwords`` for each topic. Parameters ---------- t : int A time index. Nwords : int Number of words to return for each topic. Returns ------- as_dict : dict Keys are topic indices, values are list of words. """ as_dict = {} for k in xrange(self.Z): as_dict[k] = self.list_topic(k, t, Nwords) return as_dict
[docs] def print_topics(self, t, Nwords=10): """ Yields the top ``Nwords`` for each topic. Parameters ---------- t : int A time index. Nwords : int Number of words to return for each topic. Returns ------- as_string : str Newline-delimited lists of words for each topic. """ as_dict = self.list_topics(t, Nwords) s = [] for key, value in as_dict.iteritems(): s.append('{0}: {1}'.format(key, ', '.join(value))) as_string = '\n'.join(s) return as_string
[docs]def from_gerrish(target, metadata, vocabulary, metadata_key='doi'): """ Generate a :class:`.DTMModel` from the output of `S. Gerrish's C++ DTM implementation <http://code.google.com/p/princeton-statistical-learning/downloads/detail?name=dtm_release-0.8.tgz>`_. The Gerrish DTM implementation generates a large number of data files contained in a directory called ``lda-seq``. The ``target`` parameter should be the path to that directory. ``metadata`` should be the path to a tab-delimted metadata file. Those records should occur in the same order as in the corpus data files used to generate the model. For example:: id date atitle 10.2307/2437162 1945 SOME ECOTYPIC RELATIONS OF DESCHAMPSIA CAESPITOSA 10.2307/4353229 1940 ENVIRONMENTAL INFLUENCE AND TRANSPLANT EXPERIMENTS 10.2307/4353158 1937 SOME FUNDAMENTAL PROBLEMS OF TAXONOMY AND PHYLOGENETICS ``vocabulary`` should be the path to a file containing the words used to generate the model, one per line. Parameters ---------- target : str Path to ``lda-seq`` output directory. metadata : str Path to metadata file. vocabulary : str Path to vocabulary file. Returns ------- :class:`.DTMModel` """ e_log_prob = 'topic-{0}-var-e-log-prob.dat' info = 'topic-{0}-info.dat' obs = 'topic-{0}-obs.dat' reader = GerrishLoader(target, metadata, vocabulary)#, metadata, vocabulary) return reader.load()
[docs]class GerrishLoader(object): """ Helper class for parsing results from `S. Gerrish's C++ implementation <http://code.google.com/p/princeton-statistical-learning/downloads/detail?name=dtm_release-0.8.tgz>`_ Parameters ---------- target : str Path to ``lda-seq`` output directory. metadata : str Path to metadata file. vocabulary : str Path to vocabulary file. Returns ------- :class:`.DTMModel` """ def __init__(self, target, metadata_path, vocabulary_path): self.target = target self.metadata_path = metadata_path self.vocabulary_path = vocabulary_path self.handler = { 'prob': self._handle_prob, 'info': self._handle_info, 'obs': self._handle_obs } self.tdict = {}
[docs] def load(self): try: contents = os.listdir(self.target) lda_seq_dir = os.listdir('{0}/lda-seq'.format(self.target)) except OSError: raise OSError("Invalid target path.") # Metadata. self._handle_metadata() self._handle_vocabulary() # Meta-parameters. self._handle_metaparams() # Topic proportions. self._handle_gammas() # p(w|z) for fname in lda_seq_dir: fs = re.split('-|\.', fname) if fs[0] == 'topic': z_s = fs[1] z = int(z_s) self.handler[fs[-2]](fname, z) tkeys = sorted(self.tdict.keys()) self.phi = np.array( [ self.tdict[z] for z in tkeys ]) self.model = DTMModel(self.e_theta, self.phi, self.metadata, self.vocabulary) return self.model
def _handle_metaparams(self): # Read metaparameters. with open('{0}/lda-seq/info.dat'.format(self.target), 'rb') as f: for line in f.readlines(): ls = line.split() if ls[0] == 'NUM_TOPICS': self.N_z = int(ls[1]) elif ls[0] == 'NUM_TERMS': self.N_w = int(ls[1]) elif ls[0] == 'SEQ_LENGTH': self.N_t = int(ls[1]) elif ls[0] == 'ALPHA': self.A = np.array(ls[2:]) def _handle_gammas(self): # Read gammas -> e_theta with open('{0}/lda-seq/gam.dat'.format(self.target), 'rb') as f: data = np.array(f.read().split()) self.N_d = data.shape[0]/self.N_z b = data.reshape((self.N_d, self.N_z)).astype('float32') rs = np.sum(b, axis=1) self.e_theta = np.array([ b[:,z]/rs for z in xrange(self.N_z) ]) def _handle_prob(self, fname, z): """ - topic-???-var-e-log-prob.dat: the e-betas (word distributions) for topic ??? for all times. This is in row-major form, """ with open('{0}/lda-seq/{1}'.format(self.target, fname), 'rb') as f: data = np.array(f.read().split()).reshape((self.N_w, self.N_t)) self.tdict[z] = np.exp(data.astype('float32')) def _handle_info(self, fname, z): """ No need to do anything with these yet. """ pass def _handle_obs(self, fname, z): """ TODO: Figure out what, if anything, this is good for. """ pass def _handle_metadata(self): """ Returns ------- metadata : dict Keys are document indices, values are identifiers from a :class:`.Paper` property (e.g. DOI). """ if self.metadata_path is None: self.metadata = None return self.metadata = {} with open(self.metadata_path, "rU") as f: reader = csv.reader(f, delimiter='\t') all_lines = [ l for l in reader ] keys = all_lines[0] lines = all_lines[1:] i = 0 for l in lines: self.metadata[i] = { keys[i]:l[i] for i in xrange(0, len(l)) } i += 1 return self.metadata def _handle_vocabulary(self): """ Returns ------- vocabulary : dict Keys are word indices, values are word strings. """ if self.vocabulary_path is None: raise RuntimeError("No vocabulary provided.") # Build vocabulary self.vocabulary = {} with open(self.vocabulary_path, 'rU') as f: i = 0 for v in f.readlines(): self.vocabulary[i] = v.strip('\n') i += 1 return self.vocabulary