Source code for vfb_connect.neo.query_wrapper

import json
import os
import re
import shutil
import warnings
from inspect import getfullargspec
from string import Template
from xml.sax import saxutils
import pandas as pd
import pkg_resources
import requests
from functools import wraps


# from jsonpath_rw import parse as parse_jpath
from vfb_connect.neo.neo4j_tools import chunks, Neo4jConnect, dict_cursor, escape_string



[docs]def batch_query(func): # Assumes first arg is to be batches and that return value is list. Only works on class methods. # There has to be a better way to work with the values of args and kwargs than this!!!! @wraps(func) # Boilerplate required for Sphinx autodoc def wrapper_batch(*args, **kwargs): arg_names = getfullargspec(func).args if len(args) > 1: arg1v = args[1] arg1typ = 'a' else: arg1v = kwargs[arg_names[1]] arg1typ = 'k' cs = chunks(arg1v, 2500) out = [] for c in cs: if arg1typ == 'a': arglist = list(args) arglist[1] = c subargs = tuple(arglist) out.extend(func(*subargs, **kwargs)) elif arg1typ == 'k': kwargdict = dict(kwargs) kwargdict[arg_names[1]] = c out.extend(func(*args, **kwargdict)) return out return wrapper_batch
# def pop_from_jpath(jpath, json, join=True): # expr = parse_jpath(jpath) # if join: # return '|'.join([match.value for match in expr.find(json)]) # else: # return [match.value for match in expr.find(json)] def _populate_minimal_summary_tab(TermInfo): d = dict() d['label'] = TermInfo['term']['core']['label'] d['symbol'] = TermInfo['term']['core']['symbol'] d['id'] = TermInfo['term']['core']['short_form'] d['tags'] = '|'.join(TermInfo['term']['core']['types']) return d def _populate_anatomical_entity_summary(TermInfo): d = _populate_minimal_summary_tab(TermInfo) # d['parents_symbol'] = pop_from_jpath("$.parents[*].symbol", TermInfo) # d['parents_label'] = pop_from_jpath("$.parents[*].label", TermInfo) d['parents_label'] = '|'.join([str(p['label']) for p in TermInfo['parents']]) # d['parents_id'] = pop_from_jpath("$.parents[*].short_form", TermInfo) d['parents_id'] = '|'.join([str(p['short_form']) for p in TermInfo['parents']]) return d def _populate_instance_summary_tab(TermInfo): d = _populate_anatomical_entity_summary(TermInfo) # site_expr = "$.xrefs.[*].site.short_form" # acc_expr = "$.xrefs.[*].accession" # is_data_source_expr = "$.xrefs.[*].is_data_source" # sites = pop_from_jpath(site_expr, TermInfo, join=False) sites = [str(p['site']['short_form']) for p in TermInfo['xrefs']] accessions = [str(p['accession']) for p in TermInfo['xrefs'] if 'accession' in p.keys()] is_data_source = [p['is_data_source'] for p in TermInfo['xrefs'] if 'is_data_source' in p.keys()] i = 0 data_sources = [] ds_accessions = [] for ids in is_data_source: if ids: data_sources.append(sites[i]) ds_accessions.append(accessions[i]) i += 1 d['data_source'] = '|'.join(data_sources) d['accession'] = '|'.join(ds_accessions) d['templates'] = '|'.join([str(x['image']['template_anatomy']['label']) for x in TermInfo['channel_image']]) d['dataset'] = '|'.join([str(x['dataset']['core']['short_form']) for x in TermInfo['dataset_license']]) d['license'] = '|'.join([str(x['license']['link']) for x in TermInfo['dataset_license'] if 'link' in x['license'].keys()]) return d def _populate_dataset_summary_tab(TermInfo): d = _populate_minimal_summary_tab(TermInfo) d['description'] = TermInfo['term']['description'] d['miniref'] = '|'.join([str(x['core']['label']) for x in TermInfo['pubs']]) d['FlyBase'] = '|'.join([str(x['FlyBase']) for x in TermInfo['pubs'] if 'FlyBase' in x]) d['PMID'] = '|'.join([str(x['PubMed']) for x in TermInfo['pubs'] if 'PubMed' in x]) d['DOI'] = '|'.join([str(x['DOI']) for x in TermInfo['pubs'] if 'PubMed' in x]) return d def _populate_manifest(filename, instance): d = _populate_instance_summary_tab(instance) d['filename'] = filename return d # def filter_term_content(func): # """Decorator function that wraps queries that return lists of JSON objects and which have # an arg named filters. The filters arg takes a filter object, which specifics JPATH queries which are applied # as a filter to each returned JSON object so that the final result only contains the # specified paths and their values""" # # type_2_summary = { # 'individual': '_populate_instance_summary_tab', # 'class': '_populate_anatomical_entity_summary', # } # def filter_wrapper(*args, **kwargs): # func_ret = func(*args, **kwargs) # # arg_names = getfullargspec(func).args # Potentially useful for capturing defaults # # but some problem getting working with # # nested decorators # out = [] # if ('filters' in kwargs.keys()): # for f in kwargs['filters']: # expr = parse_jpath(f) # for fr in func_ret: # matching_fields = [match for match in expr.find(fr)] # for mf in matching_fields: # out.append(mf.value) # return out # else: # return func_ret # # return filter_wrapper
[docs]def gen_simple_report(terms): nc = Neo4jConnect("https://pdb.virtualflybrain.org", "neo4j", "neo4j") query = """MATCH (n:Class) WHERE n.iri in %s WITH n OPTIONAL MATCH (n)-[r]->(p:pub) WHERE r.typ = 'syn' WITH n, COLLECT({ synonym: r.synonym, PMID: 'PMID:' + p.PMID, miniref: p.label}) AS syns OPTIONAL MATCH (n)-[r]-(p:pub) WHERE r.typ = 'def' with n, syns, collect({ PMID: 'PMID:' + p.PMID, miniref: p.label}) as pubs OPTIONAL MATCH (n)-[:SUBCLASSOF]->(super:Class) RETURN n.short_form as short_form, n.label as label, n.description as description, syns, pubs, super.label, super.short_form """ % str(terms) q = nc.commit_list([query]) # add check return dict_cursor(q)
[docs]class QueryWrapper(Neo4jConnect): def __init__(self, *args, **kwargs): super(QueryWrapper, self).__init__(*args, **kwargs) query_json = pkg_resources.resource_filename( "vfb_connect", "resources/VFB_TermInfo_queries.json") with open(query_json, 'r') as f: self.queries = json.loads(saxutils.unescape(f.read())) # def get_sites(self): # return def _query(self, q): qr = self.commit_list([q]) if not qr: raise Exception('Query failed.') else: r = dict_cursor(qr) if not r: warnings.warn('No results returned') return [] else: return r
[docs] def get_images(self, short_forms: iter, template, image_folder, image_type='swc', stomp=False): """Given an iterable of `short_forms` for instances, find all images of specified `image_type` registered to `template`. Save these to `image_folder` along with a manifest.tsv. Return manifest as pandas DataFrame. :param short_forms: iterable (e.g. list) of VFB IDs of Individuals with images :param template: template name :param image_folder: folder to save image files & manifest to. :param image_type: image type (file extension) :param stomp: Overwrite image_folder if already exists. :return: Manifest as Pandas DataFrame """ # TODO - make image type into array short_forms = list(short_forms) manifest = [] if stomp and os.path.isdir(image_folder): if shutil.rmtree.avoids_symlink_attacks: shutil.rmtree(image_folder) else: warnings.warn("Not deleting %s, stomp option not supported on this system for security reasons," "please delete manually." % image_folder) os.mkdir(image_folder) inds = self.get_anatomical_individual_TermInfo(short_forms=short_forms) for i in inds: if not ('has_image' in i['term']['core']['types']): continue label = i['term']['core']['label'] image_matches = [x['image'] for x in i['channel_image']] if not image_matches: continue for imv in image_matches: if imv['template_anatomy']['label'] == template: r = requests.get(imv['image_folder'] + '/volume.' + image_type) ### Slightly dodgy warning - could mask network errors if not r.ok: warnings.warn("No '%s' file found for '%s'." % (image_type, label)) continue filename = re.sub('\W', '_', label) + '.' + image_type with open(image_folder + '/' + filename, 'w') as image_file: image_file.write(r.text) manifest.append(_populate_manifest(instance=i, filename=filename)) manifest_df = pd.DataFrame.from_records(manifest) manifest_df.to_csv(image_folder + '/manifest.tsv', sep='\t') return manifest_df
[docs] def get_dbs(self): """Get a list of available database IDs :return: list of VFB IDs.""" query = "MATCH (i:Individual) " \ "WHERE 'Site' in labels(i) OR 'API' in labels(i)" \ "return i.short_form" return [d['i.short_form'] for d in self._query(query)]
[docs] def get_datasets(self, summary=False): """Generate JSON report of all datsets. :param summary: Optional. Returns summary reports if `True`. Default `False` :return: Returns a list of terms as nested python data structures following VFB_json or a summary_report_json :return type: list of VFB_json or summary_report_json """ dc = self._query("MATCH (ds:DataSet) " "RETURN ds.short_form AS sf") short_forms = [d['sf'] for d in dc] return self.get_DataSet_TermInfo(short_forms, summary=summary)
[docs] def get_templates(self, summary=False): """Generate JSON report of all available templates. :param summary: Optional. Returns summary reports if `True`. Default `False` :return: Returns a list of terms as nested python data structures following VFB_json or a summary_report_json :return type: list of VFB_json or summary_report_json """ dc = self._query("MATCH (i:Individual:Template:Anatomy) " "RETURN i.short_form as sf") short_forms = [d['sf'] for d in dc] return self.get_anatomical_individual_TermInfo(short_forms, summary=summary)
[docs] def vfb_id_2_xrefs(self, vfb_id: iter, db='', id_type='', reverse_return=False): """Map a list of short_form IDs in VFB to external DB IDs :param vfb_id: An iterable (e.g. a list) of VFB short_form IDs. :param db: optional specify the VFB id (short_form) of an external DB to map to. (use get_dbs to find options) :param id_type: optionally specify an external id_type :param reverse_return: Boolean: Optional (see return) :return: if `reverse_return` is False: dict { VFB_id : [{ db: <db> : acc : <acc> } Return if `reverse_return` is `True`: dict { acc : [{ db: <db> : vfb_id : <VFB_id> } """ vfb_id = list(set(vfb_id)) match = "MATCH (s:Individual)<-[r:database_cross_reference]-(i:Entity) " \ "WHERE i.short_form in %s" % str(vfb_id) clause1 = '' if db: clause1 = "AND s.short_form = '%s'" % db clause2 = '' if id_type: clause2 = "AND r.id_type = '%s'" % id_type ret = "RETURN i.short_form as key, " \ "collect({ db: s.short_form, acc: r.accession[0]}) as mapping" if reverse_return: ret = "RETURN r.accession[0] as key, " \ "collect({ db: s.short_form, vfb_id: i.short_form }) as mapping" q = ' '.join([match, clause1, clause2, ret]) dc = self._query(q) mapping = {d['key']: d['mapping'] for d in dc} unmapped = set(vfb_id)-set(mapping.keys()) if unmapped: warnings.warn("The following IDs do not match DB &/or id_type constraints: %s" % str(unmapped)) return mapping
[docs] def vfb_id_2_neuprint_bodyID(self, vfb_id, db=''): mapping = self.vfb_id_2_xrefs(vfb_id, db=db, reverse_return=True) return [int(k) for k, v in mapping.items()]
[docs] def xref_2_vfb_id(self, acc=None, db='', id_type='', reverse_return=False): """Map a list external DB IDs to VFB IDs :param acc: An iterable (e.g. a list) of external IDs (e.g. neuprint bodyIDs). :param db: optional specify the VFB id (short_form) of an external DB to map to. (use get_dbs to find options) :param id_type: optionally specify an external id_type :param reverse_return: Boolean: Optional (see return) :return: if `reverse_return` is False: dict { acc : [{ db: <db> : vfb_id : <VFB_id> } Return if `reverse_return` is `True`: dict { VFB_id : [{ db: <db> : acc : <acc> } """ match = "MATCH (s:Individual)<-[r:database_cross_reference]-(i:Entity) WHERE" conditions = [] if not (acc is None): acc = [str(a) for a in set(acc)] conditions.append("r.accession[0] in %s" % str(acc)) if db: conditions.append("s.short_form = '%s'" % db) if id_type: conditions.append("r.id_type = '%s'" % id_type) condition_clauses = ' AND '.join(conditions) ret = "RETURN r.accession[0] as key, " \ "collect({ db: s.short_form, vfb_id: i.short_form }) as mapping" if reverse_return: ret = "RETURN i.short_form as key, " \ "collect({ db: s.short_form, acc: r.accession[0]}) as mapping" q = ' '.join([match, condition_clauses, ret]) # print(q) dc = self._query(q) return {d['key']: d['mapping'] for d in dc}
[docs] @batch_query def get_terms_by_xref(self, acc, db='', id_type=''): """ Generate JSON report for terms specified by a list of IDs :param short_forms: An iterable (e.g. a list) of VFB IDs (short_forms) :return: list of term metadata as VFB_json """ return self.get_TermInfo(list(self.xref_2_vfb_id(acc, db=db, id_type=id_type, reverse_return=True).keys()))
[docs] def get_images_by_filename(self, filenames, dataset=None): """Takes a list of filenames as input and returns a list of image terminfo. Optionally restrict by dataset (improves speed)""" m = "MATCH (ds:DataSet)<-[has_source]-(ai:Individual)<-[:depicts]" \ "-(channel:Individual)-[irw:in_register_with]-(tc:Template)" w = "WHERE irw.filename[0]in %s" % str([escape_string(f) for f in filenames]) if dataset: w += "AND ds.short_form = '%s'" % dataset r = "RETURN ai.short_form" dc = self._query(' '.join([m, w, r])) return self.get_anatomical_individual_TermInfo([d['ai.short_form'] for d in dc])
[docs] @batch_query def get_TermInfo(self, short_forms: iter): """Generate JSON report for terms specified by a list of IDs :param short_forms: An iterable (e.g. a list) of VFB IDs (short_forms) :param db: optionally specify the VFB id (short_form) of external DB. :param id_type: optionally specify an external id_type :return: list of term metadata as VFB_json """ pre_query = "MATCH (e:Entity) " \ "WHERE e.short_form in %s " \ "RETURN e.short_form as short_form, labels(e) as labs " % str(short_forms) r = self._query(pre_query) out = [] for e in r: if 'Class' in e['labs']: out.extend(self.get_type_TermInfo([e['short_form']])) elif 'Individual' in e['labs'] and 'Anatomy' in e['labs']: out.extend(self.get_anatomical_individual_TermInfo([e['short_form']])) elif 'DataSet' in e['labs']: out.extend(self.get_DataSet_TermInfo([e['short_form']])) return out
@batch_query def _get_TermInfo(self, short_forms: iter, typ, show_query=False, summary=False): short_forms = list(short_forms) sfl = "', '".join(short_forms) qs = Template(self.queries[typ]).substitute(ID=sfl) if show_query: print(qs) if summary: return self._termInfo_2_summary(self._query(qs), typ=typ) else: return self._query(qs) def _get_anatomical_individual_TermInfo_by_type(self, classification, summary=False): typ = 'Get JSON for Individual:Anatomy_by_type' qs = Template(self.queries[typ]).substitute(ID=classification) if summary: return self._termInfo_2_summary(self._query(qs), typ='Get JSON for Individual') else: return self._query(qs) def _termInfo_2_summary(self, TermInfo, typ): # type_2_summary = { # 'Get JSON for Individual': '_populate_instance_summary_tab', # 'Get JSON for Class': '_populate_anatomical_entity_summary', # } dc = [] for r in TermInfo: if typ == 'Get JSON for Individual': dc.append(_populate_instance_summary_tab(r)) elif typ == 'Get JSON for Class': dc.append(_populate_anatomical_entity_summary(r)) elif typ == 'Get JSON for DataSet': dc.append(_populate_dataset_summary_tab(r)) return dc
[docs] def get_anatomical_individual_TermInfo(self, short_forms, summary=False): """ Generate JSON reports for anatomical individuals from a list of VFB IDs (short_forms) :param short_forms: An iterable (e.g. a list) of VFB IDs (short_forms) of anatomical individuals :param summary: Optional. Returns summary reports if `True`. Default `False` :rtype: list of VFB_json or summary_report_json """ return self._get_TermInfo(short_forms, typ='Get JSON for Individual', summary=summary)
[docs] def get_type_TermInfo(self, short_forms, summary=False): """ Generate JSON reports for types from a list of VFB IDs (short_forms) of classes/types. :param short_forms: An iterable (e.g. a list) of VFB IDs (short_forms) of types :param summary: Optional. Returns summary reports if `True`. Default `False` :rtype: list of VFB_json or summary_report_json """ return self._get_TermInfo(short_forms, typ='Get JSON for Class', summary=summary)
[docs] def get_DataSet_TermInfo(self, short_forms, summary=False): """ Generate JSON reports for types from a list of VFB IDs (short_forms) of DataSets. :param short_forms: An iterable (e.g. a list) of VFB IDs (short_forms) of types :param summary: Optional. Returns summary reports if `True`. Default `False` :rtype: list of VFB_json or summary_report_json """ return self._get_TermInfo(short_forms, typ='Get JSON for DataSet', summary=summary)
[docs] def get_template_TermInfo(self, short_forms): """ Generate JSON reports for types from a list of VFB IDs (short_forms) of templates. :param short_forms: An iterable (e.g. a list) of VFB IDs (short_forms) of types :param summary: Optional. Returns summary reports if `True`. Default `False` :rtype: list of VFB_json or summary_report_json """ return self._get_TermInfo(short_forms, typ='Get JSON for Template')