1
2
3
4
5
6 """This module handles seqdatabase.INI file.
7
8 Classes:
9 SeqDBRegistry Holds databases from seqdatabase.INI.
10
11 """
12 import os
13
14 from Bio.config import DBRegistry
15
16
17
18
19
20
21
22
23
24
25
26
27
29 """This object implements a dictionary-like interface to sequence
30 databases. To get a list of the databases available, do:
31 Bio.seqdb.keys()
32
33 Then, you can access the database using:
34 Bio.seqdb[DATABASE_NAME][SEQUENCE_ID]
35
36 """
39
51
57
58
59 seqdb = SeqDBRegistry("seqdb")
60
62 import warnings
63 warnings.warn(message)
64
66
67 import _stanzaformat
68
69 try:
70 stanzas = _stanzaformat.load(_openfu(ini_file))
71 except SyntaxError, x:
72 _warn("Can't load seqdb. Syntax error in %s: %s" % (ini_file, str(x)))
73 return None
74
75
76 if stanzas.version > "1.00":
77 _warn("I can't handle stanza files with version %s" % stanzas.version)
78 return None
79
80 protocol2handler = {
81 'flat' : _make_flat_db,
82 'biofetch' : _make_biofetch_db,
83 'biosql' : _make_biosql_db,
84 }
85
86 inidata = []
87 for stanza in stanzas.stanzas:
88 section_name, tagvalue_dict = stanza.name, stanza.tag_value_dict
89 section_key = section_name.lower()
90 inidata.append((section_name, section_key, tagvalue_dict))
91
92
93
94 seen = {}
95 i = 0
96 while i < len(inidata):
97 section_name, section_key, tagvalue_dict = inidata[i]
98
99 if "protocol" not in tagvalue_dict:
100 _warn("%s stanza missing 'protocol'. Skipping" % section_name)
101 del inidata[i]
102
103 elif "location" not in tagvalue_dict:
104 _warn("%s stanza missing 'location'. Skipping" % section_name)
105 del inidata[i]
106
107 elif tagvalue_dict['protocol'] not in protocol2handler:
108 _warn("%s protocol not handled. Skipping" %
109 tagvalue_dict['protocol'])
110 del inidata[i]
111
112 elif section_key in seen:
113 _warn("%s stanza already exists. Skipping" %
114 section_key)
115 del inidata[i]
116 else:
117 seen[section_key] = 1
118 i += 1
119
120
121
122 registry_objects = []
123 serial_groups = []
124 for section_name, section_key, tagvalue_dict in inidata:
125 handler = protocol2handler.get(tagvalue_dict['protocol'])
126 obj = handler(section_name, tagvalue_dict)
127 registry_objects.append(obj)
128
129 if tagvalue_dict.has_key("fallback_group"):
130 group_name = tagvalue_dict['fallback_group']
131 serial_groups.append((group_name, obj))
132
133
134 groups = {}
135 for group_name, obj in serial_groups:
136 if not groups.has_key(group_name):
137 groups[group_name] = DBRegistry.DBGroup(
138 group_name, behavior="serial")
139 groups[group_name].add(obj)
140 registry_objects.extend(groups.values())
141 return registry_objects
142
144 from Martel import Str
145
146 params = {}
147 params['name'] = name
148 params['cgi'] = tagvalue_dict['location']
149 dbname = tagvalue_dict.get("dbname", "embl")
150 params['params'] = [('style', 'raw'),
151 ('db', dbname),
152 ]
153 params['key'] = 'id'
154 params['doc'] = "Retrieve sequences from the %s database." % dbname
155
156 params['failure_cases'] = [
157 (Str("ERROR 1"), "Unknown database."),
158 (Str("ERROR 2"), "Unknown style."),
159 (Str("ERROR 3"), "Format not known for database."),
160 (Str("ERROR 4"), "ID not found in database."),
161 (Str("ERROR 5"), "Too many IDs."),
162 ]
163
164
165 return DBRegistry.CGIDB(**params)
166
167
168
169
170
171
172
173
175 """Register a BioSQL database defined in the registry."""
176 import re
177 params = {}
178 params['name'] = name
179
180
181 if not re.match(r"[a-zA-Z0-9_]+:\d+$", tagvalue_dict['location']):
182 _warn("Invalid location string: %s. I want <host:port>. Skipping" %
183 tagvalue_dict['location'])
184 host, port = tagvalue_dict['location'].split(":")
185 params['db_host'] = host
186 params['db_port'] = port
187
188 params['sql_db'] = tagvalue_dict['biodbname']
189 params['db_type'] = tagvalue_dict.get('driver', 'mysql').lower()
190 params['db_user'] = tagvalue_dict.get('user', 'root')
191 params['db_passwd'] = tagvalue_dict.get('passwd', '')
192 params['namespace_db'] = tagvalue_dict['dbname']
193
194 params["doc"] = "Retrieve %s sequences from BioSQL hosted at %s" % (
195 tagvalue_dict['dbname'], host)
196
197 return DBRegistry.BioSQLDB(**params)
198
200 """Register a Berkeley or Flat indexed file defined in the registry."""
201 params = {}
202 params['name'] = name
203 params['dbname'] = tagvalue_dict["dbname"]
204 params['doc'] = "Retrieve %s sequences from a local database." % \
205 tagvalue_dict["dbname"]
206 return DBRegistry.IndexedFileDB(**params)
207
209 """Guess whether this is a file or url and open it."""
210 if file_or_url[:4].lower() == 'http':
211 import urllib
212 return urllib.urlopen(file_or_url)
213
214 return open(file_or_url)
215
217 """_list_ini_paths() -> list of URL's or paths to search for files.
218
219 The default places to look for registry files are:
220 - ${HOME}/.bioinformatics
221 - /etc/bioinformatics
222 - http://www.open-bio.org/registry
223
224 The OBDA_SEARCH_PATH environment variable, if specified, overrides
225 the default. This should be a "+" separated list of paths or
226 URL's.
227
228 """
229 if os.environ.has_key("OBDA_SEARCH_PATH"):
230 paths = os.environ["OBDA_SEARCH_PATH"].split("+")
231 else:
232 paths = [
233 os.path.join(os.sep, "etc", "bioinformatics"),
234 "http://www.open-bio.org/registry",
235 ]
236
237 if os.environ.has_key("HOME"):
238 p = os.path.join(os.environ["HOME"], ".bioinformatics")
239 paths.insert(0, p)
240 return paths
241
243 """_list_ini_files(filename) -> list of files to search (in order)"""
244 files = []
245 searchpath = _list_ini_paths() + also_search
246 for path in searchpath:
247
248 fullname = os.path.join(path, filename)
249
250 try:
251 _openfu(fullname)
252 except IOError:
253 pass
254 else:
255 files.append(fullname)
256 return files
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271