#!/usr/bin/env python import sys import os import time import termios import select import threading import random digit_idx = lambda s: next( ( i for i in range(len(s)) if s[i].isdigit() ), len(s) ) arg_k = lambda s: s[ :digit_idx(s) ] arg_v = lambda s: s[ digit_idx(s): ] arg = lambda dv: int( arg_v( filter( lambda s: arg_k(s) == arg_k(dv) and arg_v(s), sys.argv + [dv] )[0] ) ) (W, H) = ( arg('w10'), arg('h20') ) (BW, WARP) = map( lambda k: k in sys.argv, ['bw', 'warp'] ) blk_inf = [ [ [], 1, '' ], [ [ (-1, 0), (1, 0), (2, 0) ], 2, 'red' ], # bar [ [ (1, 0), (0, 1), (1, 1) ], 1, 'yellow' ], # square [ [ (-1, 0), (1, 0), (0, 1) ], 4, 'cyan' ], # T [ [ (-1, 0), (0, 1), (1, 1) ], 2, 'green' ], # z [ [ (1, 0), (0, 1), (-1, 1) ], 2, 'magenta' ], # s [ [ (1, 0), (2, 0), (0, 1) ], 4, 'white' ], # L [ [ (-1, 0), (-2, 0), (0, 1) ], 4, 'blue' ],# J ] (pos3, rot_n, col) = zip(*blk_inf) def term_raw(sigint=True): fd = sys.stdin.fileno() a = termios.tcgetattr(fd) bak = a[:] f = termios.ECHO | termios.ECHONL | termios.FLUSHO | termios.ICANON | termios.IEXTEN if not sigint: f |= termios.ISIG a[3] &= ~f termios.tcsetattr(fd, termios.TCSADRAIN, a) return lambda : termios.tcsetattr(fd, termios.TCSADRAIN, bak) def th_start_loop(f): def loop(): while True: f() th = threading.Thread( target=loop ) th.daemon = True th.start() def readable(fd, tmout=None): (r, _, _) = select.select( [fd], [], [], tmout ) return r def getkey(tmout=None): fd = sys.stdin.fileno() return os.read(fd, 1) if readable(fd, tmout) else '' esc = lambda s: chr(0x1b) + '[' + s esc_ex = lambda d, v: esc( '?{}{}'.format(d, 'h' if v else 'l' ) ) def keydir(tmout=None): ctl = lambda s: chr( 1 + ord(s) - ord('a') ) lst = [ # up, down, left, right [ esc('A'), esc('B'), esc('D'), esc('C') ], # allow [ ctl('p'), ctl('n'), ctl('b'), ctl('f') ], # emacs [ 'k', 'j', 'h', 'l' ], # vi ] d = dict( sum( map( lambda k4: zip(k4, ['u','d','l','r']), lst ), [] ) ) d.update({ '\n': 'u', '\t': 'U', ' ': 'U' }) keys = d.keys() buf = getkey(tmout) while buf and any( map( lambda k: k.startswith(buf), keys ) ): if buf in keys: return d.get(buf) buf += getkey(tmout) return '' def out(s): sys.stdout.write(s) def flush(): sys.stdout.flush() cls = lambda : out( esc('2J') ) loc = lambda x, y: out( esc( '{};{}H'.format(y+1, x+1) ) ) rev = lambda v: out( esc( '7m' if v else '0m' ) ) cursor = lambda v: out( esc_ex(25, v) ) cursor_save = lambda : out( esc('s') ) cursor_load = lambda : out( esc('u') ) screen_save = lambda : out( esc_ex(47, 1) ) screen_load = lambda : out( esc_ex(47, 0) ) def color(v): lst = [ 'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white' ] if v in lst: out( esc( '{}m'.format( 30 + lst.index(v) ) ) ) def show(x, y, v): loc( x*2, y ) rev(v > 0) s = ' ' if v > 0: if BW: s = col[v][0].upper() + ' ' else: color( col[v] ) out(s) def show_lst(lst): map( lambda (x, y, v): show(x, y, v), lst ) flush() (EMP, CURR) = (0, -1) garr = map( lambda _: [ EMP ] * W, range(H) ) def arr(x, y, v='get'): if not ( x in range(W) and y in range(H) ): return None if v == 'get': return garr[y][x] garr[y][x] = v arr_p4 = lambda p4, v='get': map( lambda (x, y): arr(x, y, v), p4 ) def gval(v=None): keep = [ v ] def set(v): keep[0] = v return lambda v='get': keep[0] if v == 'get' else set(v) def gvals(lst): keep = map(gval, lst) to_lst = lambda v: v if type(v) in [ list, tuple ] else [ v ] * len(keep) return lambda v='get': keep if v == 'getf' else map( lambda (f, v): f(v), zip( keep, to_lst(v) ) ) curr = gvals( [ 0 ] * 4 ) (cx, cy, cv, cr) = curr('getf') rand_v = lambda : random.randrange( 1, len(blk_inf) ) next_v = gval( rand_v() ) def pos4(x, y, v, r): r %= rot_n[v] rot = lambda (x, y), n=r: (x, y) if n <= 0 else rot( (-y, x), n-1 ) p4 = map( rot, pos3[v] ) + [ (0, 0) ] return map( lambda (px, py): ( x + px, y + py ), p4 ) def curr_after(dx, dy, dr): (x, y, v, r) = curr() rn = rot_n[v] return ( x + dx, y + dy, v, (r + dr + rn) % rn ) curr_pos4 = lambda dx=0, dy=0, dr=0: pos4( *curr_after(dx, dy, dr) ) def update(p4, n4): lst = map( lambda (x, y): ( x, y, cv() ), filter( lambda a: a not in p4, n4 ) ) lst += map( lambda (x, y): (x, y, 0), filter( lambda a: a not in n4, p4 ) ) lst = sorted( lst, key=lambda (x, y, v): x ) show_lst(lst) arr_p4(p4, EMP) arr_p4(n4, CURR) def dosun(y): if y >= 2: xvs = map( lambda x: (x, arr(x, y-1) if y > 2 else 0 ), range(W) ) map( lambda (x, v): ( arr(x, y, v), show(x, y, v) )[-1], xvs ) dosun(y-1) def bikabika(ys): draw = lambda y: ( loc(0, y), out( ' ' * W ), flush() )[-1] bika = lambda i: ( rev(i%2), map(draw, ys), time.sleep(0.1) )[-1] map( bika, range(3*2) ) over = threading.Event() move_lock = threading.RLock() lines = gval(0) level = gval(1) def lines_add(a): # level ... lines( lines() + a ) loc(W*2, 0) out( 'lines: {}'.format( lines() ) ) loc(W*2, 1) out( 'level: {}'.format( level() ) ) def fix(): move_lock.acquire() arr_p4( curr_pos4(), cv() ) ys = filter( lambda y: all( map( lambda x: arr(x, y) > 0, range(W) ) ), range(2, H) ) if ys: bikabika(ys) map(dosun, ys) lines_add( len(ys) ) drop() if cy() == 0: over.set() move_lock.release() def timer_new(sec, f): tmr = gval() def tmr_f(): tmr(None) f() def ret_f(cmd=''): if not tmr() and cmd == 'start': tmr( threading.Timer(sec, tmr_f) ) tmr().start() elif tmr() and cmd == 'cancel': tmr().cancel() tmr(None) return tmr() return ret_f timer = timer_new( 1.0, fix ) pend_dx = gval(0) def try_move(dx, dy, dr): move_lock.acquire() if WARP and dx == 0 and pend_dx(): try_move( dx + pend_dx(), dy, dr ) p4 = curr_pos4() n4 = curr_pos4(dx, dy, dr) if all( map( lambda v: v in [ 0, -1 ], arr_p4(n4) ) ): update(p4, n4) curr( curr_after(dx, dy, dr) ) timer('cancel') pend_dx(0) elif dy > 0: timer('start') elif dx != 0: pend_dx(dx) move_lock.release() def drop(): vs = [ W/2, 0, next_v(), 0 ] curr(vs) try_move(0, 2, 0) if cy() > 0: next_v( rand_v() ) vs[2] = next_v() lst = map( lambda (x, y): ( x, y, next_v() ), pos4(*vs) ) show_lst(lst) def th_key(): d = { 'l': (-1, 0, 0), 'r': (1, 0, 0), 'd': (0, 1, 0), 'u': (0, 0, 1), 'U': (0, 0, -1), } prm = d.get( keydir(1.0), [] ) if prm: try_move(*prm) def work(): cls() lines_add(0) drop() th_start_loop( th_key ) (sec, interval, rate) = ( gval(1.0), 3.0, 0.99 ) def move(): try_move(0, 1, 0) time.sleep( sec() ) th_start_loop(move) def accel(): time.sleep(interval) sec( sec() * rate ) th_start_loop(accel) while not over.wait(1.0): time.sleep(1.0) # for ^C if __name__ == "__main__": if '-h' in sys.argv: print 'Usage: {} [w10] [h20] [bw] [warp]'.format( sys.argv[0] ) sys.exit(0) try: restore = term_raw() screen_save() cursor_save() cursor(False) work() finally: cursor(True) cursor_load() screen_load() restore() # EOF