pooling.py - Hero-Development/snippets GitHub Wiki

import logging, traceback

from Queue import Queue from suds import WebFault from suds.client import Client from threading import Event, Thread

from voex.output.StderrBuffer import StderrBuffer from voex.native import Log

class Task( object ): Empty = None

def __init__( self, req ):
    self.event = Event()
    self.exception = None
    self.request = req
    self.response  = None


@staticmethod
def is_empty( task ):
    return task.exception is None \
        and task.request is False \
        and task.response is True


def set( self ):
    self.event.set()


def wait( self ):
    self.event.wait()

Task.Empty = Task( False ) Task.Empty.exception = None Task.Empty.response = True

class ThreadBase( Thread ): def init( self, queue ): super( ThreadBase, self ).init()

    self.daemon = False
    self.queue = queue
    self.running = False


def process( self, request ):
    raise NotImplementedError( 'ThreadBase.process' )


def run( self ):
    while self.running:
        task = self.queue.get()
        if Task.is_empty( task ):
            self.queue.task_done()
            self.running = False
            break


        try:
            task.response = self.process( task.request )

        except Exception as ex:
            task.exception = ex

        finally:
            self.queue.task_done()
            task.set()


def start( self ):
    self.running = True
    super( ThreadBase, self ).start()

class ThreadPool( object ): def init( self, thread_qty, thread_type=Thread, target=None, args=(), kwargs={} ): self.queue = Queue() self.tasks = [] self.threads = []

    new_args = [a for a in args]
    new_args.append( self.queue )
    #logging.info( new_args )
    #logging.info( kwargs )

    for i in range( thread_qty ):
        if thread_type != Thread:
            if target:
                t = thread_type( target=target, args=new_args, kwargs=kwargs )
            else:
                t = thread_type( args=new_args, kwargs=kwargs )
        elif target:
            if target:
                t = Thread( target=target, args=new_args, kwargs=kwargs )
            else:
                t = Thread( args=new_args, kwargs=kwargs )
        else:
            raise

        self.threads.append( t )


def join_queue( self ):
    self.queue.join()
    return self


def join_threads( self ):
    for thread in self.threads:
        if thread and thread.isAlive():
            thread.join()

    return self
    

def put( self, req, synchronous = False ):
    task = Task( req )
    self.queue.put( task )

    if synchronous:
        task.wait()
    
    return task


def put_many( self, *reqs ):
    tasks = []
    for req in reqs:
        task = Task( req )
        tasks.append( task )
        self.queue.put( task )

    return tasks


def start( self ):
    for thread in self.threads:
        thread.start()

    return self


def stop( self ):
    for thread in self.threads:
        self.queue.put( Task.Empty )

    return self

class SudsConsumer( ThreadBase ): def init( self, dashURL, user, pw, proxyInfo, queue ): super( SudsConsumer, self ).init( queue )

    if proxyInfo:
        _proxy = {
            'http': proxyInfo,
            'https': proxyInfo
        }

        self.client = Client( dashURL, username = user, password = pw, proxy = _proxy )
    else:
        self.client = Client( dashURL, username = user, password = pw )
    #self.client.options.cache.setduration( days=10 )


def process( self, request ):
    fn = getattr( self.client.service, request.command )

    try:
        return fn( *request.args )

    except WebFault as webEx:
        raise

    except Exception as ex:
        with StderrBuffer( Log.error ):
            traceback.print_exc()
        raise

class SudsRequest(object): def init( self, command, *args ): self.command = command self.args = args