Source code for champ.louvain_ext

import gzip
import sys
import tempfile
from contextlib import contextmanager
from multiprocessing import Pool,cpu_count
from time import time

from .champ_functions import create_halfspaces_from_array
from .champ_functions import get_intersection

import igraph as ig
import louvain
import numpy as np

try:
    import cpickle as pickle
except ImportError:
    import pickle as pickle

@contextmanager
def terminating(obj):
    '''
    Context manager to handle appropriate shutdown of processes
    :param obj: obj to open
    :return:
    '''
    try:
        yield obj
    finally:
        obj.terminate()

'''
Extension of Traag's implementation of louvain in python to use multiprocessing \
and allow for randomization
'''

[docs]class PartitionEnsemble(): '''Group of partitions of a graph stored in membership vector format The attribute for each partition is stored in an array and can be indexed :cvar graph: The graph associated with this PartitionEnsemble. Each ensemble \ can only have a single graph and the nodes on the graph must be orded the same as \ each of the membership vectors. :type graph: igraph.Graph :cvar partitions: List of membership vectors for each partition :cvar int_edges: Number of edges internal to the communities :type int_edges: list :cvar exp_edges: Number of expected edges (based on configuration model) :type exp_edges: list :cvar resoltions: If partitions were idenitfied with Louvain, what resolution \ were they identified at (otherwise None) :type resolutions: list :cvar orig_mods: Modularity of partition at the resolution it was identified at \ if Louvain was used (otherwise None). :type orig_mods: list :cvar numparts: number of partitions :type numparts: int :cvar ind2doms: Maps index of dominant partitions to boundary points of their dominant \ domains :type ind2doms: dict ''' def __init__(self,graph=None,listofparts=None,name='unnamed_graph',maxpt=None): self.partitions = [] self.int_edges = [] self.exp_edges = [] self.resolutions = [] self.orig_mods = [] self.numparts=0 self.graph=graph if listofparts!=None: self.add_partitions(listofparts,maxpt=maxpt) self.name=name
[docs] def get_adjacency(self): ''' Calc adjacency representation if it exists :return: self.adjacency ''' if self.adjacency==None: if 'weight' in self.graph.edge_attributes(): self.adjacency=self.graph.get_adjacency(type="GET_ADJACENCY_BOTH", attribute='weight') else: self.adjacency = self.graph.get_adjacency(type="GET_ADJACENCY_BOTH") return self.adjacency
[docs] def calc_internal_edges(self,memvec): ''' Uses igraph Vertex Clustering representation to calculate internal edges. see \ :meth:`louvain_ext.get_expected_edges` :param memvec: membership vector for which to calculate the internal edges. :type memvec: list :return: ''' # if "weight" in self.graph.edge_attributes(): # adj=self.graph.get_adjacency(attribute='weight') partobj = ig.VertexClustering(graph=self.graph, membership=memvec) weight = "weight" if "weight" in self.graph.edge_attributes() else None return get_sum_internal_edges()
[docs] def calc_expected_edges(self, memvec): ''' Uses igraph Vertex Clustering representation to calculate expected edges. see \ :meth:`louvain_ext.get_expected_edges` :param memvec: membership vector for which to calculate the expected edges :type memvec: list :return: expected edges under null :rtype: float ''' # adj = self.graph.as_adjacency() # m=np.sum(adj) # exp_adj = np.outer(self.graph.degree()) #create temporary VC object partobj=ig.VertexClustering(graph=self.graph,membership=memvec) weight = "weight" if "weight" in self.graph.edge_attributes() else None return get_expected_edges(partobj,weight)
def __getitem__(self, item): ''' List of paritions in the PartitionEnsemble object can be indexed directly :param item: index of partition for direct access :type item: int :return: self.partitions[item] :rtype: membership vector of community for partition ''' return self.partitions[item] def _check_lengths(self): ''' check all state variables for equal length :return: boolean indicating states varaible lengths are equal ''' if len(self.partitions)==len(self.int_edges) and \ len(self.partitions)==len(self.resolutions) and \ len(self.partitions)==len(self.exp_edges): return True else: return False
[docs] def add_partitions(self,partitions,maxpt=None): ''' Add additional partitions to the PartitionEnsemble object :param partitions: list of partitions to add to the PartitionEnsemble :type partitions: dict,list ''' #wrap in list. if not hasattr(partitions,'__iter__'): partitions=[partitions] for part in partitions: #This must be present self.partitions.append(part['partition']) if 'resolution' in part: self.resolutions.append(part['resolution']) else: self.resolutions.append(None) if 'int_edges' in part: self.int_edges.append(part['int_edges']) else: cint_edges=self.calc_internal_edges(part['partition']) self.int_edges.append(cint_edges) if 'exp_edges' in part: self.exp_edges.append(part['exp_edges']) else: cint_edges=self.calc_expected_edges(part['partition']) self.exp_edges.append(cint_edges) if "orig_mod" in part: self.orig_mods.append(part['orig_mod']) elif not self.resolutions[-1] is None: #calculated original modularity from orig resolution self.orig_mods.append(self.int_edges[-1]-self.resolutions[-1]*self.exp_edges) else: self.orig_mods.append(None) assert self._check_lengths() self.numparts=len(self.partitions) #update the pruned set self.apply_CHAMP(maxpt=maxpt)
[docs] def get_partition_dictionary(self, ind=None): ''' Get dictionary representation of partitions with the following keys: 'partition','resolution','orig_mod','int_edges','exp_edges' :param ind: optional indices of partitions to return. if not supplied all partitions will be returned. :type ind: int, list :return: list of dictionaries ''' if ind is not None: if not hasattr(ind,"__iter__"): ind=[ind] else: #return all of the partitions ind=range(len(self.partitions)) outdicts=[] for i in ind: cdict={"partition":self.partitions[i], "int_edges":self.int_edges[i], "exp_edges":self.exp_edges[i], "resolution":self.resolutions[i], "orig_mod":self.orig_mods[i]} outdicts.append(cdict) return outdicts
[docs] def merge_ensemble(self,otherEnsemble): ''' Combine to PartitionEnsembles. Checks for concordance in the number of vertices. \ Assumes that internal ordering on the graph nodes for each is the same. :param otherEnsemble: otherEnsemble to merge :return: new PartitionEnsemble with merged set of partitions ''' if not self.graph.vcount()==otherEnsemble.graph.vcount(): raise ValueError("PartitionEnsemble graph vertex counts do not match") bothpartitions=self.get_partition_dictionary().extend(otherEnsemble.get_partition_dictionary()) return PartitionEnsemble(self.graph,listofparts=bothpartitions)
[docs] def get_coefficient_array(self): ''' Create array of coefficents for each partition :return: np.array with coefficents for each of the partions ''' outlist=[] for i in range(len(self.partitions)): outlist.append([ self.int_edges[i], self.exp_edges[i] ]) return np.array(outlist)
[docs] def apply_CHAMP(self,maxpt=None): ''' Apply CHAMP to the partition ensemble. :param maxpt: maximum domain threshhold for included partition. I.e \ partitions with a domain greater than maxpt will not be included in pruned \ set :type maxpt: int ''' self.ind2doms=get_intersection(self.get_coefficient_array(),max_pt=maxpt)
[docs] def get_CHAMP_indices(self): ''' Get the indices of the partitions that form the pruned set after application of \ CHAMP :return: list of indices of partitions that are included in the prune set \ sorted by their domains of dominance :rtype: list ''' inds=zip(self.ind2doms.keys(),[val[0][0] for val in self.ind2doms.values()]) #asscending sort by last value of domain inds.sort(key=lambda x: x[1]) #retreive index return [ind[0] for ind in inds]
[docs] def get_CHAMP_partitions(self): '''Return the subset of partitions that form the outer envelop. :return: List of partitions in membership vector form of the paritions :rtype: list ''' inds=self.get_CHAMP_indices() return [self.partitions[i] for i in inds]
[docs] def save(self,filename=None): ''' Use pickle to dump representation to compressed file :param filename: ''' if filename is None: filename="%s_PartEnsemble_%d.gz" %(self.name,self.numparts) with gzip.open(filename,'wb') as fh: pickle.dump(self,fh)
@staticmethod
[docs] def open(filename): ''' Loads pickled PartitionEnsemble from file. :param file: filename of pickled PartitionEnsemble Object :return: writes over current instance and returns the reference ''' with gzip.open(filename,'rb') as fh: opened=pickle.load(fh) openedparts=opened.get_partition_dictionary() #construct and return return PartitionEnsemble(opened.graph,listofparts=openedparts)
##### STATIC METHODS ###### def get_sum_internal_edges(partobj,weight=None): ''' Get the count(strength) of edges that are internal to community: :math:`\\hat{A}=\\sum_{ij}{A_{ij}\\delta(c_i,c_j)}` :param partobj: :type partobj: igraph.VertexClustering :param weight: True uses 'weight' attribute of edges :return: float ''' sumA=0 for subg in partobj.subgraphs(): if weight!=None: sumA+= np.sum(subg.es[weight]) else: sumA+= subg.ecount() return 2.0*sumA def get_expected_edges(partobj,weight=None): ''' Get the expected internal edges under configuration models :math:`\\hat{P}=\\sum_{ij}{\\frac{k_ik_j}{2m}\\delta(c_i,c_j)}` :param partobj: :type partobj: igraph.VertexClustering :param weight: True uses 'weight' attribute of edges :return: float ''' if weight==None: m = partobj.graph.ecount() else: m=np.sum(partobj.graph.es['weight']) kk=0 #Hashing this upfront is alot faster (factor of 10). if weight==None: strengths=dict(zip(partobj.graph.vs['id'],partobj.graph.degree(partobj.graph.vs))) else: strengths=dict(zip(partobj.graph.vs['id'],partobj.graph.strength(partobj.graph.vs,weights="weight"))) for subg in partobj.subgraphs(): # since node ordering on subgraph doesn't match main graph, get vert id's in original graph # verts=map(lambda x: int(re.search("(?<=n)\d+", x['id']).group()),subg.vs) #you have to get full weight from original graph # svec=partobj.graph.strength(verts,weights='weight') #i think is what is slow svec=np.array(map(lambda(x):strengths[x],subg.vs['id'])) # svec=subg.strength(subg.vs,weights='weight') kk+=np.sum(np.outer(svec, svec)) return kk/(2.0*m) def rev_perm(perm): ''' Calculate the reverse of a permuation vector :param perm: permutation vector :type perm: list :return: reverse of permutation ''' rperm=list(np.zeros(len(perm))) for i,v in enumerate(perm): rperm[v]=i return rperm def get_orig_ordered_mem_vec(rev_order, membership): ''' Rearrange community membership vector according to permutation Used to realign community vector output after node permutation. :param rev_order: new indices of each nodes :param membership: community membership vector to be rearranged :return: rearranged membership vector. ''' new_member=[-1 for i in xrange(len(rev_order))] for i,val in enumerate(rev_order): new_member[val]=membership[i] assert(-1 not in new_member) #Something didn't get switched return new_member def run_louvain(gfile,gamma,nruns,weight=None,node_subset=None,attribute=None,output_dictionary=False): ''' Call the louvain method for a given graph file. This takes as input a graph file (instead of the graph object) to avoid duplicating references in the context of parallelization. To allow for flexibility, it allows for subsetting of the nodes each time. :param gfile: igraph file. Must be GraphMlz (todo: other extensions) :param node_subset: Subeset of nodes to keep (either the indices or list of attributes) :param gamma: resolution parameter to run louvain :param nruns: number of runs to conduct :param weight: optional name of weight attribute for the edges if network is weighted. :param output_dictionary: Boolean - output a dictionary representation without attached graph. :return: list of partition objects ''' np.random.seed() #reset seed for each process #Load the graph from the file g = ig.Graph.Read_GraphMLz(gfile) #have to have a node identifier to handle permutations. #Found it easier to load graph from file each time than pass graph object among process #This means you do have to filter out shared nodes and realign graphs. # Can avoid for g1 by passing None # if node_subset!=None: # subset is index of vertices to keep if attribute==None: gdel=node_subset # check to keep nodes with given attribute else: gdel=[ i for i,val in enumerate(g.vs[attribute]) if val not in node_subset] #delete from graph g.delete_vertices(gdel) outparts=[] for i in xrange(nruns): rand_perm = list(np.random.permutation(g.vcount())) rperm = rev_perm(rand_perm) gr=g.permute_vertices(rand_perm) #This is just a labelling switch. internal properties maintined. rp = louvain.find_partition(gr, method='RBConfiguration',weight=weight, resolution_parameter=gamma) #store the coefficients in return object. A=get_sum_internal_edges(rp,weight) P=get_expected_edges(rp,weight) outparts.append({'partition': get_orig_ordered_mem_vec(rperm, rp.membership), 'resolution':gamma, 'orig_mod': rp.quality, 'int_edges':A, 'exp_edges':P}) if not output_dictionary: return PartitionEnsemble(graph=g,listofparts=outparts) else: return outparts return part_ensemble def _run_louvain_parallel(gfile_gamma_nruns_weight_subset_attribute_progress): ''' Parallel wrapper with single argument input for calling :meth:`louvain_ext.run_louvain` :param gfile_att_2_id_dict_shared_gamma_runs_weight: tuple or list of arguments to supply :returns: PartitionEnsemble of graph stored in gfile ''' #unpack gfile,gamma,nruns,weight,node_subset,attribute,progress=gfile_gamma_nruns_weight_subset_attribute_progress t=time() outparts=run_louvain(gfile,gamma,nruns=nruns,weight=weight,node_subset=node_subset,attribute=attribute,output_dictionary=True) if progress is not None: if progress%100==0: print "Run %d at gamma = %.3f. Return time: %.4f" %(progress,gamma,time()-t) return outparts
[docs]def parallel_louvain(graph,start=0,fin=1,numruns=200,maxpt=None, numprocesses=None,attribute=None,weight=None,node_subset=None,progress=False): ''' Generates arguments for parallel function call of louvain on graph :param graph: igraph object to run Louvain on :param start: beginning of range of resolution parameter :math:`\\gamma` . Default is 0. :param fin: end of range of resolution parameter :math:`\\gamma`. Default is 1. :param numruns: number of intervals to divide resolution parameter, :math:`\\gamma` range into :param maxpt: Cutoff off resolution for domains when applying CHAMP. Default is None :type maxpt: int :param numprocesses: the number of processes to spawn. Default is number of CPUs. :param weight: If True will use 'weight' attribute of edges in runnning Louvain and calculating modularity. :param node_subset: Optionally list of indices or attributes of nodes to keep while partitioning :param attribute: Which attribute to filter on if node_subset is supplied. If None, node subset is assumed \ to be node indices. :param progress: Print progress in parallel execution :return: PartitionEnsemble of all partitions identified. ''' parallel_args=[] if numprocesses is None: numprocesses=cpu_count() tempf=tempfile.NamedTemporaryFile('wb') graphfile=tempf.name #filter before calling parallel if node_subset != None: # subset is index of vertices to keep if attribute == None: gdel = node_subset # check to keep nodes with given attribute else: gdel = [i for i, val in enumerate(graph.vs[attribute]) if val not in node_subset] # delete from graph graph.delete_vertices(gdel) graph.write_graphmlz(graphfile) for i in xrange(numruns): prognum = None if not progress else i curg = start + ((fin - start) / (1.0 * numruns)) * i parallel_args.append((graphfile ,curg,1, weight,None,None,prognum)) #use a context manager so pools properly shut down with terminating(Pool(processes=numprocesses)) as pool: parts_list_of_list=pool.map(_run_louvain_parallel, parallel_args ) all_part_dicts=[pt for partrun in parts_list_of_list for pt in partrun] outensemble=PartitionEnsemble(graph,listofparts=all_part_dicts,maxpt=maxpt) return outensemble
def main(): return if __name__=="__main__": sys.exit(main())