Files
tradon/modules/attendance/attendance.py
2026-03-14 09:42:12 +00:00

383 lines
13 KiB
Python

# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import datetime as dt
from sql import Column, Window
from sql.aggregate import Min, Sum
from sql.conditionals import Coalesce
from sql.functions import Function, NthValue
from trytond import backend
from trytond.cache import Cache
from trytond.i18n import gettext
from trytond.model import Index, ModelSQL, ModelView, Workflow, fields
from trytond.pool import Pool, PoolMeta
from trytond.pyson import Eval
from trytond.tools import timezone as tz
from trytond.transaction import Transaction
from .exceptions import PeriodClosedError, PeriodTransitionError
class SQLiteStrftime(Function):
__slots__ = ()
_function = 'STRFTIME'
class Line(ModelSQL, ModelView):
__name__ = 'attendance.line'
company = fields.Many2One('company.company', "Company", required=True,
help="The company which the employee attended.")
employee = fields.Many2One('company.employee', "Employee", required=True,
search_context={
'active_test': False,
},
domain=[
('company', '=', Eval('company', -1)),
['OR',
('start_date', '=', None),
('start_date', '<=', Eval('date', None)),
],
['OR',
('end_date', '=', None),
('end_date', '>=', Eval('date', None)),
],
])
at = fields.DateTime("At", required=True)
date = fields.Date("Date", required=True)
type = fields.Selection([
('in', 'In'),
('out', 'Out'),
], "Type", required=True)
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('at', 'DESC'))
# Do not cache default_at
cls.__rpc__['default_get'].cache = None
@classmethod
def default_at(cls):
return dt.datetime.now()
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@classmethod
def default_employee(cls):
return Transaction().context.get('employee')
@fields.depends('at', 'company')
def on_change_with_date(self):
if not self.at:
return
at = self.at
if self.company and self.company.timezone:
timezone = tz.ZoneInfo(self.company.timezone)
at = self.at.replace(tzinfo=tz.UTC) .astimezone(timezone)
return at.date()
def get_rec_name(self, name):
pool = Pool()
Lang = pool.get('ir.lang')
lang = Lang.get()
return '%s@%s' % (self.employee.rec_name, lang.strftime(self.at))
def compute_fields(self, field_names=None):
cls = self.__class__
values = super().compute_fields(field_names=field_names)
if (field_names is None
or (cls.date.on_change_with & field_names)):
date = self.on_change_with_date()
if getattr(self, 'date', None) != date:
values['date'] = date
return values
@classmethod
def check_modification(cls, mode, lines, values=None, external=False):
super().check_modification(
mode, lines, values=values, external=external)
if mode == 'delete':
cls.check_closed_period(lines, msg='delete')
@classmethod
def validate_fields(cls, records, field_names):
super().validate_fields(records, field_names)
cls.check_closed_period(records, field_names=field_names)
@classmethod
def check_closed_period(cls, records, msg='modify', field_names=None):
pool = Pool()
Period = pool.get('attendance.period')
if field_names and not (field_names & {'company', 'at'}):
return
for record in records:
period_date = Period.get_last_period_date(record.company)
if period_date and period_date > record.at:
raise PeriodClosedError(
gettext('attendance.msg_%s_period_close' % msg,
attendance=record.rec_name,
period=period_date))
@fields.depends('employee', 'at', 'id')
def on_change_with_type(self):
records = self.search([
('employee', '=', self.employee),
('at', '<', self.at),
('id', '!=', self.id),
],
order=[('at', 'desc')],
limit=1)
if records:
record, = records
return {'in': 'out', 'out': 'in'}.get(record.type)
else:
return 'in'
class Period(Workflow, ModelSQL, ModelView):
__name__ = 'attendance.period'
_states = {
'readonly': Eval('state') == 'closed',
}
_last_period_cache = Cache('attendance.period', context=False)
ends_at = fields.DateTime("Ends at", required=True, states=_states)
company = fields.Many2One('company.company', "Company", required=True,
states=_states, help="The company the period is associated with.")
state = fields.Selection([
('draft', 'Draft'),
('closed', 'Closed'),
], "State", readonly=True, sort=False,
help="The current state of the attendance period.")
@classmethod
def __setup__(cls):
super().__setup__()
t = cls.__table__()
cls._sql_indexes.add(
Index(
t,
(t.company, Index.Equality()),
(t.state, Index.Equality(cardinality='low')),
(t.ends_at, Index.Range(order='DESC'))))
cls._transitions |= set((
('draft', 'closed'),
('closed', 'draft'),
))
cls._buttons.update({
'draft': {
'invisible': Eval('state') == 'draft',
'depends': ['state'],
},
'close': {
'invisible': Eval('state') == 'closed',
'depends': ['state'],
},
})
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@classmethod
def default_state(cls):
return 'draft'
def get_rec_name(self, name):
pool = Pool()
Lang = pool.get('ir.lang')
lang = Lang.get()
return lang.strftime(self.ends_at)
@classmethod
def get_last_period_date(cls, company):
key = int(company)
result = cls._last_period_cache.get(key, -1)
if result == -1:
records = cls.search([
('company', '=', company),
('state', '=', 'closed'),
],
order=[('ends_at', 'DESC')],
limit=1)
if records:
record, = records
result = record.ends_at
else:
result = None
cls._last_period_cache.set(key, result)
return result
@classmethod
@ModelView.button
@Workflow.transition('draft')
def draft(cls, periods):
for period in periods:
records = cls.search([
('company', '=', period.company),
('state', '=', 'closed'),
('ends_at', '>', period.ends_at),
],
order=[('ends_at', 'ASC')],
limit=1)
if records:
record, = records
raise PeriodTransitionError(
gettext('attendance.msg_draft_period_previous_closed',
period=period.rec_name,
other_period=record.rec_name))
cls._last_period_cache.clear()
@classmethod
@ModelView.button
@Workflow.transition('closed')
def close(cls, periods):
for period in periods:
records = cls.search([
('company', '=', period.company),
('state', '=', 'draft'),
('ends_at', '<', period.ends_at),
],
order=[('ends_at', 'ASC')],
limit=1)
if records:
record, = records
raise PeriodTransitionError(
gettext('attendance.msg_close_period_previous_open',
period=period.rec_name,
other_period=record.rec_name))
cls._last_period_cache.clear()
class SheetLine(ModelSQL, ModelView):
__name__ = 'attendance.sheet.line'
company = fields.Many2One('company.company', "Company")
employee = fields.Many2One('company.employee', "Employee")
from_ = fields.DateTime("From")
to = fields.DateTime("To")
duration = fields.TimeDelta("Duration")
date = fields.Date("Date")
sheet = fields.Many2One('attendance.sheet', "Sheet")
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('from_', 'DESC'))
@classmethod
def table_query(cls):
pool = Pool()
Attendance = pool.get('attendance.line')
attendance = Attendance.__table__()
window = Window(
[attendance.employee],
order_by=[attendance.at.asc],
frame='ROWS', start=0, end=1)
type = NthValue(attendance.type, 1, window=window)
from_ = NthValue(attendance.at, 1, window=window)
to = NthValue(attendance.at, 2, window=window)
date = NthValue(attendance.date, 1, window=window)
query = attendance.select(
attendance.id.as_('id'),
attendance.company.as_('company'),
attendance.employee.as_('employee'),
type.as_('type'),
from_.as_('from_'),
to.as_('to'),
date.as_('date'))
sheet = (
Min(query.id * 2, window=Window([query.employee, query.date])))
from_ = Column(query, 'from_')
if backend.name == 'sqlite':
# As SQLite does not support operation on datetime
# we convert datetime into seconds
duration = (
SQLiteStrftime('%s', query.to) - SQLiteStrftime('%s', from_))
else:
duration = query.to - from_
return query.select(
query.id.as_('id'),
query.company.as_('company'),
query.employee.as_('employee'),
from_.as_('from_'),
query.to.as_('to'),
query.date.as_('date'),
duration.as_('duration'),
sheet.as_('sheet'),
where=query.type == 'in')
class Sheet(ModelSQL, ModelView):
__name__ = 'attendance.sheet'
company = fields.Many2One('company.company', "Company")
employee = fields.Many2One('company.employee', "Employee")
duration = fields.TimeDelta("Duration")
date = fields.Date("Date")
lines = fields.One2Many('attendance.sheet.line', 'sheet', "Lines")
@classmethod
def __setup__(cls):
super().__setup__()
cls._order.insert(0, ('date', 'DESC'))
@classmethod
def table_query(cls):
pool = Pool()
Line = pool.get('attendance.sheet.line')
line = Line.__table__()
return line.select(
(Min(line.id * 2)).as_('id'),
line.company.as_('company'),
line.employee.as_('employee'),
Sum(line.duration).as_('duration'),
line.date.as_('date'),
group_by=[line.company, line.employee, line.date])
class Sheet_Timesheet(metaclass=PoolMeta):
__name__ = 'attendance.sheet'
timesheet_duration = fields.TimeDelta("Timesheet Duration")
@classmethod
def table_query(cls):
pool = Pool()
Timesheet = pool.get('timesheet.line')
line = Timesheet.__table__()
timesheet = line.select(
Min(line.id * 2 + 1).as_('id'),
line.company.as_('company'),
line.employee.as_('employee'),
Sum(line.duration).as_('duration'),
line.date.as_('date'),
group_by=[line.company, line.employee, line.date])
attendance = super().table_query()
return (attendance
.join(
timesheet, 'FULL' if backend.name != 'sqlite' else 'LEFT',
condition=(attendance.company == timesheet.company)
& (attendance.employee == timesheet.employee)
& (attendance.date == timesheet.date))
.select(
Coalesce(attendance.id, timesheet.id).as_('id'),
Coalesce(attendance.company, timesheet.company).as_('company'),
Coalesce(
attendance.employee, timesheet.employee).as_('employee'),
attendance.duration.as_('duration'),
timesheet.duration.as_('timesheet_duration'),
Coalesce(attendance.date, timesheet.date).as_('date'),
))