#!/usr/bin/env python import sys import time import threading import signal import empty try: import queue except ImportError: import Queue as queue event_new = threading.Event cond_new = threading.Condition def que_new(tmout=None, dv=None): q = queue.Queue() is_empty = lambda : q.empty() is_full = lambda : q.full() def put(v): q.put(v) def get(tmout_=None, dv_=None): v = dv_ if dv_ != None else dv try: tm = tmout_ if tmout_ != None else tmout v = q.get(timeout=tm) except: pass return v return empty.new( locals() ) def lock_new(): rlock = threading.RLock() on = lock = rlock.acquire off = unlock = rlock.release return empty.new( locals() ) def seq_cnt_new(): e = empty.new( seq=0 ) lock = lock_new() def get(): lock.on() seq = e.seq e.seq += 1 lock.off() return seq return empty.to_attr( e, locals() ) def timer_new(sec, func, args=()): e = empty.new( timer=None ) def f(): e.timer = False # expired if func: func( *args ) def cancel(): if e.timer: e.timer.cancel() e.timer = None def start(): if not e.timer: e.timer = threading.Timer( sec, f ) e.timer.start() def restart(): cancel() start() def is_timeout(): return e.timer == False return empty.to_attr( e, locals() ) def stat_lst_new(): lst = [] lock = lock_new() def add(stat): if stat not in lst: lock.on() lst.append( stat ) lock.off() def rm(stat): if stat in lst: lock.on() lst.remove( stat ) lock.off() def gc(): for stat in lst[ : ]: stat.gc() def quit(tmout=None): for stat in lst[ : ]: stat.quit( tmout ) return empty.new( locals() ) stat_lst = stat_lst_new() def stat_new(th): e = empty.new() e.v = None stop_ev = event_new() def set(s): e.v = s def set_run(): set( 'run' ) stat_lst.add( e ) def set_stop(): set( 'stop' ) stop_ev.set() def set_joined(): stat_lst.rm( e ) set( 'joined' ) is_stat = lambda s: e.v == s is_run = lambda : is_stat( 'run' ) is_stop = lambda : is_stat( 'stop' ) wait_stop = lambda tmout=None: stop_ev.wait( tmout ) def gc(): if is_stop(): th.join() def quit(tmout=None): if is_run(): th.quit_ev.set() if wait_stop( tmout ): gc() return empty.add( e, locals() ) def th_new(target=None, args=(), gc=None): e = empty.new() quit_ev = threading.Event() th = None stat = stat_new( e ) def func(*args): if e.target: stat.set_run() e.target( *e.args ) stat.set_stop() if gc: gc.put( e ) def start(): stat_lst.gc() e.th = threading.Thread( target=func, args=e.args ) e.th.daemon = True e.th.start() if gc: gc.lst.add( e ) def join(tmout=None): if e.th: if tmout != None: if not stat.wait_stop( tmout ): return # ! e.th.join() stat.set_joined() if gc: gc.lst.rm( e ) def stop(tmout=None): quit_ev.set() join( tmout ) stat_lst.gc() return empty.to_attr( e, locals() ) def loop_new(func, args=(), wait_tmout=0, gc=None): th = th_new( gc=gc ) def f(): while not th.quit_ev.wait(wait_tmout): func(*args) th.target = f return th def loop_que_new(func, que_tmout=0, wait_tmout=0, gc=None): que = que_new(que_tmout) def f(): v = que.get() if v != None: func(v) th = loop_new( f, (), wait_tmout, gc ) th.que = que return th def gc_new(): def func(th_): th_.join() th = loop_que_new( func, que_tmout=1.0 ) th.start() def put(th_): th.que.put( th_ ) def lst_new(): lst = [] def add(th_): lst.append( th_ ) def rm(th_): if th_ in lst: lst.remove( th_ ) def quit_ev_set(): for th_ in lst: th_.quit_ev.set() return empty.new( locals() ) lst = lst_new() def stop(delay_sec=None): lst.quit_ev_set() if delay_sec: time.sleep( delay_sec ) while not th.que.is_empty(): time.sleep( 1 ) th.stop() return empty.new( locals() ) def sigint_new(cb=None): e = empty.new( done=False ) cond = cond_new() def wait(): cond.acquire() cond.wait() cond.release() def hdl(sig, frame): cond.acquire() e.done = True cond.notify_all() cond.release() th = None if cb: def func(): wait() cb() th = loop_new( func ) th.start() def fini(): if th: th.stop() signal.signal( signal.SIGINT, hdl ) return empty.to_attr( e, locals() ) def set_sig_func(sig, func): bak = signal.getsignal( sig ) def hdl(sig, frame): func() if callable( bak ): bak( sig, frame ) signal.signal( sig, hdl ) # EOF