import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel('ERROR')
from ...classes import GraphCollection
from util import *
from networkx import Graph
import numpy
from scipy.sparse import coo_matrix
import cPickle as pickle
[docs]class HDF5GraphCollection(GraphCollection):
"""
Provides HDF5 persistence for :class:`.GraphCollection`\.
At this time, the :class:`.HDF5GraphCollection` should only be used for
storing existing :class:`.GraphCollection`\, and NOT for direct
manipulation.
"""
def __init__(self, G=None, datapath=None):
"""
Initialize a :class:`.HDF5GraphCollection` with a
:class:`.GraphCollection`\.
"""
logger.debug('HDF5GraphCollection: initialize.')
self.h5file, self.path, self.uuid = get_h5file('GraphCollection', datapath)
logger.debug('HDF5GraphCollection: got h5file at path {0}'
.format(self.path))
# Load or create arrays group.
self.agroup = get_or_create_group(self.h5file, 'arrays')
logger.debug('HDF5GraphCollection: initialized array group.')
self.group = get_or_create_group(self.h5file, 'graphs')
# Forward and reverse indices for nodes.
if G is not None:
index_values = [ G.node_index[k] for k
in sorted(G.node_index.keys()) ]
else:
index_values = []
self.node_index = HDF5ArrayDict( self.h5file, self.agroup,
'node_index', index_values )
# Not stored.
self.node_lookup = { v:k for k,v in self.node_index.iteritems() }
logger.debug('initialized node index and lookup for {0} nodes'
.format(len(self.node_index)))
self.edge_list = [] # Not stored.
self.graphs = {}
gchildren = self.group._v_children.keys()
if len(gchildren) > 0:
for child in gchildren:
key = child[6:] # Cut off 'graph_' at start.
try: # Keys may be ints, but we can't store them that way.
key = int(key)
except:
pass
self.graphs[key] = HDF5Graph(
self.h5file, self.group, child, None)
elif G is None:
pass
else:
for key, graph in G.graphs.iteritems():
name = 'graph_' + str(key)
self.graphs[key] = HDF5Graph(
self.h5file, self.group, name, graph)
self.h5file.flush()
def __getitem__(self, key):
name = 'graph_' + str(key)
try:
return self.graphs[key]
except KeyError:
if name in self.group:
self.graphs[key] = HDF5Graph(
self.h5file, self.group, name, None)
else:
raise KeyError()
def __setitem__(self, key, value):
name = 'graph_' + str(key)
self.graphs[key] = HDF5Graph(self.h5file, self.group, name, value)
[docs]class HDF5Graph(Graph):
def __init__(self, h5file, pgroup, name, graph):
if type(h5file) is str:
self.h5file, self.path, self.uuid = get_h5file('HDF5Graph', h5file)
else:
self.h5file = h5file
self.group = get_or_create_group(h5file, name, where=pgroup)
if graph is None:
edge_values = None
node_values = None
else:
edge_values = graph.edge
node_values = graph.node
self.edge = HDF5EdgeAttributes(h5file, self.group, edge_values)
self.node = HDF5NodeAttributes(h5file, self.group, node_values)
self.adj = self.edge
[docs] def edges(self, data=False):
edges = self.edge.get_edges(data=data)
return edges
[docs] def nodes(self, data=False):
return self.node.get_nodes(data=data)
def __str__(self):
return 'tethne.persistence.hdf5.graphcollection.HDF5Graph object'
[docs] def to_graph(self):
"""
Convert :class:`.HDF5Graph` to a :class:`.networkx.Graph`\.
Returns
-------
graph : :class:`.networkx.Graph`
"""
graph = Graph()
# Transfer nodes.
for node in self.node.items():
i = node[0]
attrs = node[1]
graph.add_node(i, attrs)
# Transfer edges.
for edge in self.edge.items():
i = edge[0]
for j, attrs in edge[1].iteritems():
graph.add_edge(i,j,attrs)
return graph
[docs] def add_node(self, *args, **kwargs):
raise NotImplementedError('HDF5Graph does not support item assignment.')
[docs] def add_edge(self, *args, **kwargs):
raise NotImplementedError('HDF5Graph does not support item assignment.')
[docs]class HDF5EdgeAttributes(object):
def __init__(self, h5file, pgroup, edges):
self.h5file = h5file
self.group = get_or_create_group(h5file, 'edges', where=pgroup)
self.fieldgroup = get_or_create_group(
h5file, 'fieldgroup', where=self.group)
self.fields = get_or_create_table(
h5file, self.group, 'fields', FieldIndex)
I = []
J = []
K = []
V = []
self.field_values = {}
if 'neighbors' in self.group._v_children.keys(): # Data already exists?
fieldchildren = self.fieldgroup._v_children.keys()
for child in fieldchildren:
carray = get_or_create_array(
self.h5file, self.fieldgroup, child, None)
self.field_values[child] = carray
else: # No data in this group.
# Get the names and types of attribute fields.
fieldkeys = {}
mvalues = { 'index': [] }
reverse = {}
k = 1
for i, neighbors in edges.iteritems():
if i not in reverse: reverse[i] = []
for j, attributes in neighbors.iteritems():
# Avoid adding the same edge twice.
if j not in reverse: reverse[j] = []
if j in reverse[i]: continue
reverse[j].append(i)
I.append(i)
J.append(j)
K.append(int(k))
k += 1
V.append(attributes)
for name, value in attributes.iteritems():
if name not in mvalues:
this_type = str(type(value))
fieldkeys[name] = this_type
mvalues[name] = []
# Pad 0th entry will null values.
mvalues[name].append(mtypes[this_type])
# Generate attribute vectors.
for v in V:
for name,this_type in fieldkeys.iteritems():
if name in v:
mvalues[name].append(v[name])
else:
mvalues[name].append(mtypes[this_type])
# Get or create arrays that hold attribute vectors.
for name in fieldkeys.keys():
if fieldkeys[name] == str(type([])):
mvalues[name] = [ pickle.dumps(v) for v in mvalues[name] ]
self.field_values[name] = get_or_create_array(self.h5file,
self.fieldgroup,
name,
mvalues[name])
if len([ r for r in self.fields ]) == 0: # New Graph.
# Generate a fields table.
for name,this_type in fieldkeys.iteritems():
query = 'name == b"{0}"'.format(name)
matches = [ row for row in self.fields.where(query)]
if len(matches) == 0:
fieldentry = self.fields.row
fieldentry['name'] = name
fieldentry['type'] = this_type
fieldentry.append()
self.fields.flush()
self.neighbors = SparseArray(self.h5file, self.group, 'neighbors', I, J, K)
def __getitem__(self, i):
neighbors = self.neighbors.get_neighbors(i)
attributes = {}
for j in neighbors:
j_attributes = {}
k = self.neighbors[(i,j)]
attributes[j] = self._get_attributes(k)
return attributes
def _get_attributes(self, k):
attr_types = { row['name']:row['type'] for row in self.fields }
attr_names = attr_types.keys()
attr = {}
for name in attr_names:
vals = self.field_values[name].read()
value = vals[k]
this_type = attr_types[name]
if this_type == str(type('')):
value = str(value)
if value == '': continue
if this_type == str(type(1)):
value = int(value)
if value == 0: continue
if this_type == str(type(1.1)):
value = float(value)
if value == 0.0: continue
if this_type == str(type(u'')):
value = unicode(value)
if value == u'': continue
if this_type == str(type([])):
value = pickle.loads(value)
if value == []: continue
attr[name] = value
return attr
def __str__(self):
return str({ i:self[i] for i in self.neighbors.nodes() })
[docs] def items(self):
return [ (i,self[i]) for i in self.neighbors.nodes() ]
def __len__(self):
return self.neighbors.num_edges()
[docs] def get_edges(self, data=False):
edges = self.neighbors.get_edges(data=data)
if data:
edges_with_data = []
attr_types = { row['name']:row['type'] for row in self.fields }
attr_names = attr_types.keys()
for edge in edges:
k = edge[2]
attrs = self._get_attributes(k)
edges_with_data.append((edge[0], edge[1], attrs))
return edges_with_data
return edges
[docs]class SparseArray(object):
def __init__(self, h5file, pgroup, name, I, J, K):
self.h5file = h5file
self.pgroup = pgroup
self.name = name
self.group = get_or_create_group(h5file, name, where=pgroup)
self.I = get_or_create_array(h5file, self.group, 'I', I)
self.J = get_or_create_array(h5file, self.group, 'J', J)
self.K = get_or_create_array(h5file, self.group, 'K', K)
self.h5file.flush()
A = coo_matrix(self.K.read(), (self.I.read(), self.J.read())).tocsr()
self.shape = A.shape
def __getitem__(self, indices):
I = numpy.array(self.I.read())
J = numpy.array(self.J.read())
K = numpy.array(self.K.read())
i, j = indices
for x in xrange(len(I)):
if (I[x] == i and J[x] == j) or (I[x] == j and J[x] == i):
return K[x]
raise KeyError()
[docs] def get_neighbors(self, i):
I = numpy.array(self.I.read())
J = numpy.array(self.J.read())
neighbors = []
for x in xrange(len(I)):
if I[x] == i:
neighbors.append(J[x])
if J[x] == i:
neighbors.append(I[x])
return neighbors
def __len__(self):
return self.shape[1]
[docs] def nodes(self):
"""
Return a list of nodes for which there are edge data.
Returns
-------
nodes : list
"""
nodes = list(set(list(self.I.read()) + list(self.J.read())))
return nodes
[docs] def num_edges(self):
A = coo_matrix(self.K.read(), (self.I.read(), self.J.read()))
return len(A.nonzero()[0])
[docs] def get_edges(self, data=False):
I = self.I.read()
J = self.J.read()
K = self.K.read()
if data:
return zip(I,J,K)
return zip(I,J)
[docs]class HDF5NodeAttributes(object):
def __init__(self, h5file, pgroup, attributes=None):
self.h5file = h5file
self.group = get_or_create_group(h5file, 'nodes', where=pgroup)
self.fieldgroup = get_or_create_group(
h5file, 'fieldgroup', where=self.group)
self.fields = get_or_create_table(
self.h5file, self.group, 'fields', FieldIndex)
self.field_values = {}
if 'I' in self.group._v_children.keys(): # Data already exists?
fieldchildren = self.fieldgroup._v_children.keys()
for child in fieldchildren:
carray = get_or_create_array(
self.h5file, self.fieldgroup, child, None)
self.field_values[child] = carray
self.I = get_or_create_array(h5file, self.group, 'I', [])
else: # No data in this group.
# Get the names and types of attribute fields.
fieldkeys = {}
mvalues = { 'index': [] }
for node, attribs in attributes.iteritems():
for name, value in attribs.iteritems():
if name not in mvalues:
this_type = str(type(value))
fieldkeys[name] = this_type
mvalues[name] = []
# Holds node identifiers.
indices = sorted(attributes.keys()) # Ensure consistent order.
self.I = get_or_create_array(h5file, self.group, 'I', indices)
# Generate attribute vectors.
for i in indices:
for name,this_type in fieldkeys.iteritems():
if name in attributes[i]:
mvalues[name].append(attributes[i][name])
else:
mvalues[name].append(mtypes[this_type])
# Generate a fields table.
for name,this_type in fieldkeys.iteritems():
query = 'name == b"{0}"'.format(name)
matches = [ row for row in self.fields.where(query)]
if len(matches) == 0:
fieldentry = self.fields.row
fieldentry['name'] = name
fieldentry['type'] = this_type
fieldentry.append()
self.fields.flush()
# Get or create arrays that hold metadata vectors.
for name in fieldkeys.keys():
if fieldkeys[name] == str(type([])):
mvalues[name] = [ pickle.dumps(v) for v in mvalues[name] ]
self.field_values[name] = get_or_create_array(
self.h5file, self.fieldgroup, name, mvalues[name])
def __iter__(self):
return iter(self.get_nodes())
[docs] def get_nodes(self, data=False):
nodes = self.I.read()
if data:
nodelist = []
for n in nodes:
k = list(nodes).index(n)
attribs = self._get_attributes(k)
nodelist.append((n,attribs))
return nodelist
return nodes
[docs] def items(self):
return self.get_nodes(data=True)
def __str__(self):
return str({ n:a for n,a in self.get_nodes(data=True) })
def __len__(self):
return len(self.I.read())
def _get_attributes(self, k):
attr_types = { row['name']:row['type'] for row in self.fields }
attr_names = attr_types.keys()
attr = {}
for name in attr_names:
vals = self.field_values[name].read()
value = vals[k]
this_type = attr_types[name]
if this_type == str(type('')):
value = str(value)
if value == '': continue
if this_type == str(type(1)):
value = int(value)
if value == 0: continue
if this_type == str(type(1.1)):
value = float(value)
if value == 0.0: continue
if this_type == str(type(u'')):
value = unicode(value)
if value == u'': continue
if this_type == str(type([])):
value = pickle.loads(value)
if value == []: continue
attr[name] = value
return attr
def __setitem__(self, key, value):
raise AttributeError('Values can only be set on __init__')
def __getitem__(self, key):
"""
For each field in `self.fields` table, retrieve the value for `key`
from the corresponding attribute vector.
"""
i = list(self.I.read()).index(key) # Get the metadata array index.
fielddata = { row['name']:row['type'] for row in self.fields }
meta = {}
for name, type in fielddata.iteritems():
meta[name] = self._get_meta_entry(name, i, type)
return meta
def _get_meta_entry(self, name, index, this_type):
"""
Retrieve value for `index` from the attribute vector `name`, and
cast as `type` before returning.
"""
vals = self.field_values[name].read()
value = vals[index]
if this_type == str(type('')): return str(value)
if this_type == str(type(1)): return int(value)
if this_type == str(type(1.1)): return float(value)
if this_type == str(type(u'')): return unicode(value)
if this_type == str(type([])): return pickle.loads(value)
return value
mtypes = {
str(type('')): '',
str(type(1)): 0,
str(type(1.1)): 0.0,
str(type(u'')): u'',
str(type([])): pickle.dumps([]) }
[docs]def to_hdf5(G, datapath=None):
return HDF5GraphCollection(G, datapath=datapath)
[docs]def from_hdf5(HD_or_path):
"""
Load a :class:`.GraphCollection` from a :class:`.HDF5GraphCollection`\.
Parameters
----------
HD_or_path : str or :class:`.GraphCollection`
If str, must be a path to a :class:`.GraphCollection` HDF5 repo.
Returns
-------
G : :class:`.GraphCollection`
Examples
--------
From a path:
.. code-block:: python
>>> model = from_hdf5('/path/to/my/HDF5LDAModel.h5')
"""
if type(HD_or_path) is str:
hmodel = HDF5GraphCollection(datapath=HD_or_path)
elif type(HD_or_path) is HDF5GraphCollection:
hmodel = HD_or_path
else:
raise AttributeError('Must provide datapath or HDF5LDAModel object.')
G = GraphCollection()
for key, graph in hmodel.graphs.iteritems():
G.graphs[key] = graph.to_graph()
G.node_index = { k:v for k,v in hmodel.node_index.iteritems() }
G.edge_list = [ e for e in hmodel.edge_list ]
G.node_lookup = { k:v for k,v in hmodel.node_lookup.iteritems() }
return G