Add faster libsecp256k1 support for sign verification, Remove old style signing support,

This commit is contained in:
shortcutme 2019-03-16 02:36:11 +01:00
parent 6f0531c663
commit bc93796727
No known key found for this signature in database
GPG Key ID: 5B63BAE6CB9613AE
3 changed files with 254 additions and 40 deletions

View File

@ -1,18 +1,40 @@
import logging import logging
import base64
from lib.BitcoinECC import BitcoinECC from util import OpensslFindPatch
from lib.pybitcointools import bitcoin as btctools from lib import pybitcointools as btctools
from Config import config from Config import config
# Try to load openssl lib_verify_best = "btctools"
def loadLib(lib_name):
global bitcoin, libsecp256k1message, lib_verify_best
if lib_name == "libsecp256k1":
from lib import libsecp256k1message
lib_verify_best = "libsecp256k1"
logging.info("Libsecpk256k1 loaded")
elif lib_name == "openssl":
import bitcoin.signmessage
import bitcoin.core.key
import bitcoin.wallet
logging.info("OpenSSL loaded, version: %.9X" % bitcoin.core.key._ssl.SSLeay())
try: try:
if not config.use_openssl: if not config.use_libsecp256k1:
raise Exception("Disabled by config") raise Exception("Disabled by config")
from lib.opensslVerify import opensslVerify loadLib("libsecp256k1")
logging.info("OpenSSL loaded, version: %s" % opensslVerify.openssl_version) lib_verify_best = "libsecp256k1"
except Exception, err: except Exception as err:
logging.info("OpenSSL load failed: %s, falling back to slow bitcoin verify" % err) logging.info("Libsecp256k1 load failed: %s, try to load OpenSSL" % err)
opensslVerify = None try:
if not config.use_openssl:
raise Exception("Disabled by config")
loadLib("openssl")
lib_verify_best = "openssl"
except Exception as err:
logging.info("OpenSSL load failed: %s, falling back to slow bitcoin verify" % err)
def newPrivatekey(uncompressed=True): # Return new private key def newPrivatekey(uncompressed=True): # Return new private key
@ -25,22 +47,17 @@ def newSeed():
def hdPrivatekey(seed, child): def hdPrivatekey(seed, child):
masterkey = btctools.bip32_master_key(seed) masterkey = btctools.bip32_master_key(bytes(seed, "ascii"))
childkey = btctools.bip32_ckd(masterkey, child % 100000000) # Too large child id could cause problems childkey = btctools.bip32_ckd(masterkey, child % 100000000) # Too large child id could cause problems
key = btctools.bip32_extract_key(childkey) key = btctools.bip32_extract_key(childkey)
return btctools.encode_privkey(key, "wif") return btctools.encode_privkey(key, "wif")
def privatekeyToAddress(privatekey): # Return address from private key def privatekeyToAddress(privatekey): # Return address from private key
if privatekey.startswith("23") and len(privatekey) > 52: # Backward compatibility to broken lib try:
bitcoin = BitcoinECC.Bitcoin() return btctools.privkey_to_address(privatekey)
bitcoin.BitcoinAddressFromPrivate(privatekey) except Exception: # Invalid privatekey
return bitcoin.BitcoinAddresFromPublicKey() return False
else:
try:
return btctools.privkey_to_address(privatekey)
except Exception: # Invalid privatekey
return False
def sign(data, privatekey): # Return sign to data using private key def sign(data, privatekey): # Return sign to data using private key
@ -50,29 +67,30 @@ def sign(data, privatekey): # Return sign to data using private key
return sign return sign
def signOld(data, privatekey): # Return sign to data using private key (backward compatible old style) def verify(data, address, sign, lib_verify=None): # Verify data using address and sign
bitcoin = BitcoinECC.Bitcoin() if not lib_verify:
bitcoin.BitcoinAddressFromPrivate(privatekey) lib_verify = lib_verify_best
sign = bitcoin.SignECDSA(data)
return sign
def verify(data, address, sign): # Verify data using address and sign
if not sign: if not sign:
return False return False
if hasattr(sign, "endswith"): if lib_verify == "libsecp256k1":
if opensslVerify: # Use the faster method if avalible sign_address = libsecp256k1message.recover_address(data.encode("utf8"), sign).decode("utf8")
pub = opensslVerify.getMessagePubkey(data, sign) elif lib_verify == "openssl":
sign_address = btctools.pubtoaddr(pub) sig = base64.b64decode(sign)
else: # Use pure-python message = bitcoin.signmessage.BitcoinMessage(data)
pub = btctools.ecdsa_recover(data, sign) hash = message.GetHash()
sign_address = btctools.pubtoaddr(pub)
if type(address) is list: # Any address in the list pubkey = bitcoin.core.key.CPubKey.recover_compact(hash, sig)
return sign_address in address
else: # One possible address sign_address = str(bitcoin.wallet.P2PKHBitcoinAddress.from_pubkey(pubkey))
return sign_address == address elif lib_verify == "btctools": # Use pure-python
else: # Backward compatible old style pub = btctools.ecdsa_recover(data, sign)
bitcoin = BitcoinECC.Bitcoin() sign_address = btctools.pubtoaddr(pub)
return bitcoin.VerifyMessageFromBitcoinAddress(address, data, sign) else:
raise Exception("No library enabled for signature verification")
if type(address) is list: # Any address in the list
return sign_address in address
else: # One possible address
return sign_address == address

View File

@ -0,0 +1 @@
from .libsecp256k1message import *

View File

@ -0,0 +1,195 @@
import hashlib
import struct
import base64
from coincurve import PrivateKey, PublicKey
from base58 import b58encode_check, b58decode_check
from hmac import compare_digest
RECID_MIN = 0
RECID_MAX = 3
RECID_UNCOMPR = 27
LEN_COMPACT_SIG = 65
class SignatureError(ValueError):
pass
def bitcoin_address():
"""Generate a public address and a secret address."""
publickey, secretkey = key_pair()
public_address = compute_public_address(publickey)
secret_address = compute_secret_address(secretkey)
return (public_address, secret_address)
def key_pair():
"""Generate a public key and a secret key."""
secretkey = PrivateKey()
publickey = PublicKey.from_secret(secretkey.secret)
return (publickey, secretkey)
def compute_public_address(publickey):
"""Convert a public key to a public Bitcoin address."""
public_plain = b'\x00' + public_digest(publickey)
return b58encode_check(public_plain)
def compute_secret_address(secretkey):
"""Convert a secret key to a secret Bitcoin address."""
secret_plain = b'\x80' + secretkey.secret
return b58encode_check(secret_plain)
def public_digest(publickey):
"""Convert a public key to ripemd160(sha256()) digest."""
publickey_hex = publickey.format(compressed=False)
return hashlib.new('ripemd160', hashlib.sha256(publickey_hex).digest()).digest()
def address_public_digest(address):
"""Convert a public Bitcoin address to ripemd160(sha256()) digest."""
public_plain = b58decode_check(address)
if not public_plain.startswith(b'\x00') or len(public_plain) != 21:
raise ValueError('Invalid public key digest')
return public_plain[1:]
def _decode_bitcoin_secret(address):
secret_plain = b58decode_check(address)
if not secret_plain.startswith(b'\x80') or len(secret_plain) != 33:
raise ValueError('Invalid secret key. Uncompressed keys only.')
return secret_plain[1:]
def recover_public_key(signature, message):
"""Recover public key from signature and message.
Recovered public key guarantees a correct signature"""
return PublicKey.from_signature_and_message(signature, message)
def decode_secret_key(address):
"""Convert a secret Bitcoin address to a secret key."""
return PrivateKey(_decode_bitcoin_secret(address))
def coincurve_sig(electrum_signature):
# coincurve := r + s + recovery_id
# where (0 <= recovery_id <= 3)
# https://github.com/bitcoin-core/secp256k1/blob/0b7024185045a49a1a6a4c5615bf31c94f63d9c4/src/modules/recovery/main_impl.h#L35
if len(electrum_signature) != LEN_COMPACT_SIG:
raise ValueError('Not a 65-byte compact signature.')
# Compute coincurve recid
recid = electrum_signature[0] - RECID_UNCOMPR
if not (RECID_MIN <= recid <= RECID_MAX):
raise ValueError('Recovery ID %d is not supported.' % recid)
recid_byte = int.to_bytes(recid, length=1, byteorder='big')
return electrum_signature[1:] + recid_byte
def electrum_sig(coincurve_signature):
# electrum := recovery_id + r + s
# where (27 <= recovery_id <= 30)
# https://github.com/scintill/bitcoin-signature-tools/blob/ed3f5be5045af74a54c92d3648de98c329d9b4f7/key.cpp#L285
if len(coincurve_signature) != LEN_COMPACT_SIG:
raise ValueError('Not a 65-byte compact signature.')
# Compute Electrum recid
recid = coincurve_signature[-1] + RECID_UNCOMPR
if not (RECID_UNCOMPR + RECID_MIN <= recid <= RECID_UNCOMPR + RECID_MAX):
raise ValueError('Recovery ID %d is not supported.' % recid)
recid_byte = int.to_bytes(recid, length=1, byteorder='big')
return recid_byte + coincurve_signature[0:-1]
def sign_data(secretkey, byte_string):
"""Sign [byte_string] with [secretkey].
Return serialized signature compatible with Electrum (ZeroNet)."""
# encode the message
encoded = _zero_format(byte_string)
# sign the message and get a coincurve signature
signature = secretkey.sign_recoverable(encoded)
# reserialize signature and return it
return electrum_sig(signature)
def verify_data(key_digest, electrum_signature, byte_string):
"""Verify if [electrum_signature] of [byte_string] is correctly signed and
is signed with the secret counterpart of [key_digest].
Raise SignatureError if the signature is forged or otherwise problematic."""
# reserialize signature
signature = coincurve_sig(electrum_signature)
# encode the message
encoded = _zero_format(byte_string)
# recover full public key from signature
# "which guarantees a correct signature"
publickey = recover_public_key(signature, encoded)
# verify that the message is correctly signed by the public key
# correct_sig = verify_sig(publickey, signature, encoded)
# verify that the public key is what we expect
correct_key = verify_key(publickey, key_digest)
if not correct_key:
raise SignatureError('Signature is forged!')
def verify_sig(publickey, signature, byte_string):
return publickey.verify(signature, byte_string)
def verify_key(publickey, key_digest):
return compare_digest(key_digest, public_digest(publickey))
# Electrum, the heck?!
def bchr(i):
return struct.pack('B', i)
def _zero_encode(val, base, minlen=0):
base, minlen = int(base), int(minlen)
code_string = b''.join([bchr(x) for x in range(256)])
result = b''
while val > 0:
index = val % base
result = code_string[index:index + 1] + result
val //= base
return code_string[0:1] * max(minlen - len(result), 0) + result
def _zero_insane_int(x):
x = int(x)
if x < 253:
return bchr(x)
elif x < 65536:
return bchr(253) + _zero_encode(x, 256, 2)[::-1]
elif x < 4294967296:
return bchr(254) + _zero_encode(x, 256, 4)[::-1]
else:
return bchr(255) + _zero_encode(x, 256, 8)[::-1]
def _zero_magic(message):
return b'\x18Bitcoin Signed Message:\n' + _zero_insane_int(len(message)) + message
def _zero_format(message):
padded = _zero_magic(message)
return hashlib.sha256(padded).digest()
def recover_address(data, sign):
publickey = recover_public_key(coincurve_sig(base64.b64decode(sign)), _zero_format(data))
return compute_public_address(publickey)
__all__ = [
'SignatureError',
'key_pair', 'compute_public_address', 'compute_secret_address',
'public_digest', 'address_public_digest', 'recover_public_key', 'decode_secret_key',
'sign_data', 'verify_data', "recover_address"
]
if __name__ == "__main__":
import base64, time, multiprocessing
s = time.time()
privatekey = decode_secret_key(b"5JsunC55XGVqFQj5kPGK4MWgTL26jKbnPhjnmchSNPo75XXCwtk")
threads = []
for i in range(1000):
data = bytes("hello", "utf8")
address = recover_address(data, "HGbib2kv9gm9IJjDt1FXbXFczZi35u0rZR3iPUIt5GglDDCeIQ7v8eYXVNIaLoJRI4URGZrhwmsYQ9aVtRTnTfQ=")
print("- Verify x10000: %.3fs %s" % (time.time() - s, address))
s = time.time()
for i in range(1000):
privatekey = decode_secret_key(b"5JsunC55XGVqFQj5kPGK4MWgTL26jKbnPhjnmchSNPo75XXCwtk")
sign = sign_data(privatekey, b"hello")
sign_b64 = base64.b64encode(sign)
print("- Sign x1000: %.3fs" % (time.time() - s))