import os import re import shutil import json import time import errno from collections import defaultdict import sqlite3 import gevent.event import util from util import SafeRe from Db.Db import Db from Debug import Debug from Config import config from util import helper from util import ThreadPool from Plugin import PluginManager from Translate import translate as _ thread_pool_fs_read = ThreadPool.ThreadPool(config.threads_fs_read, name="FS read") thread_pool_fs_write = ThreadPool.ThreadPool(config.threads_fs_write, name="FS write") thread_pool_fs_batch = ThreadPool.ThreadPool(1, name="FS batch") @PluginManager.acceptPlugins class SiteStorage(object): def __init__(self, site, allow_create=True): self.site = site self.directory = "%s/%s" % (config.data_dir, self.site.address) # Site data diretory self.allowed_dir = os.path.abspath(self.directory) # Only serve file within this dir self.log = site.log self.db = None # Db class self.db_checked = False # Checked db tables since startup self.event_db_busy = None # Gevent AsyncResult if db is working on rebuild self.has_db = self.isFile("dbschema.json") # The site has schema if not os.path.isdir(self.directory): if allow_create: os.mkdir(self.directory) # Create directory if not found else: raise Exception("Directory not exists: %s" % self.directory) def getDbFile(self): if self.db: return self.db.schema["db_file"] else: if self.isFile("dbschema.json"): schema = self.loadJson("dbschema.json") return schema["db_file"] else: return False # Create new databaseobject with the site's schema def openDb(self, close_idle=False): schema = self.getDbSchema() db_path = self.getPath(schema["db_file"]) return Db(schema, db_path, close_idle=close_idle) def closeDb(self, reason="Unknown (SiteStorage)"): if self.db: self.db.close(reason) self.event_db_busy = None self.db = None def getDbSchema(self): try: self.site.needFile("dbschema.json") schema = self.loadJson("dbschema.json") except Exception as err: raise Exception("dbschema.json is not a valid JSON: %s" % err) return schema def loadDb(self): self.log.debug("No database, waiting for dbschema.json...") self.site.needFile("dbschema.json", priority=3) self.log.debug("Got dbschema.json") self.has_db = self.isFile("dbschema.json") # Recheck if dbschema exist if self.has_db: schema = self.getDbSchema() db_path = self.getPath(schema["db_file"]) if not os.path.isfile(db_path) or os.path.getsize(db_path) == 0: try: self.rebuildDb(reason="Missing database") except Exception as err: self.log.error(err) pass if self.db: self.db.close("Gettig new db for SiteStorage") self.db = self.openDb(close_idle=True) try: changed_tables = self.db.checkTables() if changed_tables: self.rebuildDb(delete_db=False, reason="Changed tables") # TODO: only update the changed table datas except sqlite3.OperationalError: pass # Return db class @util.Noparallel() def getDb(self): if self.event_db_busy: # Db not ready for queries self.log.debug("Wating for db...") self.event_db_busy.get() # Wait for event if not self.db: self.loadDb() return self.db def updateDbFile(self, inner_path, file=None, cur=None): path = self.getPath(inner_path) if cur: db = cur.db else: db = self.getDb() return db.updateJson(path, file, cur) # Return possible db files for the site @thread_pool_fs_read.wrap def getDbFiles(self): found = 0 for content_inner_path, content in self.site.content_manager.contents.items(): # content.json file itself if self.isFile(content_inner_path): yield content_inner_path, self.getPath(content_inner_path) else: self.log.debug("[MISSING] %s" % content_inner_path) # Data files in content.json content_inner_path_dir = helper.getDirname(content_inner_path) # Content.json dir relative to site for file_relative_path in list(content.get("files", {}).keys()) + list(content.get("files_optional", {}).keys()): if not file_relative_path.endswith(".json") and not file_relative_path.endswith("json.gz"): continue # We only interesed in json files file_inner_path = content_inner_path_dir + file_relative_path # File Relative to site dir file_inner_path = file_inner_path.strip("/") # Strip leading / if self.isFile(file_inner_path): yield file_inner_path, self.getPath(file_inner_path) else: self.log.debug("[MISSING] %s" % file_inner_path) found += 1 if found % 100 == 0: time.sleep(0.001) # Context switch to avoid UI block # Rebuild sql cache @util.Noparallel() @thread_pool_fs_batch.wrap def rebuildDb(self, delete_db=True, reason="Unknown"): self.log.info("Rebuilding db (reason: %s)..." % reason) self.has_db = self.isFile("dbschema.json") if not self.has_db: return False schema = self.loadJson("dbschema.json") db_path = self.getPath(schema["db_file"]) if os.path.isfile(db_path) and delete_db: if self.db: self.closeDb("rebuilding") # Close db if open time.sleep(0.5) self.log.info("Deleting %s" % db_path) try: os.unlink(db_path) except Exception as err: self.log.error("Delete error: %s" % err) if not self.db: self.db = self.openDb() self.event_db_busy = gevent.event.AsyncResult() self.log.info("Rebuild: Creating tables...") # raise DbTableError if not valid self.db.checkTables() cur = self.db.getCursor() cur.logging = False s = time.time() self.log.info("Rebuild: Getting db files...") db_files = list(self.getDbFiles()) num_imported = 0 num_total = len(db_files) num_error = 0 self.log.info("Rebuild: Importing data...") try: if num_total > 100: self.site.messageWebsocket( _["Database rebuilding...
Imported {0} of {1} files (error: {2})..."].format( "0000", num_total, num_error ), "rebuild", 0 ) for file_inner_path, file_path in db_files: try: if self.updateDbFile(file_inner_path, file=open(file_path, "rb"), cur=cur): num_imported += 1 except Exception as err: self.log.error("Error importing %s: %s" % (file_inner_path, Debug.formatException(err))) num_error += 1 if num_imported and num_imported % 100 == 0: self.site.messageWebsocket( _["Database rebuilding...
Imported {0} of {1} files (error: {2})..."].format( num_imported, num_total, num_error ), "rebuild", int(float(num_imported) / num_total * 100) ) time.sleep(0.001) # Context switch to avoid UI block finally: cur.close() if num_total > 100: self.site.messageWebsocket( _["Database rebuilding...
Imported {0} of {1} files (error: {2})..."].format( num_imported, num_total, num_error ), "rebuild", 100 ) self.log.info("Rebuild: Imported %s data file in %.3fs" % (num_imported, time.time() - s)) self.event_db_busy.set(True) # Event done, notify waiters self.event_db_busy = None # Clear event self.db.commit("Rebuilt") return True # Execute sql query or rebuild on dberror def query(self, query, params=None): if not query.strip().upper().startswith("SELECT"): raise Exception("Only SELECT query supported") try: res = self.getDb().execute(query, params) except sqlite3.DatabaseError as err: if err.__class__.__name__ == "DatabaseError": self.log.error("Database error: %s, query: %s, try to rebuilding it..." % (err, query)) try: self.rebuildDb(reason="Query error") except sqlite3.OperationalError: pass res = self.db.cur.execute(query, params) else: raise err return res def ensureDir(self, inner_path): try: os.makedirs(self.getPath(inner_path)) except OSError as err: if err.errno == errno.EEXIST: return False else: raise err return True # Open file object def open(self, inner_path, mode="rb", create_dirs=False, **kwargs): file_path = self.getPath(inner_path) if create_dirs: file_inner_dir = os.path.dirname(inner_path) self.ensureDir(file_inner_dir) return open(file_path, mode, **kwargs) # Open file object @thread_pool_fs_read.wrap def read(self, inner_path, mode="rb"): return self.open(inner_path, mode).read() @thread_pool_fs_write.wrap def writeThread(self, inner_path, content): file_path = self.getPath(inner_path) # Create dir if not exist self.ensureDir(os.path.dirname(inner_path)) # Write file if hasattr(content, 'read'): # File-like object with open(file_path, "wb") as file: shutil.copyfileobj(content, file) # Write buff to disk else: # Simple string if inner_path == "content.json" and os.path.isfile(file_path): helper.atomicWrite(file_path, content) else: with open(file_path, "wb") as file: file.write(content) # Write content to file def write(self, inner_path, content): self.writeThread(inner_path, content) self.onUpdated(inner_path) # Remove file from filesystem def delete(self, inner_path): file_path = self.getPath(inner_path) os.unlink(file_path) self.onUpdated(inner_path, file=False) def deleteDir(self, inner_path): dir_path = self.getPath(inner_path) os.rmdir(dir_path) def rename(self, inner_path_before, inner_path_after): for retry in range(3): rename_err = None # To workaround "The process cannot access the file beacause it is being used by another process." error try: os.rename(self.getPath(inner_path_before), self.getPath(inner_path_after)) break except Exception as err: rename_err = err self.log.error("%s rename error: %s (retry #%s)" % (inner_path_before, err, retry)) time.sleep(0.1 + retry) if rename_err: raise rename_err # List files from a directory @thread_pool_fs_read.wrap def walk(self, dir_inner_path, ignore=None): directory = self.getPath(dir_inner_path) for root, dirs, files in os.walk(directory): root = root.replace("\\", "/") root_relative_path = re.sub("^%s" % re.escape(directory), "", root).lstrip("/") for file_name in files: if root_relative_path: # Not root dir file_relative_path = root_relative_path + "/" + file_name else: file_relative_path = file_name if ignore and SafeRe.match(ignore, file_relative_path): continue yield file_relative_path # Don't scan directory that is in the ignore pattern if ignore: dirs_filtered = [] for dir_name in dirs: if root_relative_path: dir_relative_path = root_relative_path + "/" + dir_name else: dir_relative_path = dir_name if ignore == ".*" or re.match(".*([|(]|^)%s([|)]|$)" % re.escape(dir_relative_path + "/.*"), ignore): continue dirs_filtered.append(dir_name) dirs[:] = dirs_filtered # list directories in a directory @thread_pool_fs_read.wrap def list(self, dir_inner_path): directory = self.getPath(dir_inner_path) return os.listdir(directory) # Site content updated def onUpdated(self, inner_path, file=None): # Update Sql cache should_load_to_db = inner_path.endswith(".json") or inner_path.endswith(".json.gz") if inner_path == "dbschema.json": self.has_db = self.isFile("dbschema.json") # Reopen DB to check changes if self.has_db: self.closeDb("New dbschema") gevent.spawn(self.getDb) elif not config.disable_db and should_load_to_db and self.has_db: # Load json file to db if config.verbose: self.log.debug("Loading json file to db: %s (file: %s)" % (inner_path, file)) try: self.updateDbFile(inner_path, file) except Exception as err: self.log.error("Json %s load error: %s" % (inner_path, Debug.formatException(err))) self.closeDb("Json load error") # Load and parse json file @thread_pool_fs_read.wrap def loadJson(self, inner_path): try: with self.open(inner_path, "r", encoding="utf8") as file: return json.load(file) except Exception as err: self.log.warning("Json load error: %s" % Debug.formatException(err)) return None # Write formatted json file def writeJson(self, inner_path, data): # Write to disk self.write(inner_path, helper.jsonDumps(data).encode("utf8")) # Get file size def getSize(self, inner_path): path = self.getPath(inner_path) try: return os.path.getsize(path) except Exception: return 0 # File exist def isFile(self, inner_path): return os.path.isfile(self.getPath(inner_path)) # File or directory exist def isExists(self, inner_path): return os.path.exists(self.getPath(inner_path)) # Dir exist def isDir(self, inner_path): return os.path.isdir(self.getPath(inner_path)) # Security check and return path of site's file def getPath(self, inner_path): inner_path = inner_path.replace("\\", "/") # Windows separator fix if not inner_path: return self.directory if "../" in inner_path: raise Exception("File not allowed: %s" % inner_path) return "%s/%s" % (self.directory, inner_path) # Get site dir relative path def getInnerPath(self, path): if path == self.directory: inner_path = "" else: if path.startswith(self.directory): inner_path = path[len(self.directory) + 1:] else: raise Exception("File not allowed: %s" % path) return inner_path # Verify all files sha512sum using content.json def verifyFiles(self, quick_check=False, add_optional=False, add_changed=True): bad_files = [] back = defaultdict(int) back["bad_files"] = bad_files i = 0 self.log.debug("Verifing files...") if not self.site.content_manager.contents.get("content.json"): # No content.json, download it first self.log.debug("VerifyFile content.json not exists") self.site.needFile("content.json", update=True) # Force update to fix corrupt file self.site.content_manager.loadContent() # Reload content.json for content_inner_path, content in list(self.site.content_manager.contents.items()): back["num_content"] += 1 i += 1 if i % 50 == 0: time.sleep(0.001) # Context switch to avoid gevent hangs if not os.path.isfile(self.getPath(content_inner_path)): # Missing content.json file back["num_content_missing"] += 1 self.log.debug("[MISSING] %s" % content_inner_path) bad_files.append(content_inner_path) for file_relative_path in list(content.get("files", {}).keys()): back["num_file"] += 1 file_inner_path = helper.getDirname(content_inner_path) + file_relative_path # Relative to site dir file_inner_path = file_inner_path.strip("/") # Strip leading / file_path = self.getPath(file_inner_path) if not os.path.isfile(file_path): back["num_file_missing"] += 1 self.log.debug("[MISSING] %s" % file_inner_path) bad_files.append(file_inner_path) continue if quick_check: ok = os.path.getsize(file_path) == content["files"][file_relative_path]["size"] if not ok: err = "Invalid size" else: try: ok = self.site.content_manager.verifyFile(file_inner_path, open(file_path, "rb")) except Exception as err: ok = False if not ok: back["num_file_invalid"] += 1 self.log.debug("[INVALID] %s: %s" % (file_inner_path, err)) if add_changed or content.get("cert_user_id"): # If updating own site only add changed user files bad_files.append(file_inner_path) # Optional files optional_added = 0 optional_removed = 0 for file_relative_path in list(content.get("files_optional", {}).keys()): back["num_optional"] += 1 file_node = content["files_optional"][file_relative_path] file_inner_path = helper.getDirname(content_inner_path) + file_relative_path # Relative to site dir file_inner_path = file_inner_path.strip("/") # Strip leading / file_path = self.getPath(file_inner_path) hash_id = self.site.content_manager.hashfield.getHashId(file_node["sha512"]) if not os.path.isfile(file_path): if self.site.content_manager.isDownloaded(file_inner_path, hash_id): back["num_optional_removed"] += 1 self.log.debug("[OPTIONAL MISSING] %s" % file_inner_path) self.site.content_manager.optionalRemoved(file_inner_path, hash_id, file_node["size"]) if add_optional and self.site.isDownloadable(file_inner_path): self.log.debug("[OPTIONAL ADDING] %s" % file_inner_path) bad_files.append(file_inner_path) continue if quick_check: ok = os.path.getsize(file_path) == content["files_optional"][file_relative_path]["size"] else: try: ok = self.site.content_manager.verifyFile(file_inner_path, open(file_path, "rb")) except Exception as err: ok = False if ok: if not self.site.content_manager.isDownloaded(file_inner_path, hash_id): back["num_optional_added"] += 1 self.site.content_manager.optionalDownloaded(file_inner_path, hash_id, file_node["size"]) optional_added += 1 self.log.debug("[OPTIONAL FOUND] %s" % file_inner_path) else: if self.site.content_manager.isDownloaded(file_inner_path, hash_id): back["num_optional_removed"] += 1 self.site.content_manager.optionalRemoved(file_inner_path, hash_id, file_node["size"]) optional_removed += 1 bad_files.append(file_inner_path) self.log.debug("[OPTIONAL CHANGED] %s" % file_inner_path) if config.verbose: self.log.debug( "%s verified: %s, quick: %s, optionals: +%s -%s" % (content_inner_path, len(content["files"]), quick_check, optional_added, optional_removed) ) self.site.content_manager.contents.db.processDelayed() time.sleep(0.001) # Context switch to avoid gevent hangs return back # Check and try to fix site files integrity def updateBadFiles(self, quick_check=True): s = time.time() res = self.verifyFiles( quick_check, add_optional=True, add_changed=not self.site.settings.get("own") # Don't overwrite changed files if site owned ) bad_files = res["bad_files"] self.site.bad_files = {} if bad_files: for bad_file in bad_files: self.site.bad_files[bad_file] = 1 self.log.debug("Checked files in %.2fs... Found bad files: %s, Quick:%s" % (time.time() - s, len(bad_files), quick_check)) # Delete site's all file @thread_pool_fs_batch.wrap def deleteFiles(self): site_title = self.site.content_manager.contents.get("content.json", {}).get("title", self.site.address) message_id = "delete-%s" % self.site.address self.log.debug("Deleting files from content.json (title: %s)..." % site_title) files = [] # Get filenames content_inner_paths = list(self.site.content_manager.contents.keys()) for i, content_inner_path in enumerate(content_inner_paths): content = self.site.content_manager.contents.get(content_inner_path, {}) files.append(content_inner_path) # Add normal files for file_relative_path in list(content.get("files", {}).keys()): file_inner_path = helper.getDirname(content_inner_path) + file_relative_path # Relative to site dir files.append(file_inner_path) # Add optional files for file_relative_path in list(content.get("files_optional", {}).keys()): file_inner_path = helper.getDirname(content_inner_path) + file_relative_path # Relative to site dir files.append(file_inner_path) if i % 100 == 0: num_files = len(files) self.site.messageWebsocket( _("Deleting site {site_title}...
Collected {num_files} files"), message_id, (i / len(content_inner_paths)) * 25 ) if self.isFile("dbschema.json"): self.log.debug("Deleting db file...") self.closeDb("Deleting site") self.has_db = False try: schema = self.loadJson("dbschema.json") db_path = self.getPath(schema["db_file"]) if os.path.isfile(db_path): os.unlink(db_path) except Exception as err: self.log.error("Db file delete error: %s" % err) num_files = len(files) for i, inner_path in enumerate(files): path = self.getPath(inner_path) if os.path.isfile(path): for retry in range(5): try: os.unlink(path) break except Exception as err: self.log.error("Error removing %s: %s, try #%s" % (inner_path, err, retry)) time.sleep(float(retry) / 10) if i % 100 == 0: self.site.messageWebsocket( _("Deleting site {site_title}...
Deleting file {i}/{num_files}"), message_id, 25 + (i / num_files) * 50 ) self.onUpdated(inner_path, False) self.log.debug("Deleting empty dirs...") i = 0 for root, dirs, files in os.walk(self.directory, topdown=False): for dir in dirs: path = os.path.join(root, dir) if os.path.isdir(path): try: i += 1 if i % 100 == 0: self.site.messageWebsocket( _("Deleting site {site_title}...
Deleting empty directories {i}"), message_id, 85 ) os.rmdir(path) except OSError: # Not empty pass if os.path.isdir(self.directory) and os.listdir(self.directory) == []: os.rmdir(self.directory) # Remove sites directory if empty if os.path.isdir(self.directory): self.log.debug("Some unknown file remained in site data dir: %s..." % self.directory) self.site.messageWebsocket( _("Deleting site {site_title}...
Site deleted, but some unknown files left in the directory"), message_id, 100 ) return False # Some files not deleted else: self.log.debug("Site %s data directory deleted: %s..." % (site_title, self.directory)) self.site.messageWebsocket( _("Deleting site {site_title}...
All files deleted successfully"), message_id, 100 ) return True # All clean