A private messenger for Android.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

106 lines
4.5 KiB

import sys
import re
import argparse
import sqlite3
import gzip
from progressbar import ProgressBar, Counter, Timer
from lxml import etree
parser = argparse.ArgumentParser(prog='apntool', description="""Process Android's apn xml files and drop them into an
easily queryable SQLite db. Tested up to version 9 of
their APN file.""")
parser.add_argument('-v', '--version', action='version', version='%(prog)s v1.1')
parser.add_argument('-i', '--input', help='the xml file to parse', default='apns.xml', required=False)
parser.add_argument('-o', '--output', help='the sqlite db output file', default='apns.db', required=False)
parser.add_argument('--quiet', help='do not show progress or verbose instructions', action='store_true', required=False)
parser.add_argument('--no-gzip', help="do not gzip after creation", action='store_true', required=False)
args = parser.parse_args()
def normalized(target):
o2_typo = re.compile(r"02\.co\.uk")
port_typo = re.compile(r"(\d+\.\d+\.\d+\.\d+)\.(\d+)")
leading_zeros = re.compile(r"(/|\.|^)0+(\d+)")
subbed = o2_typo.sub(r'o2.co.uk', target)
subbed = port_typo.sub(r'\1:\2', subbed)
subbed = leading_zeros.sub(r'\1\2', subbed)
return subbed
try:
connection = sqlite3.connect(args.output)
cursor = connection.cursor()
cursor.execute('SELECT SQLITE_VERSION()')
version = cursor.fetchone()
if not args.quiet:
print("SQLite version: %s" % version)
print("Opening %s" % args.input)
cursor.execute("PRAGMA legacy_file_format=ON")
cursor.execute("PRAGMA journal_mode=DELETE")
cursor.execute("PRAGMA page_size=32768")
cursor.execute("VACUUM")
cursor.execute("DROP TABLE IF EXISTS apns")
cursor.execute("""CREATE TABLE apns(_id INTEGER PRIMARY KEY, mccmnc TEXT, mcc TEXT, mnc TEXT, carrier TEXT,
apn TEXT, mmsc TEXT, port INTEGER, type TEXT, protocol TEXT, bearer TEXT, roaming_protocol TEXT,
carrier_enabled INTEGER, mmsproxy TEXT, mmsport INTEGER, proxy TEXT, mvno_match_data TEXT,
mvno_type TEXT, authtype INTEGER, user TEXT, password TEXT, server TEXT)""")
apns = etree.parse(args.input)
root = apns.getroot()
pbar = None
if not args.quiet:
pbar = ProgressBar(widgets=['Processed: ', Counter(), ' apns (', Timer(), ')'], maxval=len(list(root))).start()
count = 0
for apn in root.iter("apn"):
if apn.get("mmsc") is None:
continue
sqlvars = ["?" for x in apn.attrib.keys()] + ["?"]
mccmnc = "%s%s" % (apn.get("mcc"), apn.get("mnc"))
normalized_mmsc = normalized(apn.get("mmsc"))
if normalized_mmsc != apn.get("mmsc"):
print("normalize MMSC: %s => %s" % (apn.get("mmsc"), normalized_mmsc))
apn.set("mmsc", normalized_mmsc)
if not apn.get("mmsproxy") is None:
normalized_mmsproxy = normalized(apn.get("mmsproxy"))
if normalized_mmsproxy != apn.get("mmsproxy"):
print("normalize proxy: %s => %s" % (apn.get("mmsproxy"), normalized_mmsproxy))
apn.set("mmsproxy", normalized_mmsproxy)
values = [apn.get(attrib) for attrib in apn.attrib.keys()] + [mccmnc]
keys = apn.attrib.keys() + ["mccmnc"]
cursor.execute("SELECT 1 FROM apns WHERE mccmnc = ? AND apn = ?", [mccmnc, apn.get("apn")])
if cursor.fetchone() is None:
statement = "INSERT INTO apns (%s) VALUES (%s)" % (", ".join(keys), ", ".join(sqlvars))
cursor.execute(statement, values)
count += 1
if not args.quiet:
pbar.update(count)
if not args.quiet:
pbar.finish()
connection.commit()
print("Successfully written to %s" % args.output)
if not args.no_gzip:
gzipped_file = "%s.gz" % (args.output,)
with open(args.output, 'rb') as orig:
with gzip.open(gzipped_file, 'wb') as gzipped:
gzipped.writelines(orig)
print("Successfully gzipped to %s" % gzipped_file)
if not args.quiet:
print("\nTo include this in the distribution, copy it to the project's assets/databases/ directory.")
print("If you support API 10 or lower, you must use the gzipped version to avoid corruption.")
except sqlite3.Error, e:
if connection:
connection.rollback()
print("Error: %s" % e.args[0])
sys.exit(1)
finally:
if connection:
connection.close()