# ============================================================================
#
# Copyright (C) 2007-2013 Conceptive Engineering bvba. All rights reserved.
# www.conceptive.be / info@conceptive.be
#
# This file is part of the Camelot Library.
#
# This file may be used under the terms of the GNU General Public
# License version 2.0 as published by the Free Software Foundation
# and appearing in the file license.txt included in the packaging of
# this file. Please review this information to ensure GNU
# General Public Licensing requirements will be met.
#
# If you are unsure which license is appropriate for your use, please
# visit www.python-camelot.com or contact info@conceptive.be
#
# This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
# WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# For use of this library in commercial applications, please contact
# info@conceptive.be
#
# ============================================================================
'''
Created on Sep 9, 2009
@author: tw55413
'''
import logging
import sys
logger = logging.getLogger('camelot.view.model_thread.signal_slot_model_thread')
from PyQt4 import QtCore
from camelot.core.utils import pyqt
from camelot.core.threading import synchronized
from camelot.view.model_thread import ( AbstractModelThread, object_thread,
setup_model )
from camelot.view.controls.exception import register_exception
#
# Wrap and unwrap None passed through signal/slot accross threads to
# prevent segfaults with PySide
#
# https://bugreports.qt-project.org/browse/PYSIDE-17
#
if pyqt:
wrap_none = lambda x:x
unwrap_none = lambda x:x
else:
class Null( object ):
pass
null = Null()
def wrap_none( func ):
def new_func( *args ):
y = func( *args )
if y == None:
return null
return y
return new_func
def unwrap_none( func ):
def new_func( x ):
if x == null:
x = None
return func( x )
return new_func
class Task(QtCore.QObject):
finished = QtCore.pyqtSignal(object)
exception = QtCore.pyqtSignal(object)
def __init__(self, request, name='', args=()):
"""A task to be executed in a different thread
:param request: the function to execture
:param name: a string with the name of the task to be used in the gui
:param args: a tuple with the arguments to be passed to the request
"""
QtCore.QObject.__init__(self)
self._request = request
self._name = name
self._args = args
def clear(self):
"""clear this tasks references to other objects"""
self._request = None
self._name = None
self._args = None
def execute(self):
logger.debug('executing %s' % (self._name))
try:
result = self._request( *self._args )
self.finished.emit( result )
#
# don't handle StopIteration as a normal exception, but return a new
# instance of StopIteration (in order to not keep alive a stack trace),
# and to signal to the caller that an iterator has ended
#
except StopIteration:
self.finished.emit( StopIteration() )
except Exception, e:
exc_info = register_exception(logger, 'exception caught in model thread while executing %s'%self._name, e)
self.exception.emit( exc_info )
# the stack might contain references to QT objects which could be kept alive this way
sys.exc_clear()
except:
logger.error( 'unhandled exception in model thread' )
exc_info = ( 'Unhandled exception',
sys.exc_info()[0],
None,
'Please contact the application developer', '')
# still emit the exception signal, to allow the gui to clean up things (such as closing dialogs)
self.exception.emit( exc_info )
sys.exc_clear()
[docs]class TaskHandler(QtCore.QObject):
"""A task handler is an object that handles tasks that appear in a queue,
when its handle_task method is called, it will sequentially handle all tasks
that are in the queue.
"""
task_handler_busy_signal = QtCore.pyqtSignal(bool)
def __init__(self, queue):
""":param queue: the queue from which to pop a task when handle_task
is called"""
QtCore.QObject.__init__(self)
self._mutex = QtCore.QMutex()
self._queue = queue
self._tasks_done = []
self._busy = False
logger.debug("TaskHandler created.")
[docs] def busy(self):
""":return True/False: indicating if this task handler is busy"""
return self._busy
@QtCore.pyqtSlot()
[docs] def handle_task(self):
"""Handle all tasks that are in the queue"""
self._busy = True
self.task_handler_busy_signal.emit( True )
task = self._queue.pop()
while task:
task.execute()
# we keep track of the tasks done to prevent them being garbage collected
# apparently when they are garbage collected, they are recycled, but their
# signal slot connections seem to survive this recycling.
# @todo: this should be investigated in more detail, since we are causing
# a deliberate memory leak here
#
# not keeping track of the tasks might result in corruption
#
# see : http://www.riverbankcomputing.com/pipermail/pyqt/2011-August/030452.html
#
task.clear()
self._tasks_done.append(task)
task = self._queue.pop()
self.task_handler_busy_signal.emit( False )
self._busy = False
[docs]class SignalSlotModelThread( AbstractModelThread ):
"""A model thread implementation that uses signals and slots
to communicate between the model thread and the gui thread
there is no explicit model thread verification on these methods,
since this model thread might not be THE model thread.
"""
task_available = QtCore.pyqtSignal()
def __init__( self, setup_thread = setup_model ):
"""
@param setup_thread: function to be called at startup of the thread to initialize
everything, by default this will setup the model. set to None if nothing should
be done.
"""
super(SignalSlotModelThread, self).__init__( setup_thread )
self._task_handler = None
self._mutex = QtCore.QMutex()
self._request_queue = []
self._connected = False
self._setup_busy = True
def run( self ):
self.logger.debug( 'model thread started' )
self._task_handler = TaskHandler(self)
self._task_handler.task_handler_busy_signal.connect(self._thread_busy, QtCore.Qt.QueuedConnection)
self._thread_busy(True)
try:
self._setup_thread()
except Exception, e:
exc_info = register_exception(logger, 'Exception when setting up the SignalSlotModelThread', e)
self.setup_exception_signal.emit( exc_info )
self._thread_busy(False)
self.logger.debug('thread setup finished')
# Some tasks might have been posted before the signals were connected to the task handler,
# so once force the handling of tasks
self._task_handler.handle_task()
self._setup_busy = False
self.exec_()
self.logger.debug('model thread stopped')
@QtCore.pyqtSlot( bool )
def _thread_busy(self, busy_state):
self.thread_busy_signal.emit( busy_state )
@synchronized
def post( self, request, response = None, exception = None, args = () ):
if not self._connected and self._task_handler:
# creating this connection in the model thread throws QT exceptions
self.task_available.connect( self._task_handler.handle_task, QtCore.Qt.QueuedConnection )
self._connected = True
# response should be a slot method of a QObject
if response:
name = '%s -> %s.%s'%(request.__name__, response.im_self.__class__.__name__, response.__name__)
else:
name = request.__name__
task = Task( wrap_none( request ), name = name, args = args )
# QObject::connect is a thread safe function
if response:
assert response.im_self != None
assert isinstance(response.im_self, QtCore.QObject)
# verify if the response has been defined as a slot
#assert hasattr(response, '__pyqtSignature__')
task.finished.connect( unwrap_none( response ),
QtCore.Qt.QueuedConnection )
if exception:
task.exception.connect( exception, QtCore.Qt.QueuedConnection )
# task.moveToThread(self)
# only put the task in the queue when it is completely set up
self._request_queue.append(task)
#print 'task created --->', id(task)
self.task_available.emit()
@synchronized
def stop( self ):
self.quit()
return True
@synchronized
[docs] def pop( self ):
"""Pop a task from the queue, return None if the queue is empty"""
if len(self._request_queue):
task = self._request_queue.pop(0)
return task
@synchronized
[docs] def busy( self ):
"""Return True or False indicating wether either the model or the
gui thread is doing something"""
while not self._task_handler:
import time
time.sleep(1)
app = QtCore.QCoreApplication.instance()
return app.hasPendingEvents() or len(self._request_queue) or self._task_handler.busy() or self._setup_busy
[docs] def wait_on_work(self):
"""Wait for all work to be finished, this function should only be used
to do unit testing and such, since it will block the calling thread until
all work is done"""
assert object_thread( self )
app = QtCore.QCoreApplication.instance()
while self.busy():
app.processEvents()