Below is a generic threading worker in PyQT:
{{{ lang=python lines=1
from PyQt4 import QtGui, QtCore
import logging
class worker_thread(QtCore.QThread):
## Generic worker thread
finished = QtCore.pyqtSignal(dict) #< Replace this for data type that will be returned
def __init__(self, ret_function, function, *args, **kwargs):
## Init
# @param Function ret_function Function to run on end
# @param Function function Function to run on start
# @param List *args args to pass to function
# @param Dict **kwargs Keyword args to pass to function
super(worker_thread, self).__init__()
self.function = function
self.finished.connect(ret_function)
self.args = args
self.kwargs = kwargs
def __del__(self):
## No exceptions
self.wait()
def run(self):
## Thread finished, run function with args
logging.debug('End Thread')
ret = self.function(*self.args,**self.kwargs)
self.finished.emit( ret ) #< Emit the return value
}}}
Usage:
{{{ lang=python lines=1
def function():
logging.debug( 'Thread running.' ) #< Thread start
return { 'thread_return' : 'This was a success' } #< Return a dict, as expected by finished() signal
def return_function(dict_in):
logging.debug( dict_in ) #< Print the input
thread = worker_thread(return_function, function) #< Make it
thread.start() #< Run it
}}}
Leave a Reply