trytond-electronic_mail_filter/electronic_mail.py
2016-03-29 12:00:23 +02:00

214 lines
7.1 KiB
Python

# This file is part of electronic_mail_filter module for Tryton.
# The COPYRIGHT file at the top level of this repository contains the full
# copyright notices and license terms.
from datetime import datetime
from trytond.model import ModelSQL, ModelView, fields
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Bool, Eval, Not
from trytond.transaction import Transaction
__all__ = ['ElectronicMailFilter', 'SearchingStart']
class ElectronicMailFilter(ModelSQL, ModelView):
'eMail Filter'
__name__ = 'electronic.mail.filter'
name = fields.Char('Name', required=True,
help='Name of the cron for sending Email.')
template = fields.Many2One('electronic.mail.template', 'eMail Template',
required=True)
model = fields.Function(fields.Many2One('ir.model', 'Model'),
'on_change_with_model')
cron = fields.Many2One('ir.cron', 'Cron', readonly=True)
active = fields.Boolean('Active')
profile = fields.Many2One('searching.profile', 'Profile',
domain=[
('model', '=', Eval('model')),
],
context={
'model': Eval('model'),
},
depends=['model'])
last_send = fields.DateTime('Last Send', required=True)
manual = fields.Boolean('Manual')
source = fields.Text('Source',
states={
'readonly': Not(Bool(Eval('manual'))),
'required': Bool(Eval('manual')),
}, depends=['manual'],
help='Available global variables:\n'
'\tself: Instance of this model.\n'
'\tpool: Instance of Pool that allows get access to any model '
'of database.\n'
'Returns `records`: List of records you want to select.')
mix_operator = fields.Selection([
('all', 'All'),
('any', 'Any'),
], 'Mix operator',
states={
'readonly': Not(Bool(Eval('manual'))),
},
help='Operator to mix the records obtained through the filters in the '
'first tab and records calculated by the manual code.')
@classmethod
def __setup__(cls):
super(ElectronicMailFilter, cls).__setup__()
cls._error_messages.update({
'filter_error': 'Manual filter error:\n%s',
'filter_success': 'Filter obtained these records:\n%s',
'filter_without_errors': 'Filter ended without errors, but do '
'not obtained any records.',
'details': 'Details:\n%s',
})
cls._buttons.update({
'test_profile': {
},
'send_emails': {
},
'create_cron': {
'invisible': Bool(Eval('cron')),
},
'delete_cron': {
'invisible': Not(Bool(Eval('cron'))),
},
})
@staticmethod
def default_active():
return True
@staticmethod
def default_mix_operator():
return 'all'
@staticmethod
def default_last_send():
return datetime.now()
@fields.depends('template')
def on_change_with_model(self, name=None):
if self.template:
return self.template.model.id
@classmethod
@ModelView.button
def create_cron(cls, filters):
pool = Pool()
Cron = pool.get('ir.cron')
User = pool.get('res.user')
cron_user, = User.search([
('active', '=', False),
('login', '=', 'user_cron_trigger'),
])
admin_user, = User.search([('login', '=', 'admin')])
args = []
vlist = []
for filter_ in filters:
vlist = [{
'name': filter_.name,
'user': cron_user.id,
'request_user': admin_user.id,
'active': True,
'interval_number': 1,
'interval_type': 'days',
'number_calls': -1,
'next_call': datetime.now(),
'model': 'electronic.mail.filter',
'function': 'send_emails',
'args': '(%s,)' % filter_.id,
}]
cron, = Cron.create(vlist)
args.extend(([filter_], {'cron': cron}))
cls.write(*args)
@classmethod
@ModelView.button
def delete_cron(cls, filters):
Cron = Pool().get('ir.cron')
crons = [f.cron for f in filters]
Cron.delete(crons)
@classmethod
@ModelView.button_action(
'searching.act_searching')
def test_profile(cls, filters):
pass
def search_records(self):
Model = Pool().get(self.model.model)
domain = self.profile.get_domain()
try:
records = Model.search(domain)
except (SyntaxError, NameError, IndexError, AttributeError), exc:
self.raise_user_error('filter_error', exc)
return records
@classmethod
@ModelView.button
def send_emails(cls, filters):
pool = Pool()
ElectronicMail = pool.get('electronic.mail')
EmailConfiguration = pool.get('electronic.mail.configuration')
email_configuration = EmailConfiguration(1)
if isinstance(filters, int):
filters = cls.search([
('id', '=', filters),
])
emails = []
for fltr in filters:
template = fltr.template
records = fltr.search_records()
if not records:
continue
group_records = (getattr(template, 'single_email', False) and
getattr(template, 'group_records', False))
if group_records:
records = group_records(records)
for record in records:
if group_records:
attachments = template.get_attachments(record)
record = record[0]
message = template.render_message(record, attachments)
else:
attachments = template.get_attachments([record])
message = template.render(record)
if template.queue:
mailbox = (template.mailbox_outbox
if template.mailbox_outbox
else email_configuration.outbox)
else:
mailbox = (template.mailbox
if template.mailbox
else email_configuration.sent)
context = {}
field_expression = getattr(template, 'bcc')
eval_result = template.eval(field_expression, record)
if eval_result:
context['bcc'] = eval_result
emails.append(ElectronicMail.create_from_email(message,
mailbox, context))
ElectronicMail.send_emails(emails)
class SearchingStart:
__metaclass__ = PoolMeta
__name__ = 'searching.start'
@staticmethod
def default_profile():
context = Transaction().context
active_id = context.get('active_id')
filter_ = Pool().get('electronic.mail.filter')(active_id)
if filter_.profile:
return filter_.profile.id