diff --git a/src/Db/Db.py b/src/Db/Db.py index 22a10516..9f372fc3 100644 --- a/src/Db/Db.py +++ b/src/Db/Db.py @@ -5,7 +5,10 @@ import logging import re import os import atexit +import threading import sys +import weakref +import errno import gevent @@ -71,6 +74,7 @@ class Db(object): self.schema["version"] = self.schema.get("version", 1) self.conn = None self.cur = None + self.cursors = weakref.WeakSet() self.id = next_db_id next_db_id += 1 self.progress_sleeping = False @@ -121,10 +125,16 @@ class Db(object): "Connected to %s in %.3fs (opened: %s, sqlite version: %s)..." % (self.db_path, time.time() - s, len(opened_dbs), sqlite3.version) ) + self.log.debug("Connect by thread: %s" % threading.current_thread().ident) self.log.debug("Connect called by %s" % Debug.formatStack()) finally: self.connect_lock.release() + def getConn(self): + if not self.conn: + self.connect() + return self.conn + def progress(self, *args, **kwargs): self.progress_sleeping = True time.sleep(0.001) @@ -199,6 +209,7 @@ class Db(object): def close(self, reason="Unknown"): if not self.conn: return False + self.connect_lock.acquire() s = time.time() if self.delayed_queue: self.processDelayed() @@ -207,10 +218,19 @@ class Db(object): self.need_commit = False self.commit("Closing: %s" % reason) self.log.debug("Close called by %s" % Debug.formatStack()) + for i in range(10): + if len(self.cursors) == 0: + break + self.log.debug("Pending cursors: %s" % len(self.cursors)) + time.sleep(0.1 * i) + if len(self.cursors): + self.log.debug("Killing cursors: %s" % len(self.cursors)) + self.conn.interrupt() + if self.cur: self.cur.close() if self.conn: - self.conn.close() + ThreadPool.main_loop.call(self.conn.close) self.conn = None self.cur = None self.log.debug("%s closed (reason: %s) in %.3fs, opened: %s" % (self.db_path, reason, time.time() - s, len(opened_dbs)))