2015-07-12 20:36:46 +02:00
|
|
|
import sqlite3
|
|
|
|
import json
|
|
|
|
import time
|
|
|
|
import logging
|
|
|
|
import re
|
|
|
|
import os
|
2015-08-06 00:51:25 +02:00
|
|
|
import gevent
|
2015-07-12 20:36:46 +02:00
|
|
|
|
2015-03-19 21:19:14 +01:00
|
|
|
from DbCursor import DbCursor
|
2016-03-12 23:11:25 +01:00
|
|
|
from Config import config
|
2017-07-14 10:34:57 +02:00
|
|
|
from util import SafeRe
|
2017-08-09 14:19:39 +02:00
|
|
|
from util import helper
|
2015-03-19 21:19:14 +01:00
|
|
|
|
2015-08-06 00:51:25 +02:00
|
|
|
opened_dbs = []
|
|
|
|
|
|
|
|
|
|
|
|
# Close idle databases to save some memory
|
2015-08-16 11:51:00 +02:00
|
|
|
def dbCleanup():
|
2015-08-06 00:51:25 +02:00
|
|
|
while 1:
|
|
|
|
time.sleep(60 * 5)
|
|
|
|
for db in opened_dbs[:]:
|
2016-05-16 22:28:30 +02:00
|
|
|
idle = time.time() - db.last_query_time
|
2016-09-07 17:40:35 +02:00
|
|
|
if idle > 60 * 5:
|
2015-08-06 00:51:25 +02:00
|
|
|
db.close()
|
|
|
|
|
2015-08-16 11:51:00 +02:00
|
|
|
gevent.spawn(dbCleanup)
|
2015-08-06 00:51:25 +02:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
|
Version 0.3.5, Rev830, Full Tor mode support with hidden services, Onion stats in Sidebar, GeoDB download fix using Tor, Gray out disabled sites in Stats page, Tor hidden service status in stat page, Benchmark sha256, Skyts tracker out expodie in, 2 new tracker using ZeroNet protocol, Keep SSL cert option between restarts, SSL Certificate pinning support for connections, Site lock support for connections, Certificate pinned connections using implicit SSL, Flood protection whitelist support, Foreign keys support for DB layer, Not support for SQL query helper, 0 length file get bugfix, Pex onion address support, Faster port testing, Faster uPnP port opening, Need connections more often on owned sites, Delay ZeroHello startup message if port check or Tor manager not ready yet, Use lockfiles to avoid double start, Save original socket on proxy monkey patching to get ability to connect localhost directly, Handle atomic write errors, Broken gevent https workaround helper, Rsa crypt functions, Plugin to Bootstrap using ZeroNet protocol
2016-01-05 00:20:52 +01:00
|
|
|
class Db(object):
|
2015-03-19 21:19:14 +01:00
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
def __init__(self, schema, db_path):
|
|
|
|
self.db_path = db_path
|
|
|
|
self.db_dir = os.path.dirname(db_path) + "/"
|
|
|
|
self.schema = schema
|
|
|
|
self.schema["version"] = self.schema.get("version", 1)
|
|
|
|
self.conn = None
|
|
|
|
self.cur = None
|
|
|
|
self.log = logging.getLogger("Db:%s" % schema["db_name"])
|
|
|
|
self.table_names = None
|
|
|
|
self.collect_stats = False
|
Version 0.3.5, Rev830, Full Tor mode support with hidden services, Onion stats in Sidebar, GeoDB download fix using Tor, Gray out disabled sites in Stats page, Tor hidden service status in stat page, Benchmark sha256, Skyts tracker out expodie in, 2 new tracker using ZeroNet protocol, Keep SSL cert option between restarts, SSL Certificate pinning support for connections, Site lock support for connections, Certificate pinned connections using implicit SSL, Flood protection whitelist support, Foreign keys support for DB layer, Not support for SQL query helper, 0 length file get bugfix, Pex onion address support, Faster port testing, Faster uPnP port opening, Need connections more often on owned sites, Delay ZeroHello startup message if port check or Tor manager not ready yet, Use lockfiles to avoid double start, Save original socket on proxy monkey patching to get ability to connect localhost directly, Handle atomic write errors, Broken gevent https workaround helper, Rsa crypt functions, Plugin to Bootstrap using ZeroNet protocol
2016-01-05 00:20:52 +01:00
|
|
|
self.foreign_keys = False
|
2015-07-12 20:36:46 +02:00
|
|
|
self.query_stats = {}
|
|
|
|
self.db_keyvalues = {}
|
2016-11-07 23:11:14 +01:00
|
|
|
self.delayed_queue = []
|
|
|
|
self.delayed_queue_thread = None
|
2015-08-06 00:51:25 +02:00
|
|
|
self.last_query_time = time.time()
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<Db:%s>" % self.db_path
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
def connect(self):
|
2015-08-06 00:51:25 +02:00
|
|
|
if self not in opened_dbs:
|
|
|
|
opened_dbs.append(self)
|
2016-03-12 23:11:25 +01:00
|
|
|
s = time.time()
|
2015-07-12 20:36:46 +02:00
|
|
|
if not os.path.isdir(self.db_dir): # Directory not exist yet
|
|
|
|
os.makedirs(self.db_dir)
|
|
|
|
self.log.debug("Created Db path: %s" % self.db_dir)
|
|
|
|
if not os.path.isfile(self.db_path):
|
|
|
|
self.log.debug("Db file not exist yet: %s" % self.db_path)
|
2017-01-21 23:00:37 +01:00
|
|
|
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
|
2015-07-12 20:36:46 +02:00
|
|
|
self.conn.row_factory = sqlite3.Row
|
|
|
|
self.conn.isolation_level = None
|
|
|
|
self.cur = self.getCursor()
|
2017-02-15 12:43:05 +01:00
|
|
|
if config.db_mode == "security":
|
|
|
|
self.cur.execute("PRAGMA journal_mode = WAL")
|
|
|
|
self.cur.execute("PRAGMA synchronous = NORMAL")
|
|
|
|
else:
|
|
|
|
self.cur.execute("PRAGMA journal_mode = MEMORY")
|
|
|
|
self.cur.execute("PRAGMA synchronous = OFF")
|
Version 0.3.5, Rev830, Full Tor mode support with hidden services, Onion stats in Sidebar, GeoDB download fix using Tor, Gray out disabled sites in Stats page, Tor hidden service status in stat page, Benchmark sha256, Skyts tracker out expodie in, 2 new tracker using ZeroNet protocol, Keep SSL cert option between restarts, SSL Certificate pinning support for connections, Site lock support for connections, Certificate pinned connections using implicit SSL, Flood protection whitelist support, Foreign keys support for DB layer, Not support for SQL query helper, 0 length file get bugfix, Pex onion address support, Faster port testing, Faster uPnP port opening, Need connections more often on owned sites, Delay ZeroHello startup message if port check or Tor manager not ready yet, Use lockfiles to avoid double start, Save original socket on proxy monkey patching to get ability to connect localhost directly, Handle atomic write errors, Broken gevent https workaround helper, Rsa crypt functions, Plugin to Bootstrap using ZeroNet protocol
2016-01-05 00:20:52 +01:00
|
|
|
if self.foreign_keys:
|
|
|
|
self.execute("PRAGMA foreign_keys = ON")
|
2016-11-07 23:12:48 +01:00
|
|
|
self.log.debug(
|
|
|
|
"Connected to %s in %.3fs (opened: %s, sqlite version: %s)..." %
|
|
|
|
(self.db_path, time.time() - s, len(opened_dbs), sqlite3.version)
|
|
|
|
)
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# Execute query using dbcursor
|
|
|
|
def execute(self, query, params=None):
|
2015-08-06 00:51:25 +02:00
|
|
|
self.last_query_time = time.time()
|
2015-07-12 20:36:46 +02:00
|
|
|
if not self.conn:
|
|
|
|
self.connect()
|
|
|
|
return self.cur.execute(query, params)
|
|
|
|
|
2016-11-07 23:11:14 +01:00
|
|
|
def insertOrUpdate(self, *args, **kwargs):
|
|
|
|
self.last_query_time = time.time()
|
|
|
|
if not self.conn:
|
|
|
|
self.connect()
|
|
|
|
return self.cur.insertOrUpdate(*args, **kwargs)
|
|
|
|
|
|
|
|
def executeDelayed(self, *args, **kwargs):
|
|
|
|
if not self.delayed_queue_thread:
|
|
|
|
self.delayed_queue_thread = gevent.spawn_later(10, self.processDelayed)
|
|
|
|
self.delayed_queue.append(("execute", (args, kwargs)))
|
|
|
|
|
|
|
|
def insertOrUpdateDelayed(self, *args, **kwargs):
|
|
|
|
if not self.delayed_queue:
|
|
|
|
gevent.spawn_later(1, self.processDelayed)
|
|
|
|
self.delayed_queue.append(("insertOrUpdate", (args, kwargs)))
|
|
|
|
|
|
|
|
def processDelayed(self):
|
|
|
|
if not self.delayed_queue:
|
|
|
|
self.log.debug("processDelayed aborted")
|
|
|
|
return
|
|
|
|
self.last_query_time = time.time()
|
|
|
|
if not self.conn:
|
|
|
|
self.connect()
|
|
|
|
|
|
|
|
s = time.time()
|
|
|
|
cur = self.getCursor()
|
|
|
|
cur.execute("BEGIN")
|
|
|
|
for command, params in self.delayed_queue:
|
|
|
|
if command == "insertOrUpdate":
|
|
|
|
cur.insertOrUpdate(*params[0], **params[1])
|
|
|
|
else:
|
|
|
|
cur.execute(*params[0], **params[1])
|
|
|
|
|
|
|
|
cur.execute("END")
|
|
|
|
if len(self.delayed_queue) > 10:
|
|
|
|
self.log.debug("Processed %s delayed queue in %.3fs" % (len(self.delayed_queue), time.time() - s))
|
|
|
|
self.delayed_queue = []
|
|
|
|
self.delayed_queue_thread = None
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
def close(self):
|
2016-05-16 22:28:59 +02:00
|
|
|
s = time.time()
|
2016-11-07 23:11:14 +01:00
|
|
|
if self.delayed_queue:
|
|
|
|
self.processDelayed()
|
2015-08-06 00:51:25 +02:00
|
|
|
if self in opened_dbs:
|
|
|
|
opened_dbs.remove(self)
|
2015-07-12 20:36:46 +02:00
|
|
|
if self.cur:
|
|
|
|
self.cur.close()
|
|
|
|
if self.conn:
|
|
|
|
self.conn.close()
|
2015-08-06 00:51:25 +02:00
|
|
|
self.conn = None
|
|
|
|
self.cur = None
|
2016-11-16 11:18:15 +01:00
|
|
|
self.log.debug("%s closed in %.3fs, opened: %s" % (self.db_path, time.time() - s, len(opened_dbs)))
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# Gets a cursor object to database
|
|
|
|
# Return: Cursor class
|
|
|
|
def getCursor(self):
|
|
|
|
if not self.conn:
|
|
|
|
self.connect()
|
|
|
|
return DbCursor(self.conn, self)
|
|
|
|
|
|
|
|
# Get the table version
|
|
|
|
# Return: Table version or None if not exist
|
|
|
|
def getTableVersion(self, table_name):
|
|
|
|
if not self.db_keyvalues: # Get db keyvalues
|
|
|
|
try:
|
2016-11-07 23:13:05 +01:00
|
|
|
res = self.execute("SELECT * FROM keyvalue WHERE json_id=0") # json_id = 0 is internal keyvalues
|
2015-07-12 20:36:46 +02:00
|
|
|
except sqlite3.OperationalError, err: # Table not exist
|
|
|
|
self.log.debug("Query error: %s" % err)
|
|
|
|
return False
|
|
|
|
|
|
|
|
for row in res:
|
|
|
|
self.db_keyvalues[row["key"]] = row["value"]
|
|
|
|
|
|
|
|
return self.db_keyvalues.get("table.%s.version" % table_name, 0)
|
|
|
|
|
|
|
|
# Check Db tables
|
|
|
|
# Return: <list> Changed table names
|
|
|
|
def checkTables(self):
|
|
|
|
s = time.time()
|
|
|
|
changed_tables = []
|
|
|
|
cur = self.getCursor()
|
|
|
|
|
|
|
|
cur.execute("BEGIN")
|
|
|
|
|
|
|
|
# Check internal tables
|
|
|
|
# Check keyvalue table
|
|
|
|
changed = cur.needTable("keyvalue", [
|
|
|
|
["keyvalue_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
|
|
|
|
["key", "TEXT"],
|
|
|
|
["value", "INTEGER"],
|
2016-11-07 23:14:09 +01:00
|
|
|
["json_id", "INTEGER"],
|
2015-07-12 20:36:46 +02:00
|
|
|
], [
|
|
|
|
"CREATE UNIQUE INDEX key_id ON keyvalue(json_id, key)"
|
|
|
|
], version=self.schema["version"])
|
|
|
|
if changed:
|
|
|
|
changed_tables.append("keyvalue")
|
|
|
|
|
|
|
|
# Check json table
|
|
|
|
if self.schema["version"] == 1:
|
|
|
|
changed = cur.needTable("json", [
|
|
|
|
["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
|
|
|
|
["path", "VARCHAR(255)"]
|
|
|
|
], [
|
|
|
|
"CREATE UNIQUE INDEX path ON json(path)"
|
|
|
|
], version=self.schema["version"])
|
2016-08-10 12:47:29 +02:00
|
|
|
elif self.schema["version"] == 2:
|
2015-07-12 20:36:46 +02:00
|
|
|
changed = cur.needTable("json", [
|
|
|
|
["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
|
|
|
|
["directory", "VARCHAR(255)"],
|
|
|
|
["file_name", "VARCHAR(255)"]
|
|
|
|
], [
|
|
|
|
"CREATE UNIQUE INDEX path ON json(directory, file_name)"
|
|
|
|
], version=self.schema["version"])
|
2016-08-10 12:47:29 +02:00
|
|
|
elif self.schema["version"] == 3:
|
|
|
|
changed = cur.needTable("json", [
|
|
|
|
["json_id", "INTEGER PRIMARY KEY AUTOINCREMENT"],
|
|
|
|
["site", "VARCHAR(255)"],
|
|
|
|
["directory", "VARCHAR(255)"],
|
|
|
|
["file_name", "VARCHAR(255)"]
|
|
|
|
], [
|
|
|
|
"CREATE UNIQUE INDEX path ON json(directory, site, file_name)"
|
|
|
|
], version=self.schema["version"])
|
2015-07-12 20:36:46 +02:00
|
|
|
if changed:
|
|
|
|
changed_tables.append("json")
|
|
|
|
|
|
|
|
# Check schema tables
|
|
|
|
for table_name, table_settings in self.schema["tables"].items():
|
|
|
|
changed = cur.needTable(
|
|
|
|
table_name, table_settings["cols"],
|
|
|
|
table_settings["indexes"], version=table_settings["schema_changed"]
|
|
|
|
)
|
|
|
|
if changed:
|
|
|
|
changed_tables.append(table_name)
|
|
|
|
|
|
|
|
cur.execute("COMMIT")
|
|
|
|
self.log.debug("Db check done in %.3fs, changed tables: %s" % (time.time() - s, changed_tables))
|
2015-11-12 02:04:45 +01:00
|
|
|
if changed_tables:
|
|
|
|
self.db_keyvalues = {} # Refresh table version cache
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
return changed_tables
|
|
|
|
|
2017-02-09 01:53:31 +01:00
|
|
|
# Update json file to db
|
2015-07-12 20:36:46 +02:00
|
|
|
# Return: True if matched
|
2017-02-09 01:53:31 +01:00
|
|
|
def updateJson(self, file_path, file=None, cur=None):
|
2015-07-12 20:36:46 +02:00
|
|
|
if not file_path.startswith(self.db_dir):
|
|
|
|
return False # Not from the db dir: Skipping
|
2017-06-15 13:33:51 +02:00
|
|
|
relative_path = file_path[len(self.db_dir):] # File path realative to db file
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
# Check if filename matches any of mappings in schema
|
|
|
|
matched_maps = []
|
|
|
|
for match, map_settings in self.schema["maps"].items():
|
2017-07-14 10:34:57 +02:00
|
|
|
try:
|
|
|
|
if SafeRe.match(match, relative_path):
|
|
|
|
matched_maps.append(map_settings)
|
|
|
|
except SafeRe.UnsafePatternError as err:
|
|
|
|
self.log.error(err)
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# No match found for the file
|
|
|
|
if not matched_maps:
|
|
|
|
return False
|
|
|
|
|
|
|
|
# Load the json file
|
2016-08-10 12:49:22 +02:00
|
|
|
try:
|
|
|
|
if file is None: # Open file is not file object passed
|
2017-08-09 14:19:39 +02:00
|
|
|
file = open(file_path, "rb")
|
2016-08-10 12:49:22 +02:00
|
|
|
|
|
|
|
if file is False: # File deleted
|
|
|
|
data = {}
|
|
|
|
else:
|
2017-08-09 14:19:39 +02:00
|
|
|
if file_path.endswith("json.gz"):
|
|
|
|
data = json.load(helper.limitedGzipFile(fileobj=file))
|
|
|
|
else:
|
|
|
|
data = json.load(file)
|
2016-08-10 12:49:22 +02:00
|
|
|
except Exception, err:
|
|
|
|
self.log.debug("Json file %s load error: %s" % (file_path, err))
|
|
|
|
data = {}
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# No cursor specificed
|
|
|
|
if not cur:
|
|
|
|
cur = self.getCursor()
|
|
|
|
cur.execute("BEGIN")
|
|
|
|
cur.logging = False
|
|
|
|
commit_after_done = True
|
|
|
|
else:
|
|
|
|
commit_after_done = False
|
|
|
|
|
2016-08-10 12:49:38 +02:00
|
|
|
# Row for current json file if required
|
2017-02-13 16:13:09 +01:00
|
|
|
if not data or filter(lambda dbmap: "to_keyvalue" in dbmap or "to_table" in dbmap, matched_maps):
|
2016-08-10 12:49:38 +02:00
|
|
|
json_row = cur.getJsonRow(relative_path)
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# Check matched mappings in schema
|
2016-11-07 23:14:36 +01:00
|
|
|
for dbmap in matched_maps:
|
2015-07-12 20:36:46 +02:00
|
|
|
# Insert non-relational key values
|
2016-11-07 23:14:36 +01:00
|
|
|
if dbmap.get("to_keyvalue"):
|
2015-07-12 20:36:46 +02:00
|
|
|
# Get current values
|
|
|
|
res = cur.execute("SELECT * FROM keyvalue WHERE json_id = ?", (json_row["json_id"],))
|
|
|
|
current_keyvalue = {}
|
|
|
|
current_keyvalue_id = {}
|
|
|
|
for row in res:
|
|
|
|
current_keyvalue[row["key"]] = row["value"]
|
|
|
|
current_keyvalue_id[row["key"]] = row["keyvalue_id"]
|
|
|
|
|
2016-11-07 23:14:36 +01:00
|
|
|
for key in dbmap["to_keyvalue"]:
|
2015-07-12 20:36:46 +02:00
|
|
|
if key not in current_keyvalue: # Keyvalue not exist yet in the db
|
|
|
|
cur.execute(
|
|
|
|
"INSERT INTO keyvalue ?",
|
|
|
|
{"key": key, "value": data.get(key), "json_id": json_row["json_id"]}
|
|
|
|
)
|
|
|
|
elif data.get(key) != current_keyvalue[key]: # Keyvalue different value
|
|
|
|
cur.execute(
|
|
|
|
"UPDATE keyvalue SET value = ? WHERE keyvalue_id = ?",
|
|
|
|
(data.get(key), current_keyvalue_id[key])
|
|
|
|
)
|
|
|
|
|
2016-08-10 12:49:38 +02:00
|
|
|
# Insert data to json table for easier joins
|
2016-11-07 23:14:36 +01:00
|
|
|
if dbmap.get("to_json_table"):
|
2016-08-10 12:49:38 +02:00
|
|
|
directory, file_name = re.match("^(.*?)/*([^/]*)$", relative_path).groups()
|
2016-11-07 23:14:36 +01:00
|
|
|
data_json_row = dict(cur.getJsonRow(directory + "/" + dbmap.get("file_name", file_name)))
|
2016-08-10 12:49:38 +02:00
|
|
|
changed = False
|
2016-11-07 23:14:36 +01:00
|
|
|
for key in dbmap["to_json_table"]:
|
2016-08-10 12:49:38 +02:00
|
|
|
if data.get(key) != data_json_row.get(key):
|
|
|
|
changed = True
|
|
|
|
if changed:
|
|
|
|
# Add the custom col values
|
2016-11-07 23:14:36 +01:00
|
|
|
data_json_row.update({key: val for key, val in data.iteritems() if key in dbmap["to_json_table"]})
|
2016-08-10 12:49:38 +02:00
|
|
|
cur.execute("INSERT OR REPLACE INTO json ?", data_json_row)
|
2015-07-12 20:36:46 +02:00
|
|
|
|
|
|
|
# Insert data to tables
|
2016-11-07 23:14:36 +01:00
|
|
|
for table_settings in dbmap.get("to_table", []):
|
2015-07-12 20:36:46 +02:00
|
|
|
if isinstance(table_settings, dict): # Custom settings
|
|
|
|
table_name = table_settings["table"] # Table name to insert datas
|
|
|
|
node = table_settings.get("node", table_name) # Node keyname in data json file
|
|
|
|
key_col = table_settings.get("key_col") # Map dict key as this col
|
|
|
|
val_col = table_settings.get("val_col") # Map dict value as this col
|
|
|
|
import_cols = table_settings.get("import_cols")
|
|
|
|
replaces = table_settings.get("replaces")
|
|
|
|
else: # Simple settings
|
|
|
|
table_name = table_settings
|
|
|
|
node = table_settings
|
|
|
|
key_col = None
|
|
|
|
val_col = None
|
|
|
|
import_cols = None
|
|
|
|
replaces = None
|
|
|
|
|
2016-11-07 23:15:09 +01:00
|
|
|
# Fill import cols from table cols
|
|
|
|
if not import_cols:
|
|
|
|
import_cols = set(map(lambda item: item[0], self.schema["tables"][table_name]["cols"]))
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
cur.execute("DELETE FROM %s WHERE json_id = ?" % table_name, (json_row["json_id"],))
|
|
|
|
|
|
|
|
if node not in data:
|
|
|
|
continue
|
|
|
|
|
|
|
|
if key_col: # Map as dict
|
|
|
|
for key, val in data[node].iteritems():
|
|
|
|
if val_col: # Single value
|
|
|
|
cur.execute(
|
|
|
|
"INSERT OR REPLACE INTO %s ?" % table_name,
|
|
|
|
{key_col: key, val_col: val, "json_id": json_row["json_id"]}
|
|
|
|
)
|
|
|
|
else: # Multi value
|
|
|
|
if isinstance(val, dict): # Single row
|
|
|
|
row = val
|
|
|
|
if import_cols:
|
2016-11-07 23:15:09 +01:00
|
|
|
row = {key: row[key] for key in row if key in import_cols} # Filter row by import_cols
|
2015-07-12 20:36:46 +02:00
|
|
|
row[key_col] = key
|
|
|
|
# Replace in value if necessary
|
|
|
|
if replaces:
|
|
|
|
for replace_key, replace in replaces.iteritems():
|
|
|
|
if replace_key in row:
|
|
|
|
for replace_from, replace_to in replace.iteritems():
|
|
|
|
row[replace_key] = row[replace_key].replace(replace_from, replace_to)
|
|
|
|
|
|
|
|
row["json_id"] = json_row["json_id"]
|
|
|
|
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
|
|
|
|
else: # Multi row
|
|
|
|
for row in val:
|
|
|
|
row[key_col] = key
|
|
|
|
row["json_id"] = json_row["json_id"]
|
|
|
|
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
|
|
|
|
else: # Map as list
|
|
|
|
for row in data[node]:
|
|
|
|
row["json_id"] = json_row["json_id"]
|
2016-11-07 23:15:09 +01:00
|
|
|
if import_cols:
|
|
|
|
row = {key: row[key] for key in row if key in import_cols} # Filter row by import_cols
|
2015-07-12 20:36:46 +02:00
|
|
|
cur.execute("INSERT OR REPLACE INTO %s ?" % table_name, row)
|
|
|
|
|
2016-08-10 12:49:22 +02:00
|
|
|
# Cleanup json row
|
|
|
|
if not data:
|
|
|
|
self.log.debug("Cleanup json row for %s" % file_path)
|
|
|
|
cur.execute("DELETE FROM json WHERE json_id = %s" % json_row["json_id"])
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
if commit_after_done:
|
|
|
|
cur.execute("COMMIT")
|
|
|
|
return True
|
2015-03-19 21:19:14 +01:00
|
|
|
|
|
|
|
|
2015-07-12 20:36:46 +02:00
|
|
|
if __name__ == "__main__":
|
|
|
|
s = time.time()
|
|
|
|
console_log = logging.StreamHandler()
|
|
|
|
logging.getLogger('').setLevel(logging.DEBUG)
|
|
|
|
logging.getLogger('').addHandler(console_log)
|
|
|
|
console_log.setLevel(logging.DEBUG)
|
|
|
|
dbjson = Db(json.load(open("zerotalk.schema.json")), "data/users/zerotalk.db")
|
|
|
|
dbjson.collect_stats = True
|
|
|
|
dbjson.checkTables()
|
|
|
|
cur = dbjson.getCursor()
|
|
|
|
cur.execute("BEGIN")
|
|
|
|
cur.logging = False
|
2017-02-09 01:53:31 +01:00
|
|
|
dbjson.updateJson("data/users/content.json", cur=cur)
|
2015-07-12 20:36:46 +02:00
|
|
|
for user_dir in os.listdir("data/users"):
|
|
|
|
if os.path.isdir("data/users/%s" % user_dir):
|
2017-02-09 01:53:31 +01:00
|
|
|
dbjson.updateJson("data/users/%s/data.json" % user_dir, cur=cur)
|
2015-07-12 20:36:46 +02:00
|
|
|
# print ".",
|
|
|
|
cur.logging = True
|
|
|
|
cur.execute("COMMIT")
|
|
|
|
print "Done in %.3fs" % (time.time() - s)
|
|
|
|
for query, stats in sorted(dbjson.query_stats.items()):
|
2016-05-16 22:28:59 +02:00
|
|
|
print "-", query, stats
|