trytond-electronic_mail_filter/electronic_mail.py

219 lines
7.8 KiB
Python

# This file is part of electronic_mail_filter module for Tryton.
# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
from datetime import datetime
from trytond.model import ModelSQL, ModelView, fields, DeactivableMixin
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, Not
from trytond.transaction import Transaction
__all__ = ['ElectronicMailFilter', 'SearchingStart']
class ElectronicMailFilter(DeactivableMixin, ModelSQL, ModelView):
'eMail Filter'
__name__ = 'electronic.mail.filter'
name = fields.Char('Name', required=True,
help='Name of the cron for sending Email.')
template = fields.Many2One('electronic.mail.template', 'eMail Template',
required=True)
model = fields.Function(fields.Many2One('ir.model', 'Model'),
'on_change_with_model')
cron = fields.Many2One('ir.cron', 'Cron', readonly=True)
profile = fields.Many2One('searching.profile', 'Profile',
domain=[
('model', '=', Eval('model')),
],
context={
'model': Eval('model'),
},
depends=['model'])
last_send = fields.DateTime('Last Send', required=True)
manual = fields.Boolean('Manual')
source = fields.Text('Source',
states={
'readonly': Not(Bool(Eval('manual'))),
'required': Bool(Eval('manual')),
}, depends=['manual'],
help='Available global variables:\n'
'\tself: Instance of this model.\n'
'\tpool: Instance of Pool that allows get access to any model '
'of database.\n'
'Returns `records`: List of records you want to select.')
mix_operator = fields.Selection([
('all', 'All'),
('any', 'Any'),
], 'Mix operator',
states={
'readonly': Not(Bool(Eval('manual'))),
},
help='Operator to mix the records obtained through the filters in the '
'first tab and records calculated by the manual code.')
email = fields.Many2One('party.contact_mechanism', 'E-Mail',
domain=[('type', '=', 'email')],
help='Defines E-mail to use instead of defined in email template. '
'Required if check "Send email when no records" enabled.')
@classmethod
def __setup__(cls):
super(ElectronicMailFilter, cls).__setup__()
cls._error_messages.update({
'filter_error': 'Manual filter error:\n%s',
'filter_success': 'Filter obtained these records:\n%s',
'filter_without_errors': 'Filter ended without errors, but do '
'not obtained any records.',
'details': 'Details:\n%s',
})
cls._buttons.update({
'test_profile': {
},
'send_emails': {
},
'create_cron': {
'invisible': Bool(Eval('cron')),
},
'delete_cron': {
'invisible': Not(Bool(Eval('cron'))),
},
})
@staticmethod
def default_mix_operator():
return 'all'
@staticmethod
def default_last_send():
return datetime.now()
@fields.depends('template')
def on_change_with_model(self, name=None):
if self.template:
return self.template.model.id
@classmethod
@ModelView.button
def create_cron(cls, filters):
pool = Pool()
Cron = pool.get('ir.cron')
User = pool.get('res.user')
cron_user, = User.search([
('active', '=', False),
('login', '=', 'user_cron_trigger'),
])
admin_user, = User.search([('login', '=', 'admin')])
args = []
vlist = []
for filter_ in filters:
vlist = [{
'name': filter_.name,
'user': cron_user.id,
'request_user': admin_user.id,
'active': True,
'interval_number': 1,
'interval_type': 'days',
'number_calls': -1,
'next_call': datetime.now(),
'model': 'electronic.mail.filter',
'function': 'send_emails',
'args': '([%s,],)' % filter_.id,
}]
cron, = Cron.create(vlist)
args.extend(([filter_], {'cron': cron}))
cls.write(*args)
@classmethod
@ModelView.button
def delete_cron(cls, filters):
Cron = Pool().get('ir.cron')
crons = [f.cron for f in filters]
Cron.delete(crons)
@classmethod
@ModelView.button_action(
'searching.act_searching')
def test_profile(cls, filters):
pass
def search_records(self):
Model = Pool().get(self.model.model)
domain = self.profile.get_domain()
try:
records = Model.search(domain)
except (SyntaxError, NameError, IndexError, AttributeError) as exc:
self.raise_user_error('filter_error', exc)
return records
@classmethod
@ModelView.button
def send_emails(cls, filters):
pool = Pool()
ElectronicMail = pool.get('electronic.mail')
EmailConfiguration = pool.get('electronic.mail.configuration')
email_configuration = EmailConfiguration(1)
if filters and isinstance(filters[0], int):
filters = cls.search([
('id', 'in', filters),
])
emails = []
for fltr in filters:
ctx = fltr.profile.get_context()
with Transaction().set_context(**ctx):
template = fltr.template
records = fltr.search_records()
if not records:
if not template.no_records_email:
continue
records.append(fltr)
elif template.no_records_email:
continue
group_records = (getattr(template, 'single_email', False) and
getattr(template, 'group_records', False))
if group_records:
records = group_records(records)
if fltr.email:
# group all records, discard template 'to' grouping
records = [[r for g in records for r in g]]
for record in records:
if group_records:
attachments = template.get_attachments(record)
record = record[0]
else:
attachments = template.get_attachments([record])
message = template.render_message(record, attachments)
mailbox = (template.mailbox
if template.mailbox
else email_configuration.sent)
field_expression = getattr(template, 'bcc')
eval_result = template.eval(field_expression, record)
if eval_result:
message['bcc'] = eval_result
if fltr.email:
message.replace_header('to', fltr.email.email)
emails.append(ElectronicMail.create_from_mail(message,
mailbox))
ElectronicMail.send_mail(emails)
class SearchingStart(metaclass=PoolMeta):
__name__ = 'searching.start'
@staticmethod
def default_profile():
context = Transaction().context
active_id = context.get('active_id')
active_model = context.get('active_model')
if active_model == 'electronic.mail.filter':
filter_ = Pool().get('electronic.mail.filter')(active_id)
if filter_.profile:
return filter_.profile.id