Update pybitcointools to 1.1.42

This commit is contained in:
shortcutme 2017-04-06 19:19:47 +02:00
parent 12a0c955bc
commit 9921fc07dd
No known key found for this signature in database
GPG Key ID: 5B63BAE6CB9613AE
13 changed files with 2502 additions and 131 deletions

View File

@ -1 +1 @@
include LICENSE
include bitcoin/english.txt

View File

@ -7,3 +7,4 @@ from .bci import *
from .composite import *
from .stealth import *
from .blocks import *
from .mnemonic import *

View File

@ -23,20 +23,76 @@ def make_request(*args):
raise Exception(p)
def is_testnet(inp):
'''Checks if inp is a testnet address or if UTXO is a known testnet TxID'''
if isinstance(inp, (list, tuple)) and len(inp) >= 1:
return any([is_testnet(x) for x in inp])
elif not isinstance(inp, basestring): # sanity check
raise TypeError("Input must be str/unicode, not type %s" % str(type(inp)))
if not inp or (inp.lower() in ("btc", "testnet")):
pass
## ADDRESSES
if inp[0] in "123mn":
if re.match("^[2mn][a-km-zA-HJ-NP-Z0-9]{26,33}$", inp):
return True
elif re.match("^[13][a-km-zA-HJ-NP-Z0-9]{26,33}$", inp):
return False
else:
#sys.stderr.write("Bad address format %s")
return None
## TXID
elif re.match('^[0-9a-fA-F]{64}$', inp):
base_url = "http://api.blockcypher.com/v1/btc/{network}/txs/{txid}?includesHex=false"
try:
# try testnet fetchtx
make_request(base_url.format(network="test3", txid=inp.lower()))
return True
except:
# try mainnet fetchtx
make_request(base_url.format(network="main", txid=inp.lower()))
return False
sys.stderr.write("TxID %s has no match for testnet or mainnet (Bad TxID)")
return None
else:
raise TypeError("{0} is unknown input".format(inp))
def set_network(*args):
'''Decides if args for unspent/fetchtx/pushtx are mainnet or testnet'''
r = []
for arg in args:
if not arg:
pass
if isinstance(arg, basestring):
r.append(is_testnet(arg))
elif isinstance(arg, (list, tuple)):
return set_network(*arg)
if any(r) and not all(r):
raise Exception("Mixed Testnet/Mainnet queries")
return "testnet" if any(r) else "btc"
def parse_addr_args(*args):
# Valid input formats: blockr_unspent([addr1, addr2,addr3])
# blockr_unspent(addr1, addr2, addr3)
# blockr_unspent([addr1, addr2, addr3], network)
# blockr_unspent(addr1, addr2, addr3, network)
# Where network is 'btc' or 'testnet'
network = 'btc'
# Valid input formats: unspent([addr1, addr2, addr3])
# unspent([addr1, addr2, addr3], network)
# unspent(addr1, addr2, addr3)
# unspent(addr1, addr2, addr3, network)
addr_args = args
network = "btc"
if len(args) == 0:
return [], 'btc'
if len(args) >= 1 and args[-1] in ('testnet', 'btc'):
network = args[-1]
addr_args = args[:-1]
if len(addr_args) == 1 and isinstance(addr_args, list):
network = set_network(*addr_args[0])
addr_args = addr_args[0]
if addr_args and isinstance(addr_args, tuple) and isinstance(addr_args[0], list):
addr_args = addr_args[0]
network = set_network(addr_args)
return network, addr_args
@ -46,14 +102,14 @@ def bci_unspent(*args):
u = []
for a in addrs:
try:
data = make_request('https://blockchain.info/unspent?address='+a)
data = make_request('https://blockchain.info/unspent?active='+a)
except Exception as e:
if str(e) == 'No free outputs to spend':
continue
else:
raise Exception(e)
try:
jsonobj = json.loads(data)
jsonobj = json.loads(data.decode("utf-8"))
for o in jsonobj["unspent_outputs"]:
h = o['tx_hash'].decode('hex')[::-1].encode('hex')
u.append({
@ -74,9 +130,9 @@ def blockr_unspent(*args):
network, addr_args = parse_addr_args(*args)
if network == 'testnet':
blockr_url = 'https://tbtc.blockr.io/api/v1/address/unspent/'
blockr_url = 'http://tbtc.blockr.io/api/v1/address/unspent/'
elif network == 'btc':
blockr_url = 'https://btc.blockr.io/api/v1/address/unspent/'
blockr_url = 'http://btc.blockr.io/api/v1/address/unspent/'
else:
raise Exception(
'Unsupported network {0} for blockr_unspent'.format(network))
@ -88,7 +144,7 @@ def blockr_unspent(*args):
else:
addrs = addr_args
res = make_request(blockr_url+','.join(addrs))
data = json.loads(res)['data']
data = json.loads(res.decode("utf-8"))['data']
o = []
if 'unspent' in data:
data = [data]
@ -102,7 +158,7 @@ def blockr_unspent(*args):
def helloblock_unspent(*args):
network, addrs = parse_addr_args(*args)
addrs, network = parse_addr_args(*args)
if network == 'testnet':
url = 'https://testnet.helloblock.io/v1/addresses/%s/unspents?limit=500&offset=%s'
elif network == 'btc':
@ -111,7 +167,7 @@ def helloblock_unspent(*args):
for addr in addrs:
for offset in xrange(0, 10**9, 500):
res = make_request(url % (addr, offset))
data = json.loads(res)["data"]
data = json.loads(res.decode("utf-8"))["data"]
if not len(data["unspents"]):
break
elif offset:
@ -152,11 +208,21 @@ def history(*args):
for addr in addrs:
offset = 0
while 1:
data = make_request(
'https://blockchain.info/address/%s?format=json&offset=%s' %
(addr, offset))
gathered = False
while not gathered:
try:
data = make_request(
'https://blockchain.info/address/%s?format=json&offset=%s' %
(addr, offset))
gathered = True
except Exception as e:
try:
sys.stderr.write(e.read().strip())
except:
sys.stderr.write(str(e))
gathered = False
try:
jsonobj = json.loads(data)
jsonobj = json.loads(data.decode("utf-8"))
except:
raise Exception("Failed to decode data: "+data)
txs.extend(jsonobj["txs"])
@ -167,7 +233,7 @@ def history(*args):
outs = {}
for tx in txs:
for o in tx["out"]:
if o['addr'] in addrs:
if o.get('addr', None) in addrs:
key = str(tx["tx_index"])+':'+str(o["n"])
outs[key] = {
"address": o["addr"],
@ -177,11 +243,12 @@ def history(*args):
}
for tx in txs:
for i, inp in enumerate(tx["inputs"]):
if inp["prev_out"]["addr"] in addrs:
key = str(inp["prev_out"]["tx_index"]) + \
':'+str(inp["prev_out"]["n"])
if outs.get(key):
outs[key]["spend"] = tx["hash"]+':'+str(i)
if "prev_out" in inp:
if inp["prev_out"].get("addr", None) in addrs:
key = str(inp["prev_out"]["tx_index"]) + \
':'+str(inp["prev_out"]["n"])
if outs.get(key):
outs[key]["spend"] = tx["hash"]+':'+str(i)
return [outs[k] for k in outs]
@ -207,9 +274,9 @@ def eligius_pushtx(tx):
def blockr_pushtx(tx, network='btc'):
if network == 'testnet':
blockr_url = 'https://tbtc.blockr.io/api/v1/tx/push'
blockr_url = 'http://tbtc.blockr.io/api/v1/tx/push'
elif network == 'btc':
blockr_url = 'https://btc.blockr.io/api/v1/tx/push'
blockr_url = 'http://btc.blockr.io/api/v1/tx/push'
else:
raise Exception(
'Unsupported network {0} for blockr_pushtx'.format(network))
@ -237,14 +304,21 @@ def pushtx(*args, **kwargs):
return f(*args)
def last_block_height():
def last_block_height(network='btc'):
if network == 'testnet':
data = make_request('http://tbtc.blockr.io/api/v1/block/info/last')
jsonobj = json.loads(data.decode("utf-8"))
return jsonobj["data"]["nb"]
data = make_request('https://blockchain.info/latestblock')
jsonobj = json.loads(data)
jsonobj = json.loads(data.decode("utf-8"))
return jsonobj["height"]
# Gets a specific transaction
def bci_fetchtx(txhash):
if isinstance(txhash, list):
return [bci_fetchtx(h) for h in txhash]
if not re.match('^[0-9a-fA-F]*$', txhash):
txhash = txhash.encode('hex')
data = make_request('https://blockchain.info/rawtx/'+txhash+'?format=hex')
@ -253,19 +327,27 @@ def bci_fetchtx(txhash):
def blockr_fetchtx(txhash, network='btc'):
if network == 'testnet':
blockr_url = 'https://tbtc.blockr.io/api/v1/tx/raw/'
blockr_url = 'http://tbtc.blockr.io/api/v1/tx/raw/'
elif network == 'btc':
blockr_url = 'https://btc.blockr.io/api/v1/tx/raw/'
blockr_url = 'http://btc.blockr.io/api/v1/tx/raw/'
else:
raise Exception(
'Unsupported network {0} for blockr_fetchtx'.format(network))
if not re.match('^[0-9a-fA-F]*$', txhash):
txhash = txhash.encode('hex')
jsondata = json.loads(make_request(blockr_url+txhash))
return jsondata['data']['tx']['hex']
if isinstance(txhash, list):
txhash = ','.join([x.encode('hex') if not re.match('^[0-9a-fA-F]*$', x)
else x for x in txhash])
jsondata = json.loads(make_request(blockr_url+txhash).decode("utf-8"))
return [d['tx']['hex'] for d in jsondata['data']]
else:
if not re.match('^[0-9a-fA-F]*$', txhash):
txhash = txhash.encode('hex')
jsondata = json.loads(make_request(blockr_url+txhash).decode("utf-8"))
return jsondata['data']['tx']['hex']
def helloblock_fetchtx(txhash, network='btc'):
if isinstance(txhash, list):
return [helloblock_fetchtx(h) for h in txhash]
if not re.match('^[0-9a-fA-F]*$', txhash):
txhash = txhash.encode('hex')
if network == 'testnet':
@ -275,7 +357,7 @@ def helloblock_fetchtx(txhash, network='btc'):
else:
raise Exception(
'Unsupported network {0} for helloblock_fetchtx'.format(network))
data = json.loads(make_request(url + txhash))["data"]["transaction"]
data = json.loads(make_request(url + txhash).decode("utf-8"))["data"]["transaction"]
o = {
"locktime": data["locktime"],
"version": data["version"],
@ -296,8 +378,8 @@ def helloblock_fetchtx(txhash, network='btc'):
"value": outp["value"],
"script": outp["scriptPubKey"]
})
from bitcoin.transaction import serialize
from bitcoin.transaction import txhash as TXHASH
from .transaction import serialize
from .transaction import txhash as TXHASH
tx = serialize(o)
assert TXHASH(tx) == txhash
return tx
@ -325,7 +407,7 @@ def firstbits(address):
def get_block_at_height(height):
j = json.loads(make_request("https://blockchain.info/block-height/" +
str(height)+"?format=json"))
str(height)+"?format=json").decode("utf-8"))
for b in j['blocks']:
if b['main_chain'] is True:
return b
@ -337,10 +419,10 @@ def _get_block(inp):
return get_block_at_height(inp)
else:
return json.loads(make_request(
'https://blockchain.info/rawblock/'+inp))
'https://blockchain.info/rawblock/'+inp).decode("utf-8"))
def get_block_header_data(inp):
def bci_get_block_header_data(inp):
j = _get_block(inp)
return {
'version': j['ver'],
@ -354,14 +436,14 @@ def get_block_header_data(inp):
def blockr_get_block_header_data(height, network='btc'):
if network == 'testnet':
blockr_url = "https://tbtc.blockr.io/api/v1/block/raw/"
blockr_url = "http://tbtc.blockr.io/api/v1/block/raw/"
elif network == 'btc':
blockr_url = "https://btc.blockr.io/api/v1/block/raw/"
blockr_url = "http://btc.blockr.io/api/v1/block/raw/"
else:
raise Exception(
'Unsupported network {0} for blockr_get_block_header_data'.format(network))
k = json.loads(make_request(blockr_url + str(height)))
k = json.loads(make_request(blockr_url + str(height)).decode("utf-8"))
j = k['data']
return {
'version': j['version'],
@ -373,6 +455,40 @@ def blockr_get_block_header_data(height, network='btc'):
'nonce': j['nonce'],
}
def get_block_timestamp(height, network='btc'):
if network == 'testnet':
blockr_url = "http://tbtc.blockr.io/api/v1/block/info/"
elif network == 'btc':
blockr_url = "http://btc.blockr.io/api/v1/block/info/"
else:
raise Exception(
'Unsupported network {0} for get_block_timestamp'.format(network))
import time, calendar
if isinstance(height, list):
k = json.loads(make_request(blockr_url + ','.join([str(x) for x in height])).decode("utf-8"))
o = {x['nb']: calendar.timegm(time.strptime(x['time_utc'],
"%Y-%m-%dT%H:%M:%SZ")) for x in k['data']}
return [o[x] for x in height]
else:
k = json.loads(make_request(blockr_url + str(height)).decode("utf-8"))
j = k['data']['time_utc']
return calendar.timegm(time.strptime(j, "%Y-%m-%dT%H:%M:%SZ"))
block_header_data_getters = {
'bci': bci_get_block_header_data,
'blockr': blockr_get_block_header_data
}
def get_block_header_data(inp, **kwargs):
f = block_header_data_getters.get(kwargs.get('source', ''),
bci_get_block_header_data)
return f(inp, **kwargs)
def get_txs_in_block(inp):
j = _get_block(inp)
hashes = [t['hash'] for t in j['tx']]
@ -380,5 +496,33 @@ def get_txs_in_block(inp):
def get_block_height(txhash):
j = json.loads(make_request('https://blockchain.info/rawtx/'+txhash))
j = json.loads(make_request('https://blockchain.info/rawtx/'+txhash).decode("utf-8"))
return j['block_height']
# fromAddr, toAddr, 12345, changeAddress
def get_tx_composite(inputs, outputs, output_value, change_address=None, network=None):
"""mktx using blockcypher API"""
inputs = [inputs] if not isinstance(inputs, list) else inputs
outputs = [outputs] if not isinstance(outputs, list) else outputs
network = set_network(change_address or inputs) if not network else network.lower()
url = "http://api.blockcypher.com/v1/btc/{network}/txs/new?includeToSignTx=true".format(
network=('test3' if network=='testnet' else 'main'))
is_address = lambda a: bool(re.match("^[123mn][a-km-zA-HJ-NP-Z0-9]{26,33}$", a))
if any([is_address(x) for x in inputs]):
inputs_type = 'addresses' # also accepts UTXOs, only addresses supported presently
if any([is_address(x) for x in outputs]):
outputs_type = 'addresses' # TODO: add UTXO support
data = {
'inputs': [{inputs_type: inputs}],
'confirmations': 0,
'preference': 'high',
'outputs': [{outputs_type: outputs, "value": output_value}]
}
if change_address:
data["change_address"] = change_address #
jdata = json.loads(make_request(url, data))
hash, txh = jdata.get("tosign")[0], jdata.get("tosign_tx")[0]
assert bin_dbl_sha256(txh.decode('hex')).encode('hex') == hash, "checksum mismatch %s" % hash
return txh.encode("utf-8")
blockcypher_mktx = get_tx_composite

View File

@ -6,12 +6,12 @@ from .blocks import *
# Takes privkey, address, value (satoshis), fee (satoshis)
def send(frm, to, value, fee=10000):
return sendmultitx(frm, to + ":" + str(value), fee)
def send(frm, to, value, fee=10000, **kwargs):
return sendmultitx(frm, to + ":" + str(value), fee, **kwargs)
# Takes privkey, "address1:value1,address2:value2" (satoshis), fee (satoshis)
def sendmultitx(frm, tovalues, fee=10000, **kwargs):
def sendmultitx(frm, *args, **kwargs):
tv, fee = args[:-1], int(args[-1])
outs = []
outvalue = 0
@ -21,7 +21,7 @@ def sendmultitx(frm, tovalues, fee=10000, **kwargs):
u = unspent(privtoaddr(frm), **kwargs)
u2 = select(u, int(outvalue)+int(fee))
argz = u2 + outs + [frm, fee]
argz = u2 + outs + [privtoaddr(frm), fee]
tx = mksend(*argz)
tx2 = signall(tx, frm)
return pushtx(tx2, **kwargs)

File diff suppressed because it is too large Load Diff

View File

@ -122,7 +122,7 @@ def jacobian_add(p, q):
U1H2 = (U1 * H2) % P
nx = (R ** 2 - H3 - 2 * U1H2) % P
ny = (R * (U1H2 - nx) - S1 * H3) % P
nz = H * p[2] * q[2]
nz = (H * p[2] * q[2]) % P
return (nx, ny, nz)
@ -179,10 +179,10 @@ def encode_pubkey(pub, formt):
pub = decode_pubkey(pub)
if formt == 'decimal': return pub
elif formt == 'bin': return b'\x04' + encode(pub[0], 256, 32) + encode(pub[1], 256, 32)
elif formt == 'bin_compressed':
elif formt == 'bin_compressed':
return from_int_to_byte(2+(pub[1] % 2)) + encode(pub[0], 256, 32)
elif formt == 'hex': return '04' + encode(pub[0], 16, 64) + encode(pub[1], 16, 64)
elif formt == 'hex_compressed':
elif formt == 'hex_compressed':
return '0'+str(2+(pub[1] % 2)) + encode(pub[0], 16, 64)
elif formt == 'bin_electrum': return encode(pub[0], 256, 32) + encode(pub[1], 256, 32)
elif formt == 'hex_electrum': return encode(pub[0], 16, 64) + encode(pub[1], 16, 64)
@ -253,6 +253,9 @@ def add_privkeys(p1, p2):
f1, f2 = get_privkey_format(p1), get_privkey_format(p2)
return encode_privkey((decode_privkey(p1, f1) + decode_privkey(p2, f2)) % N, f1)
def mul_privkeys(p1, p2):
f1, f2 = get_privkey_format(p1), get_privkey_format(p2)
return encode_privkey((decode_privkey(p1, f1) * decode_privkey(p2, f2)) % N, f1)
def multiply(pubkey, privkey):
f1, f2 = get_pubkey_format(pubkey), get_privkey_format(privkey)
@ -450,12 +453,32 @@ def pubkey_to_address(pubkey, magicbyte=0):
pubtoaddr = pubkey_to_address
def is_privkey(priv):
try:
get_privkey_format(priv)
return True
except:
return False
def is_pubkey(pubkey):
try:
get_pubkey_format(pubkey)
return True
except:
return False
def is_address(addr):
ADDR_RE = re.compile("^[123mn][a-km-zA-HJ-NP-Z0-9]{26,33}$")
return bool(ADDR_RE.match(addr))
# EDCSA
def encode_sig(v, r, s):
vb, rb, sb = from_int_to_byte(v), encode(r, 256), encode(s, 256)
result = base64.b64encode(vb+b'\x00'*(32-len(rb))+rb+b'\x00'*(32-len(sb))+sb)
return result if is_python2 else str(result, 'utf-8')
@ -487,35 +510,59 @@ def ecdsa_raw_sign(msghash, priv):
r, y = fast_multiply(G, k)
s = inv(k, N) * (z + r*decode_privkey(priv)) % N
return 27+(y % 2), r, s
v, r, s = 27+((y % 2) ^ (0 if s * 2 < N else 1)), r, s if s * 2 < N else N - s
if 'compressed' in get_privkey_format(priv):
v += 4
return v, r, s
def ecdsa_sign(msg, priv):
return encode_sig(*ecdsa_raw_sign(electrum_sig_hash(msg), priv))
v, r, s = ecdsa_raw_sign(electrum_sig_hash(msg), priv)
sig = encode_sig(v, r, s)
assert ecdsa_verify(msg, sig,
privtopub(priv)), "Bad Sig!\t %s\nv = %d\n,r = %d\ns = %d" % (sig, v, r, s)
return sig
def ecdsa_raw_verify(msghash, vrs, pub):
v, r, s = vrs
if not (27 <= v <= 34):
return False
w = inv(s, N)
z = hash_to_int(msghash)
u1, u2 = z*w % N, r*w % N
x, y = fast_add(fast_multiply(G, u1), fast_multiply(decode_pubkey(pub), u2))
return bool(r == x and (r % N) and (s % N))
return r == x
# For BitcoinCore, (msg = addr or msg = "") be default
def ecdsa_verify_addr(msg, sig, addr):
assert is_address(addr)
Q = ecdsa_recover(msg, sig)
magic = get_version_byte(addr)
return (addr == pubtoaddr(Q, int(magic))) or (addr == pubtoaddr(compress(Q), int(magic)))
def ecdsa_verify(msg, sig, pub):
if is_address(pub):
return ecdsa_verify_addr(msg, sig, pub)
return ecdsa_raw_verify(electrum_sig_hash(msg), decode_sig(sig), pub)
def ecdsa_raw_recover(msghash, vrs):
v, r, s = vrs
if not (27 <= v <= 34):
raise ValueError("%d must in range 27-31" % v)
x = r
beta = pow(x*x*x+A*x+B, (P+1)//4, P)
xcubedaxb = (x*x*x+A*x+B) % P
beta = pow(xcubedaxb, (P+1)//4, P)
y = beta if v % 2 ^ beta % 2 else (P - beta)
# If xcubedaxb is not a quadratic residue, then r cannot be the x coord
# for a point on the curve, and so the sig is invalid
if (xcubedaxb - y*y) % P != 0 or not (r % N) or not (s % N):
return False
z = hash_to_int(msghash)
Gz = jacobian_multiply((Gx, Gy, 1), (N - z) % N)
XY = jacobian_multiply((x, y, 1), s)
@ -523,10 +570,12 @@ def ecdsa_raw_recover(msghash, vrs):
Q = jacobian_multiply(Qr, inv(r, N))
Q = from_jacobian(Q)
if ecdsa_raw_verify(msghash, vrs, Q):
return Q
return False
# if ecdsa_raw_verify(msghash, vrs, Q):
return Q
# return False
def ecdsa_recover(msg, sig):
return encode_pubkey(ecdsa_raw_recover(electrum_sig_hash(msg), decode_sig(sig)), 'hex')
v,r,s = decode_sig(sig)
Q = ecdsa_raw_recover(electrum_sig_hash(msg), (v,r,s))
return encode_pubkey(Q, 'hex_compressed') if v >= 31 else encode_pubkey(Q, 'hex')

View File

@ -0,0 +1,127 @@
import hashlib
import os.path
import binascii
import random
from bisect import bisect_left
wordlist_english=list(open(os.path.join(os.path.dirname(os.path.realpath(__file__)),'english.txt'),'r'))
def eint_to_bytes(entint,entbits):
a=hex(entint)[2:].rstrip('L').zfill(32)
print(a)
return binascii.unhexlify(a)
def mnemonic_int_to_words(mint,mint_num_words,wordlist=wordlist_english):
backwords=[wordlist[(mint >> (11*x)) & 0x7FF].strip() for x in range(mint_num_words)]
return backwords[::-1]
def entropy_cs(entbytes):
entropy_size=8*len(entbytes)
checksum_size=entropy_size//32
hd=hashlib.sha256(entbytes).hexdigest()
csint=int(hd,16) >> (256-checksum_size)
return csint,checksum_size
def entropy_to_words(entbytes,wordlist=wordlist_english):
if(len(entbytes) < 4 or len(entbytes) % 4 != 0):
raise ValueError("The size of the entropy must be a multiple of 4 bytes (multiple of 32 bits)")
entropy_size=8*len(entbytes)
csint,checksum_size = entropy_cs(entbytes)
entint=int(binascii.hexlify(entbytes),16)
mint=(entint << checksum_size) | csint
mint_num_words=(entropy_size+checksum_size)//11
return mnemonic_int_to_words(mint,mint_num_words,wordlist)
def words_bisect(word,wordlist=wordlist_english):
lo=bisect_left(wordlist,word)
hi=len(wordlist)-bisect_left(wordlist[:lo:-1],word)
return lo,hi
def words_split(wordstr,wordlist=wordlist_english):
def popword(wordstr,wordlist):
for fwl in range(1,9):
w=wordstr[:fwl].strip()
lo,hi=words_bisect(w,wordlist)
if(hi-lo == 1):
return w,wordstr[fwl:].lstrip()
wordlist=wordlist[lo:hi]
raise Exception("Wordstr %s not found in list" %(w))
words=[]
tail=wordstr
while(len(tail)):
head,tail=popword(tail,wordlist)
words.append(head)
return words
def words_to_mnemonic_int(words,wordlist=wordlist_english):
if(isinstance(words,str)):
words=words_split(words,wordlist)
return sum([wordlist.index(w) << (11*x) for x,w in enumerate(words[::-1])])
def words_verify(words,wordlist=wordlist_english):
if(isinstance(words,str)):
words=words_split(words,wordlist)
mint = words_to_mnemonic_int(words,wordlist)
mint_bits=len(words)*11
cs_bits=mint_bits//32
entropy_bits=mint_bits-cs_bits
eint=mint >> cs_bits
csint=mint & ((1 << cs_bits)-1)
ebytes=_eint_to_bytes(eint,entropy_bits)
return csint == entropy_cs(ebytes)
def mnemonic_to_seed(mnemonic_phrase,passphrase=b''):
try:
from hashlib import pbkdf2_hmac
def pbkdf2_hmac_sha256(password,salt,iters=2048):
return pbkdf2_hmac(hash_name='sha512',password=password,salt=salt,iterations=iters)
except:
try:
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import SHA512,HMAC
def pbkdf2_hmac_sha256(password,salt,iters=2048):
return PBKDF2(password=password,salt=salt,dkLen=64,count=iters,prf=lambda p,s: HMAC.new(p,s,SHA512).digest())
except:
try:
from pbkdf2 import PBKDF2
import hmac
def pbkdf2_hmac_sha256(password,salt,iters=2048):
return PBKDF2(password,salt, iterations=iters, macmodule=hmac, digestmodule=hashlib.sha512).read(64)
except:
raise RuntimeError("No implementation of pbkdf2 was found!")
return pbkdf2_hmac_sha256(password=mnemonic_phrase,salt=b'mnemonic'+passphrase)
def words_mine(prefix,entbits,satisfunction,wordlist=wordlist_english,randombits=random.getrandbits):
prefix_bits=len(prefix)*11
mine_bits=entbits-prefix_bits
pint=words_to_mnemonic_int(prefix,wordlist)
pint<<=mine_bits
dint=randombits(mine_bits)
count=0
while(not satisfunction(entropy_to_words(eint_to_bytes(pint+dint,entbits)))):
dint=randombits(mine_bits)
if((count & 0xFFFF) == 0):
print("Searched %f percent of the space" % (float(count)/float(1 << mine_bits)))
return entropy_to_words(eint_to_bytes(pint+dint,entbits))
if __name__=="__main__":
import json
testvectors=json.load(open('vectors.json','r'))
passed=True
for v in testvectors['english']:
ebytes=binascii.unhexlify(v[0])
w=' '.join(entropy_to_words(ebytes))
seed=mnemonic_to_seed(w,passphrase='TREZOR')
passed = passed and w==v[1]
passed = passed and binascii.hexlify(seed)==v[2]
print("Tests %s." % ("Passed" if passed else "Failed"))

View File

@ -40,10 +40,14 @@ if sys.version_info.major == 2:
return encode(decode(string, frm), to, minlen)
def bin_to_b58check(inp, magicbyte=0):
inp_fmtd = chr(int(magicbyte)) + inp
leadingzbytes = len(re.match('^\x00*', inp_fmtd).group(0))
checksum = bin_dbl_sha256(inp_fmtd)[:4]
return '1' * leadingzbytes + changebase(inp_fmtd+checksum, 256, 58)
if magicbyte == 0:
inp = '\x00' + inp
while magicbyte > 0:
inp = chr(int(magicbyte % 256)) + inp
magicbyte //= 256
leadingzbytes = len(re.match('^\x00*', inp).group(0))
checksum = bin_dbl_sha256(inp)[:4]
return '1' * leadingzbytes + changebase(inp+checksum, 256, 58)
def bytes_to_hex_string(b):
return b.encode('hex')

View File

@ -38,16 +38,20 @@ if sys.version_info.major == 3:
return encode(decode(string, frm), to, minlen)
def bin_to_b58check(inp, magicbyte=0):
inp_fmtd = from_int_to_byte(int(magicbyte))+inp
if magicbyte == 0:
inp = from_int_to_byte(0) + inp
while magicbyte > 0:
inp = from_int_to_byte(magicbyte % 256) + inp
magicbyte //= 256
leadingzbytes = 0
for x in inp_fmtd:
for x in inp:
if x != 0:
break
leadingzbytes += 1
checksum = bin_dbl_sha256(inp_fmtd)[:4]
return '1' * leadingzbytes + changebase(inp_fmtd+checksum, 256, 58)
checksum = bin_dbl_sha256(inp)[:4]
return '1' * leadingzbytes + changebase(inp+checksum, 256, 58)
def bytes_to_hex_string(b):
if isinstance(b, str):

View File

@ -9,7 +9,7 @@ from _functools import reduce
def json_is_base(obj, base):
if not is_python2 and isinstance(obj, bytes):
return False
alpha = get_code_string(base)
if isinstance(obj, string_types):
for i in range(len(obj)):
@ -58,7 +58,7 @@ def deserialize(tx):
def read_var_int():
pos[0] += 1
val = from_byte_to_int(tx[pos[0]-1])
if val < 253:
return val
@ -138,9 +138,9 @@ def signature_form(tx, i, script, hashcode=SIGHASH_ALL):
newtx["outs"] = []
elif hashcode == SIGHASH_SINGLE:
newtx["outs"] = newtx["outs"][:len(newtx["ins"])]
for out in range(len(newtx["ins"]) - 1):
out.value = 2**64 - 1
out.script = ""
for out in newtx["outs"][:len(newtx["ins"]) - 1]:
out['value'] = 2**64 - 1
out['script'] = ""
elif hashcode == SIGHASH_ANYONECANPAY:
newtx["ins"] = [newtx["ins"][i]]
else:
@ -152,15 +152,14 @@ def signature_form(tx, i, script, hashcode=SIGHASH_ALL):
def der_encode_sig(v, r, s):
b1, b2 = safe_hexlify(encode(r, 256)), safe_hexlify(encode(s, 256))
if r >= 2**255:
if len(b1) and b1[0] in '89abcdef':
b1 = '00' + b1
if s >= 2**255:
if len(b2) and b2[0] in '89abcdef':
b2 = '00' + b2
left = '02'+encode(len(b1)//2, 16, 2)+b1
right = '02'+encode(len(b2)//2, 16, 2)+b2
return '30'+encode(len(left+right)//2, 16, 2)+left+right
def der_decode_sig(sig):
leftlen = decode(sig[6:8], 16)*2
left = sig[8:8+leftlen]
@ -168,6 +167,32 @@ def der_decode_sig(sig):
right = sig[12+leftlen:12+leftlen+rightlen]
return (None, decode(left, 16), decode(right, 16))
def is_bip66(sig):
"""Checks hex DER sig for BIP66 consistency"""
#https://raw.githubusercontent.com/bitcoin/bips/master/bip-0066.mediawiki
#0x30 [total-len] 0x02 [R-len] [R] 0x02 [S-len] [S] [sighash]
sig = bytearray.fromhex(sig) if re.match('^[0-9a-fA-F]*$', sig) else bytearray(sig)
if (sig[0] == 0x30) and (sig[1] == len(sig)-2): # check if sighash is missing
sig.extend(b"\1") # add SIGHASH_ALL for testing
#assert (sig[-1] & 124 == 0) and (not not sig[-1]), "Bad SIGHASH value"
if len(sig) < 9 or len(sig) > 73: return False
if (sig[0] != 0x30): return False
if (sig[1] != len(sig)-3): return False
rlen = sig[3]
if (5+rlen >= len(sig)): return False
slen = sig[5+rlen]
if (rlen + slen + 7 != len(sig)): return False
if (sig[2] != 0x02): return False
if (rlen == 0): return False
if (sig[4] & 0x80): return False
if (rlen > 1 and (sig[4] == 0x00) and not (sig[5] & 0x80)): return False
if (sig[4+rlen] != 0x02): return False
if (slen == 0): return False
if (sig[rlen+6] & 0x80): return False
if (slen > 1 and (sig[6+rlen] == 0x00) and not (sig[7+rlen] & 0x80)):
return False
return True
def txhash(tx, hashcode=None):
if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx):
@ -230,8 +255,11 @@ def script_to_address(script, vbyte=0):
if vbyte in [111, 196]:
# Testnet
scripthash_byte = 196
else:
elif vbyte == 0:
# Mainnet
scripthash_byte = 5
else:
scripthash_byte = vbyte
# BIP0016 scripthash addresses
return bin_to_b58check(script[2:-1], scripthash_byte)
@ -275,7 +303,7 @@ def serialize_script_unit(unit):
if unit < 16:
return from_int_to_byte(unit + 80)
else:
return bytes([unit])
return from_int_to_byte(unit)
elif unit is None:
return b'\x00'
else:
@ -300,7 +328,7 @@ else:
if json_is_base(script, 16):
return safe_hexlify(serialize_script(json_changebase(script,
lambda x: binascii.unhexlify(x))))
result = bytes()
for b in map(serialize_script_unit, script):
result += b if isinstance(b, bytes) else bytes(b, 'utf-8')
@ -313,7 +341,7 @@ def mk_multisig_script(*args): # [pubs],k or pub1,pub2...pub[n],k
else:
pubs = list(filter(lambda x: len(str(x)) >= 32, args))
k = int(args[len(pubs)])
return serialize_script([k]+pubs+[len(pubs)]) + 'ae'
return serialize_script([k]+pubs+[len(pubs)]+[0xae])
# Signing and verifying
@ -378,8 +406,12 @@ def apply_multisignatures(*args):
if isinstance(tx, str) and re.match('^[0-9a-fA-F]*$', tx):
return safe_hexlify(apply_multisignatures(binascii.unhexlify(tx), i, script, sigs))
# Not pushing empty elements on the top of the stack if passing no
# script (in case of bare multisig inputs there is no script)
script_blob = [] if script.__len__() == 0 else [script]
txobj = deserialize(tx)
txobj["ins"][i]["script"] = serialize_script([None]+sigs+[script])
txobj["ins"][i]["script"] = serialize_script([None]+sigs+script_blob)
return serialize(txobj)

View File

@ -5,7 +5,7 @@ except ImportError:
from distutils.core import setup
setup(name='bitcoin',
version='1.1.28',
version='1.1.42',
description='Python Bitcoin Tools',
author='Vitalik Buterin',
author_email='vbuterin@gmail.com',
@ -13,5 +13,5 @@ setup(name='bitcoin',
packages=['bitcoin'],
scripts=['pybtctool'],
include_package_data=True,
data_files=[("", ["LICENSE"])],
data_files=[("", ["LICENSE"]), ("bitcoin", ["bitcoin/english.txt"])],
)

View File

@ -99,57 +99,19 @@ class TestElectrumWalletInternalConsistency(unittest.TestCase):
)
class TestElectrumSignVerify(unittest.TestCase):
"""Requires Electrum."""
class TestRawSignRecover(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.wallet = "/tmp/tempwallet_" + str(random.randrange(2**40))
print("Starting wallet tests with: " + cls.wallet)
os.popen('echo "\n\n\n\n\n\n" | electrum -w %s create' % cls.wallet).read()
cls.seed = str(json.loads(os.popen("electrum -w %s getseed" % cls.wallet).read())['seed'])
cls.addies = json.loads(os.popen("electrum -w %s listaddresses" % cls.wallet).read())
print("Basic signing and recovery tests")
def test_address(self):
for i in range(5):
def test_all(self):
for i in range(20):
k = sha256(str(i))
s = ecdsa_raw_sign('35' * 32, k)
self.assertEqual(
self.addies[i],
electrum_address(self.seed, i, 0),
"Address does not match! Details:\nseed %s, i: %d" % (self.seed, i)
)
def test_sign_verify(self):
print("Electrum-style signing and verification tests, against actual Electrum")
alphabet = "1234567890qwertyuiopasdfghjklzxcvbnm"
for i in range(8):
msg = ''.join([random.choice(alphabet) for i in range(random.randrange(20, 200))])
addy = random.choice(self.addies)
wif = os.popen('electrum -w %s dumpprivkey %s' % (self.wallet, addy)).readlines()[-2].replace('"', '').strip()
priv = b58check_to_hex(wif)
pub = privtopub(priv)
sig = os.popen('electrum -w %s signmessage %s %s' % (self.wallet, addy, msg)).readlines()[-1].strip()
self.assertTrue(
ecdsa_verify(msg, sig, pub),
"Verification error. Details:\nmsg: %s\nsig: %s\npriv: %s\naddy: %s\npub: %s" % (
msg, sig, priv, addy, pub
)
)
rec = ecdsa_recover(msg, sig)
self.assertEqual(
pub,
rec,
"Recovery error. Details:\nmsg: %s\nsig: %s\npriv: %s\naddy: %s\noriginal pub: %s, %s\nrecovered pub: %s" % (
msg, sig, priv, addy, pub, decode_pubkey(pub, 'hex')[1], rec
)
)
mysig = ecdsa_sign(msg, priv)
self.assertEqual(
os.popen('electrum -w %s verifymessage %s %s %s' % (self.wallet, addy, mysig, msg)).read().strip(),
"true",
"Electrum verify message does not match"
ecdsa_raw_recover('35' * 32, s),
decode_pubkey(privtopub(k))
)