2017-06-08 08:36:48 +02:00
|
|
|
# The COPYRIGHT file at the top level of this repository contains
|
|
|
|
# the full copyright notices and license terms.
|
2017-05-17 12:34:08 +02:00
|
|
|
from logging import getLogger
|
2017-05-18 11:15:22 +02:00
|
|
|
from contextlib import contextmanager
|
|
|
|
from tempfile import NamedTemporaryFile
|
|
|
|
|
2017-05-17 12:34:08 +02:00
|
|
|
from cryptography.fernet import Fernet
|
|
|
|
|
|
|
|
from trytond.config import config
|
|
|
|
from trytond.model import fields
|
|
|
|
from trytond.pool import PoolMeta
|
|
|
|
from trytond.transaction import Transaction
|
|
|
|
|
|
|
|
__all__ = ['Company']
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class Company:
|
|
|
|
__name__ = 'company.company'
|
|
|
|
__metaclass__ = PoolMeta
|
|
|
|
|
2017-06-12 23:29:26 +02:00
|
|
|
pem_certificate = fields.Binary('PEM Certificate')
|
|
|
|
encrypted_private_key = fields.Binary('Encrypted Private Key')
|
|
|
|
private_key = fields.Function(fields.Binary('Private Key'),
|
|
|
|
'get_private_key', 'set_private_key')
|
|
|
|
|
2017-05-17 12:34:08 +02:00
|
|
|
@classmethod
|
|
|
|
def __setup__(cls):
|
|
|
|
super(Company, cls).__setup__()
|
|
|
|
|
|
|
|
cls._error_messages.update({
|
|
|
|
'missing_fernet_key': "Missing Fernet key configuration",
|
2017-06-08 08:36:48 +02:00
|
|
|
'missing_pem_cert': "Missing PEM certificate"
|
2017-05-17 12:34:08 +02:00
|
|
|
})
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_private_key(cls, companies, name=None):
|
2018-05-04 14:38:25 +02:00
|
|
|
converter = bytes
|
2017-05-17 12:34:08 +02:00
|
|
|
default = None
|
|
|
|
format_ = Transaction().context.pop(
|
|
|
|
'%s.%s' % (cls.__name__, name), '')
|
|
|
|
if format_ == 'size':
|
|
|
|
converter = len
|
|
|
|
default = 0
|
|
|
|
|
|
|
|
pkeys = [
|
|
|
|
company._get_private_key(name)
|
|
|
|
for company in companies
|
|
|
|
]
|
|
|
|
return {
|
|
|
|
company.id: converter(pkey) if pkey else default
|
|
|
|
for (company, pkey) in zip(companies, pkeys)
|
|
|
|
}
|
|
|
|
|
|
|
|
def _get_private_key(self, name=None):
|
|
|
|
if not self.encrypted_private_key:
|
|
|
|
return None
|
|
|
|
fernet = self.get_fernet_key()
|
|
|
|
decrypted_key = fernet.decrypt(bytes(self.encrypted_private_key))
|
|
|
|
return decrypted_key
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def set_private_key(cls, companies, name, value):
|
|
|
|
encrypted_key = None
|
|
|
|
if value:
|
|
|
|
fernet = cls.get_fernet_key()
|
|
|
|
encrypted_key = fernet.encrypt(bytes(value))
|
|
|
|
cls.write(companies, {'encrypted_private_key': encrypted_key})
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_fernet_key(cls):
|
|
|
|
fernet_key = config.get('cryptography', 'fernet_key')
|
|
|
|
if not fernet_key:
|
|
|
|
_logger.error('Missing Fernet key configuration')
|
|
|
|
cls.raise_user_error('missing_fernet_key')
|
|
|
|
else:
|
|
|
|
return Fernet(fernet_key)
|
2017-05-18 11:15:22 +02:00
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
def tmp_ssl_credentials(self):
|
2017-06-08 08:36:48 +02:00
|
|
|
if not self.pem_certificate:
|
|
|
|
self.raise_user_error('missing_pem_cert')
|
2017-05-18 11:15:22 +02:00
|
|
|
with NamedTemporaryFile(suffix='.crt') as crt:
|
|
|
|
with NamedTemporaryFile(suffix='.pem') as key:
|
|
|
|
crt.write(self.pem_certificate)
|
|
|
|
key.write(self.private_key)
|
|
|
|
crt.flush()
|
|
|
|
key.flush()
|
|
|
|
yield (crt.name, key.name)
|