no-google/convert.py

195 lines
6.5 KiB
Python

import json
import sys
from collections import OrderedDict, defaultdict
from datetime import date
from pathlib import Path
from typing import Dict, List
class DomainBlocklistConverter:
INPUT_FILE = "pihole-google.txt"
PIHOLE_FILE = "google-domains"
UNBOUND_FILE = "pihole-google-unbound.conf"
ADGUARD_FILE = "pihole-google-adguard.txt"
ADGUARD_IMPORTANT_FILE = "pihole-google-adguard-important.txt"
CATEGORIES_PATH = "categories"
BLOCKLIST_ABOUT = "This blocklist helps to restrict access to Google and its domains. Contribute at https://github.com/nickspaargaren/no-google"
def __init__(self):
self.data: Dict[List] = OrderedDict()
self.timestamp: str = date.today().strftime("%Y-%m-%d")
def read(self):
"""
Read input file into `self.data`, a dictionary mapping category names to lists of member items.
"""
with open(self.INPUT_FILE, "r") as f:
category = None
for line in f:
line = line.strip()
if line.startswith("#"):
category = line.lstrip("# ")
self.data.setdefault(category, [])
else:
if category is None:
raise ValueError("Unable to store item without category")
self.data[category].append(line)
def dump(self):
"""
Output data in JSON format on STDOUT.
"""
print(json.dumps(self.data, indent=4))
def pihole(self):
"""
Produce blocklist for the Pi-hole.
"""
with open(self.PIHOLE_FILE, "w") as f:
f.write(f"# {self.BLOCKLIST_ABOUT}\n")
f.write(f"# Last updated: {self.timestamp}\n")
for category, entries in self.data.items():
f.write(f"# {category}\n")
for entry in entries:
if entry != "":
f.write(f"0.0.0.0 {entry}\n")
def unbound(self):
"""
Produce blocklist for the Unbound DNS server.
https://github.com/nickspaargaren/no-google/issues/67
"""
with open(self.UNBOUND_FILE, "w") as f:
f.write(f"# {self.BLOCKLIST_ABOUT}\n")
f.write(f"# Last updated: {self.timestamp}\n")
for category, entries in self.data.items():
f.write(f"\n# Category: {category}\n")
for entry in entries:
if entry != "":
f.write(f'local-zone: "{entry}" always_refuse\n')
def adguard(self):
"""
Produce blocklist for AdGuard.
"""
with open(self.ADGUARD_FILE, "w") as f:
f.write(f"! {self.BLOCKLIST_ABOUT}\n")
f.write(f"! Last updated: {self.timestamp}\n")
for category, entries in self.data.items():
f.write(f"! {category}\n")
for entry in entries:
if entry != "":
f.write(f"||{entry}^\n")
def adguard_important(self):
"""
Produce blocklist for AdGuard including important syntax.
"""
with open(self.ADGUARD_IMPORTANT_FILE, "w") as f:
f.write(f"! {self.BLOCKLIST_ABOUT}\n")
f.write(f"! Last updated: {self.timestamp}\n")
for category, entries in self.data.items():
f.write(f"! {category}\n")
for entry in entries:
if entry != "":
f.write(f"||{entry}^$important\n")
def categories(self):
"""
Produce individual per-category blocklist files.
"""
def write_file(path, category, entries, line_prefix=""):
"""
Generic function to write per-category file in both flavours.
"""
with open(path, "w") as f:
f.write(f"# {self.BLOCKLIST_ABOUT}\n")
f.write(f"# Last updated: {self.timestamp}\n")
f.write(f"# {category}\n")
f.write(f"\n")
for entry in entries:
if entry != "":
f.write(f"{line_prefix}{entry}\n")
for category, entries in self.data.items():
# Compute file names.
filename = category.replace(" ", "").lower()
filepath = Path(self.CATEGORIES_PATH).joinpath(filename)
text_file = filepath.with_suffix(".txt")
parsed_file = str(filepath) + "parsed"
# Write two flavours of per-category file.
write_file(text_file, category, entries, line_prefix="0.0.0.0 ")
write_file(parsed_file, category, entries)
def duplicates(self):
"""
Find duplicates in main source file.
"""
hashes = defaultdict(int)
for category, entries in self.data.items():
for entry in entries:
hashes[hash(entry)] += 1
for category, entries in self.data.items():
for entry in entries:
hashvalue = hash(entry)
if hashvalue in hashes:
count = hashes[hashvalue]
if count > 1:
print(
f"Domain {entry} found {count} times, please remove duplicate domains."
)
hashes[hashvalue] = 0
def run(action: str):
"""
Invoke different actions on converter engine.
"""
# Create converter instance and read input file.
converter = DomainBlocklistConverter()
converter.read()
# Invoke special action "json".
if action == "json":
converter.dump()
sys.exit()
# Either invoke specific action, or expand to all actions.
if action == "all":
subcommands = action_candidates
else:
subcommands = [action]
# Invoke all actions subsequently.
for action in subcommands:
print(f"Invoking subcommand '{action}'")
method = getattr(converter, action)
method()
if __name__ == "__main__":
# Read subcommand from command line, with error handling.
action_candidates = ["pihole", "unbound", "adguard", "adguard_important", "categories"]
special_candidates = ["all", "duplicates", "json"]
subcommand = None
try:
subcommand = sys.argv[1]
except:
pass
if subcommand not in action_candidates + special_candidates:
print(
f"ERROR: Subcommand not given or invalid, please use one of {action_candidates + special_candidates}"
)
sys.exit(1)
# Invoke subcommand.
run(subcommand)