1
2
3
4
5
6
7
8 import os
9 import Ice
10 import time
11 import numpy
12 import signal
13 import logging
14 import threading
15 import traceback
16 import subprocess
17 import exceptions
18 import portalocker
19
20 from path import path
21
22
23 import omero
24 import omero.clients
25
26
27 from omero.columns import *
28 from omero.rtypes import *
29 from omero.util.decorators import remoted, locked, perf
30 from omero_ext.functional import wraps
31
32
33 sys = __import__("sys")
34 tables = __import__("tables")
37 """
38 Returns the length of the argument or None
39 if the argument is None
40 """
41 if rv is None:
42 return None
43 return len(rv)
44
46 """
47 Decorator which takes the first argument after "self" and compares
48 that to the last modification time. If the stamp is older, then the
49 method call will throw an omero.OptimisticLockException. Otherwise,
50 execution will complete normally. If update is True, then the
51 last modification time will be updated after the method call if it
52 is successful.
53
54 Note: stamped implies locked
55
56 """
57 def check_and_update_stamp(*args, **kwargs):
58 self = args[0]
59 stamp = args[1]
60 if stamp < self._stamp:
61 raise omero.OptimisticLockException(None, None, "Resource modified by another thread")
62
63 try:
64 return func(*args, **kwargs)
65 finally:
66 if update:
67 self._stamp = time.time()
68 checked_and_update_stamp = wraps(func)(check_and_update_stamp)
69 return locked(check_and_update_stamp)
70
73 """
74 Since two calls to tables.openFile() return non-equal files
75 with equal fileno's, portalocker cannot be used to prevent
76 the creation of two HdfStorage instances from the same
77 Python process.
78 """
79
81 self._lock = threading.RLock()
82 self.__filenos = {}
83 self.__paths = {}
84
85 @locked
86 - def addOrThrow(self, hdfpath, hdffile, hdfstorage, action):
87 fileno = hdffile.fileno()
88 if fileno in self.__filenos.keys():
89 raise omero.LockTimeout(None, None, "File already opened by process: %s" % hdfpath, 0)
90 else:
91 self.__filenos[fileno] = hdfstorage
92 self.__paths[hdfpath] = hdfstorage
93 action()
94
95 @locked
97 try:
98 return self.__paths[hdfpath]
99 except KeyError:
100 return HdfStorage(hdfpath)
101
102 @locked
103 - def remove(self, hdfpath, hdffile):
104 del self.__filenos[hdffile.fileno()]
105 del self.__paths[hdfpath]
106
107
108 HDFLIST = HdfList()
111 """
112 Provides HDF-storage for measurement results. At most a single
113 instance will be available for any given physical HDF5 file.
114 """
115
116
118
119 """
120 file_path should be the path to a file in a valid directory where
121 this HDF instance can be stored (Not None or Empty). Once this
122 method is finished, self.__hdf_file is guaranteed to be a PyTables HDF
123 file, but not necessarily initialized.
124 """
125
126 if file_path is None or str(file_path) == "":
127 raise omero.ValidationException(None, None, "Invalid file_path")
128
129 self.logger = logging.getLogger("omero.tables.HdfStorage")
130 self.__hdf_path = path(file_path)
131 self.__hdf_file = self.__openfile("a")
132 self.__tables = []
133
134 self._lock = threading.RLock()
135 self._stamp = time.time()
136
137
138 self.__mea = None
139 self.__ome = None
140
141
142
143 try:
144 fileno = self.__hdf_file.fileno()
145 HDFLIST.addOrThrow(self.__hdf_path, self.__hdf_file, self,\
146 lambda: portalocker.lock(self.__hdf_file, portalocker.LOCK_NB|portalocker.LOCK_EX))
147 except portalocker.LockException, le:
148 self.cleanup()
149 raise omero.LockTimeout(None, None, "Cannot acquire exclusive lock on: %s" % self.__hdf_path, 0)
150
151 try:
152 self.__ome = self.__hdf_file.root.OME
153 self.__mea = self.__ome.Measurements
154 self.__types = self.__ome.ColumnTypes[:]
155 self.__descriptions = self.__ome.ColumnDescriptions[:]
156 self.__initialized = True
157 except tables.NoSuchNodeError:
158 self.__initialized = False
159
160
161
162
163
165 try:
166 return tables.openFile(self.__hdf_path, mode=mode, title="OMERO HDF Measurement Storege", rootUEP="/")
167 except IOError, io:
168 msg = "HDFStorage initialized with bad path: %s" % self.__hdf_path
169 self.logger.error(msg)
170 raise omero.ValidationException(None, None, msg)
171
173 if not self.__initialized:
174 raise omero.ApiUsageException(None, None, "Not yet initialized")
175
176
177
178
179
180 @locked
182 """
183
184 """
185
186 if self.__initialized:
187 raise omero.ValidationException(None, None, "Already initialized.")
188
189 self.__definition = columns2definition(cols)
190 self.__ome = self.__hdf_file.createGroup("/", "OME")
191 self.__mea = self.__hdf_file.createTable(self.__ome, "Measurements", self.__definition)
192
193 self.__types = [ x.ice_staticId() for x in cols ]
194 self.__descriptions = [ (x.description != None) and x.description or "" for x in cols ]
195 self.__hdf_file.createArray(self.__ome, "ColumnTypes", self.__types)
196 self.__hdf_file.createArray(self.__ome, "ColumnDescriptions", self.__descriptions)
197
198 self.__mea.attrs.version = "v1"
199 self.__mea.attrs.initialized = time.time()
200 if metadata:
201 for k, v in metadata.items():
202 self.__mea.attrs[k] = v
203
204
205 self.__mea.flush()
206 self.__hdf_file.flush()
207 self.__initialized = True
208
209 @locked
210 - def incr(self, table):
211 sz = len(self.__tables)
212 self.logger.info("Size: %s - Attaching %s to %s" % (sz, table, self.__hdf_path))
213 if table in self.__tables:
214 self.logger.warn("Already added")
215 raise omero.ApiUsageException(None, Non, "Already added")
216 self.__tables.append(table)
217 return sz + 1
218
219 @locked
220 - def decr(self, table):
221 sz = len(self.__tables)
222 self.logger.info("Size: %s - Detaching %s from %s", sz, table, self.__hdf_path)
223 if not (table in self.__tables):
224 self.logger.warn("Unknown table")
225 raise omero.ApiUsageException(None, None, "Unknown table")
226 self.__tables.remove(table)
227 if sz <= 1:
228 self.cleanup()
229 return sz - 1
230
231 @locked
233 return self._stamp <= stamp
234
235 @locked
239
240 @locked
241 - def cols(self, size, current):
242 self.__initcheck()
243 ic = current.adapter.getCommunicator()
244 types = self.__types
245 names = self.__mea.colnames
246 cols = []
247 for i in range(len(types)):
248 t = types[i]
249 n = names[i]
250 try:
251 col = ic.findObjectFactory(t).create(t)
252 col.name = n
253 col.size(size)
254 cols.append(col)
255 except:
256 msg = traceback.format_exc()
257 raise omero.ValidationException(None, msg, "BAD COLUMN TYPE: %s for %s" % (t,n))
258 return cols
259
260 @locked
277
278 @locked
280
281 arrays = []
282 names = []
283 for col in cols:
284 names.append(col.name)
285 arrays.append(col.array())
286 data = numpy.rec.fromarrays(arrays, names=names)
287 self.__mea.append(data)
288 self.__mea.flush()
289
290
291
292
293
294 @stamped
295 - def getWhereList(self, stamp, condition, variables, unused, start, stop, step):
298
299 - def _data(self, cols, rowNumbers):
300 data = omero.grid.Data()
301 data.columns = cols
302 data.rowNumbers = rowNumbers
303 data.lastModification = long(self._stamp*1000)
304 return data
305
306 @stamped
314
315 @stamped
316 - def slice(self, stamp, colNumbers, rowNumbers, current):
317 self.__initcheck()
318 if rowNumbers is None or len(rowNumbers) == 0:
319 rows = self.__mea.read()
320 else:
321 rows = self.__mea.readCoordinates(rowNumbers)
322 cols = self.cols(None, current)
323 rv = []
324 for i in range(len(cols)):
325 if colNumbers is None or len(colNumbers) == 0 or i in colNumbers:
326 col = cols[i]
327 col.values = rows[col.name].tolist()
328 rv.append(col)
329 return self._data(rv, rowNumbers)
330
331
332
333
334
337
338 @locked
340 self.logger.info("Cleaning storage: %s", self.__hdf_path)
341 if self.__mea:
342 self.__mea.flush()
343 self.__mea = None
344 if self.__ome:
345 self.__ome = None
346 if self.__hdf_file:
347 HDFLIST.remove(self.__hdf_path, self.__hdf_file)
348 hdffile = self.__hdf_file
349 self.__hdf_file = None
350 hdffile.close()
351
352
353
354
355 -class TableI(omero.grid.Table, omero.util.SimpleServant):
356 """
357 Spreadsheet implementation based on pytables.
358 """
359
360 - def __init__(self, ctx, file_obj, storage, uuid = "unknown"):
361 self.uuid = uuid
362 self.file_obj = file_obj
363 self.stamp = time.time()
364 self.storage = storage
365 omero.util.SimpleServant.__init__(self, ctx)
366 self.storage.incr(self)
367
369 """
370 Called periodically to check the resource is alive. Returns
371 False if this resource can be cleaned up. (Resources API)
372 """
373 self.logger.debug("Checking %s" % self)
374 return False
375
377 """
378 Decrements the counter on the held storage to allow it to
379 be cleaned up.
380 """
381 if self.storage:
382 try:
383 self.storage.decr(self)
384 finally:
385 self.storage = None
386
388 return "Table-%s" % self.uuid
389
390 @remoted
391 @perf
392 - def close(self, current = None):
393 try:
394 self.cleanup()
395 self.logger.info("Closed %s", self)
396 except:
397 self.logger.warn("Closed %s with errors", self)
398
399
400
401 @remoted
402 @perf
404 self.logger.info("%s.getOriginalFile() => %s", self, self.file_obj)
405 return self.file_obj
406
407 @remoted
408 @perf
410 rv = self.storage.cols(None, current)
411 self.logger.info("%s.getHeaders() => size=%s", self, slen(rv))
412 return rv
413
414 @remoted
415 @perf
420
421 @remoted
422 @perf
424 rv = self.storage.rows()
425 self.logger.info("%s.getNumberOfRows() => %s", self, rv)
426 return long(rv)
427
428 @remoted
429 @perf
430 - def getWhereList(self, condition, variables, start, stop, step, current = None):
431 if stop == 0:
432 stop = None
433 if step == 0:
434 step = None
435 rv = self.storage.getWhereList(self.stamp, condition, variables, None, start, stop, step)
436 self.logger.info("%s.getWhereList(%s, %s, %s, %s, %s) => size=%s", self, condition, variables, start, stop, step, slen(rv))
437 return rv
438
439 @remoted
440 @perf
442 self.logger.info("%s.readCoordinates(size=%s)", self, slen(rowNumbers))
443 return self.storage.readCoordinates(self.stamp, rowNumbers, current)
444
445 @remoted
446 @perf
447 - def slice(self, colNumbers, rowNumbers, current = None):
448 self.logger.info("%s.slice(size=%s, size=%s)", self, slen(colNumbers), slen(rowNumbers))
449 return self.storage.slice(self.stamp, colNumbers, rowNumbers, current)
450
451
452
453 @remoted
454 @perf
459
460 @remoted
461 @perf
463 raise omero.ApiUsageException(None, None, "NYI")
464
465 @remoted
466 @perf
467 - def addData(self, cols, current = None):
471
472
473 -class TablesI(omero.grid.Tables, omero.util.Servant):
474 """
475 Implementation of the omero.grid.Tables API. Provides
476 spreadsheet like functionality across the OMERO.grid.
477 This servant serves as a session-less, user-less
478 resource for obtaining omero.grid.Table proxies.
479
480 The first major step in initialization is getting
481 a session. This will block until the Blitz server
482 is reachable.
483 """
484
485 - def __init__(self,\
486 ctx,\
487 table_cast = omero.grid.TablePrx.uncheckedCast,\
488 internal_repo_cast = omero.grid.InternalRepositoryPrx.checkedCast):
489
490 omero.util.Servant.__init__(self, ctx, needs_session = True)
491
492
493
494 self._table_cast = table_cast
495 self._internal_repo_cast = internal_repo_cast
496
497 self.__stores = []
498 self._get_dir()
499 self._get_uuid()
500 self._get_repo()
501
503 """
504 Second step in initialization is to find the .omero/repository
505 directory. If this is not created, then a required server has
506 not started, and so this instance will not start.
507 """
508 wait = int(self.communicator.getProperties().getPropertyWithDefault("omero.repo.wait", "1"))
509 self.repo_dir = self.communicator.getProperties().getProperty("omero.repo.dir")
510
511 if not self.repo_dir:
512
513 self.repo_dir = self.ctx.getSession().getConfigService().getConfigValue("omero.data.dir")
514
515 self.repo_cfg = path(self.repo_dir) / ".omero" / "repository"
516 start = time.time()
517 while not self.repo_cfg.exists() and wait < (time.time() - start):
518 self.logger.info("%s doesn't exist; waiting 5 seconds..." % self.repo_cfg)
519 time.sleep(5)
520 count -= 1
521 if not self.repo_cfg.exists():
522 msg = "No repository found: %s" % self.repo_cfg
523 self.logger.error(msg)
524 raise omero.ResourceError(None, None, msg)
525
527 """
528 Third step in initialization is to find the database uuid
529 for this grid instance. Multiple OMERO.grids could be watching
530 the same directory.
531 """
532 cfg = self.ctx.getSession().getConfigService()
533 self.db_uuid = cfg.getDatabaseUuid()
534 self.instance = self.repo_cfg / self.db_uuid
535
537 """
538 Fourth step in initialization is to find the repository object
539 for the UUID found in .omero/repository/<db_uuid>, and then
540 create a proxy for the InternalRepository attached to that.
541 """
542
543
544 self.repo_uuid = (self.instance / "repo_uuid").lines()[0].strip()
545 if len(self.repo_uuid) != 38:
546 raise omero.ResourceError("Poorly formed UUID: %s" % self.repo_uuid)
547 self.repo_uuid = self.repo_uuid[2:]
548
549
550 self.repo_obj = self.ctx.getSession().getQueryService().findByQuery("select f from OriginalFile f where sha1 = :uuid",
551 omero.sys.ParametersI().add("uuid", rstring(self.repo_uuid)))
552 self.repo_mgr = self.communicator.stringToProxy("InternalRepository-%s" % self.repo_uuid)
553 self.repo_mgr = self._internal_repo_cast(self.repo_mgr)
554 self.repo_svc = self.repo_mgr.getProxy()
555
556 @remoted
558 """
559 Returns the Repository object for this Tables server.
560 """
561 return self.repo_svc
562
563 @remoted
564 @perf
565 - def getTable(self, file_obj, current = None):
566 """
567 Create and/or register a table servant.
568 """
569
570
571 self.logger.info("getTable: %s", (file_obj and file_obj.id and file_obj.id.val))
572 file_path = self.repo_mgr.getFilePath(file_obj)
573 p = path(file_path).dirname()
574 if not p.exists():
575 p.makedirs()
576
577 storage = HDFLIST.getOrCreate(file_path)
578 id = Ice.Identity()
579 id.name = Ice.generateUUID()
580 table = TableI(self.ctx, file_obj, storage, uuid = id.name)
581 self.resources.add(table)
582
583 prx = current.adapter.add(table, id)
584 return self._table_cast(prx)
585