# ============================================================================
#
# Copyright (C) 2007-2013 Conceptive Engineering bvba. All rights reserved.
# www.conceptive.be / info@conceptive.be
#
# This file is part of the Camelot Library.
#
# This file may be used under the terms of the GNU General Public
# License version 2.0 as published by the Free Software Foundation
# and appearing in the file license.txt included in the packaging of
# this file. Please review this information to ensure GNU
# General Public Licensing requirements will be met.
#
# If you are unsure which license is appropriate for your use, please
# visit www.python-camelot.com or contact info@conceptive.be
#
# This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
# WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# For use of this library in commercial applications, please contact
# info@conceptive.be
#
# ============================================================================
import copy
import datetime
import logging
from .base import Action
from application_action import ( ApplicationActionGuiContext,
ApplicationActionModelContext )
from camelot.core.exception import UserException
from camelot.core.utils import ugettext, ugettext_lazy as _
from camelot.view.art import Icon
from PyQt4 import QtGui
LOGGER = logging.getLogger( 'camelot.admin.action.list_action' )
[docs]class ListActionModelContext( ApplicationActionModelContext ):
"""On top of the attributes of the
:class:`camelot.admin.action.application_action.ApplicationActionModelContext`,
this context contains :
.. attribute:: selection_count
the number of selected rows.
.. attribute:: collection_count
the number of rows in the list.
.. attribute:: selected_rows
an ordered list with tuples of selected row ranges. the range is
inclusive.
.. attribute:: current_row
the current row in the list
.. attribute:: session
The session to which the objects in the list belong.
.. attribute:: field_attributes
The field attributes of the field to which the list relates, for example
the attributes of Person.addresses if the list is the list of addresses
of the Person.
The :attr:`collection_count` and :attr:`selection_count` attributes allow the
:meth:`model_run` to quickly evaluate the size of the collection or the
selection without calling the potentially time consuming methods
:meth:`get_collection` and :meth:`get_selection`.
"""
def __init__( self ):
super( ListActionModelContext, self ).__init__()
self._model = None
self.admin = None
self.current_row = None
self.selection_count = 0
self.collection_count = 0
self.selected_rows = []
self.field_attributes = dict()
[docs] def get_selection( self, yield_per = None ):
"""
:param yield_per: an integer number giving a hint on how many objects
should fetched from the database at the same time.
:return: a generator over the objects selected
"""
# during deletion or duplication, the collection might
# change, while the selection remains the same, so we should
# be careful when using the collection to generate selection data
for (first_row, last_row) in self.selected_rows:
for row in range( first_row, last_row + 1 ):
yield self._model._get_object( row )
[docs] def get_collection( self, yield_per = None ):
"""
:param yield_per: an integer number giving a hint on how many objects
should fetched from the database at the same time.
:return: a generator over the objects in the list
"""
for obj in self._model.get_collection():
yield obj
[docs] def get_object( self ):
"""
:return: the object displayed in the current row or None
"""
if self.current_row != None:
return self._model._get_object( self.current_row )
[docs]class ListActionGuiContext( ApplicationActionGuiContext ):
"""The context for an :class:`Action` on a table view. On top of the attributes of the
:class:`camelot.admin.action.application_action.ApplicationActionGuiContext`,
this context contains :
.. attribute:: item_view
the :class:`QtGui.QAbstractItemView` class that relates to the table
view on which the widget will be placed.
.. attribute:: view
a :class:`camelot.view.controls.view.AbstractView` class that represents
the view in which the action is triggered.
.. attribute:: field_attributes
a dictionary with the field attributes of the list. This dictionary will
be filled in case if the list displayed is related to a field on another
object. For example, the list of addresses of Person will have the field
attributes of the Person.addresses field when displayed on the Person
form.
"""
model_context = ListActionModelContext
def __init__( self ):
super( ListActionGuiContext, self ).__init__()
self.item_view = None
self.view = None
self.field_attributes = dict()
def create_model_context( self ):
context = super( ListActionGuiContext, self ).create_model_context()
context.field_attributes = copy.copy( self.field_attributes )
current_row = None
model = None
collection_count = 0
selection_count = 0
selected_rows = []
if self.item_view != None:
current_row = self.item_view.currentIndex().row()
model = self.item_view.model()
if model != None:
collection_count = model.rowCount()
if self.item_view.selectionModel() != None:
selection = self.item_view.selectionModel().selection()
for i in range( len( selection ) ):
selection_range = selection[i]
rows_range = ( selection_range.top(), selection_range.bottom() )
selected_rows.append( rows_range )
selection_count += ( rows_range[1] - rows_range[0] ) + 1
context.selection_count = selection_count
context.collection_count = collection_count
context.selected_rows = selected_rows
context.current_row = current_row
context._model = model
return context
def copy( self, base_class = None ):
new_context = super( ListActionGuiContext, self ).copy( base_class )
new_context.item_view = self.item_view
new_context.view = self.view
return new_context
[docs]class CallMethod( Action ):
"""
Call a method on all objects in a selection, and flush the
session.
:param verbose_name: the name of the action, as it should appear
to the user
:param method: the method to call on the objects
:param enabled: method to call on objects to verify if the action is
enabled, by default the action is always enabled
This action can be used either within :attr:`list_actions` or within
:attr:`form_actions`.
"""
def __init__( self, verbose_name, method, enabled=None ):
self.verbose_name = verbose_name
self.method = method
self.enabled = enabled
def model_run( self, model_context ):
from camelot.view.action_steps import ( UpdateProgress,
FlushSession,
UpdateObject )
step = max( 1, model_context.selection_count / 100 )
for i, obj in enumerate( model_context.get_selection() ):
if i%step == 0:
yield UpdateProgress( i, model_context.selection_count )
self.method( obj )
# the object might have changed without the need to be flushed
# to the database
yield UpdateObject( obj )
yield FlushSession( model_context.session )
def get_state( self, model_context ):
state = super( CallMethod, self ).get_state( model_context )
if self.enabled != None:
for obj in model_context.get_selection():
if not self.enabled( obj ):
state.enabled = False
break
return state
[docs]class ListContextAction( Action ):
"""An base class for actions that should only be enabled if the
gui_context is a :class:`ListActionModelContext`
"""
def get_state( self, model_context ):
state = super( ListContextAction, self ).get_state( model_context )
if isinstance( model_context, ListActionModelContext ):
state.enabled = True
else:
state.enabled = False
return state
[docs]class EditAction( ListContextAction ):
"""A base class for an action that will modify the model, it will be
disabled when the field_attributes for the relation field are set to
not-editable.
"""
def get_state( self, model_context ):
state = super( EditAction, self ).get_state( model_context )
if isinstance( model_context, ListActionModelContext ):
editable = model_context.field_attributes.get( 'editable', True )
if editable == False:
state.enabled = False
return state
[docs]class OpenNewView( EditAction ):
"""Opens a new view of an Entity related to a table view.
"""
shortcut = QtGui.QKeySequence.New
icon = Icon('tango/16x16/actions/document-new.png')
tooltip = _('New')
verbose_name = _('New')
def gui_run( self, gui_context ):
from camelot.view.workspace import show_top_level
admin = gui_context.admin
model = gui_context.item_view.model()
form = admin.create_new_view( related_collection_proxy=model,
parent = None )
show_top_level( form, gui_context.item_view )
[docs]class DuplicateSelection( EditAction ):
"""Duplicate the selected rows in a table"""
shortcut = QtGui.QKeySequence.Copy
icon = Icon('tango/16x16/actions/edit-copy.png')
tooltip = _('Duplicate')
verbose_name = _('Duplicate')
def model_run( self, model_context ):
from camelot.view import action_steps
for i, obj in enumerate( model_context.get_selection() ):
yield action_steps.UpdateProgress( i,
model_context.selection_count,
self.verbose_name )
new_object = model_context.admin.copy( obj )
model_context._model.append_object( new_object )
yield action_steps.FlushSession( model_context.session )
[docs]class DeleteSelection( EditAction ):
"""Delete the selected rows in a table"""
shortcut = QtGui.QKeySequence.Delete
icon = Icon('tango/16x16/places/user-trash.png')
tooltip = _('Delete')
verbose_name = _('Delete')
def gui_run( self, gui_context ):
#
# if there is an open editor on a row that will be deleted, there
# might be an assertion failure in QT, or the data of the editor
# might be pushed to the row that replaces the deleted one
#
gui_context.item_view.close_editor()
super( DeleteSelection, self ).gui_run( gui_context )
# this refresh call could be avoided if the removal of an object
# in the collection through the DeleteObject action step handled this
gui_context.item_view.model().refresh()
def model_run( self, model_context ):
from camelot.view import action_steps
if model_context.selection_count <= 0:
raise StopIteration
admin = model_context.admin
if model_context.admin.get_delete_mode() == 'on_confirm':
step = action_steps.MessageBox( _('Please confirm'),
admin.get_delete_message(None),
QtGui.QMessageBox.Yes,
QtGui.QMessageBox.No )
response = yield step
if response == QtGui.QMessageBox.No:
raise StopIteration
objects_to_remove = list( model_context.get_selection() )
#
# it might be impossible to determine the depending objects once
# the object has been removed from the collection
#
depending_objects = set()
for o in objects_to_remove:
depending_objects.update( set( admin.get_depending_objects( o ) ) )
for i, obj in enumerate( objects_to_remove ):
yield action_steps.UpdateProgress( i,
model_context.selection_count,
_('Removing') )
#
# We should not update depending objects that have
# been deleted themselves
#
try:
depending_objects.remove( obj )
except KeyError:
pass
for step in self.handle_object( model_context, obj ):
yield step
for depending_obj in depending_objects:
yield action_steps.UpdateObject( depending_obj )
yield action_steps.FlushSession( model_context.session )
def handle_object( self, model_context, obj ):
from camelot.view import action_steps
yield action_steps.DeleteObject( obj )
model_context.admin.delete( obj )
[docs]class ToPreviousRow( ListContextAction ):
"""Move to the previous row in a table"""
shortcut = QtGui.QKeySequence.MoveToPreviousPage
icon = Icon('tango/16x16/actions/go-previous.png')
tooltip = _('Previous')
verbose_name = _('Previous')
def gui_run( self, gui_context ):
item_view = gui_context.item_view
selection = item_view.selectedIndexes()
rows = item_view.model().rowCount()
if rows <= 0:
return
if selection:
current_row = selection[0].row()
previous_row = ( current_row - 1 ) % rows
else:
previous_row = 0
item_view.selectRow( previous_row )
def get_state( self, model_context ):
state = super( ToPreviousRow, self ).get_state( model_context )
#if state.enabled:
# state.enabled = ( model_context.current_row > 0 )
return state
[docs]class ToFirstRow( ToPreviousRow ):
"""Move to the first row in a table"""
shortcut = QtGui.QKeySequence.MoveToStartOfDocument
icon = Icon('tango/16x16/actions/go-first.png')
tooltip = _('First')
verbose_name = _('First')
def gui_run( self, gui_context ):
gui_context.item_view.selectRow( 0 )
[docs]class ToNextRow( ListContextAction ):
"""Move to the next row in a table"""
shortcut = QtGui.QKeySequence.MoveToNextPage
icon = Icon('tango/16x16/actions/go-next.png')
tooltip = _('Next')
verbose_name = _('Next')
def gui_run( self, gui_context ):
item_view = gui_context.item_view
selection = item_view.selectedIndexes()
rows = item_view.model().rowCount()
if rows <= 0:
return
if selection:
current_row = selection[0].row()
next_row = ( current_row + 1 ) % rows
else:
next_row = 0
item_view.selectRow( next_row )
def get_state( self, model_context ):
state = super( ToNextRow, self ).get_state( model_context )
#if state.enabled:
# max_row = model_context.collection_count - 1
# state.enabled = ( model_context.current_row < max_row )
return state
[docs]class ToLastRow( ToNextRow ):
"""Move to the last row in a table"""
shortcut = QtGui.QKeySequence.MoveToEndOfDocument
icon = Icon('tango/16x16/actions/go-last.png')
tooltip = _('Last')
verbose_name = _('Last')
def gui_run( self, gui_context ):
item_view = gui_context.item_view
item_view.selectRow( item_view.model().rowCount() - 1 )
[docs]class ExportSpreadsheet( ListContextAction ):
"""Export all rows in a table to a spreadsheet"""
icon = Icon('tango/16x16/mimetypes/x-office-spreadsheet.png')
tooltip = _('Export to MS Excel')
verbose_name = _('Export to MS Excel')
font_name = 'Arial'
def model_run( self, model_context ):
from decimal import Decimal
from xlwt import Font, Borders, XFStyle, Pattern, Workbook
from camelot.view.import_utils import ( RowData,
ColumnMapping,
ColumnSelectionAdmin )
from camelot.view.utils import ( local_date_format,
local_datetime_format,
local_time_format )
from camelot.view import action_steps
#
# Select the columns that need to be exported
#
admin = model_context.admin
all_fields = admin.get_all_fields_and_attributes()
field_choices = [(f,entity_fa['name']) for f,entity_fa in
all_fields.items() ]
row_data = RowData( 1, [None] * len( all_fields ) )
mapping = ColumnMapping( len( all_fields ),
[row_data],
admin,
[field for field, _fa in admin.get_columns()] )
mapping_admin = ColumnSelectionAdmin( len( all_fields ),
admin, field_choices )
yield action_steps.ChangeObject( mapping, mapping_admin )
columns = []
for i in range( len( all_fields ) ):
field = getattr( mapping, 'column_%i_field'%i )
if field != None:
columns.append( ( field, all_fields[field] ) )
#
# setup worksheet
#
yield action_steps.UpdateProgress( text = _('Create worksheet') )
workbook = Workbook()
worksheet = workbook.add_sheet('Sheet1')
#
# keep a global cache of styles, since the number of styles that
# can be used is limited.
#
styles = dict()
freeze = lambda d:tuple(sorted(d.iteritems()))
def get_style( font_specs=dict(),
border_specs = dict(),
pattern = None,
num_format_str = None, ):
style_key = ( freeze(font_specs),
freeze(border_specs),
pattern,
num_format_str )
try:
return styles[style_key]
except KeyError:
style = XFStyle()
style.font = Font()
for key, value in font_specs.items():
setattr( style.font, key, value )
style.borders = Borders()
for key, value in border_specs.items():
setattr( style.borders, key, value )
if pattern:
style.pattern = pattern
if num_format_str:
style.num_format_str = num_format_str
styles[ style_key ] = style
return style
#
# write style
#
title_style = get_style( dict( font_name = self.font_name,
bold = True,
height = 240 ) )
worksheet.write( 0, 0, admin.get_verbose_name_plural(), title_style )
#
# create some patterns and formats
#
date_format = local_date_format()
datetime_format = local_datetime_format()
time_format = local_time_format()
header_pattern = Pattern()
header_pattern.pattern = Pattern.SOLID_PATTERN
header_pattern.pattern_fore_colour = 0x16
#
# write headers
#
field_names = []
for i, (name, field_attributes) in enumerate( columns ):
verbose_name = unicode( field_attributes.get( 'name', name ) )
field_names.append( name )
font_specs = dict( font_name = self.font_name,
bold = True,
height = 200 )
border_specs = dict( top = 0x01 )
name = unicode( name )
if i == 0:
border_specs[ 'left' ] = 0x01
elif i == len( columns ) - 1:
border_specs[ 'right' ] = 0x01
header_style = get_style( font_specs, border_specs, header_pattern )
worksheet.write( 2, i, verbose_name, header_style)
if len( name ) < 8:
worksheet.col( i ).width = 8 * 375
else:
worksheet.col( i ).width = len( verbose_name ) * 375
#
# write data
#
offset = 3
for j, obj in enumerate( model_context.get_collection( yield_per = 100 ) ):
dynamic_attributes = admin.get_dynamic_field_attributes( obj,
field_names )
row = offset + j
if j % 100 == 0:
yield action_steps.UpdateProgress( j, model_context.collection_count )
for i, ((_name, attributes), delta_attributes) in enumerate( zip( columns, dynamic_attributes ) ):
attributes.update( delta_attributes )
value = attributes['getter']( obj )
format_string = '0'
if value != None:
if isinstance( value, Decimal ):
value = float( str( value ) )
if isinstance( value, (unicode, str) ):
if attributes.get( 'translate_content', False ) == True:
value = ugettext( value )
elif isinstance( value, list ):
separator = attributes.get('separator', u', ')
value = separator.join([unicode(el) for el in value])
elif isinstance( value, float ):
precision = attributes.get( 'precision', 2 )
format_string = '0.' + '0'*precision
elif isinstance( value, int ):
format_string = '0'
elif isinstance( value, datetime.date ):
format_string = date_format
elif isinstance( value, datetime.datetime ):
format_string = datetime_format
elif isinstance( value, datetime.time ):
format_string = time_format
else:
value = unicode( value )
else:
# empty cells should be filled as well, to get the
# borders right
value = ''
font_specs = dict( font_name = self.font_name, height = 200 )
border_specs = dict()
if i == 0:
border_specs[ 'left' ] = 0x01
elif i == len( columns ) - 1:
border_specs[ 'right' ] = 0x01
if (row - offset + 1) == model_context.collection_count:
border_specs[ 'bottom' ] = 0x01
style = get_style( font_specs,
border_specs,
None,
format_string )
worksheet.write( row, i, value, style )
min_width = len( unicode( value ) ) * 300
worksheet.col( i ).width = max( min_width, worksheet.col( i ).width )
yield action_steps.UpdateProgress( text = _('Saving file') )
filename = action_steps.OpenFile.create_temporary_file( '.xls' )
workbook.save( filename )
yield action_steps.UpdateProgress( text = _('Opening file') )
yield action_steps.OpenFile( filename )
[docs]class PrintPreview( ListContextAction ):
"""Print all rows in a table"""
icon = Icon('tango/16x16/actions/document-print-preview.png')
tooltip = _('Print Preview')
verbose_name = _('Print Preview')
def model_run( self, model_context ):
from camelot.view import action_steps
columns = model_context.admin.get_columns()
table = []
getters = [field_attributes['getter'] for _field, field_attributes in columns]
to_strings = [field_attributes['to_string'] for _field, field_attributes in columns]
column_range = range( len( columns ) )
for obj in model_context.get_collection():
table.append( [to_strings[i]( getters[i]( obj ) ) for i in column_range] )
context = {
'title': model_context.admin.get_verbose_name_plural(),
'table': table,
'columns': [field_attributes['name'] for _field, field_attributes in columns],
}
yield action_steps.PrintJinjaTemplate( template = 'list.html',
context = context )
[docs]class SelectAll( ListContextAction ):
"""Select all rows in a table"""
verbose_name = _('Select &All')
shortcut = QtGui.QKeySequence.SelectAll
tooltip = _('Select all rows in the table')
def gui_run( self, gui_context ):
gui_context.item_view.selectAll()
[docs]class ImportFromFile( EditAction ):
"""Import a csv file in the current table"""
verbose_name = _('Import from file')
icon = Icon('tango/16x16/mimetypes/text-x-generic.png')
tooltip = _('Import from file')
def model_run( self, model_context ):
import os.path
import chardet
from camelot.view import action_steps
from camelot.view.import_utils import ( UnicodeReader,
RowData,
RowDataAdmin,
XlsReader,
ColumnMapping,
ColumnMappingAdmin )
file_names = yield action_steps.SelectFile()
if not len( file_names ):
return
file_name = file_names[0]
yield action_steps.UpdateProgress( text = _('Reading data') )
#
# read the data into temporary row_data objects
#
if os.path.splitext( file_name )[-1] in ('.xls', '.xlsx'):
items = XlsReader( file_name )
else:
detected = chardet.detect( open( file_name ).read() )['encoding']
enc = detected or 'utf-8'
items = UnicodeReader( open( file_name ), encoding = enc )
collection = [ RowData(i, row_data) for i, row_data in enumerate( items ) ]
if len( collection ) < 1:
raise UserException( _('No data in file' ) )
#
# select columns to import
#
admin = model_context.admin
columns = max( row_data.columns for row_data in collection )
default_fields = [field for field, fa in admin.get_columns()
if fa.get('editable', True)]
column_mapping = ColumnMapping( columns,
collection,
admin,
default_fields )
field_choices = [(f,entity_fa['name']) for f,entity_fa in
admin.get_all_fields_and_attributes().items()
if entity_fa.get('editable', True)]
column_mapping_admin = ColumnMappingAdmin( columns,
admin,
field_choices )
yield action_steps.ChangeObject( column_mapping, column_mapping_admin )
#
# validate the temporary data
#
row_data_admin = RowDataAdmin( admin, column_mapping )
yield action_steps.ChangeObjects( collection, row_data_admin )
#
# Ask confirmation
#
yield action_steps.MessageBox( icon = QtGui.QMessageBox.Warning,
title = _('Proceed with import'),
text = _('Importing data cannot be undone,\n'
'are you sure you want to continue') )
#
# import the temporary objects into real objects
#
with model_context.session.begin():
for i,row in enumerate( collection ):
new_entity_instance = admin.entity()
for field_name, attributes in row_data_admin.get_columns():
try:
from_string = attributes['from_string']
except KeyError:
LOGGER.warn( 'field %s has no from_string field attribute, dont know how to import it properly'%attributes['original_field'] )
from_string = lambda _a:None
setattr(
new_entity_instance,
attributes['original_field'],
from_string(getattr(row, field_name))
)
admin.add( new_entity_instance )
# in case the model is a collection proxy, the new objects should
# be appended
model_context._model.append( new_entity_instance )
yield action_steps.UpdateProgress( i, len( collection ), _('Importing data') )
yield action_steps.FlushSession( model_context.session )
yield action_steps.Refresh()
[docs]class ReplaceFieldContents( EditAction ):
"""Select a field an change the content for a whole selection"""
verbose_name = _('Replace field contents')
tooltip = _('Replace the content of a field for all rows in a selection')
def model_run( self, model_context ):
from camelot.view import action_steps
field_name, value_getter = yield action_steps.ChangeField( model_context.admin )
yield action_steps.UpdateProgress( text = _('Replacing field') )
if value_getter != None:
value = value_getter()
with model_context.session.begin():
for obj in model_context.get_selection():
setattr( obj, field_name, value )
yield action_steps.FlushSession( model_context.session )
[docs]class AddExistingObject( EditAction ):
"""Add an existing object to a list if it is not yet in the
list"""
tooltip = _('Add')
verbose_name = _('Add')
icon = Icon( 'tango/16x16/actions/list-add.png' )
def model_run( self, model_context ):
from sqlalchemy.orm import object_session
from camelot.view import action_steps
obj_getter = yield action_steps.SelectObject( model_context.admin )
if obj_getter != None:
obj_to_add = obj_getter()
for obj in model_context.get_collection():
if obj_to_add == obj:
raise StopIteration()
model_context._model.append_object( obj_to_add, flush = False )
yield action_steps.FlushSession( object_session( obj_to_add ) )
[docs]class AddNewObject( OpenNewView ):
"""Add a new object to a collection. Depending on the
'create_inline' field attribute, a new form is opened or not"""
def gui_run( self, gui_context ):
create_inline = gui_context.field_attributes.get( 'create_inline',
False )
if create_inline == True:
super( OpenNewView, self ).gui_run( gui_context )
else:
super( AddNewObject, self ).gui_run( gui_context )
def model_run( self, model_context ):
admin = model_context.admin
model_context._model.append_object( admin.entity() )
[docs]class RemoveSelection( DeleteSelection ):
"""Remove the selected objects from a list without deleting them"""
shortcut = None
tooltip = _('Remove')
verbose_name = _('Remove')
icon = Icon( 'tango/16x16/actions/list-remove.png' )
def handle_object( self, model_context, obj ):
model_context._model.remove( obj )
raise StopIteration