#!/usr/bin/env python import sys import time import threading import signal import empty try: import queue except ImportError: import Queue as queue event_new = threading.Event cond_new = threading.Condition def que_new(tmout=None, dv=None): q = queue.Queue() is_empty = lambda : q.empty() is_full = lambda : q.full() def put(v): q.put(v) def get(tmout_=None, dv_=None): v = dv_ if dv_ != None else dv try: tm = tmout_ if tmout_ != None else tmout v = q.get(timeout=tm) except: pass return v return empty.new( locals() ) def lock_new(): rlock = threading.RLock() on = lock = rlock.acquire off = unlock = rlock.release return empty.new( locals() ) def seq_cnt_new(): e = empty.new( seq=0 ) lock = lock_new() def get(): lock.on() seq = e.seq e.seq += 1 lock.off() return seq return empty.to_attr( e, locals() ) def ev_val_new(ev=None, v_init=None, v_timeout=None, v_cancel=False): e = empty.new() e.ev = ev if ev else event_new() def init(): e.v = v_init e.dirty = False e.ev.clear() def set(v): e.v = v e.dirty = True e.ev.set() def cancel(): e.ev.set() def wait(tmout): init() if not e.ev.wait( tmout ): return v_timeout if not e.dirty: return v_cancel return e.v init() return empty.add( e, locals() ) def timer_new(sec, func, args=()): e = empty.new( timer=None ) def f(): e.timer = False # expired if func: func( *args ) def cancel(): if e.timer: e.timer.cancel() e.timer = None def start(): if not e.timer: e.timer = threading.Timer( sec, f ) e.timer.start() def restart(): cancel() start() def is_timeout(): return e.timer == False return empty.to_attr( e, locals() ) def tmr_new( sec, func, args=() ): e = empty.new( stat='init' ) cond = cond_new() def f(): cond.acquire() wsec = None if e.stat == 'start': e.stat = 'wait' wsec = sec cond.wait( wsec ) tmout = False if e.stat == 'wait': e.stat = 'init' tmout = True cond.release() if tmout and func: func( *args ) th = loop_new( f ) def stat_if_set( stat_if, stat_set ): cond.acquire() ret = not stat_if or e.stat == stat_if if ret: e.stat = stat_set cond.notify() cond.release() return ret def start(): return stat_if_set( 'init', 'start' ) def cancel(): return stat_if_set( 'wait', 'init' ) def quit(): th.quit_ev.set() stat_if_set( '', 'quit' ) th.stop() # do not call from th th.start() return empty.add( e, locals() ) def th_new(target=None, args=(), gc=None): e = empty.new() quit_ev = threading.Event() th = None def func(*args): if e.target: e.target( *e.args ) if gc: gc.put( e ) def start(): e.th = threading.Thread( target=func, args=e.args ) e.th.daemon = True e.th.start() if gc: gc.lst.add( e ) def join(): if e.th: e.th.join() if gc: gc.lst.rm( e ) def stop(): quit_ev.set() join() return empty.to_attr( e, locals() ) def loop_new(func, args=(), wait_tmout=0, gc=None): th = th_new( gc=gc ) def f(): while not th.quit_ev.wait(wait_tmout): func(*args) th.target = f return th def loop_que_new(func, que_tmout=0, wait_tmout=0, gc=None): que = que_new(que_tmout) def f(): v = que.get() if v != None: func(v) th = loop_new( f, (), wait_tmout, gc ) th.que = que return th def call_after_new(): def f( v ): ( func, args, kwds ) = v func( *args, **kwds ) th = loop_que_new( f, que_tmout=1.0 ) th.start() def put( func, *args, **kwds ): th.que.put( ( func, args, kwds ) ) def wait_flush(): while not th.que.is_empty(): time.sleep( 1 ) def stop(): th.stop() return empty.new( locals() ) def gc_new(): def func(th_): th_.join() th = loop_que_new( func, que_tmout=1.0 ) th.start() def put(th_): th.que.put( th_ ) def lst_new(): lst = [] def add(th_): lst.append( th_ ) def rm(th_): if th_ in lst: lst.remove( th_ ) def quit_ev_set(): for th_ in lst: th_.quit_ev.set() return empty.new( locals() ) lst = lst_new() def stop(delay_sec=None): lst.quit_ev_set() if delay_sec: time.sleep( delay_sec ) while not th.que.is_empty(): time.sleep( 1 ) th.stop() return empty.new( locals() ) def ths_new(): lst = [] lock = lock_new() def add(th): if th not in lst: lock.on() lst.append( th ) lock.off() def rm(th): if th in lst: lock.on() lst.remove( th ) lock.off() def gc(): cth = threading.current_thread() bak_n = len( lst ) for th in lst[ : ]: if th.goal and th.th != cth: rm( th ) th.join() n = len( lst ) #dbg.out( 'gc {} --> {}'.format( bak_n, n ) ) def func(th): gc() if th.target: th.target( *th.args ) gc() th.goal = True def start(th): gc() th.goal = False add( th ) th.th = threading.Thread( target=func, args=( th, ) ) th.th.daemon = True th.th.start() def stop_all(tm_out=5.0): for th in lst[ : ]: th.quit_ev.set() st = time.time() while lst and time.time() - st <= tm_out: gc() time.sleep( 1 ) return empty.new( locals() ) ths = ths_new() def sigint_new(cb=None): e = empty.new( done=False ) cond = cond_new() def wait(): cond.acquire() cond.wait() cond.release() def hdl(sig, frame): cond.acquire() e.done = True cond.notify_all() cond.release() th = None if cb: def func(): wait() cb() th = loop_new( func ) th.start() def fini(): if th: th.stop() signal.signal( signal.SIGINT, hdl ) return empty.to_attr( e, locals() ) def set_sig_func(sig, func): bak = signal.getsignal( sig ) def hdl(sig, frame): func() if callable( bak ): bak( sig, frame ) signal.signal( sig, hdl ) # EOF