pythonのユーティリティ・プログラム 2020冬

2024/MAR/20

pythonのプログラムを組んでて、毎度繰り返し作る同じような関数の処理を、 自分用にまとめてます。

更新履歴
日付 変更内容
2020/JAN/19 新規作成
2020/JAN/20 cmd_ut.py proc_new()引数sudo_pw=''追加
2020/JAN/23 io_ut.py 追加
2020/JAN/24 io_ut.py no_buffering()を追加
io_ut.py reader_new()
typo修正
引数順変更
引数no_buf=False追加
2020/JAN/27 empty.py is_empty()を追加
2020/JAN/28 cmd_ut.py call_comm(cmd, in_b)を追加
nkf.py 追加
2020/JAN/30 cmd_ut.py cmd_py(name)を追加
arg.py 追加
io_ut.py mainの箇所修正(少し)
2020/FEB/01 cmd_ut.py cmd_stdio_conn()
call_stdio_conn()
proc_stdio_conn()を追加
arg.py is_pop(s)
get_av()を追加
dbg.py help_exit()を追加
chat_conn.py 更新
2020/FEB/06 empty.py **kwdsを引数追加
term_ut.py追加
2020/FEB/08 tm_ut.py追加
base.py追加
dbg.py out_lst()を追加
2020/FEB/09 cmd_ut.py rm_rf()を追加
2020/FEB/10 cmd_ut.py proc_new_comm_tm2()を追加
base.py filter_div()とfind_path()を追加
yaml_ut.py追加
2020/FEB/11 cmd_ut.py base.pyにpath_lst()を追加
2020/FEB/15 pty_swpawn.py追加
dbg.py out_exit()を追加
2020/FEB/21 thr.py que_new()にis_empty(), is_full()追加
thr.py gc_new()を追加
2020/FEB/23 term_ut.py get_xy()を追加
term_ut.py buff_new()を追加
2020/FEB/24 term_ut.py バグ修正
2020/MAR/31 base.py add_path()を追加
2020/APR/01 cmd_ut.py call()に引数b2sを追加
cmd_ut.py mk_tmp_dir()を追加
2020/APR/03 yaml_ut.py file open mode修正
2020/APR/05 base.py filter_str()を追加
tm_ut.py sec_to_str() 引数delimを拡張
tm_ut.py is_data_time_str()を追加
snap_ut.py追加
2020/APR/26 arg.py cmd_new(),get_name_args()を追加
2020/MAY/02 cmd_ut.py call_show()を追加
arg.py get_name_args()を更新
2020/MAY/03 arg.py get_name_args()を更新
2020/MAY/06 arg.py cmd_new(),get_name_args()を更新
2020/MAY/10 thr.py gcに機能を追加
2020/MAY/13 empty.pyにcall(),attr_call()を追加
2020/MAY/16 empty.pyにto_dic(),from_dic()を追加
np_ut.pyを追加
2020/MAY/17 snap_ut.py fs_new()を更新
empty.pyにkwds_pop()を追加
2020/MAY/20 snap_ut.py コメント箇所の修正
2020/MAY/24 thr.py que_new()を更新
thr.py lock_new(),seq_cnt_new()を追加
arg.py get_name_args()を更新
2020/MAY/29 wx_ut.py 追加
2020/MAY/31 wx_ut.py 更新
base.pyにcan_hash(),rev_dic()を追加
2020/JUN/02 wx_ut.py 更新
thr.pyにtimer_new()を追加
2020/JUN/07 dbg.pyにquantize(),laptime_new()を追加
2020/JUN/09 dbg.py laptime_new()を更新
2020/JUN/10 empty.py chg_new()を追加
2020/JUN/13 empty.py chg_new()を更新
2020/JUN/19 base.py str_find() str_replace_dic()を追加
2020/JUN/20 thr.py sigint_new()を追加
cmds_out.py cmds_out.yamlを追加
2020/JUL/12 base.py dic_update_r()を追加
2020/JUL/30 empty.py update()を追加
2020/AUG/20 snd_ut.py 追加
2020/SEP/08 empty.py copy()を追加
2020/SEP/09 cmd_ut.py cb_line()を追加
2020/SEP/13 inc_ut.py 追加
2020/OCT/01 wx_ut.py 更新
2020/OCT/03 html_ut.py 追加
2020/OCT/09 nkf.py 更新
2020/OCT/23 tm_ut.py get_mon3() date432() 追加
2020/OCT/25 chide.py 追加
2020/OCT/29 thr.py set_sig_func 追加
2020/OCT/30 snap_ut.py fs_new() 更新
2020/NOV/01 ご予定 追加
2020/NOV/06 tm_ut.py にtm60関連を追加
2020/NOV/16 cmd_ut.py proc_new(),cb_line()引数preexec_fn追加
html_ut.py head_only引数追加など
from_to.py 追加
2020/NOV/27 del_lines.py 追加
np_ut.py 更新
2020/DEC/05 typo修正など arr_new(*args) q4_to_ypr(q4)
2021/JAN/13 thr.py にths_new(),ths追加
2021/JAN/18 thr.py にev_val_new()追加
np_ut.py にcross_y(),cross_z(),point_plane_len()追加
2021/JAN/27 np_ut.py にget_dist_lst(),get_plane_from_points()追加
2021/JAN/28 np_ut.py にto_bytes_xxx(),from_bytes_xxx(),save_bytes_xxx(),load_bytes_xxx()追加
cnt_ut.py 追加
2021/JAN/29 cnt_ut.py にcnt_map(),cnt_loop()追加
dbg.py にmsg_start(),start_save(),start_load()を追加
np_ut.py save,load関連にパス表示機能を追加
2021/MAR/21 csv_ut.py 追加
2021/MAR/23 np_ut.py
write_bytes_head( f, b )
read_bytes_head( f )
get_plane_z( pl_p, pl_v, x, y )
newton( f, a, b, lmt ) 追加
2021/APR/01 thr.pycall_after_new() 追加
2021/APR/06 np_ut.py
line_plane_cross_point( li, pl )
plane2_cross_line( pl_a, pl_b )
plane3_cross_point( pl3 )
point3_circle_point( p3 )
point3_circle_touch_v( p3 ) 追加
2021/APR/13 tcp_ut.py 追加
2021/JUN/30 np_ut.py 更新
2021/JUL/19 fio_ut.py 追加
2021/AUG/28 thr.py に tmr_new() 追加
2022/MAY/02 snd_ut.py data_new()のget_bytes()メソッドに引数cache=True追加
2024/MAR/20 wx_ut.py 更新

目次


一覧

リンク ソースコード 内容
#empty.py empty.py 空のクラスです。
#dbg.py dbg.py デバッグ出力用です。
#thr.py thr.py スレッド関連です。
#cmd_ut.py cmd_ut.py 外部コマンド実行用です。
#io_ut.py io_ut.py 主にファイルIO関連用です。
#tcp_ut.py tcp_ut.py TCPソケットの通信用です。
chat_conn.pyよりレベルが低くて軽いです。
#chat_conn.py chat_conn.py TCPソケットの通信用です。
#nkf.py nkf.py 日本語対応用です。
#arg.py arg.py 引数用です。
#term_ut.py term_ut.py 端末へのエスケープシーケンスの表示やキー入力用です。
#tm_ut.py tm_ut.py 時間関連です。
#cnt_ut.py cnt_ut.py 進捗表示用のカウンタです。
#base.py base.py わりと基本的なものです。
#chide.py chide.py 文字列の特定の文字を使わないようにする変換用です。
#yaml_ut.py yaml_ut.py YAML関連です。
#html_ut.py html_ut.py HTML関連です。
#from_to.py from_to.py 簡易なテキストのフィルタです。html_ut.pyから使ってます。
#del_lines.py del_lines.py 簡易なテキストのフィルタです。
#np_ut.py np_ut.py numpy関連です。
#snap_ut.py snap_ut.py ファイルのサイズや日付などの情報関連です。
#snd_ut.py snd_ut.py 音声ファイル関連です。
wx_ut wx_ut.py wxPythonのユーティリティ・プログラムです。
#pty_spawn.py pty_spawn.py pty.spawn()実行用です。
#cmds_out.py cmds_out.py 複数のコマンドを実行して表示するだけのツールです。
inc inc_ut inc_ut.py テキストのinclude処理です。
csv csv_ut csv_ut.py CSV形式の変換処理などです。
fio_ut fio_ut.py ソケット、sysのstdio、プロセスのstdio関連です。


ご予定

thr.pyのgc処理

2021/JAN/13 結局、 ちょっと茶ぶ台返し で対応しました。


empty.py

empty.py
#!/usr/bin/env python

from copy import deepcopy

class Empty:
	pass

def get_dic(base, over_wrt):
	d = base.copy()
	d.update( over_wrt )
	return d

def to_attr(e, dic={}, **kwds):
	vars( e ).update( get_dic( dic, kwds ) )
	return e

new = lambda dic={}, **kwds: to_attr( Empty(), dic, **kwds )

new_kwds = new

def add(e, dic={}, **kwds):
	d = get_dic( get_dic( dic, kwds ), vars( e ) )
	vars( e ).update( d )
	return e

copy = lambda e: new( deepcopy( vars( e ) ) )

def update(targ, add):  # not recursive
	dic = vars( targ )
	dic.update( vars( add ) )

is_empty = lambda o: isinstance(o, Empty)

def call(f, args=(), dv=None):
	return f( *args ) if f else dv

def attr_call(e, attr, args=(), dv=None):
	if not e:
		return dv
	f = getattr( e, attr, None )
	return call( f, args, dv )

def to_dic(o):
	t = type( o )
	if t == list:
		return list( map( to_dic, o ) )
	if t == dict:
		f = lambda kv: ( kv[0], to_dic( kv[1] ) )
		return dict( map( f, o.items() ) )
	if is_empty( o ):
		return to_dic( vars( o ) )
	return o

def from_dic(o):
	t = type( o )
	if t == list:
		return list( map( from_dic, o ) )
	if t == dict:
		f = lambda kv: ( kv[0], from_dic( kv[1] ) )
		return new( dict( map( f, o.items() ) ) )
	if is_empty( o ):
		return from_dic( vars( o ) )
	return o

def kwds_pop(targ_dic, **kwds):
	for k in kwds.keys():
		if k in targ_dic:
			kwds[ k ] = targ_dic.pop( k )
	return new( kwds )

def chg_new(v, f=None, fv=None):
	e = new( v=v, prev=None, chg=None )

	def set(v):
		e.chg = e.v != v
		if e.chg:
			e.prev = e.v
			e.v = v

			if f:
				f( e )
			if fv:
				fv( e.v )
		return e.chg

	return to_attr( e, locals() )

# EOF

空のクラスです。

もう、これを多用するスタイルが常になってしまいました。

クロージャと空クラス に、以前の説明があります。

import empty

def hoge_new(v):
	add = lambda a: v+a
	return empty.new( locals() )

hoge = hoge_new(2)
v = hoge.add(3)

class Empty

空のクラスです。それだけです。

to_attr(e, dic={}, **kwds)

class Emptyのオブジェクトeと、文字列がkeyの辞書dicの引数を取ります。

eのアトリビュートkeyに対応するvalueをセットしてeを返します。

$ python
>>> import empty

>>> e = empty.Empty()
>>> dic = { 'foo': 1, 'bar': 'hello' }
>>> e2 = empty.to_attr(e, dic)
>>> e2 == e
True
>>> e.foo
1
>>> e.bar
'hello'

末尾に**kwds引数を追加しました。2020/FEB/06

引数の辞書に、ちょっとだけ追加したい時などに。

>>> e = empty.Empty()
>>> dic = { 'foo': 1, 'bar': 'hello' }
>>> e = empty.to_attr( e, dic, hoge='fuga' )
>>> e.foo
1
>>> e.bar
'hello'
>>> e.hoge
'fuga'

引数dicにデフォルト値の指定={}を追加しました。2020/SEP/19

dicとkwdsの上書き関係を逆の仕様に変更しました。2020/SEP/19

変更後はdicのcopy()をkwdsで上書きします。

(dicとkwdsとでキーの重複が無ければ影響ないはずです)

new( dic={}, **kwds )

class Emptyを生成して、to_attr()で引数dicのアトリビュートをセットして、返します。

pythonの組み込み関数locals()で返る辞書をdicに指定すると、 クロージャを使うときに便利なクラスオブジェクになります。

$ python
>>> import empty

>>> def foo(v):
...   def add1():
...     return v+1
...   return empty.new( locals() )
...
>>> bar = foo(2)
>>> bar.add1()
3
>>> bar.v
2

末尾に**kwds引数を追加しました。2020/FEB/06

これでnew_kwds()の存在意義が無くなってしまったかも。

>>> add = lambda a, b: a + b

>>> def bar():
...   add1 = lambda v: add(v, 1)
...   return empty.new( locals(), add=add )
...
>>> b = bar()
>>> b.add1(3)
4
>>> b.add(3, 2)
5

new_kwds(**kwds)

new( { 'foo': 1, 'bar': 'hello' } )

の代わりに

new_kwds( foo=1, bar='hello' )

などとして使えます。

参照だけする分にはCの構造体のようにして使えます。

>>> hoge = empty.new_kwds( foo=1, bar='hello' )
>>> hoge.foo
1
>>> hoge.bar
'hello'

でしたが、new()の末尾に**kwds引数を追加したので... (2020/FEB/06)

>>> hoge = empty.new( foo=1, bar='hello' )
>>> hoge.foo
1
>>> hoge.bar
'hello'

new()でも同じ事ができます。;-p)

実装を変更 2020/SEP/19

new_kwds = new

として、一応互換性のために残しておきます。

add(e, dic={}, **kwds)

基本的にto_attr()と同様の動作ですが、 eの既存のアトリビュートは上書きされません。

オブジェクト生成関数の末尾で

return empty.add( e, locals() )

として、to_attr()の代わりに使っていく所存です。

詳細は emptyモジュール落とし穴

copy(e)

Emptyクラスのオブジェクトeをコピーして返します。

内部でeを辞書にして、deepcopyをとってから、Emptyクラスのオブジェクトを生成して返します。

この処理のためにempty.pyから初めてimport追加しました。

from copy import deepcopy
copy = lambda e: new( deepcopy( vars( e ) ) )

update(targ, add)

Emptyクラスのオブジェクトtargの属性を、Emptyクラスのオブジェクトaddの属性で更新します。

この関数は値は返しません。targの属性を更新します。

辞書のupdate()メソッドと同様で、再帰的には動作しません。

>>> import empty
>>> targ = empty.new( foo=1, bar='hoge' )
>>> add = empty.new( bar='fuga', name='kon' )
>>> empty.update( targ, add )
>>> vars( targ )
{'foo': 1, 'bar': 'fuga', 'name': 'kon'}

pythonのちょっとしたメモ varsとオブジェクトのアトリビュート

vars()の動作の思い違いが発覚! 訂正します。

ちなみに、別のオブジェクトとして値を返す方式なら

>>> def update_ret(targ, add):
... 	dic = vars( targ )
... 	dic.update( vars( add ) )
... 	return empty.new( **dic )

>>> targ = empty.new( foo=1, bar='hoge' )
>>> add = empty.new( bar='fuga', name='kon' )
>>> ret = update_ret( targ, add )
>>> vars( ret )
{'foo': 1, 'bar': 'fuga', 'name': 'kon'}

ですが、こちらは簡単なので入れてません。;-p)

is_empty(o)

オブジェクトoがEmptyクラスならTrueを、そうでなければFalseを返します。

call(f, args=(), dv=None)

2020/MAY/13追加

引数の関数fがNoneでなければ、引数argsを与えてf( *args )を実行し、その結果を返します。

fがNoneならばデフォルト値dvを返します。

attr_call(e, attr, args=(), dv=None)

2020/MAY/13追加

引数eがNoneではなく、そのオブジェクトのアトリビュートattrが存在すれば、 アトリビュートの値を関数fとして、call(f, args, dv)を実行し、その結果を返します。

eがNoneであったり、アトリビュートが存在しなければ、デフォルト値dvを返します。

to_dic(o)

2020/MAY/16追加

オブジェクトoを再帰的に探索し、Emptyクラスのオブジェクトを辞書に変換して返します。

oがリストや辞書やEmptyクラスのオブエクトの場合、再帰的に探索して、 Emptyクラスのオブジェクトを見つけると辞書に変換します。

タプルや他のクラスなど、Emptyクラスでもリストでも辞書でもないものは対象外で、 そのまま変化しません。

>>> import empty
>>> import yaml_ut

>>> lst = [ empty.new( s1='hello', s2='bye' ), empty.new( v=123, w=456 ) ]
>>> dic = { 'a': empty.new( b='c' ), 'b': empty.new( d='e' ) }
>>> o = empty.new( foo=1, bar=2, hoge=lst, fuga=dic )

>>> empty.to_dic( o )
{'foo': 1, 'bar': 2, 'hoge': [{'s1': 'hello', 's2': 'bye'}, \
{'v': 123, 'w': 456}], 'fuga': {'a': {'b': 'c'}, 'b': {'d': 'e'}}}
>>> print( yaml_ut.dump( empty.to_dic( o ) ) )
bar: 2
foo: 1
fuga:
  a: {b: c}
  b: {d: e}
hoge:
- {s1: hello, s2: bye}
- {v: 123, w: 456}
>>> print( yaml_ut.dump_no_flow( empty.to_dic( o ) ).decode() )
bar: 2
foo: 1
fuga:
  a:
    b: c
  b:
    d: e
hoge:
- s1: hello
  s2: bye
- v: 123
  w: 456

from_dic(o)

2020/MAY/16追加

オブジェクトoを再帰的に探索し、辞書をEmptyクラスのオブジェクトに変換して返します。

oがリストや辞書やEmptyクラスのオブエクトの場合、再帰的に探索して、 辞書を見つけるとEmptyクラスのオブジェクトに変換します。

タプルや他のクラスなど、Emptyクラスでもリストでも辞書でもないものは対象外で、 そのまま変化しません。

>>> import empty
>>> import yaml_ut

>>> lst = [ empty.new( s1='hello', s2='bye' ), empty.new( v=123, w=456 ) ]
>>> dic = { 'a': empty.new( b='c' ), 'b': empty.new( d='e' ) }
>>> o = empty.new( foo=1, bar=2, hoge=lst, fuga=dic )

>>> o = empty.from_dic( o )

>>> o.fuga.a.b
'c'
>>> o.fuga.b.d
'e'
>>> d = empty.to_dic( o )
>>> d
{'foo': 1, 'bar': 2, 'hoge': [{'s1': 'hello', 's2': 'bye'}, \
{'v': 123, 'w': 456}], 'fuga': {'a': {'b': 'c'}, 'b': {'d': 'e'}}}

>>> o2 = empty.from_dic( d )

>>> o2.foo
1
>>> o2.hoge[1].v
123
>>> o2.fuga.b.d
'e'

kwds_pop(targ_dic, **kwds)

empty.pyに入れるべきものじゃないかも?

ですが、とりあえず追加しておきます。

targ_dicの辞書から、kwds引数で指定した辞書のキーを探して、 targ_dicからpop()で取り出します。

pop()なのでtarg_dicからは削除され、 取り出した値は、kwdsの方の辞書に上書きします。

更新されなかったkwdsのキーに対応する値は、引数で指定した時の初期値のままです。

最後にkwdsの辞書をnew()でEmptyクラスにして返します。

説明は長いですが、ソースコードは短いです。

def kwds_pop(targ_dic, **kwds):
	for k in kwds.keys():
		if k in targ_dic:
			kwds[ k ] = targ_dic.pop( k )
	return new( kwds )

例えば、キーワード引数の既存の関数について、 ラッパー関数を作って、独自のキーワード引数を追加したい場面で使います。

def foo(a, *args, **kwds):
	pass
	# ...

な関数があって

	:
foo( 'hello' )
	:
ret = foo( 'bye', 1 )
	:
foo( 'bye-bye', v=2 )
	:
foo( 'abc', 'def', v=3, s='ghi' )
	:

などと、呼び出している箇所があった場合。

呼び出し箇所をラッパー関数bar()に置き換えて、 bar()からはfoo()を呼び出します。

そして、bar()で処理する引数を追加したい場合に

import empty

def bar(a, *args, **kwds):
	prm = empty.kwds_pop( kwds, bar_v=0, bar_s='' )
	print( 'v={} s={}'.fromat( prm.bar_v, prm.bar_s ) )

	return foo( a, *args, **kwds )

などとbar()を用意しておいて

	:
bar( 'hello', bar_v=999 )
	:
ret = bar( 'bye', 1, bar_s='wao' )
	:
bar( 'bye-bye', v=2, bar_v=100, bar_s='100' )
	:
bar( 'abc', 'def', v=3, s='ghi', bar_s='xyz' )
	:

などとfoo()箇所をbar()に書き換えて、キーワード引数を追加します。

bar()内部でkwds_pop()してるので、 foo()には、呼び出し箇所で追加したbar_vやbar_sは渡りません。

chg_new(v, f=None, fv=None)

これまたempty.pyに入れるべきものじゃないかも?

値の変化を検出するのに使います。

初期値vを与えて、オブジェクトを返します。

オブジェクトの属性 v に現在の値が保持されます。

オブジェクトのメソッド set(v) で値を更新します。

保持している値が変化したかどうかが、属性 chg にセットされ、 set()自身の返り値になります。

変化した場合、変化する前の値が、オブジェクトの属性 prev にセットされます。

属性 chg, prev の初期値はNoneです。

引数fに関数を指定すると、set()で保持してる値が変化したときに、 オブジェクトを引数として与え、関数fを呼び出します。

引数fvに関数を指定すると、set()で保持してる値が変化したときに、 set()で指定した値vを引数として与え、関数fvを呼び出します。

(引数fv追加 2020/JUN/13)

def chg_new(v, f=None, fv=None):
	e = new( v=v, prev=None, chg=None )

	def set(v):
		e.chg = e.v != v
		if e.chg:
			e.prev = e.v
			e.v = v

			if f:
				f( e )
			if fv:
				fv( e.v )
		return e.chg

	return to_attr( e, locals() )


dbg.py

dbg.py
#!/usr/bin/env python

import sys
import time
from decimal import Decimal

import empty
import base

def f_out(f, s, tail='\n'):
	f.write(s + tail)
	f.flush()

def out(s, tail='\n'):
	f_out(sys.stdout, s, tail)

def err(s, tail='\n'):
	f_out(sys.stderr, s, tail)

def out_exit(s, code=0):
	out(s)
	sys.exit(code)

def err_exit(s, code=1):
	err(s)
	sys.exit(code)

def help_exit(help, code=1):
	s = ' '.join( [ 'Usage:', sys.argv[0], help ] )
	err_exit(s)

def out_lst(lst, pre=''):
	out( base.to_str( lst, pre ) )


def msg_start(msg, show=True):
	if show:
		out( msg + ' ... ', '' )
	def end():
		if show:
			out( 'ok' )
	return end

start_save = lambda path, show=True: msg_start( 'save ' + path, show )
start_load = lambda path, show=True: msg_start( 'load ' + path, show )


quantize= lambda v, exp: Decimal( v ).quantize( Decimal( exp ) )
# Deciaml( '123.456789' ).quantize( Decimal( '0.001' ) )
# --> 123.456


def laptime_new(s=''):
	lst = []

	def get(s=''):
		inf = empty.new( s=s, tm_sec=time.time(), sec=0, dsec=0 )
		if lst:
			inf.sec = inf.tm_sec - lst[ 0 ].tm_sec
			inf.dsec = inf.sec - lst[ -1 ].sec
		return inf

	get_sec = lambda : get().sec

	def rec(s='', do_show=False):
		inf = get( s )
		lst.append( inf )
		if do_show:
			show( inf )

	def show(inf=None):
		if inf:
			i = lst.index( inf ) if inf in lst else -1
			exp = '0.001'
			sec = quantize( inf.sec, exp )
			dsec = quantize( inf.dsec, exp )
			s = '{} {} {} {}'.format( i, inf.s, sec, dsec )
			out( s )
		else:
			out( '----' )
			for inf in lst:
				show( inf )

	rec( s )

	return empty.new( locals() )

# EOF

デバッグ出力用です。

import dbg
  :
dbg.out('hoge')

out_lst(lst, pre='')

lstの文字列を1行ごとに表示します。

lstは文字列が要素のリストの前提です。

lstの各文字列の先頭に pre の文字列を挿入できるようにしてます。

base.pyのto_str()を使用してます。

msg_start(msg, show=True)

引数msgに処理中を表す文字列を指定します。

msg + ' ... '

を改行なしで表示して、終了時に呼び出す関数を返します。

処理が終わった後で、終了時に呼び出す関数を呼び出すと、

'ok'

を改行付きで表示します。

引数showにFalseを指定すると、同じ処理をしても何も表示しません。

例えば

end = dbg.msg_start( 'calc' )
calc()
end()

を実行すると

calc ...

を表示してから calc() を呼び出します。

calc()の呼び出しから戻ると end() をよびだして

calc ... ok

の表示になります。

表示が不要ならば

end = dbg.msg_start( 'calc', show=False )
calc()
end()

に変更すれば、何も表示されなくなります。

start_save(path, show=True)

msg_start()のsave用です。

例えば

path = 'hoge/hoge.dat'
end = start_save( path )
save_hoge( path )
end()

save hoge/hoge.dat ...

save_hoge( path ) の呼び出しから戻ると

save hoge/hoge.dat ... ok

start_load(path, show=True)

msg_start()のload用です。

例えば

path = 'hoge/hoge.dat'
end = start_load( path )
load_hoge( path )
end()

load hoge/hoge.dat ...

load_hoge( path ) の呼び出しから戻ると

load hoge/hoge.dat ... ok

quantize(v, exp)

Decimalのquantize()を実行してDecimalを返します。

その実態は

from decimal import Decimal

quantize= lambda v, exp: Decimal( v ).quantize( Decimal( exp ) )

です。

使用例

>>> import dbg

>>> dbg.out( '{}'.format( dbg.quantize( 123.456789, '0.001' ) ) )
123.457

>>> dbg.out( '{}'.format( dbg.quantize( '987.654321', '0.001' ) ) )
987.654

laptime_new(s='')

ラップタイム計測用のオブジェクトを返します。

2020/JUN/09 get(), get_sec()追加しました。

主なメソッド

メソッド 内容
get(s='') laptime_new()からの経過時間や前回のrec()呼び出しからの時間などの情報を、
引数の文字列sと共にオブジェクトの属性に設定して返します。
get_sec() laptime_new()からの経過時間を返します。
rec(s='', do_show=False) laptime_new()からの経過時間や前回のrec()呼び出しからの時間などの情報を、
引数の文字列sと共に記録します。
do_show=True を指定すると、上記の情報を表示します。
show(inf=None) get()で返す情報やrec()で記録した情報を表示します。
引数を指定しないと、記録した全ての情報を表示します。

get()が返すオブジェクトの属性

属性 内容
tm_sec time.time()が返す秒数
sec laptime_new()からの経過時間の秒数
dsec 最後にrec()で記録した時間からの秒数

使用例

laptime_test.py
#!/usr/bin/env python

import time

import dbg

def foo():
	time.sleep( 1.5 )

def hoge():
	time.sleep( 0.3 )

def run():
	lap = dbg.laptime_new( '>foo' )
	foo()
	lap.rec( '<foo' )

	for i in range(3):
		if i % 2 == 0:
			lap.rec( '>hoge' )
			hoge()
			lap.rec( '<hoge', True )
		lap.rec()
	foo()
	inf = lap.get( 'foo2' )
	lap.show( inf )

	foo()
	sec = lap.get_sec()
	dbg.out( 'sec={}'.format( sec ) )

	lap.rec( 'end' )
	lap.show()

if __name__ == "__main__":
	run()
# EOF

$ ./laptime_test.py
3 <hoge 1.811 0.305
7 <hoge 2.116 0.305
-1 foo2 3.621 1.505
sec=5.124626874923706
----
0 >foo 0.000 0.000
1 <foo 1.505 1.505
2 >hoge 1.505 0.000
3 <hoge 1.811 0.305
4  1.811 0.000
5  1.811 0.000
6 >hoge 1.811 0.000
7 <hoge 2.116 0.305
8  2.116 0.000
9 end 5.125 3.009


thr.py

thr.py
#!/usr/bin/env python

import sys
import time
import threading
import signal

import empty

try:
	import queue
except ImportError:
	import Queue as queue

event_new = threading.Event

cond_new = threading.Condition

def que_new(tmout=None, dv=None):
	q = queue.Queue()

	is_empty = lambda : q.empty()

	is_full = lambda : q.full()

	def put(v):
		q.put(v)

	def get(tmout_=None, dv_=None):
		v = dv_ if dv_ != None else dv
		try:
			tm = tmout_ if tmout_ != None else tmout
			v = q.get(timeout=tm)
		except:
			pass
		return v

	return empty.new( locals() )


def lock_new():
	rlock = threading.RLock()

	on = lock = rlock.acquire

	off = unlock = rlock.release

	return empty.new( locals() )

def seq_cnt_new():
	e = empty.new( seq=0 )

	lock = lock_new()

	def get():
		lock.on()
		seq = e.seq
		e.seq += 1
		lock.off()
		return seq

	return empty.to_attr( e, locals() )


def ev_val_new(ev=None, v_init=None, v_timeout=None, v_cancel=False):
	e = empty.new()
	e.ev = ev if ev else event_new()

	def init():
		e.v = v_init
		e.dirty = False
		e.ev.clear()

	def set(v):
		e.v = v
		e.dirty = True
		e.ev.set()

	def cancel():
		e.ev.set()

	def wait(tmout):
		init()
		if not e.ev.wait( tmout ):
			return v_timeout
		if not e.dirty:
			return v_cancel
		return e.v

	init()
	return empty.add( e, locals() )


def timer_new(sec, func, args=()):
	e = empty.new( timer=None )

	def f():
		e.timer = False # expired
		if func:
			func( *args )

	def cancel():
		if e.timer:
			e.timer.cancel()
			e.timer = None

	def start():
		if not e.timer:
			e.timer = threading.Timer( sec, f )
			e.timer.start()

	def restart():
		cancel()
		start()

	def is_timeout():
		return e.timer == False

	return empty.to_attr( e, locals() )


def tmr_new( sec, func, args=() ):
	e = empty.new( stat='init' )
	cond = cond_new()

	def f():
		cond.acquire()

		wsec = None
		if e.stat == 'start':
			e.stat = 'wait'
			wsec = sec

		cond.wait( wsec )

		tmout = False
		if e.stat == 'wait':
			e.stat = 'init'
			tmout = True

		cond.release()

		if tmout and func:
			func( *args )

	th = loop_new( f )

	def stat_if_set( stat_if, stat_set ):
		cond.acquire()
		ret = not stat_if or e.stat == stat_if
		if ret:
			e.stat = stat_set
			cond.notify()
		cond.release()
		return ret

	def start():
		return stat_if_set( 'init', 'start' )

	def cancel():
		return stat_if_set( 'wait', 'init' )

	def quit():
		th.quit_ev.set()
		stat_if_set( '', 'quit' )
		th.stop()  # do not call from th

	th.start()

	return empty.add( e, locals() )


def th_new(target=None, args=(), gc=None):
	e = empty.new()

	quit_ev = threading.Event()
	th = None

	def func(*args):
		if e.target:
			e.target( *e.args )
		if gc:
			gc.put( e )
	def start():
		e.th = threading.Thread( target=func, args=e.args )
		e.th.daemon = True
		e.th.start()
		if gc:
			gc.lst.add( e )
	def join():
		if e.th:
			e.th.join()
			if gc:
				gc.lst.rm( e )
	def stop():
		quit_ev.set()
		join()

	return empty.to_attr( e, locals() )

def loop_new(func, args=(), wait_tmout=0, gc=None):
	th = th_new( gc=gc )

	def f():
		while not th.quit_ev.wait(wait_tmout):
			func(*args)
	th.target = f
	return th

def loop_que_new(func, que_tmout=0, wait_tmout=0, gc=None):
	que = que_new(que_tmout)

	def f():
		v = que.get()
		if v != None:
			func(v)

	th = loop_new( f, (), wait_tmout, gc )
	th.que = que
	return th

def call_after_new():
	def f( v ):
		( func, args, kwds ) = v
		func( *args, **kwds )

	th = loop_que_new( f, que_tmout=1.0 )
	th.start()

	def put( func, *args, **kwds ):
		th.que.put( ( func, args, kwds ) )

	def wait_flush():
		while not th.que.is_empty():
			time.sleep( 1 )

	def stop():
		th.stop()

	return empty.new( locals() )

def gc_new():
	def func(th_):
		th_.join()

	th = loop_que_new( func, que_tmout=1.0 )
	th.start()

	def put(th_):
		th.que.put( th_ )

	def lst_new():
		lst = []

		def add(th_):
			lst.append( th_ )

		def rm(th_):
			if th_ in lst:
				lst.remove( th_ )

		def quit_ev_set():
			for th_ in lst:
				th_.quit_ev.set()

		return empty.new( locals() )

	lst = lst_new()

	def stop(delay_sec=None):
		lst.quit_ev_set()

		if delay_sec:
			time.sleep( delay_sec )
		while not th.que.is_empty():
			time.sleep( 1 )
		th.stop()

	return empty.new( locals() )


def ths_new():
	lst = []
	lock = lock_new()

	def add(th):
		if th not in lst:
			lock.on()
			lst.append( th )
			lock.off()
	def rm(th):
		if th in lst:
			lock.on()
			lst.remove( th )
			lock.off()
	def gc():
		cth = threading.current_thread()
		bak_n = len( lst )
		for th in lst[ : ]:
			if th.goal and th.th != cth:
				rm( th )
				th.join()
		n = len( lst )
		#dbg.out( 'gc {} --> {}'.format( bak_n, n ) )

	def func(th):
		gc()
		if th.target:
			th.target( *th.args )
			gc()
		th.goal = True

	def start(th):
		gc()
		th.goal = False
		add( th )

		th.th = threading.Thread( target=func, args=( th, ) )
		th.th.daemon = True
		th.th.start()

	def stop_all(tm_out=5.0):
		for th in lst[ : ]:
			th.quit_ev.set()
		st = time.time()
		while lst and time.time() - st <= tm_out:
			gc()
			time.sleep( 1 )

	return empty.new( locals() )

ths = ths_new()


def sigint_new(cb=None):
	e = empty.new( done=False )

	cond = cond_new()

	def wait():
		cond.acquire()
		cond.wait()
		cond.release()

	def hdl(sig, frame):
		cond.acquire()
		e.done = True
		cond.notify_all()
		cond.release()

	th = None
	if cb:
		def func():
			wait()
			cb()
		th = loop_new( func )
		th.start()

	def fini():
		if th:
			th.stop()

	signal.signal( signal.SIGINT, hdl )

	return empty.to_attr( e, locals() )

def set_sig_func(sig, func):
	bak = signal.getsignal( sig )

	def hdl(sig, frame):
		func()
		if callable( bak ):
			bak( sig, frame )

	signal.signal( sig, hdl )

# EOF

スレッド関連です。

ThreadとQueueです。

Eventは単純過ぎかなと思い、入れてません。

importの手間を省くだけのために Event と Condition も追加しました。

threadingのまんまです。;-p)

event_new(...)

実態は

event_new = threding.Event

です。

import threading
ev = threding.Event( ... )

の代わりに

import thr
ev = thr.event_new( ... )

としたかっただけです。

cond_new(...)

実態は

condt_new = threding.Condition

です。

import threading
ev = threding.Condition( ... )

の代わりに

import thr
ev = thr.cond_new( ... )

としたかっただけです。

que_new(tmout=None, dv=None)

put(),get()メソッドだけです。

is_empty(), is_full()メソッド追加しました。2020/FEB/21

put()はそのまま。

get()は生成時のタイムアウト指定が、デフォルトになります。

get()でキューが空のとき、例外をキャッチして指定のデフォルト値(dv)を返します。

2020/MAY/24

que_new()のget()を更新しました。

except queue.Empty:

行を

except:

に変更しました。

python2でこの行で不具合がでる環境があったので、とりあえず変更してみました。

lock_new()

内部でthreading.RLockオブジェウトを生成して、 ラップしたオブジェクトを返します。

メソッド 内容
lock() ロックします .acquire()
on() ロックします .acquire()
unloc() アンロックします .release()
off() アンロックします .release()
def lock_new():
	rlock = threading.RLock()

	on = lock = rlock.acquire

	off = unlock = rlock.release

	return empty.new( locals() )

seq_cnt_new()

初期値0の整数のカウンタのオブジェクトを返します。

get()メソッドを呼び出すと、現在のカウンタ値を返します。

同時に、排他処理でカウンタ値をインクリメントして更新します。

ev_val_new(ev=None, v_init=None, v_timeout=None, v_cancel=False)

イベントに値を付加した版です。

どこかで生成したイベントを引数evで与えても良く、 与えないと内部でイベントを生成して使います。

メソッド

メソッド 内容
init() 値をv_initに初期化し、イベントをクリアします。
set(v) 値vをセットしてイベントをセットします。
cancel() 値をセットせずにイベントをセットします。
wait(tmout) init()で初期化してからtmout秒の指定でイベントを待ちます。
タイムアウトしたら、v_timeout(デフォルトNone)を返します。
set(v)メソッドでイベントがセットされた場合は、値vを返します。
cancel()メソッドなどで単にイベントがセットされただけの場合は、
v_cancel(デフォルトFalse)を返します。

timer_new(sec, func, args=())

threading.Timerとさほど変わりませんが、 ラップしたオブジェクトを返します。

メソッド

メソッド 内容
start() タイマー動作開始
既に開始してると何もしません
cancel() タイマー動作キャンセル
開始してないと何もしません
restart() 内部でcancel(), start()します
is_timeout() タイマーが期限切れになって関数が呼び出されたらTrueを返します
cancel()したり、start()しなおしたらFalseを返す状態に戻ります

tmr_new( sec, func, args=() )

timer_new()とほぼ同じ仕様ですが、 cond_new() を使ってスレッドの排他制御を、まともっぽくしてみました。

タイマーが切れる微妙なタイミングでのキャンセルや終了指示で、 空振りしないようにしてます。

restart()は廃止。必要ならば呼び出し側でcancel(), start()で。

タイマーで呼び出される関数からstart()の呼び出しは可能ですが、 quit()の呼び出しはダメ。

quit()は関数を呼び出しているスレッドの終了処理を行うので、 自身のスレッドをjoin()してエラーになります。

th_new(target=None, args=(), gc=None)

スレッドの基本部分です。

targetで関数を、argsで関数に指定する引数リストを指定します。

start()でスレッドで関数の実行を開始。

stop()で終了用のイベント(quit_ev)をセットして、join()で終了を待ちます。

targetで指定した関数でquit_evを使用するには、 th_new()ではtargetを指定せずに、そのreturnクラスに対してtargetを指定します。
(ややこしい仕様...)

import thr

th = thr.th_new()

d = {}
def func(k, v, v_fin):
	d[k] = v
	th.quit_ev.wait()
	d[k] = v_fin

th.target = func
th.args = ('hoge', 1, 2)

th.start()
a = d.get('hoge')
th.stop()
b = d.get('hoge')

引数 gc=None を追加しました。2020/FEB/21

gc=Noneであれば従来通りです。

gcにはgc_new()で生成したオブジェクトを指定します。

gcを指定しておくと、明示的にstop()の呼び出しをしなくても、 targetの関数が終了すれば、gcがjoin()して資源を回収します。

gcに機能を追加しました。2020/MAY/10

gcに終了時に、th_new()由来の全てのスレッドを停止させようとする機能を追加しました。

gcを指定しておくと、スレッド生成時にgcのリストに登録します。

スレッド終了でjoin()されると、登録したgcのリストから削除します。

gc自体を終了するときに、リストに残っているスレッドに終了用のイベントをセットします。

loop_new(func, args=(), wait_tmout=0)

funcを繰り返し実行するスレッド用です。

毎回wait_tmoutの指定秒だけ終了イベント(quit_ev)を待って終了判定します。

import time
import thr

def hoge():
	print( time.time() )

th = thr.loop_new( hoge, wait_tmout=0.5 )

th.start()
time.sleep(10)
th.stop()

引数 gc=None を追加しました。2020/FEB/21

gc=Noneであれば従来通りです。

gcにはgc_new()で生成したオブジェクトを指定します。

gcを指定しておくと、明示的にstop()の呼び出しをしなくても、 targetの関数が終了すれば、gcがjoin()して資源を回収します。

loop_que_new(func, que_tmout=0, wait_tmout=0, gc=None)

funcを繰り返し実行するスレッド用です。

ただし、funcは引数を1つとります。

funcへの引数は、生成したキューから取り出して与えられます。

import thr

def hoge(v):
	print(v)

th = thr.loop_que_new( hoge, 0.5 )

th.start()

th.que.put(1)
th.que.put(2)
th.que.put(999)

th.stop()

引数 gc=None を追加しました。2020/FEB/21

gc=Noneであれば従来通りです。

gcにはgc_new()で生成したオブジェクトを指定します。

gcを指定しておくと、明示的にstop()の呼び出しをしなくても、 targetの関数が終了すれば、gcがjoin()して資源を回収します。

call_after_new()

関数を実行する専用スレッドを1つ、loop_que_new()で生成し、 制御用のオブジェクトを返します。

put( func, *args, **kwds ) メソッドを呼び出すと、引数の情報が スレッドの入力用のキューに入れられます。

スレッドではキューから情報を取り出し、関数を呼び出します。

メソッド

メソッド 内容
put( func, *args, **kwds ) 引数の情報をスレッド入力用のキューに入れてスレッドに渡します。
wait_flush() スレッド入力用のキューが空になるまで待ちます。
stop() スレッドの実行を終了します。

実行例

>>> import thr
>>> ca = thr.call_after()
>>> ca.put( print, 'foo' )
foo
>>> import time
>>> ca.put( sleep, 30 )
>>> ca.put( print, 'bar' )
>>> ca.wait_flush()
# 30秒経過後
bar
>>>

gc_new()

ゴミ集め用のオブジェクトを生成して返します。

生成時にスレッドを1つ生成しスタートします。

生成したオブジェクトは、 スレッド生成用の関数の引数gcに指定します。

有効な引数gcが指定されて生成されたスレッドは、 明示的にstop()メソッドを呼び出してjoin()しなくても、 quit_evイベントをセットするなどして関数が終了しさえすれば、 gcのスレッドがjoin()を実行して資源を回収します。

gcのstop(delay_sec=None)メソッドで回収用のスレッドを終了します。

delay_secでメソッド先頭で待機する秒数を指定します。

例えば、終了処理で、対象のスレッドのquit_evイベントをセットしても、 指定のタイムアウト秒経過しないと、スレッドが終了しない場合、 すぐにはgcにキューイングされません。

そのような場合に、delay_secを適宜指定します。

gcに機能を追加しました。2020/MAY/10

gcに終了時に、th_new()由来の全てのスレッドを停止させようとする機能を追加しました。

gcを指定しておくと、スレッド生成時にgcのリストに登録します。

スレッド終了でjoin()されると、登録したgcのリストから削除します。

gc自体を終了するときに、リストに残っているスレッドに終了用のイベントをセットします。

gc_new()の使用例

各スレッドのstop()を呼び出さなくてもOK。

#!/usr/bin/env python

import time
import thr
import dbg

def hoge():
	dbg.out( 'hoge' )
	time.sleep( 3 )
	dbg.out( 'hoge' )

def bar():
	dbg.out( 'bar' )

def foo(v):
	dbg.out( 'foo v={}'.format( v ) )

def run():
	gc = thr.gc_new()

	thr.th_new( hoge, gc=gc ).start()

	th_bar = thr.loop_new( bar, wait_tmout=1.0, gc=gc )
	th_bar.start()

	th_foo = thr.loop_que_new( foo, que_tmout=1.0, gc=gc )
	th_foo.start()
	for i in range(8):
		th_foo.que.put( i )
		time.sleep( 0.5 )

	gc.stop( 2.0 )

if __name__ == "__main__":
	run()
# EOF
$ ./thr_test.py
hoge
foo v=0
foo v=1
bar
foo v=2
foo v=3
bar
foo v=4
foo v=5
hoge
bar
foo v=6
foo v=7
bar
$

ths_new()

thr.pyの中で

ths = ths_new()

しているので

import thr

してる側からは、

thr.ths.start(th)

thr.ths.stop_all()

を使用します。

thr.pyのgc処理 を経て、 2021/JAN/13 ちょっと茶ぶ台返し が、その経緯となります。

sigint_new(cb=None)

thr.pyに入れるべき内容じゃないかもですが、 thr.pyの関数をいくつか使うので、入れてしまってます。

^CキーによるSIGINTの簡単なユーティリティです。

オブジェクトを返します。

引数cbに関数を指定すると、^Cキーで指定した関数を引数なしで呼び出します。

メソッドや属性

メソッド、属性 内容
wait() 呼び出すと^Cキーが押されるまでブロックします。
^Cが押されると呼び出し元に戻ります。
fini() 終了処理です。
cbに関数を指定していた場合は、内部で実行するスレッドの終了処理をします。
done 初期値Falseで一度^Cキーが押されるとTrueになります。
>>> import thr

>>> sigint = thr.sigint_new( lambda : print( 'hoge' ) )
>>> sigint.done
False

>>> sigint.wait()
^Cキー入力
hoge

>>> sigint.done
True
>>>

^Cキー入力
hoge

^Cキー入力
hoge

>>> ^D
$

set_sig_func(sig, func)

これまたthr.pyに入れるべき内容じゃないかもですが...

引数sigのシグナルでfuncの関数を引数なしで呼び出します。

元々、呼び出し可能なpythonレベルのsignal handlerが設定されていた場合は、 func()を呼び出した後で、signal hadlerを呼び出します。


cmd_ut.py

cmd_ut.py
#!/usr/bin/env python

import os
import subprocess
import signal
import six
import time

import empty
import io_ut
import tm_ut
import dbg

PIPE = subprocess.PIPE
SIGKILL = signal.SIGKILL
SIGINT = signal.SIGINT

def call(cmd, defval='', b2s=False):
	try:
		s = subprocess.check_output( cmd, shell=True )
		if six.PY3 and b2s:
			s = s.decode()
		return s
	except:
		return defval

def kill_pid(pid, sig=SIGKILL):
	os.killpg(pid, sig)

def proc_new(cmd, stdin=None, stdout=None, stderr=None, preexec_fn=os.setsid, sudo_pw=''):
	proc = None
	try:
		if sudo_pw:
			cmd = 'sudo -S ' + cmd
		proc = subprocess.Popen( cmd, shell=True, stdin=stdin, stdout=stdout, stderr=stderr, preexec_fn=preexec_fn )
		if sudo_pw:
			proc.stdin.write(sudo_pw + '\n')
			proc.stdin.flush()
	except:
		pass

	def is_alive():
		if not proc:
			return False
		proc.poll()
		return proc.returncode == None

	def kill(sig=SIGKILL):
		if proc and is_alive():
			if sudo_pw:
				cmd = 'echo {} | sudo -S kill -{} -{} >/dev/null 2>&1'.format(sudo_pw, sig, proc.pid)
				call(cmd)
			else:
				kill_pid(proc.pid, sig)

	def wait(timeout=None):
		if six.PY2:
			proc.wait()
		else:
			proc.wait(timeout)

	def communicate(input=None, timeout=None):
		if not proc:
			return (None, None)
		if six.PY2:
			return proc.communicate(input)
		return proc.communicate(input, timeout)

	get_stdin = lambda : proc.stdin if proc else None
	get_stdout = lambda : proc.stdout if proc else None
	get_stderr = lambda : proc.stderr if proc else None

	def write_stdin_flush(b):
		stdin = get_stdin()
		if stdin:
			stdin.write(b)
			stdin.flush()

	def read_stdout(defval=None):
		stdout = get_stdout()
		return stdout.read() if stdout else defval

	return empty.new( locals() )

def cb_line(cmd, cb, preexec_fn=os.setsid):
	proc = proc_new( cmd, stdout=PIPE, preexec_fn=preexec_fn )
	f = proc.get_stdout()

	while True:
		s = f.readline().decode()
		if not s:
			break
		cb( s )

	proc.kill()
	proc.wait()

def call_show(cmd):
	# show stdout stderr, and return bool
	proc = proc_new( cmd )
	proc.wait()
	return proc.proc.returncode == 0

def call_comm(cmd, in_b):
	proc = proc_new(cmd, stdin=PIPE, stdout=PIPE)
	out_b = proc.communicate(in_b)[0]
	proc.wait()
	return out_b

def proc_new_comm_tm2(cmd, tmout1, tmout2):
	proc = proc_new(cmd, stdin=PIPE, stdout=PIPE)
	kill = proc.kill
	wait = proc.wait

	stdout = proc.get_stdout()
	rdr = io_ut.reader_new( stdout, tmout2, no_buf=True )

	def write(s):
		proc.write_stdin_flush( s.encode() )

	def read():
		if not io_ut.readable(stdout, tmout1):
			return None # timeout
		return rdr.read() # use tmout2

	def comm(s):
		write(s)
		return read()

	return empty.new( locals() )

def cmd_py(name):
	path = name + '.py'
	if os.path.exists(path):
		return './' + path

	return 'python -m ' + name

def cmd_stdio_conn(cmd1, cmd2, fifo='fifo'):
	cmd = 'rm -f {} ; mkfifo {} ; {} < {} | {} > {} ; rm -f {}'
	return cmd.format( fifo, fifo, cmd1, fifo, cmd2, fifo, fifo )

def call_stdio_conn(cmd1, cmd2, fifo='fifo', defval=''):
	cmd = cmd_stdio_conn( cmd1, cmd2, fifo )
	return call( cmd, defval )

def proc_stdio_conn(cmd1, cmd2, fifo='fifo', sudo_pw=''):
	cmd = cmd_stdio_conn( cmd1, cmd2, fifo )
	return proc_new( cmd, sudo_pw=sudo_pw )

def rm_rf(fn):
	if os.path.exists(fn):
		cmd = 'rm -rf {}'.format(fn)
		call(cmd)
		return cmd
	return ''

def mk_tmp_dir(pre='tmp_'):
	name = pre
	while True:
		sec = tm_ut.now_sec()
		name = tm_ut.sec_to_fn( pre, sec )
		if not os.path.exists( name ):
			break
		time.sleep( 1 )
	call( 'mkdir ' + name )
	return name

# EOF

外部コマンド実行用です。

subprocessを使ってます。

call(cmd, defval='', b2s=False)

cmd文字列のコマンドをshell=True指定で実行します。

実行結果の文字列を返します。

例外が発生すると、指定のデフォルト値を返します。

import cmd_ut

s = cmd_ut.call('date')
print(s)

2020/ARP/01 引数b2sを追加しました。

python3では結果がstrではなくbytesが返りますが、 b2s=True を指定すると、strに変換して返します。

proc_new(cmd, stdin=None, stdout=None, stderr=None, preexec_fn=os.setsid, sudo_pw='')

subproess.Popen()でcmd文字列をサブプロセスで実行します。

import time
import cmd_ut

cmd = "find / -name '*.py'"
proc = proc_new(cmd)

time.sleep(10)

proc.kill()
proc.wait()

cb_line(cmd, cb, preexec_fn=os.setsid)

cmd文字列を proc_new( cmd, stdout=PIPE ) で実行します。

実行コマンドの標準出力から行単位で文字列を取得し、 コールバック関数 cb() の引数に与えて呼び出します。

実行コマンドが終了するまで、cb_line()の呼び出しはブロックします。

def cb_line(cmd, cb):
	proc = proc_new( cmd, stdout=PIPE )
	f = proc.get_stdout()

	while True:
		s = f.readline().decode()
		if not s:
			break
		cb( s )

	proc.kill()
	proc.wait()

$ ping localhost | head
PING localhost (127.0.0.1) 56(84) bytes of data.
64 bytes from localhost (127.0.0.1): icmp_seq=1 ttl=64 time=0.035 ms
64 bytes from localhost (127.0.0.1): icmp_seq=2 ttl=64 time=0.040 ms
64 bytes from localhost (127.0.0.1): icmp_seq=3 ttl=64 time=0.022 ms
	:

$ python
>>> import cmd_ut

>>> def cb(s):
...     print( s.split()[ -2 ] )
...

>>> cmd_ut.cb_line( 'ping localhost', cb )
of
time=0.022
time=0.046
time=0.022
  :

call_show(cmd)

cmd文字列を実行しますが、 call(cmd)とは異なり、標準出力、標準エラーはそのままコンソールに表示します。

コマンドのreturncodeが0(正常終了)ならばTrueを、 そうでなければFalseを返します。

call_comm(cmd, in_b)

cmd文字列を実行し、その標準入力にデータin_bを与え、 標準出力の結果を返します。

内部では、Popen()したprocをcommunicate()を使って処理し、 wait()でprocの終了を待ちます。

proc_new_comm_tm2(cmd, tmout1, tmout2)

proc_new()を使用して、 subproess.Popen()でcmd文字列をサブプロセスで実行し、 オブジェクトを返します。

オブジェクトのメソッドを呼び出して、 実行しているコマンドの標準入力に文字列を与えたり、 標準出力の文字列を取得できます。

Popenのcommunicate()の呼び出しでは、 標準入力へデータを与えた後、標準入力を閉じてしまうようなので、 1回だけしか使えない? (使いか方間違ってるだけかも?)

対話的に入出力をさせたく、本関数を追加してみました。

オブジェクトの主なメソッド

tmout1
コマンドの標準出力から文字列を取得するときに、 最初に読み出し可能になるまでのタイムアウト秒数を指定します。
tmout2
コマンドの標準出力から文字列を取得するときに、 文字列を読み出し中に後続に文字列が無いと判断するときの、 タイムアウト秒数を指定します。
メソッド 内容
write(s) 実行しているコマンドの標準入力に文字列sを与えます。
read() 実行しているコマンドの標準出力から取得した文字列を返します。
タイムアウトした場合はNoneが返ります。
EOFの場合は空文字列''が返ります。
comm(s) write(s)で実行しているコマンドの標準入力に文字列sを与え
read()で実行しているコマンドの標準出力から取得した文字列を返します。
タイムアウトした場合はNoneが返ります。
EOFの場合は空文字列''が返ります。

cmd_py(name)

pythonのモジュールnameを実行するときのコマンド文字列を返します。

カレントディレクトリに name + ".py" のファイルが存在すれば、

"./" + name + ".py"

を返します。

存在しなければ、環境のpythonの検索パスにモジュールnameが、 存在する事にして、

"python -m " + name

を返します。

$ python
>>> import cmd_ut

>>> cmd_ut.cmd_py('io_ut')
'./io_ut.py'

>>> cmd_ut.cmd_py('hoge')
'python -m hoge'

cmd_stdio_conn(cmd1, cmd2, fifo='fifo')

文字列cmd1, cmd2の2つのコマンドの双方の標準入出力を相互に繋いだ状態で、 起動するためのコマンド文字列を返します。

要はsnddivでGUI部分のコマンドを繋ぐときのアレです。

GUIとの通信経路

>>> import cmd_ut
>>> cmd_ut.cmd_stdio_conn('hoge fuga', 'foo bar')
'rm -f fifo ; mkfifo fifo ; hoge fuga < fifo | foo bar > fifo ; rm -f fifo'

call_stdio_conn(cmd1, cmd2, fifo='fifo', defval='')

cmd_stdio_conn()で 文字列cmd1, cmd2の2つのコマンドの双方の標準入出力を相互に繋いだ状態で、 起動するためのコマンド文字列を生成して、 call()で実行します。

proc_stdio_conn(cmd1, cmd2, fifo='fifo', sudo_pw='')

cmd_stdio_conn()で 文字列cmd1, cmd2の2つのコマンドの双方の標準入出力を相互に繋いだ状態で、 起動するためのコマンド文字列を生成して、 proc_new()でサブプロセスで実行します。

rm_rf(fn)

ファイルやディレクトリ以下の階層を削除します。

fnが存在すれば"rm -rf"コマンドで削除し、実行したコマンドの文字列を返します。

fnが存在しなければ、何もせずに空文字列''を返します。

$ mkdir -p hoge/fuga
$ echo foo > hoge/fuga/foo.txt
$ echo bar > hoge/bar.txt

$ find hoge
hoge
hoge/bar.txt
hoge/fuga
hoge/fuga/foo.txt

$ python
>>> import cmd_ut
>>> cmd_ut.rm_rf('foo')
''
>>> cmd_ut.rm_rf('hoge')
'rm -rf hoge'
>>> ^D

$ find hoge
find: hoge: No such file or directory
$

mk_tmp_dir(pre='tmp_')

テンポラリ・ディレクトを作成してその名前を返します。

preの直後に、現在の日付時刻の文字列を追加した名前のディレクトリを作成しようとします。

既にその名前のエントリがあれば、1秒待機して時刻の更新を待ってから、 名前を作りなおしてリトライします。

未使用の名前が見つかるまで繰り返します。;-p)

>>> cmd_ut.mk_tmp_dir( '/tmp/ttt_' )
'/tmp/ttt_20200401003843'

>>> s = cmd_ut.call( 'ls -lt /tmp/ | head', b2s=True )
>>> print( s )
total 2072
drwxr-xr-x  2 kondoh  wheel      68  4  1 00:38 ttt_20200401003843
  :


io_ut.py

io_ut.py
#!/usr/bin/env python

import os
import select
import six
import empty
import dbg

def no_buffering(f):
	if six.PY2:
		return os.fdopen(f.fileno(), 'r', 0)
	return os.fdopen(f.fileno(), 'rb', buffering=0)

readable = lambda f, tmout=0: select.select([f], [], [], tmout)[0] == [f]

def read1(f, tmout=0):
	if readable(f, tmout):
		return f.read(1) # EOF is b'' or ''
	return None # timeout

def reader_new(f, tmout=0, to_str=True, no_buf=False):
	# return '' is EOF
	# return None is timeout

	if no_buf:
		f = no_buffering(f)

	e = empty.new()

	readable_ = lambda : readable(f, tmout)
	e.readable = readable_

	e.type = None

	def read1_():
		b = read1(f, tmout)
		if b == None:
			return None # timeout
		if e.type == None or e.type != type(b):
			e.type = type(b)
		if to_str and  e.type == bytes:
			return b.decode() # EOF is ''
		return b # EOF is '' or b''
	e.read1 = read1_

	def read():
		lst = []
		r1 = None
		while True:
			r1 = read1_()
			if not r1: # None is timeout , '' or b'' is EOF
				break
			lst.append(r1)

		if not lst:
			if r1 == None: # timeout
				return None
			r0 = '' if e.type in (str, None) else b''
			return r0

		r0 = '' if type(lst[0]) == str else b''
		return r0.join(lst)

	def read_to(k): # to_str=True only
		buf = ''
		while True:
			r1 = read1_()
			if not r1: # None is timeout , '' is EOF
				return buf if buf else r1
			buf += r1
			if buf.endswith(k):
				break
		return buf

	e.buf = ''

	def pop_buf():
		(r, e.buf) = (e.buf, '')
		return r

	def readline(): # to_str=True only
		while True:
			c = read1_()
			if c == None: # timeout
				break
			if c == '': # EOF
				return pop_buf()
			e.buf += c
			if c in ('\n', '\r'):
				return pop_buf() # '\n', '\r' contained
		return None # timeout

	e.lines = []

	def pop_lines():
		(r, e.lines) = (e.lines, [])
		return r

	def readlines(): # to_str=True only
		# delim is "\n" line
		# return list of str
		# return [] is EOF
		# return None is timeout

		while True:
			s = readline()
			if s == None: # timeout
				break
			if s == '': # EOF
				return pop_lines()
			e.lines.append(s)
			if s == '\n': # delim
				return pop_lines() # delim contained
		return None # timeout

	return empty.to_attr( e, locals() )


if __name__ == "__main__":
	import cmd_ut
	import yaml

	cmd = 'ls ; echo "" ; ls ; echo ""'
	proc = cmd_ut.proc_new(cmd, stdout=cmd_ut.PIPE)
	f = proc.get_stdout()
	rdr = reader_new(f)

	while True:
		lst = rdr.readlines()
		if lst == []:
			break
		if lst:
			for s in lst:
				dbg.out(s, '')
# EOF

主にファイルIO関連用です。

select()が面倒なので、どこかに入れてしまおうかと思いつつ、 io_ut.py を新規追加してみました。

readable(f, tmout=0)

selectでリード可能か判定です。

read1(f, tmout=0)

1バイトリードです。

selectでタイムアウトしたらNoneを返します。

リード可能でもEOFなら b'' か '' を返します。

reader_new(f, tmout=0, to_str=True, no_buf=False)

リード用のファイルfを指定して、リーダを生成します。

no_buf=Trueでfから入力のバッファリングを無しにします。

('rb'でfdopen()してます。なんとなく動いてますが、このやり方で良いのだろうか?)

基本的に to_str=True で、バイナリのbytesではなく、文字列のstr前提です。

リーダの主なメソッド

メソッド 内容
readable() selectでリード可能か判定です。
read1() 1バイトリードです。
selectでタイムアウトしたらNoneを返します。
EOFならb'' か ''を返します。
read() リード可能が続く限り1バイトリードを使ってリードします。
selectでタイムアウトしたらNoneを返します。
EOFならb'' か ''を返します。

過去に1度もリードに成功せずにEOFになった場合は、
型が不明なので''を返します。
read_to(k) 1バイトリードでタイムアウトするか、EOFになるか、
あるいは文字列kをリードするまでリードを続けて、
リードした文字列を返します。

1バイトもリード出来ずにタイムアウトしたらNoneを、
1バイトもリード出来ずにEOFなら''を返します。
readline() 1行リードで、改行(0x0A)か復帰(0x0D)までリードして返します。
返り値の末尾に改行文字を含みます。
selectでタイムアウトしたらNoneを返します。
EOFなら''を返します。
readlines() 複数行リードで、改行文字だけの行までをリストにして返します。
リストの末尾に改行文字だけの行を含みます。
selectでタイムアウトしたらNoneを返します。
EOFなら[]を返します。


tcp_ut.py

tcp_ut.py
#!/usr/bin/env python

import sys
import socket
import select
import time

import empty
import thr
import dbg

readable = lambda f, tmout=0: select.select( [ f ], [], [], tmout )[ 0 ] == [ f ]

def recv_sock( f, bufmax=4*1024, tmout=3.0 ):
	if not readable( f, tmout ):
		return None  # time out
	dbg.out( 'readable' )
	return f.recv( bufmax ).decode( 'utf-8' )  # '' EOF

def recv_sock_loop( f, is_quit, bufmax=4*1024, tmout=3.0 ):
	s = None
	while s is None:
		if is_quit():
			return False  # quit
		s = recv_sock( f, bufmax, tmout )
		dbg.out( 's={}'.format( s ) )
	return s

def send_sock( f, s ):
	try:
		f.sendall( s.encode( 'utf-8' ) )
	except:
		pass

def pipe_sock( f, f_, is_quit, bufmax=4*1024, tmout=3.0 ):
	while not is_quit():
		s = recv_sock_loop( f, is_quit, bufmax, tmout )
		if not s:
			break
		send_sock( f_, s )

def srv_new( port ):
	ss = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
	ss.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
	ss.bind( ( '', port ) )
	ss.listen( 5 )

	def accept( tmout ):
		if not readable( ss, tmout ):
			return None
		( cs, adr ) = ss.accept()
		return cs

	def close():
		ss.close()

	def accept_loop( cb, is_quit, tmout ):
		while not is_quit():
			cs = accept( tmout )
			if not cs:
				continue

			def func():
				cb( cs )
				cs.close()

			th = thr.th_new( func )
			thr.ths.start( th )
		close()

	return empty.new( locals() )

def cli_conn( port ):
	cs = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
	try:
		host = 'localhost'
		cs.connect( ( host, port ) )
	except:
		return None
	return cs

def uniq_cnt_new():
	e = empty.new()
	e.i = 0

	lock = thr.lock_new()

	def get():
		lock.on()
		v = e.i
		e.i += 1
		lock.off()
		return v

	def get_str():
		return str( get() )

	return empty.add( e, locals() )

def srv_id( port, is_quit_, bufmax=4*1024, tmout=3.0 ):

	e = empty.new()
	e.killed_all = False

	def is_quit():
		return is_quit_() or e.killed_all

	def cb( f ):
		dbg.out( 'cb' )

		ids = {}

		def srv_boot( id ):
			if id in ids:
				send_sock( f, 'already boot id' )
				f.close()
				return

			e = empty.new()
			e.f = f
			e.dic = {}
			e.uniq_cnt = uniq_cnt_new()
			ids[ id ] = e
			send_sock( f, 'ok' )

		def srv_rep( id, k ):
			if id not in ids:
				send_sock( f, 'not found id' )
				f.close()
				return

			e = ids.get( id )
			if k not in e.dic:
				send_sock( f, 'not found k' )
				f.close()
				return

			lst = e.dic.get( k )
			if not lst or len( lst ) != 1:
				send_sock( f, 'invalid state' )
				f.close()
				return

			f_ = lst[ 0 ]
			lst.append( f )

			send_sock( f, 'ok' )

			pipe_sock( f, f_, is_quit, bufmax, tmout )
			if k in e.dic:
				e.dic.pop( k )
			f.close()
			f_.close()

		def conn_id( id ):
			if id not in ids:
				send_sock( f, 'not found id' )
				f.close()
				return

			e = ids.get( id )
			k = e.uniq_cnt.get_str()
			e.dic[ k ] = [ f ]

			send_sock( e.f, k )

			lst = []
			while True:
				if is_quit():
					f.close()
					return
				lst = e.dic.get( k, [] )
				if len( lst ) >= 2:
					break
				time.sleep( 1.0 )

			( f, f_ ) = lst[ : 2 ]

			send_sock( f, 'ok' )

			pipe_sock( f, f_, is_quit, bufmax, tmout )
			if k in e.dic:
				e.dic.pop( k )
			f.close()
			f_.close()

		def kill_id( id ):
			e = ids.pop( id )
			ks = e.dic.keys()
			for k in ks:
				lst = e.dic.pop( k )
				for f_ in lst:
					f_.close()
			e.f.close()

		def srv_kill( id ):
			if id not in ids:
				send_sock( f, 'not found id' )
				f.close()
				return

			kill_id( id )
			send_sock( f, 'ok' )
			f.close()

		def kill_all():
			for id in ids.keys():
				kill_id( id )
			send_sock( f, 'ok' )
			f.close()
			e.killed_all = True


		s = recv_sock_loop( f, is_quit, bufmax, tmout )

		dbg.out( 'cb s={}'.format( s ) )

		if s:
			lst = s.split()
			n = len( lst )
			if n == 2 and lst[ 0 ] == 'srv_boot':
				# srv_boot id
				id = lst[ 1 ]
				srv_boot( id )
			elif n == 2 and lst[ 0 ] == 'srv_kill':
				# srv_kill id
				id = lst[ 1 ]
				srv_kill( id )
			elif n == 3 and lst[ 0 ] == 'srv_rep':
				# srv_rep id k
				( id, k ) = lst[ 1 : ]
				srv_rep( id, k )
			elif n == 1 and lst[ 0 ] == 'kill_all':
				# kill_all
				kill_all()
			elif n == 1:
				# id
				conn_id( id )
			else:
				f.close()
		else:
			f.close()


	srv = srv_new( port )
	dbg.out( 'srv={}'.format( srv ) )

	srv.accept_loop( cb, is_quit, tmout )


def msg_ret_None( s ):
	dbg.out( s )
	return None

def conn_send_recv_ok_confirm( port, s, is_quit ):
	f = cli_conn( port )
	if not f:
		return msg_ret_None( 'failed conn' )

	send_sock( f, s )

	s = recv_sock_loop( f, is_quit )
	if not s:
		return msg_ret_None( 'no recv' )
	if s != 'ok':
		return msg_ret_None( s )
	return f

def srv_id_boot_id( port, id, is_quit ):
	s = 'srv_boot ' + id
	return conn_send_recv_ok_confirm( port, s, is_quit )

def srv_id_accept( port, f, is_quit, bufmax=4*1024, tmout=3.0 ):
	s = recv_sock_loop( f, is_quit, bufmax, tmout )
	if not s:
		return msg_ret_None( 'no recv' )
	id = s
	s = 'srv_rep ' + id
	return conn_send_recv_ok_confirm( port, s, is_quit )

def srv_id_conn_id( port, id, is_quit ):
	s = id
	return conn_send_recv_ok_confirm( port, s, is_quit )

def srv_id_kill_id( port, id, is_quit ):
	s = 'srv_kill ' + id
	return conn_send_recv_ok_confirm( port, s, is_quit )

def srv_id_kill_all( port, is_quit ):
	s = 'kill_all'
	return conn_send_recv_ok_confirm( port, s, is_quit )

def run():

	port = 12345

	is_quit = lambda : False

	def f():
		srv_id( port, is_quit )

	th = thr.th_new( f )
	thr.ths.start( th )


	id = 'foo'
	ss = srv_id_boot_id( port, id, is_quit )
	dbg.out( 'ss={}'.format( ss ) )

	def acc_f( cs ):
		s = recv_sock_loop( cs, is_quit )
		send_sock( cs, s.upper() )

	def acc():
		while not is_quit():
			cs = srv_id_accept( port, ss, is_quit )
			if cs:
				th = thr.th_new( acc_f )
				thr.ths.start( th )

	th_acc = thr.th_new( acc )
	thr.ths.start( th_acc )


	f = srv_id_conn_id( port, id, is_quit )
	if f:
		send_sock( f, 'abc' )
		s = recv_sock_loop( f, is_quit )
		if s:
			dbg.out( s )


	srv_id_kill_id( port, id, is_quit )

	srv_id_kill_all( port, is_quit )

	thr.ths.stop_all()

if __name__ == "__main__":
	run()
# EOF

TCPソケットの通信用です。

chat_conn.py よりレベルが低くて軽いです。

srv_new( port, cb=None, bufmax=4*1024, tmout=3.0, kill_word='kill_srv' )

サーバ用のスレッドを生成、実行し、その制御用のオブジェクトを返します。

引数 内容
port サーバのポート番号。
cb コールバック関数を指定します。
クライアントが送信した文字列の引数1つを受け取り、クライアントに返信する文字列を返す関数を指定します。
bufmax ソケットのrecv()メソッドで指定するバッファサイズを指定します。
tmout 受信タイムアウト秒数を指定します。
kill_word クライアントからこの文字列を受信すると、全てのソケットをクローズして、サーバスレッドを終了します。
オブジェクトの属性 内容
cb コールバック関数。
srv_new()の引数で指定したものが設定されますが、オブジェクト生成後に、この属性に代入して設定する事もできます。
オブジェクトのメソッド 機能
stop() 全てのソケットをクローズして、サーバスレッドを終了します。
is_alive() サーバスレッドが動作してるかどうかをboolで返します。
wait() サーバスレッドが終了するまで待機します。

cli_new( port, host='localhost', bufmax=4*1024, tmout=3.0, kill_word='kill_srv' )

クライアント用のオブジェクトを返します。

引数 内容
port 接続するサーバのポート番号。
bufmax ソケットのrecv()メソッドで指定するバッファサイズを指定します。
tmout 受信タイムアウト秒数を指定します。
kill_word サーバに設定されている終了用の文字列を指定します。
オブジェクトのメソッド 機能
send_rev( s ) 文字列 s をサーバに送信し、サーバから受信した文字列を返します。
close() サーバと接続してるソケットをクローズします。
kill_srv() サーバにkill_wordを送信してサーバを終了させ、サーバと接続してるソケットをクローズします。

コマンドとしての動作

コマンドライン引数 内容
port=xxxx サーバのポート番号を指定します。
指定がないときのデフォルト値は1234です。
srv ポート番号指定を除いた、引数が無いとき、あるいは先頭の引数が srv のときは、文字列を大文字に変換して返すだけのサーバとして動作します。
srv 以外の引数が指定されているときは、クライアントとして動作します。
その他 クライアントとして動作するときは、指定のサーバのポート番号に接続し、引数の文字列を順次サーバに送信し、サーバからの返信を表示します。
$ ./tcp_ut.py &
[1] 11739
$ ./tcp_ut.py foo bar
FOO
BAR
$ ./tcp_ut.py kill_srv
$
[1]+  Done                    ./tcp_ut.py
$


chat_conn.py

chat_conn.py
#!/usr/bin/env python

import sys
import socket
import time
import threading
import thr
import empty
import io_ut
import dbg
import arg

tmout = 3.0

def str_div(s, delim):
	i = s.index(delim) if delim in s else len(s)
	return (s[:i], s[i+len(delim):])

readable = lambda fd: io_ut.readable(fd, tmout)

def rest_new():
	e = empty.new()
	e.s = ''

	def clear():
		e.s = ''
	def get():
		(r, e.s) = str_div(e.s, '\n')
		return r
	def add_get(s):
		e.s = e.s + s
		return get()

	return empty.to_attr( e, locals() )

def recv_base(fd, rest, dis_conn_func, conn_func=None):
	s = rest.get()
	if s:
		return s
	if conn_func:
		fd = conn_func()
		if not fd:
			return False # no connection
	if not readable(fd):
		return None # timeout
	try:
		bufmax = 8 * 1024
		s = fd.recv(bufmax)
	except:
		return dis_conn_func(False)
	if not s:
		return dis_conn_func(False)
	#try:
	#	s = s.decode('utf-8')
	#except:
	#	pass
	return rest.add_get(s)

def send_base(s, fd, dis_conn_func, conn_func=None):
	if conn_func:
		fd = conn_func()
		if not fd:
			return False # no connection
	try:
		s = s.encode('utf-8')
	except:
		pass
	try:
		fd.sendall(s+'\n')
	except:
		return dis_conn_func(False)
	return True

def conn_lst_new(): # for srv_accept_conn_new
	lst = []

	def add(conn):
		if conn not in lst:
			lst.append(conn)
	def remove(conn):
		if conn in lst:
			lst.remove(conn)
	def get_by_name(name):
		for conn in lst:
			if conn.name == name:
				return conn
		return None

	others = lambda conn: list( filter( lambda o: o.name != conn.name, lst ) )
	names = lambda : list( map( lambda conn: conn.name, lst ) )
	other_names = lambda conn: list( map( lambda o: o.name, others(conn) ) )

	def shutdown():
		start = time.time()
		while time.time() - start < 5:
			lst_ = filter( lambda conn: conn.name, lst )
			if not lst_:
				break
			for conn in lst_:
				conn.send_from('srv', 'shutdown')
			time.sleep(1)

	return empty.new( locals() )

def srv_accept_conn_new(fd, conn_lst):
	e = empty.new_kwds(name=None, lock_name=None)
	cond = threading.Condition()
	rest = rest_new()

	def dis_conn(v=None):
		if e.name:
			send_others('srv', 'logout ' + e.name)
			e.name = None
		rest.clear()
		if fd:
			fd.close()
		conn_lst.remove(e)
		return v

	recv = lambda : recv_base(fd, rest, dis_conn)
	send = lambda s: send_base(s, fd, dis_conn)

	def name_modify(org):
		if ' ' in org:
			org = org.replace(' ', '_')
		id = 2
		name = org
		while conn_lst.get_by_name(name):
			name = org + '_' + str(id)
			id += 1
		return name

	def login():
		if not send('who?'):
			return dis_conn(False)
		name = None
		lmt = time.time() + 30
		while time.time() < lmt:
			name = recv()
			if name != None:
				break
		if not name:
			return dis_conn(False)

		e.name = name_modify(name)

		if not send(e.name):
			return dis_conn(False)

		send_others('srv', 'login ' + e.name)

		return True

	def lock_name_set(name):
		cond.acquire()
		if e.lock_name != name:
			if name != None:
				while e.lock_name != None:
					cond.wait()
				e.lock_name = name
			else:
				e.lock_name = None
				cond.notify()
		cond.release()

	def send_from(from_name, s):
		s = '{}> {}'.format(from_name, s)
		f = False
		cond.acquire()
		if e.lock_name == from_name:
			r = send(s)
		else:
			while e.lock_name != None:
				cond.wait()
			e.lock_name = from_name
			r = send(s)
			e.lock_name = None
			cond.notify()
		cond.release()
		return r

	def send_others(from_name, s):
		if not from_name:
			from_name = 'srv'
		for to in conn_lst.others(e):
			to.send_from(from_name, s)

	def recv_to_names():
		s = recv()
		if not s:
			return s
		to_names = []
		while s.startswith('@'):
			(name, s) = str_div(s[1:], ' ')
			to_names.append(name)
		if not to_names:
			to_names = conn_lst.other_names(e)
		return (s, to_names)

	conn_lst.add(e)
	return empty.to_attr( e, locals() )

def srv_th_func(conn, th_quit, srv_quit):
	if not conn.login():
		return

	conn_lst = conn.conn_lst

	def is_lock_unlock(s):
		r = False
		for k in ('lock', 'unlock'):
			if not s.startswith(k+' '):
				continue
			name = s[len(k)+1:]
			o = conn_lst.get_by_name(name)
			if not o:
				break
			conn.lock_name_set(o.name if k == 'lock' else None)
			o.lock_name_set(conn.name if k == 'lock' else None)
			r = True
		return r

	def srv_proc(s):
		r = "? '{}'".format(s) # err
		if s == 'kill':
			srv_quit.set()
			return
		elif s in ('exit', 'quit', 'bye'):
			th_quit.set()
			return
		elif s == 'names':
			r = ' '.join( conn_lst.names() )
		elif s == 'other_names':
			r = ' '.join( conn_lst.other_names(conn) )
		elif is_lock_unlock(s):
			return
		conn.send_from('srv', r)

	while not th_quit.wait(0):
		r = conn.recv_to_names()
		if r == False:
			break
		if not r: # None or ''
			continue
		(s, to_names) = r

		for to_name in to_names:
			if to_name == 'srv':
				srv_proc(s)
				continue
			to = conn_lst.get_by_name(to_name)
			if to:
				to.send_from(conn.name, s)
	conn.dis_conn()

def srv(port):
	srv_quit = threading.Event()

	ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
	ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
	ss.bind(('', port))
	ss.listen(5)

	ths = []
	conn_lst = conn_lst_new()

	while not srv_quit.wait(0):
		if not readable(ss):
			continue
		(fd, adr) = ss.accept()
		conn = srv_accept_conn_new(fd, conn_lst)

		th = thr.th_new(srv_th_func)
		th.args = (conn, th.quit_ev, srv_quit)
		ths.append(th)
		th.start()

	conn_lst.shutdown()

	ss.close()
	for th in ths:
		th.stop()

def cli_new(host, port, name0, watch_func=None):
	e = empty.new_kwds(fd=None, name=None)

	e.watch_func = watch_func # func(name, 'login' or 'logout')
	others = []
	rest = rest_new()

	def dis_conn(v=None):
		if e.name:
			e.name = None
		rest.clear()
		if e.fd:
			e.fd.close()
			e.fd = None
		return v

	def conn():
		if e.fd == None:
			e.fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
			try:
				e.fd.connect((host, port))
			except:
				time.sleep(tmout)
				return dis_conn(False)
		return e.fd # for recv_base, send_base

	recv = lambda : recv_base(e.fd, rest, dis_conn, conn)
	send = lambda s: send_base(s, e.fd, dis_conn, conn)

	def login():
		if not e.name:
			if recv() != 'who?':
				return dis_conn(False)
			if not send(name0):
				return False
			s = recv()
			if not s:
				return dis_conn(False)
			e.name = s

			if e.watch_func:
				others.extend( others_get() )
				for o in others:
					e.watch_func(o, 'login')
		return e.name

	def send_to(name, s):
		if not login():
			return False
		if name:
			s = '@{} {}'.format(name, s)
		return send(s)

	def split_from(s):
		k = '> '
		return str_div(s, k) if k in s else ('', s)

	def recv_split():
		if not login():
			return False
		s = recv()
		if s in (False, None):
			return s
		(f, s) = split_from(s)

		if e.watch_func and f == 'srv':
			for k in ('login', 'logout'):
				if s.startswith(k+' '):
					name = s[len(k)+1:]
					e.watch_func(name, k)
		return (f, s)

	def recv_from(name):
		r = recv_split()
		if not r:
			return r
		(f, s) = r
		return s if f == name else False # not dis_conn

	lock = lambda name: send_to('srv', 'lock ' + name)
	unlock = lambda name: send_to('srv', 'unlock ' + name)

	def send_recv_name(name, s):
		send_to(name, s)
		return recv_from(name)

	def send_recv_lst_name_lock(name, lst):
		if not lst:
			return []
		lock(name)
		rvs = list( map( lambda s: send_recv_name(name, s), lst ) )
		unlock(name)
		return rvs

	def names_get():
		send_to('srv', 'names')
		s = recv_from('srv')
		return s.split(' ') if s else []

	others_get = lambda : list( filter( lambda name: name != e.name, names_get() ) )

	return empty.to_attr( e, locals() )

if __name__ == "__main__":
	s = arg.new().pop()
	if not s:
		dbg.help_exit( '<port>' )
	srv( int(s) )
# EOF

TCPソケットの通信用です。

TCPソケットのチャット2019夏 で作ったやつです。

io_ut.py を使うように手を入れるべきか...?

2020/FEB/01

io_ut.py と arg.py を使うように少しだけ書き換えてみました。


nkf.py

nkf.py
#!/usr/bin/env python

import sys
import six
import subprocess as sp
import cmd_ut

enc = lambda s: s.encode('utf-8') if six.PY2 else s.encode()
dec = lambda b: unicode(b, 'utf-8') if six.PY2 else b.decode()

def get_stdin():
	fi = sys.stdin if six.PY2 else sys.stdin.buffer
	return fi.read() # b

def put_stdout(b):
	fo = sys.stdout if six.PY2 else sys.stdout.buffer
	fo.write(b)

opt_dic = {
	'ISO-2022-JP': '-j',
	'EUC-JP': '-e',
	'Shift_JIS': '-s',
	'UTF-8': '-u',
	'ASCII': '',
}

which_nkf = []

def get_which_nkf():
	if which_nkf == []:
		which_nkf.append( cmd_ut.call( 'which nkf' ) )
	return which_nkf[ 0 ]

def guess(b, style_opt=False):
	def func():
		if get_which_nkf():
			r = cmd_ut.call_comm( enc('nkf -g'), b )
			return dec( r ).strip()

		for b1 in b:
			if b1 >= b'\x80':
				return 'UTF-8'  # !
		return 'ASCII'

	opt = func()
	return opt_dic.get(opt, '-u') if style_opt else opt

def cvt(b, opt, do_guess=False):
	if opt not in opt_dic.values():
		opt = opt_dic.get(opt, '-u')
	if opt == '':
		return b
	if do_guess:
		from_opt = guess(b, True)
		if from_opt == '' or from_opt == opt:
			return b
	return cmd_ut.call_comm( enc('nkf ' + opt), b )

def to_utf8(b):
	opt = guess(b, True)
	u8 = b if opt in ('-u', '') else cvt(b, '-u')
	return (u8, opt)

def utf8_to(u8, opt):
	return u8 if opt in ('-u', '') else cvt(u8, opt)

def to_str(b):
	(u8, opt) = to_utf8(b)
	return ( dec(u8), opt )

def str_to(s, opt):
	return utf8_to( enc(s), opt )

def str_width(s):
	return sum( map( lambda c: 1 if ord(c) < 0x80 else 2, s ) )

if __name__ == "__main__":
	b = get_stdin()
	opt = guess(b)
	u8 = cvt(b, '-u')
	s = dec(u8)
	# ...
	u8 = enc(s)
	b = cvt(u8, opt)
	put_stdout(b)
# EOF

日本語対応用です。

内部でコマンドnkfを使ってます。 nkfのインストールが必須です。

nkfがインストールされていない環境でも、 ASCIIの範囲外のテキストはUTF-8に限り、 動作するようにしてみました。(2020/OCT/09 追記)

差分
--- nkf.py.0    2020-01-28 22:50:54.000000000 +0900
+++ nkf.py      2020-10-09 22:18:30.000000000 +0900
@@ -24,9 +24,25 @@
 'ASCII': '',
 }

+which_nkf = []
+
+def get_which_nkf():
+       if which_nkf == []:
+               which_nkf.append( cmd_ut.call( 'which nkf' ) )
+       return which_nkf[ 0 ]
+
 def guess(b, style_opt=False):
-       b = cmd_ut.call_comm( enc('nkf -g'), b )
-       opt = dec(b).strip()
+       def func():
+               if get_which_nkf():
+                       r = cmd_ut.call_comm( enc('nkf -g'), b )
+                       return dec( r ).strip()
+
+               for b1 in b:
+                       if b1 >= b'\x80':
+                               return 'UTF-8'  # !
+               return 'ASCII'
+
+       opt = func()
 return opt_dic.get(opt, '-u') if style_opt else opt

 def cvt(b, opt, do_guess=False):

当時、python2, python3 での日本語の扱いの違いを吸収しようと思い、作ってみました。

自分で使う狭い範囲にだけ対応してればまぁ良いかと、安易な事しかしてません。;-p)

はじめに 簡易なHTMLパーサ 2018秋 で作ってから、 字下げツールテキスト配置ツール簡易なおれおれマークダウン 2019秋 と、互換性を保ちつつバージョンアップして使い回してきました。

今回nkfコマンドの実行する箇所だけ cmd_ut.py を使うようにして kon_utに追加しておきます。

概要

main部分の例がほぼ全てです。

if __name__ == "__main__":
	b = get_stdin()
	opt = guess(b)
	u8 = cvt(b, '-u')
	s = dec(u8)
	# ...
	u8 = enc(s)
	b = cvt(u8, opt)
	put_stdout(b)

主な関数

関数 内容
enc(s) 文字列をutf-8にエンコードして返します。
dec(b) utf-8エンコードされたバイト列を文字列にして返します。
get_stdin() 標準入力からバイト列を全部読み込み(!)返します。
put_stdout(b) 標準出力にバイト列bを書き込みます。
guess(b, style_opt=False) バイト列bを'nkf -g'コマンドに入力して、
'UTF-8'や’ISO-2022-JP'といった、エンコード形式も文字列を返します。
style_opt=Trueの場合は、エンコード形式のnkfコマンドに指定するオプションの形式
('-u'や'-j'など)で返します。
cvt(b, opt, do_guess=False) バイト列bのエンコード形式をnkfコマンドのoptオプション('-u'や'-j')に変換します。
do_guess=Trueの場合は、バイト列bのエンコード形式'nkf -g'コマンドで調べて、
変換する必要が無いときはそのままbを返します。
to_utf8(b) バイト列bの元のエンコード形式を調べてから、
utf-8エンコードされたバイト列に変換します。
(utf-8バイト列, 元のエンコード形式のオプション)のタプルを返します。
utf8_to(u8, opt) to_utf8()が返したバイト列u8と元のエンコード形式のオプションを引数として、
to_utf8()で変換する前の元のバイト列に戻して返します。
to_str(b) バイト列bの元のエンコード形式を調べてから、
文字列列に変換します。
(文字列, 元のエンコード形式のオプション)のタプルを返します。
str_to(s, opt) to_str()が返した文字列sと元のエンコード形式のオプションを引数として、
to_str()で変換する前の元のバイト列に戻して返します。
str_width(s) 等幅フォントの端末で、日本語文字の幅がASCII文字の2倍のときを想定して、
文字列sを表示したときに、ASCII文字何文字分の幅になるかを返します。


arg.py

arg.py
#!/usr/bin/env python

import sys
import yaml

import empty
import base
import dbg

def new():
	av = sys.argv[1:] # copy

	idx = lambda s: av.index(s) if s in av else -1

	def idx_startswith(s):
		for i in range( len(av) ):
			if av[i].startswith(s):
				return i
		return -1

	def pop_str(k, dv=''):
		i = idx(k)
		if i >= 0 and i+1 < len(av):
			av.pop(i) # skip k
			v = av.pop(i)
			return v
		i = idx_startswith(k)
		if i >= 0:
			v = av.pop(i)
			return v[ len(k): ]
		return dv

	def pop_v(k, dv=None):
		s = pop_str(k)
		if s == '':
			return dv
		return yaml.load(s)

	def pop(dv='', avoid_k='-'):
		for i in range( len(av) ):
			if avoid_k and av[i].startswith(avoid_k):
				continue
			return av.pop(i)
		return dv

	def is_pop(s):
		i = idx(s)
		if i < 0:
			return False
		av.pop(i)
		return True

	get_av = lambda : av

	return empty.new( locals() )


def cmd_new(name, args=[], opts={}, comment='', **kwds):
	return empty.new( name=name, args=args, opts=opts, comment=comment, **kwds )

def get_name_args(cmds, a=None):

#	cmds = [
#		cmd_new( 'create' ),
#		cmd_new( 'commit' ),
#		cmd_new( 'log' ),
#		cmd_new( 'curr', comment='show current id' ),
#		cmd_new( 'ids', comment='show all id' ),
#		cmd_new( 'tails', comment='show tail ids' ),
#		cmd_new( 'checkout', [ 'id' ] ),
#		cmd_new( 'check', comment='show dirty' ),
#		cmd_new( 'fixlink', comment='fix link info' ),
#		cmd_new( 'push', [ 'to_path' ] ),
#		cmd_new( 'pull', [ 'from_path' ] ),
#	]
#
#	cmd = get_name_args( cmds )
#
#	obj has obj.create(), obj.commit() ...
#
#	func = getattr( obj, cmd.name )
#	func( *cmd.args )

	dic = dict( map( lambda cmd: ( cmd.name, cmd ), cmds ) )

	def get_help_str(cmd, max_n=0):
		def cnv(v):
			if v == '':
				return '""'
			if v is False:
				return ''
			return v

		def f(kv):
			(k, v) = kv
			lst = [ '[', '-' + k, str( cnv( v ) ), ']' ]
			return base.to_str( filter( lambda e: e, lst ), delim=' ' )

		opts = list( map( f, cmd.opts.items() ) )
		lst = [ cmd.name ] + opts + cmd.args
		s = base.to_str( lst, delim=' ' )

		if max_n < 0: # auto
			max_n = len( s )

		if max_n > 0 and cmd.comment:
			s += ' ' * ( max_n - len( s ) )
			s += ' # ' + cmd.comment
		return s

	def help():
		msg = 'cmd arg ...\n'
		max_n = max( map( lambda cmd: len( get_help_str( cmd ) ), cmds ) )
		msg += base.to_str( map( lambda cmd: get_help_str( cmd, max_n ), cmds ), '  ' )
		dbg.help_exit( msg )

	if not a:
		a = new()
	name = a.pop()
	if name not in dic:
		help()

	cmd = dic.get( name )

	def pop_key_v(key, dv):
		if dv is False:
			return a.is_pop( key )
		return a.pop_v( key, dv )

	opts = cmd.opts.copy()
	for (k, dv) in opts.items():
		key = '-' + k # signel '-'
		v = pop_key_v( key, dv )
		if v != dv:
			opts[ k ] = v

	n = len( cmd.args )
	av = a.get_av()
	if len( av ) < n:
		help()

	args = list( map( lambda i: av.pop( 0 ), range( n ) ) )
	s = get_help_str( cmd, -1 )

	return cmd_new( cmd.name, args, opts, help=s, a=a )
# EOF

引数用です。

コマンドライン引数のsys.argvの リストの先頭のコマンド自身の文字列を除外した分について、 自分がよく使う引数取得処理をまとめてます。

引数を取得するごとに削除していく方式なので、 状態を持つため、一応クロージャの形式にしてます。

new()

sys.argv[1:]のリストについて処理するオブジェクトを返します。

オブジェクトの主なメソッド

メソッド 内容
idx(s) 内部で保持してる残っている引数のリストについて、
文字列sに合致する引数があれば、そのインデックスを返します。
合致するものがなければ -1 を返します。
idx_startswith(s) 内部で保持してる残っている引数のリストについて、
先頭が文字列sで始まる引数があれば、そのインデックスを返します。
合致するものがなければ -1 を返します。
pop_str(k, dv='') 内部で保持してる残っている引数のリストについて、
文字列kに合致する文字列があれば、その直後に続く引数文字列を返します。
その際、内部の引数のリストからkとその直後に続く文字列を削除します。

文字列kに合致する引数が無い場合、先頭が文字列kで始まる文字列を探します。
見つかれば、その文字列のk以降の部分を返します。
その際、内部の引数のリストから文字列を削除します。

見つからなければ、デフォルト値dvを返します。
pop_v(k, dv=None) pop_str(k)を呼び出して、文字列kに対応する文字列を取得します。
取得した文字列はyaml.load()にかけて、
可能なものは文字列から値に変換して返します。
対応する文字列が見つからなければ、デフォルト値dvを返します。
pop(dv='', avoid_k='-') 内部で保持してる残っている引数のリストについて、
先頭の文字列を返します。
その際、内部の引数のリストから文字列を削除します。

候補の文字列がavoid_kで指定された文字列で始まる場合は、
avoid_kで始まらない文字列をリストの先頭から調べます。
適切な文字列が見つからなければ、デフォルト値dvを返します。
is_pop(s) 内部で保持してる残っている引数のリストについて、
文字列sに合致する文字列があれば、Trueを返します。
その際、内部の引数のリストから文字列を削除します。
合致する文字列が無ければ、Falseを返します。
get_av() 内部で保持してる残っている引数のリストを返します。
$ python - -t hoge -v10 --special=True
>>> import arg
>>> a = arg.new()
>>> a.pop_str('-t')
'hoge'
>>> a.pop_v('-v')
10
>>> a.pop_v('--special=')
True

cmd_new(name, args=[], opts={}, comment='', **kwds)

引数をempty.new()で、のまま属性にセットしたオブジェクトを返します。

その実体は

def cmd_new(name, args=[], opts={}, comment='', **kwds):
	return empty.new( name=name, args=args, opts=opts, comment=comment, **kwds )

だけです。

get_name_args()に与えるコマンド一覧の情報を作成するときや、 get_name_args()が返す値として利用します。

get_name_args(cmds, a=None)

引数にコマンド一覧の情報cmdsと、 コマンドライン引数new()結果aを与えておいて、 処理します。

コマンドライン引数new()結果は、省略すると内部でnew()を実行します。

合致するコマンド(関数名)と、コマンドラインで指定された引数を、 cmd_new()で返されるオブジェクトで返します。

コマンドラインの指定が、コマンド一覧の情報のどれにも合致しない場合は、 コマンド一覧の情報からヘルプメッセージ生成して、 dbg.help_exit() を呼び出して表示して終了します。

使用例

arg_sample.py
#!/usr/bin/env python

import arg
import dbg

def add(a, b):
	return a + b

def inc(a):
	return add( a, 1 )

def dec(a, step, n):
	return a - step * n

cmds = [
	arg.cmd_new( 'add', [ 'v1', 'v2' ], comment='add v1 v2' ),
	arg.cmd_new( 'inc', [ 'v' ], comment='increment v' ),
	arg.cmd_new( 'dec', [ 'v' ], opts={ 'step': 1, 'n': 1 }, comment='v - step * n' ),
]

cmd = arg.get_name_args( cmds )

dbg.out( 'cmd.name={} cmd.args={} cmd.opts={}'.format( cmd.name, cmd.args, cmd.opts ) )

func = eval( cmd.name )

args = list( map( float, cmd.args ) )

dbg.out( 'func={} args={}'.format( func, args ) )

ret = func( *args, **cmd.opts )

dbg.out( 'ret={}'.format( ret ) )

# EOF

opts には、キーが変数名、値がデフォルト値の辞書を指定します。

コマンドライン引数では、ハイフン'-'にキーの変数名を追加した文字列の値に、指定の値を並べます。

取得にpop_v()を利用しているので、例えば'-v 1'でも'-v1'でも可能です。

実行例

$ ./arg_sample.py
Usage: ./arg_sample.py cmd arg ...
  add v1 v2                  # add v1 v2
  inc v                      # increment v
  dec [ -step 1 ] [ -n 1 ] v # v - step * n
$ ./arg_sample.py add 1.2 3.4
cmd.name=add cmd.args=['1.2', '3.4'] cmd.opts={}
func=<function add at 0x10e24ff28> args=[1.2, 3.4]
ret=4.6
$ ./arg_sample.py inc 9
cmd.name=inc cmd.args=['9'] cmd.opts={}
func=<function inc at 0x10c19fb70> args=[9.0]
ret=10.0

コマンドや引数が間違ってると

$ ./arg_sample.py add 1
Usage: ./arg_sample.py cmd arg ...
  add v1 v2                  # add v1 v2
  inc v                      # increment v
  dec [ -step 1 ] [ -n 1 ] v # v - step * n
$ ./arg_sample.py sub 2 1
Usage: ./arg_sample.py cmd arg ...
  add v1 v2                  # add v1 v2
  inc v                      # increment v
  dec [ -step 1 ] [ -n 1 ] v # v - step * n

ただし、

$ ./arg_sample.py add 1 2 3
cmd.name=add cmd.args=['1', '2']
func=<function add at 0x10d7adf28> args=[1.0, 2.0]
ret=3.0

引数が多い分には文句を言いません。

'-'によるオプション指定の例

$ ./arg_sample.py dec 10
cmd.name=dec cmd.args=['10'] cmd.opts={'step': 1, 'n': 1}
func=<function dec at 0x104761bf8> args=[10.0]
ret=9.0

$ ./arg_sample.py dec -step 0.5 10
cmd.name=dec cmd.args=['10'] cmd.opts={'step': 0.5, 'n': 1}
func=<function dec at 0x1096c6bf8> args=[10.0]
ret=9.5

$ ./arg_sample.py dec -step 0.5 -n 3 10
cmd.name=dec cmd.args=['10'] cmd.opts={'step': 0.5, 'n': 3}
func=<function dec at 0x107c9dbf8> args=[10.0]
ret=8.5

オプション指定の有無だけの場合 (2020/MAY/02追加)

オプション指定で、デフォルト値がFalseの場合は、 ハイフン'-'にキーの変数名を追加した文字列があった場合は、 Trueとする仕様にします。

$ python - hoge
>>> import arg
>>> cmds = [ arg.cmd_new( 'hoge', opts={ 'verb': False, 'cnt': 10 } ) ]
>>> cmd = arg.get_name_args( cmds )
>>> cmd.name
'hoge'
>>> cmd.opts
{'verb': False, 'cnt': 10}
$ python - hoge -verb -cnt 10
>>> import arg
>>> cmds = [ arg.cmd_new( 'hoge', opts={ 'verb': False, 'cnt': 10 } ) ]
>>> cmd = arg.get_name_args( cmds )
>>> cmd.name
'hoge'
>>> cmd.opts
{'verb': True, 'cnt': 10}

実行するコマンドのヘルプメッセージ (2020/MAY/03追加)

返すcmdにそのコマンドのヘルプメッセージを追加しました。

cmd_new()は変更せずに、返すcmdオブジェクトにプロパティhelpを追加し、 ヘルプメッセージの文字列をセットしてます。

$ python - hoge -verb -cnt 10
>>> import arg
>>> cmds = [ arg.cmd_new( 'hoge', opts={ 'verb': False, 'cnt': 10 } ) ]
>>> cmd = arg.get_name_args( cmds )
>>> cmd.name
'hoge'
>>> cmd.help
'hoge [ -verb ] [ -cnt 10 ]'

残りのコマンドライン引数 (2020/MAY/06追加)

余っているコマンドライン引数を、返すcmdオブジェクトにプロパティaを追加し、 new()で生成されたオブジェクをセットしてます。

$ python - hoge -verb -cnt 10 abc def
>>> import arg
>>> cmds = [ arg.cmd_new( 'hoge', opts={ 'verb': False, 'cnt': 10 } ) ]
>>> cmd = arg.get_name_args( cmds )
>>> cmd.name
'hoge'
>>> cmd.a
<empty.Empty object at 0x10e509c18>
>>> cmd.a.get_av()
['abc', 'def']

オプション指定の有無だけの場合を修正 (2020/MAY/24追加)

pythonのちょっとしたメモ 「boolと整数と辞書」 を書いておきながら、それにハマってました。

修正しました。

arg_sample2.py
#!/usr/bin/env python

import arg
import dbg

def show(n):
	dbg.out( 'n={}'.format( n ) )

cmds = [
	arg.cmd_new( 'show', opts={ 'n': 0 } )
]

cmd = arg.get_name_args( cmds )
f = eval( cmd.name )
f( **cmd.opts )

# EOF

修正前

$ ./arg_sample2.py
Usage: ./arg_sample2.py cmd arg ...
  show [ -n 0 ]

$ ./arg_sample2.py show
n=0

$ ./arg_sample2.py show -n 1
n=True

$ ./arg_sample2.py show -n 2
n=True

修正後

$ ./arg_sample2.py
Usage: ./arg_sample2.py cmd arg ...
  show [ -n 0 ]

$ ./arg_sample2.py show
n=0

$ ./arg_sample2.py show -n 1
n=1

$ ./arg_sample2.py show -n 2
n=2


term_ut.py

term_ut.py
#!/usr/bin/env python

import os
import empty
import cmd_ut
import io_ut

def esc_seq_new():
	esc = lambda s: chr(0x1b) + '[' + s
	esc_ex = lambda d, v: esc( '?{}{}'.format(d, 'h' if v else 'l' ) )
	cursor_show = lambda v: esc_ex( 25, v )
	mov = lambda x, y: esc( '{};{}H'.format(y+1, x+1) )
	scroll = lambda m: esc( '{}{}'.format( abs(m), 'S' if m > 0 else 'T' ) )
	cls = esc('2J')
	reset = esc('0m')
	rev = esc('7m')
	uline = esc('4m')
	return empty.new( locals() )

def get_wh():
	w = int( cmd_ut.call('tput cols').strip() )
	h = int( cmd_ut.call('tput lines').strip() )
	return (w, h)

dev = '/dev/tty'

def stty(s, v=True):
	if not v:
		s = '-' + s
	cmd = 'stty {} < {}'.format(s, dev)
	cmd_ut.call(cmd)

def new():
	esc_seq = esc_seq_new()

	fr = open(dev, 'r')
	fw = open(dev, 'w')

	def get_xy():
		cmds = [
			#'echo -en "\e[6n"',
			'echo -en "\x1b[6n"',
			'read -sd"[" dmy',
			'read -sd"R" row_col',
			'echo -n $row_col >&2',
		]
		cmd = "bash -c '{}'".format( '\n'.join(cmds) )

		proc = cmd_ut.proc_new(cmd, stderr=cmd_ut.PIPE)
		s = proc.communicate()[1].decode()
		proc.wait()

		(row, col) = map( int, s.split(';') )
		(x, y) = (col-1, row-1)
		return (x, y)

	def disp_new():
		(w, h) = wh = get_wh()

		def out(s):
			fw.write(s)
			fw.flush()
			return s

		cursor_show = lambda v: out( esc_seq.cursor_show(v) )
		mov = lambda x, y: out( esc_seq.mov(x, y) )
		scroll = lambda m: out( esc_seq.scroll(m) )
		cls = lambda : out( esc_seq.cls )
		reset = lambda : out( esc_seq.reset )
		rev = lambda : out( esc_seq.rev )
		uline = lambda : out( esc_seq.uline )

		return empty.new( locals(), get_xy=get_xy )

	disp = disp_new()

	def key_new():
		def init():
			stty( 'echo', False )
			stty( 'icanon', False )
			disp.cursor_show( False )

		def fini():
			disp.cursor_show( True )
			stty( 'icanon', True )
			stty( 'echo', True )

		readable = lambda : io_ut.readable( fr )

		def get_block():
			s = os.read( fr.fileno(), 10 )
			if type(s) != str:
				s = ''.join( map( chr, s ) ) # !!! decode()?
			return s

		get = lambda : get_block() if readable() else ''


		ctl = lambda s: chr( 1 + ord(s) - ord('a') ) # control key
		esc = esc_seq.esc # esc key

		allow_tbls = ((
			( 'u', 'd', 'l', 'r' ),
			( esc('A'), esc('B'), esc('D'), esc('C') ), # allow
			( ctl('p'), ctl('n'), ctl('b'), ctl('f') ), # emacs
			( 'k', 'j', 'h', 'l' ), # vi
		), (
			# PAGE
			( 'pu', 'pd', 'pl', 'pr' ),
			( esc('5~'), esc('6~'), esc('8~'), esc('7~') ), # allow
			( chr(0xf6), ctl('v'), ctl('a'), ctl('e') ), # emacs
			( 'b', 'f', '^', '$' ), # less, vi
		))

		allow_dic = {}
		for tbl in allow_tbls:
			lst0 = tbl[0]
			for lst in tbl[1:]:
				for (k, v) in zip(lst, lst0):
					allow_dic[k] = v

		k_to_allow = lambda k, dv='': allow_dic.get( k, dv )
			# return 'u', 'd', 'l', 'r', 'pu', 'pd', pl', 'pr'

		dxy_dic = { 'l': (-1,0), 'r': (1,0), 'u': (0,-1), 'd': (0,1) }
		d_to_dxy = lambda d, dv=None: dxy_dic.get( d, dv )

		k_to_dxy = lambda k, dv=None: d_to_dxy( k_to_allow(k), dv )

		is_ok = lambda k: k in ('\n', ' ')
		is_cancel = lambda k: k in ( chr(0x1b) )
		is_allow = lambda k: k_to_allow(k) != ''

		return empty.new( locals() )

	key = key_new()


	def buff_new(w, h):

		def c_new(c=' ', rev=False, uline=False):

			def set_attr(prev):
				if rev == prev.rev and uline == prev.uline:
					return
				if rev == False or uline == False:
					disp.reset()
				if rev:
					disp.rev()
				if uline:
					disp.uline()

			def reset(disp):
				if rev or uline:
					disp.reset()

			return empty.new( locals() )

		def arr_xy_new(w, h):
			arr = list( map( lambda y: list( map( lambda x: c_new(), range( w ) ) ), range( h ) ) )

			get = lambda x, y: arr[y][x]

			def set(x, y, c):
				arr[y][x] = c
				return c

			return empty.new( locals() )

		arr_xy = arr_xy_new( w, h )

		def ids_new(w, h):
			xy_to_id = lambda x, y: y * w + x
			id_to_xy = lambda id: ( id % w, id // w )

			lst = []

			def add(x, y):
				id = xy_to_id( x, y )
				if id not in lst:
					lst.append( id )
					lst.sort()

			get = lambda : list( map( id_to_xy, lst ) )

			def clear():
				del lst[:]

			return empty.new( locals() )

		ids = ids_new( w, h )

		def init_by():
			(bx, by) = disp.get_xy()
			sc = max( h - ( disp.h - by ), 0 )
			if sc > 0:
				disp.scroll( sc )
				by -= sc
			return by

		by = init_by()

		def putc(x, y, c):
			prev = arr_xy.get( x, y )
			if vars( prev ) != vars( c ):
				arr_xy.set( x, y, c )
				ids.add( x, y )

		def fill(x, y, w_, h_, c):
			for j in range( h_ ):
				for i in range( w_ ):
					x_ = ( x + i ) % w
					y_ = ( y + j ) % h
					putc( x_, y_, c )

		def puts(x, y, s):
			# \tr1 \tr0 \tu0 \tu1

			lst = s.split('\n')
			(rev, uline) = ( False, False )
			for (j, s) in enumerate( lst ):
				i = 0
				while s:
					(c, s) = ( s[0], s[1:] )
					if c == '\t' and len(s) >= 2:
						(k, v, s) = ( s[0], s[1], s[2:] )
						v = ( v != '0' )
						if k == 'r':
							rev = v
						elif k == 'u':
							uline = v
						continue
					putc( x + i, y + j, c_new( c, rev, uline ) )
					i += 1

		def update():
			disp.reset()
			(cx, cy, cc) = ( w, h, c_new() )
			for (x, y) in ids.get():
				if not( y == cy and x == cx ):
					disp.mov( x, by + y )
					(cx, cy) = ( x, y )
				c = arr_xy.get( x, y )
				c.set_attr( cc )
				disp.out( c.c )
				(cx, cy, cc) = ( cx + 1, y, c )
			ids.clear()
			cc.reset( disp )

		def fini():
			disp.reset()
			disp.mov( w - 1, by + h - 1 )
			disp.out( '\n' )

		return empty.new( locals() )

	return empty.new( locals() )
# EOF

端末へのエスケープシーケンスの表示やキー入力用です。

テキスト配置ツール で作った term.py と key.py を1つにまとめてみました。

buff_new(x, h)を追加しました。2020/FEB/23

esc_seq_new()

いわゆるエスケープシーケンス文字列を返すメソッドや値を属性に持つ、 オブジェクトを返します。

オブジェクトの主なメソッドまたは値

メソッドまたは値 内容
cursor_show(v) カーソル表示のON/OFFの文字列です。
vにTrueかFalseを指定します。
mov(x, y) カーソル移動の文字列です。
x, y は「0起源の値」を指定します。
scroll(m) 画面のスクロール動作の文字列です。
mは正または負の整数を指定します。
cls 画面クリアの文字列です。
reset 表示装飾のリセット用の文字列です。
rev 反転表示用の文字列です。
uline 下線表示用の文字列です。

色関係は無し ;-p)

>>> import term_ut
>>> e = term_ut.esc_seq_new()
>>> s = e.mov( 3, 4 )
>>> print(s)

>>> print( e.cls )

new()

「端末表示用のオブジェクトと、 キー入力用のオブジェクトを属性に持つ」 オブジェクトを返します。

>>> ipmort term_ut
>>> term = term_ut.new()
>>> disp = term.disp
>>> key = term.key
  :

buff_new(x, h)を追加しました。2020/FEB/23

>>> buff = term.buff_new(80, 25)

term.disp

端末表示用のオブジェクトです。

オブジェクトの主なメソッドまたは値

メソッドまたは値 内容
out(s) 文字列sを表示します。
cursor_show(v) カーソル表示のON/OFFです。
vにTrueかFalseを指定します。
mov(x, y) カーソル移動です。
x, y は「0起源の値」を指定します。
scroll(m) 画面のスクロール動作です。
mは正または負の整数を指定します。
cls() 画面クリアです。
reset() 表示装飾のリセット用です。
rev() 反転表示用です。
uline() 下線表示用です。
w 端末画面の幅です。
h 端末画面の高さです。
wh (w, h)のタプルが返ります。
get_xy() 現在のカーソル位置を(x, y)のタプルで返します。
>>> import term_ut
>>> term = term_ut.new()
>>> term.disp.mov(3, 4)
>>> term.disp.out('hoge')

term.key

キー入力用のオブジェクトです。

オブジェクトの主なメソッド

メソッド 内容
init() 「直接キー入力」モードに入ります。
キー入力の画面のエコーバックやカーソル表示を消します。
fini() 「直接キー入力」モードを終了します。
キー入力の画面のエコーバックやカーソル表示の設定を元に戻します。
get_block() キー入力があるまで待機(ブロック)して、入力されたキーを返します。
get() 入力されたキーを返します。入力が無ければ空文字列''を返します。
k_to_allow(k, dv='') kで指定したキーが矢印キーか判定して種類を返します。
kにはget()やget_block()で取得したキーを指定します。
kが矢印キーでなければdvを返します。
(矢印キーの種類は後述)
d_to_dxy(d, dv=None) dで指定した矢印キーの種類に対応する、(dx, dy)のタプルを返します。
dx, dy は -1, 0, 1 の値を取ります。
dが矢印キーの種類でなければdvを返します。
k_to_dxy(k, dv=None) d_to_dxy( k_to_allow(k), dv ) を返します。;-p)
kで指定したキーが矢印キーならば、
その種類に対応する、(dx, dy)のタプルで返します。
dx, dy は -1, 0, 1 の値を取ります。
矢印キーでなければdvを返します。
is_ok(k) kが「リターンキー」か「スペースキー」ならばTrueを、
そうでなければFalseを返します。
is_cancel(k) kが「ESCキー」ならばTrueを、
そうでなければFalseを返します。
is_allow(k) kが矢印キーならばTrueを、
そうでなければFalseを返します。

矢印キーの種類

種類 種類の値 対応してるキー (dx, dy)
'u' 上矢印
コントロール+P
K
(0, -1)
'd' 下矢印
コントロール+N
J
(0, 1)
'l' 左矢印
コントロール+B
H
(-1, 0)
'r' 右矢印
コントロール+F
L
(1, 0)
ページアップ 'pu' ALT+V
B
-
ページダウン 'pd' コントロール+V
F
-
ページ左 'pl' コントロール+A
^
-
ページ右 'pr' コントロール+E
$
-

buff_new(w, h)

サイズ w, h を指定して、描画バッファのオブジェクトを生成して返します。

主なメソッド

メソッド 内容
c_new(c=' ', rev=False, uline=False) 1文字cと反転と下線の有無を指定したオブジェクトを生成して返します。
fill(x, y, w_, h_, c) バッファの(x, y, w_, h_)で指定した矩形の領域をcで埋めます。
cにはc_new()で生成したオブジェクトを指定します。
puts(x, y, s) バッファ上に(x, y)の位置から文字列sを描画します。
文字列中には後述のフォーマットで装飾が指定できます。
改行コードがあると、現在位置を次の行のxの位置に移動します。
update() バッファの状態を画面(term.disp)に反映します。
前回のupdate()呼び出しから、fill()あるいはputs()などで更新された、
必要な領域のみが画面に反映されます。
fini() 終了処理です。
文字の装飾を解除し、画面のカーソルをバッファ領域の下側に移動します。

puts()で指定可能な文字列の装飾指定

文字列中に'\t'が現れると、続く2文字は装飾指定と解釈されます。

よって、'\t'自身は画面に出力する事はあきらめてください。;-p)

続く2文字が無い場合、動作は保証されません。

'\t'に続く2文字の指定

2文字 内容
r1 反転開始
r0 反転終了
u1 下線開始
u0 下線終了

文字列の開始は装飾なしの状態から始まります。

2種類の装飾の指定は独立です。

例えば、

'abc\tr1def\tu1ghi\tr0jkl\tu0mno'

の指定では、次の装飾になります。

文字列 装飾
abc 装飾なし
def 反転
ghi 反転+下線
jkl 下線
mno 装飾なし

term_buff_sample.py
#!/usr/bin/env python

import term_ut

if __name__ == "__main__":
	term = term_ut.new()
	buff = term.buff_new( 32, 16 )
	buff.fill( 0, 0, 24, 12, buff.c_new( '.' ) )
	buff.fill( 4, 4, 24, 12, buff.c_new( '-' ) )

	lst = [
		'\tr1' + 'hello' + '\tr0',
		'',
		'foo',
		'',
		'\tu1' + 'bar' + '\tr1' + 'hoge' + '\tr0' + 'fuga',
	]
	s = '\n'.join( lst )
	buff.puts( 2, 2, s )
	buff.update()
	buff.fini()
# EOF


tm_ut.py

時間関連です。

tm_ut.py
#!/usr/bin/env python

import sys
import os
import time
import base

now_sec = lambda : time.time()

sample = '2020-01-23_12.34.56'

len_digit = lambda s: len( base.filter_str( lambda c: c.isdigit(), s ) )

def is_format_delim(delim):
	if len( delim ) <= 1:
		return False
	n = len_digit( delim )
	smp_n = len_digit( sample )
	return n == smp_n or n == smp_n - 2

def sec_to_str(sec, delim=''):
	if is_format_delim( delim ):
		s = sec_to_str( sec ) # 202001223123456
		if len_digit( delim ) < len( s ):
			s = s[ 2: ]
		lst = list( s ) # [ '2', '0', ... ]
		f = lambda c: lst.pop( 0 ) if c.isdigit() else c
		return base.to_str( map( f, delim ), delim='' )

	t = time.localtime(sec)

	def v_to_s(v):
		s = str(v)
		w = 4 if len(s) > 2 else 2
		return ('0' * w + s)[-w:]

	return delim.join( map( v_to_s, t[:6] ) )

def str_to_sec(s):
	s = ''.join( map( lambda c: c if c.isdigit() else '-', s ) )

	lst = []
	if '-' in s:
		lst = list( map( int, s.split('-') ) )
	else:
		div = lambda s, n: ( s[:n], s[n:] )

		if len(s) > 2 * 6:
			(t, s) = div(s, 4)
			lst.append( int(t) )
		while s:
			(t, s) = div(s, 2)
			lst.append( int(t) )
	while len(lst) < 9:
		lst.append(0)

	y = lst[0]
	if y < 100:
		lst[0] += 2000 if y < 50 else 1900

	return time.mktime( time.struct_time(lst) )

def is_date_time_str(s, delim=''):
	if not is_format_delim( delim ):
		delim = sec_to_str( 0, delim )
		if len( s ) == len( delim ) - 2:
			delim = delim[ 2: ]

	if len( s ) != len( delim ):
		return False

	def f(st):
		(s, t) = st
		return s.isdigit() if t.isdigit() else s == t
	return all( map( f, zip( s, delim ) ) )

def sec_to_fn(pre, sec, ext=''):
	name = pre + sec_to_str(sec)
	return name + ( '.' + ext if ext else '' )

def fn_to_sec(fn):
	(name, ext) = os.path.splitext(fn)
	tail_num = lambda s: tail_num( s[:-1] ) + s[-1] if s and s[-1].isdigit() else ''
	return str_to_sec( tail_num(name) )

mon3_lst = [ 'dmy', 'JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC' ]

def get_mon3(str_or_int):
	v = str_or_int
	if type( v ) != int:
		v = int( v )
	return mon3_lst[ v ]

def date432():
	s = sec_to_str( now_sec(), delim='0000 00 00 000000' )
	lst = s.split()[ : 3 ]
	lst[ 1 ] = get_mon3( lst[ 1 ] )
	return '/'.join( lst )


def tm60_to_int(tm60):
	if tm60 < 0:
		return -tm60_to_int( -tm60 )

	(H, L) = ( tm60 // 100, tm60 % 100 )
	(H, L) = ( H + L // 60, L % 60 )

	if H > 0:
		H = tm60_to_int( H ) * 60
	return H + L

def int_to_tm60(v):
	if v < 0:
		return -int_to_tm60( -v )

	(H, L) = ( v // 60, v % 60 )

	if H >= 60:
		H = int_to_tm60( H )
	return H * 100 + L

def tm60_to_str(tm60):
	if tm60 < 0:
		return "-" + tm60_to_str( -tm60 )

	f = lambda s: ':'.join( ( f( s[ : -2 ] ), s[ -2 : ] ) ) if len( s ) > 2 else s
	return f( str( tm60 ) )

def str_to_tm60(s):
	if s.startswith( '-' ):
		return -str_to_tm60( s[ 1 : ] )
	sum = 0
	for s in s.split( ':' ):
		sum = sum * 100 + int( s )
	return sum

tm60_str_to_int = lambda s: tm60_to_int( str_to_tm60( s ) )

tm60_int_to_str = lambda v: tm60_to_str( int_to_tm60( v ) )

def is_tm60_str(s):
	if s.startswith( '-' ):
		s = s[ 1 : ]
	s = s.replace( ':', '' )
	return s.isdigit()

def tm60_show_sum(tm60_str_lst):
	lst = list( map( tm60_str_to_int, tm60_str_lst ) )

	n = len( lst )
	dif_lst = list( map( lambda i: lst[ i + 1 ] - lst[ i ], range( n - 1 ) ) )

	(a, b) = ( [], [] )
	for i in range( n - 1 ):
		t = a if i % 2 == 0 else b
		t.append( dif_lst[ i ] )

	sum_a = sum( a )
	sum_b = sum( b )
	sum_ab = sum_a + sum_b

	f = lambda lst: list( map( tm60_int_to_str, lst ) )

	lst = f( lst )
	dif_lst = f( dif_lst )
	( sum_a, sum_b, sum_ab ) = f( ( sum_a, sum_b, sum_ab ) )

	for i in range( n ):
		print( lst[ i ] )
		if i < n - 1:
			print( '\t' * ( i % 2 + 1 ) + dif_lst[ i ] )

	print( '-' * 16 )
	print( '\t' + sum_a )
	print( '\t\t' + sum_b )
	print( sum_ab )

if __name__ == "__main__":
	s = sys.stdin.read()
	s = s.replace( '-', ' ' )
	lst = s.split()
	lst = list( filter( is_tm60_str, lst ) )
	tm60_show_sum( lst )

# EOF

now_sec()

今の秒数を返します。

time.time() そのままです。

>>> import tm_ut

>>> tm_ut.now_sec()
1581139702.9195719

sec_to_str(sec, delim='')

秒数を「日付時刻」の数字の文字列にして返します。

>>>tm_ut.sec_to_str(0)
'19700101090000'

>>> tm_ut.sec_to_str(0, '-')
'1970-01-01-09-00-00'

>>> sec = tm_ut.now_sec()
>>> sec
1581139799.499052

>>> tm_ut.sec_to_str(sec)
'20200208142959'

>>> tm_ut.sec_to_str(sec, '.')
'2020.02.08.14.29.59'

引数delimの仕様を拡張しました。2020/APR/05

2文字以上の長さのサンプルの文字列を与えると、そのサンプルの形式の文字列で返します。

2文字以上でも、サンプル中に数字が12文字以上ないと、 サンプルと見なさずに、従来通りの「区切り」用の文字列として扱います。

sample = '2020-01-23_12.34.56'

というサンプルの定数も追加しました。

sec = 0

>>> tm_ut.sec_to_str( sec )
'19700101090000'

>>> tm_ut.sec_to_str( sec, '_' )
'1970_01_01_09_00_00'

>>> tm_ut.sec_to_str( sec, tm_ut.sample )
'1970-01-01_09.00.00'

>>> tm_ut.sec_to_str( sec, '[2000/01/01 01:01:01]' )
'[1970/01/01 09:00:00]'

>>> sec = tm_ut.now_sec()
>>> sec
1586061744.445481

>>> tm_ut.sec_to_str( sec )
'20200405134224'

>>> tm_ut.sec_to_str( sec, '_' )
'2020_04_05_13_42_24'

>>> tm_ut.sec_to_str( sec, tm_ut.sample )
'2020-04-05_13.42.24'

>>> tm_ut.sec_to_str( sec, '[2000/01/01 01:01:01]' )
'[2020/04/05 13:42:24]'

>>> tm_ut.sec_to_str( sec, '[20/01/01 01:01:01]' )
'[20/04/05 13:42:24]'

>>> tm_ut.sec_to_str( sec, '20.01.01 / 01:01:01' )
'20.04.05 / 13:42:24'

str_to_sec(s)

sec_to_str()で生成した文字列から秒数に戻して返します。

>>> sec = tm_ut.now_sec()
>>> sec
1581139799.499052

>>> tm_ut.sec_to_str(sec)
'20200208142959'

>>> tm_ut.str_to_sec('20200208142959')
1581139799.0

>>> tm_ut.sec_to_str(1581139799.0)
'20200208142959'

>>> tm_ut.str_to_sec('2020/02/08/14/29/59')
1581139799.0

>>> tm_ut.sec_to_str( tm_ut.str_to_sec('2020/02/08 14:29:59') )
'20200208142959'

>>> tm_ut.sec_to_str( tm_ut.str_to_sec('20/02/08 14:29:59'), '-' )
'2020-02-08-14-29-59'

>>> tm_ut.sec_to_str( tm_ut.str_to_sec('98/02/08 14:29:59'), '-' )
'1998-02-08-14-29-59'

>>> tm_ut.sec_to_str( tm_ut.str_to_sec('980208142959'), '.' )
'1998.02.08.14.29.59'

>>> tm_ut.sec_to_str( tm_ut.str_to_sec('010208142959'), ' ' )
'2001 02 08 14 29 59'

>>> tm_ut.sec_to_str( tm_ut.str_to_sec('2001 02 08 14 29 59'), ':' )
'2001:02:08:14:29:59'

is_date_time_str(s, delim='')

文字列がsec_to_str()で生成された文字列かどうか判定します。

>>> tm_ut.is_date_time_str( 'foo' )
False

>>> tm_ut.is_date_time_str( '200405012345' )
True

>>> tm_ut.is_date_time_str( '20200405012345' )
True

>>> tm_ut.is_date_time_str( '2020-04-05-01-23-45', '-' )
True

>>> tm_ut.is_date_time_str( '20-04-05-01-23-45', '-' )
True

>>> tm_ut.sample
'2020-01-23_12.34.56'

>>> tm_ut.is_date_time_str( '2000-02-22_01.23.45', tm_ut.sample )
True

sec_to_fn(pre, sec, ext='')

秒数secを日付時刻にした数字の文字列を含むファイル名を返します。

引数 内容
pre 先頭部分の文字列
sec 秒数
ext 拡張子の文字列 ('.'は含めず)
>>> sec = tm_ut.now_sec()
>>> sec
1581140699.4601622
>>> tm_ut.sec_to_fn('hoge_', sec, 'txt')
'hoge_20200208144459.txt'

fn_to_sec(fn)

日付時刻を含むファイル名から、その日付時刻の秒数を返します。

>>> tm_ut.fn_to_sec('hoge_20200208144459.txt')
1581140699.0

>>> tm_ut.sec_to_str(1581140699.0, '-')
'2020-02-08-14-44-59'

>>> tm_ut.sec_to_str( tm_ut.fn_to_sec('foo_bar-200208144459.txt'), '.' )
'2020.02.08.14.44.59'

>>> tm_ut.sec_to_str( tm_ut.fn_to_sec('foo_bar-980208144459.tgz'), '-' )
'1998-02-08-14-44-59'

get_mon3(str_or_int)

引数の月の値をアルファベット3文字の文字列にして返します。

引数には1から12の整数か文字列を指定します。

>>> import tm_ut
>>> tm_ut.get_mon3(8)
'AUG'
>>> tm_ut.get_mon3('09')
'SEP'
>>> tm_ut.get_mon3(10)
'OCT'

date432()

現在の日付を '2020/OCT/23' のような形式で返します。

>>> imort tm_ut
>>> tm_ut.date432()
'2020/OCT/23'

tm60_to_int(tm60)

hh[mm[ss]] 形式の60進数の整数を、最小の位の整数に変換します。

例えば、130 (1:30) は 90 に。

int_to_tm60(v)

整数vを最小の位の整数として、60進数の整数に変換します。

tm60_to_int(tm60) の逆変換です。

例えば、90 は 130 (1:30) に。

tm60_to_str(tm60)

hh[mm[ss]] 形式の60進数の整数を、 hh[:mm[:ss]] 形式の文字列に変換します。

例えば、130 は '1:30' に。

str_to_tm60(s)

hh[:mm[:ss]] 形式の文字列を、 hh[mm[ss]] 形式の60進数の整数に変換します。

tm60_to_str(tm60) の逆変換です。

例えば、'1:30' は 130 に。

tm60_str_to_int(s)

hh[:mm[:ss]] 形式の文字列を、最小の位の整数に変換します。

str_to_tm60(s)tm60_to_int(tm60) の合成です。

例えば、'1:30' は 90 に。

tm60_int_to_str(v)

整数vを最小の位の整数として、hh[:mm[:ss]] 形式の文字列に変換します。

tm60_str_to_int(s) の逆変換です。

int_to_tm60(v)tm60_to_str(tm60) の合成です。

例えば、90 は '1:30' に。

is_tm60_str(s)

文字列 s が時刻を表すような hh[ [:] mm[ [:] ss] ] 的な形式ならば、 Trueを、そうでなければFalseを返します。

(かなりテキトーな甘い判定です)

tm60_show_sum(tm60_str_lst)

引数 tm60_str_lst には、 時刻を表すような hh[ [:] mm[ [:] ss] ] 的な形式のリストを与えます。

各時刻間の経過時間を算出し、 偶数番目の経過時間の合計、 奇数番目の経過時間の合計、 全体の経過時間の合計を求めて表示します。

__main__

次のコードを追加し、 tm_ut.py の実行で tm60_show_sum(tm60_str_lst) を呼び出すようにしました。

if __name__ == "__main__":
	s = sys.stdin.read()
	s = s.replace( '-', ' ' )
	lst = s.split()
	lst = list( filter( is_tm60_str, lst ) )
	tm60_show_sum( lst )

標準入力に与えた文字列から、時刻っぽい文字列を抽出してリストにし、 tm60_show_sum(tm60_str_lst) を呼び出して、経過時間などを表示します。

在宅勤務の作業時間の集計などに便利かと。

$ ./tm_ut.py
1030-
1200-1245
1315-1320
-1800
^D
10:30
        1:30
12:00
             45
12:45
        30
13:15
             5
13:20
        4:40
18:00
----------------
        6:40
             50
7:30
$
$ echo 1030 1200 1245 1315 1320 1800 | ./tm_ut.py
10:30
        1:30
12:00
             45
12:45
        30
13:15
             5
13:20
        4:40
18:00
----------------
        6:40
             50
7:30
$


cnt_ut.py

進捗表示用のカウンタです。

cnt_ut.py
#!/usr/bin/env python

import time

import empty
import thr
import dbg

def cnt_new(n):
	e = empty.new()
	e.i = 0
	e.p = 0
	e.s_n = 0

	e.st_sec = None
	e.st_i = None
	e.ed_sec = None

	def calc_rsec():
		now = time.time()
		i = e.i
		rsec = -1
		if e.st_sec is None:
			e.st_sec = now
			e.st_i = i
		else:
			rsec = ( now - e.st_sec ) * ( n - i ) / ( i - e.st_i )
		return rsec

	def sec_to_str(sec):
		v = int( sec )
		s = ''
		while v > 0:
			a = str( v % 60 )
			v = v // 60
			if v > 0:
				a = ':' + ( '0' + a )[ -2 : ]
			s = a + s
		return s

	def show(s):
		bk = '\b' * e.s_n
		bk = bk + ( ' ' * e.s_n ) + bk
		e.s_n = len( s )
		dbg.out( bk + s, '' )

	def th_f():
		p = int( 100 * e.i / n )
		if p > e.p:
			e.p = p
			show( '{}%  rest {}'.format( p, sec_to_str( calc_rsec() ) ) )

	th = thr.loop_new( th_f, wait_tmout=1.0 )
	thr.ths.start( th )

	def up():
		if e.i < n:
			e.i += 1
			if e.i == n:
				e.ed_sec = time.time()

	def show_all_sec(fmt):
		st_sec = e.st_sec if e.st_sec else time.time()
		ed_sec = e.ed_sec if e.ed_sec else time.time()
		s = sec_to_str( ed_sec - st_sec )
		dbg.out( fmt.format( s ) )

	def stop(show_fmt=''):
		show( '' )
		th.quit_ev.set()
		if show_fmt:
			show_all_sec( show_fmt )

	return empty.add( e, locals() )

def cnt_map(func, lst, show_fmt=''):
	cnt = cnt_new( len( lst ) )

	def f(v):
		cnt.up()
		return func( v )

	rlst = map( f, lst )
	cnt.stop( show_fmt )
	return rlst

def cnt_loop(func_i, n, show_fmt=''):
	cnt = cnt_new( n )
	for i in range( n ):
		cnt.up()
		func_i( i )
	cnt.stop( show_fmt )

# EOF

dbg.pyかthr.py、あるいはtm_ut.pyに追加しようと思いつつ、 一長一短なので、専用のcnt_ut.pyを新規追加してしまいました。

cnt_new(n)

引数 n にはカウントする回数を指定します。

進捗表示用のオブジェクトを生成し、返します。

主なメソッド

メソッド 内容
up() カウントアップするときに呼び出します。
n回呼び出すまでの間、進捗率が1%以上変化すると、進捗状況を表示します。
ただし、前回の表示更新から1秒以上経過してないと、表示を更新しません。
(頻繁すぎる更新を避けて、負荷を抑えてます)
stop() 使用後に呼び出します。
内部でスレッドを使用しているので、資源を回収します。

cnt_map(func, lst)

map()関数の進捗表示版です。

lst = list( map( func, lst ) )

lstの要素数が多いとか、func()の処理が重いとかで、 延々と待たされる。そんなとき、一度

import cnt_ut

lst = list( cnt_ut.cnt_map( func, lst ) )

をお試しください。

cnt_loop(func_i, n)

引数func_iには、カウンタ変数iをとる処理関数を指定します。

例えば

sum = 0
for i in range( big_n ):
	sum += heavy( i )

が全然終わる気配が無いとき。

cnt_new()を使うなら

import cnt_ut

sum = 0
cnt = cnt_cnt.cnt_new( big_n )
for i in range( big_n ):
	cnt.up()
	sum += heavy( i )
cnt.stop()

ですが、、、

cnt_loop()を使うなら

import cnt_ut

sum = 0
def func_i(i):
	sum += heaby( i )
cnt_loop( func_i, big_n )

でいけます。


base.py

わりと基本的なものです。

どのジャンルにも属さない様な、基本的なものを集めてみます。

'\n'.join( ['foo', 'bar', 'hoge'] )

的な事を多用するので、kon_utのどこかに入れておこうかと思いつつ...

適切な場所が無かったので、この base.py を新たに追加してみました。

base.py
#!/usr/bin/env python

import os

import empty


def to_str(lst, pre='', delim='\n', last=False):
	s = delim.join( map( lambda s: pre + s, lst ) )
	if last:
		s += delim
	return s

def filter_str(f, s):
	return ''.join( filter(f, s) )

def filter_div(f, lst):
	not_f = lambda e: not f(e)
	return ( list( filter( f, lst ) ), list( filter( not_f, lst ) ) )

def add_path(base, add, delim='/'):
	lst = []
	if base:
		lst.append( base )
	if add:
		lst.append( add )
	return delim.join( lst )

def path_lst(name, from__file__):
	lst = [ name ]
	dir_ = os.path.dirname( from__file__ )
	if dir_ != '.':
		lst.append( os.path.join( dir_, name ) )

	return list( filter( os.path.exists, lst ) )

def find_path(name, from__file__, dv=None):
	lst = path_lst( name, from__file__ )
	return lst[0] if lst else dv


def can_hash(o):
	try:
		hash( o )
		return True
	except:
		pass
	return False

def rev_dic(dic):
	items = filter( lambda kv: can_hash( kv[ 1 ] ), dic.items() )
	return dict( map( reversed, items ) )


def dic_update_r(targ, add):
	for (k, v) in add.items():
		tv = targ.get( k )
		if type( v ) == dict and type( tv ) == dict:
			dic_update_r( tv, v )
		else:
			targ[ k ] = v


def str_find(s, key_h, key_t=None):
	if key_h not in s:
		return None
	i = s.index( key_h )
	i_in = i + len( key_h )
	s_ = s[ i_in : ]

	if key_t:
		if key_t not in s_:
			return None
		j_in = i_in + s_.index( key_t )
		j = j_in + len( key_t )
	else:
		dlims = ' \t\n'
		f = lambda dlim: s_.index( dlim ) if dlim in s_ else len( s_ )
		j = j_in = i_in + min( map( f, dlims ) )

	s_in = s[ i_in : j_in ]

	return empty.new( locals() )


def str_replace_dic(s, dic):
	keys = [
		( '${', '}' ),
		( '$(', ')' ),
		( '$', None ),
	]

	while True:
		bak = s
		for (key_h, key_t) in keys:
			r = str_find( s, key_h, key_t )
			if r and r.s_in in dic:
				s = s [ : r.i ] + dic.get( r.s_in ) + s [ r.j : ]
				break
		if s == bak:
			break
	return s

# EOF

to_str(lst, pre='', delim='\ n', last=False)

lstの文字列をdelimを挟んで連結します。

lstは文字列が要素のリストの前提です。

lstの各文字列の先頭に pre の文字列を挿入できるようにしてます。

last=Trueを指定すると、結果の末尾にさらにdelimを追加します。

>>> import base
>>> lst = list( map( str, range(8) ) )
>>> lst
['0', '1', '2', '3', '4', '5', '6', '7']

>>> print( base.to_str( lst, pre='hoge_' ) )
hoge_0
hoge_1
hoge_2
hoge_3
hoge_4
hoge_5
hoge_6
hoge_7

>>> base.to_str( lst, delim=' ' )
'0 1 2 3 4 5 6 7'

>>> base.to_str( lst, last=True )
'0\n1\n2\n3\n4\n5\n6\n7\n'

>>> base.to_str( lst, ' [', ']', True )
' [0] [1] [2] [3] [4] [5] [6] [7]'

filter_str(f, s)

文字列のfilter()です。

中身は

''.join( filter(f, s) )

です。

>>> import base

>>> f = lambda c: c.isdigit()

>>> list( filter( f, 'abc123def456' ) )
['1', '2', '3', '4', '5', '6']

>>> ''.join( list( filter( f, 'abc123def456' ) ) )
'123456'

>>> base.filter_str( f, 'abc123def456' )
'123456'

filter_div(f, lst)

filter(f, lst)で除外された方もまとめて、 (残ったもののリスト、除外されたもののリスト) のタプルを返します。

>>> import base
>>> base.filter_div( lambda v: v % 3 == 0, range(12) )
([0, 3, 6, 9], [1, 2, 4, 5, 7, 8, 10, 11])

add_path(base, add, delim='/')

パス文字列baseの末尾に'/'区切ってaddの文字列を追加します。

baseが空文字列('')の時は、addだけが返ります。

addが空文字列('')の時は、baseだけが返ります。

どちらも空文字列('')の時は、空文字列('')が返ります。

例えばos.path.join()では

>>> os.path.join( 'foo', 'bar' )
'foo/bar'
>>> os.path.join( '', 'bar' )
'bar'
>>> os.path.join( 'foo', '' )
'foo/'

本関数では最後の場合で 'foo' が返ります。

path_lst(name, from__file__)

ファイル名nameのファイルを探してリストを返します。

from__file__ には、呼び出し側で __file__ を与えるようにします。

検索パスは2つだけです。

カレントディレクトリと、 引数from__file__で指定されたファイルの存在するディレクトリを探します。

見つかったファイルパスのリストを返します。

$ cd /tmp
$ mkdir hoge
$ cd hoge

$ cat > fuga.py
#!/usr/bin/env python
import base
print( base.path_lst('foo', __file__) )
^D
$ chmod +x fuga.py

$ ./fuga.py
[]

$ touch foo

$ ./fuga.py
['foo']

$ cd ..
$ hoge/fuga.py
['hoge/foo']

$ touch foo
$ hoge/fuga.py
['foo', 'hoge/foo']

find_path(name, from__file__, dv=None)

ファイル名nameのファイルを探してパスを返します。

from__file__ には、呼び出し側で __file__ を与えるようにします。

ファイルを探す場所は2箇所だけです。

最初にカレントディレクトリを探します。

見つからなければ、 引数from__file__で指定されたファイルの存在するディレクトリを探します。

$ cd /tmp
$ mkdir hoge
$ cd hoge

$ cat > fuga.py
#!/usr/bin/env python
import base
print( base.find_path('foo', __file__, 'not found') )
^D
$ chmod +x fuga.py

$ ./fuga.py
not found

$ touch foo

$ ./fuga.py
foo

$ cd ..
$ hoge/fuga.py
hoge/foo

$ touch foo
$ hoge/fuga.py
foo

can_hash(o)

hash( o )を実行して例外が発生しなければTrue、発生すればFalseを返します。

rev_dic()で、o が辞書のキーとして使えるかどうかを判定するために追加してみました。

他にもっと良い正しい判定方法がありそうですが、、、

def can_hash(o):
	try:
		hash( o )
		return True
	except:
		pass
	return False

とりあえずこれで

rev_dic(dic)

dicのキーと値を反対にした辞書を返します。

ただし、キーとして使えないものはcan_hash()で判定して、除外されます。

>>> import base

>>> s = 'ABCDEFGH'

>>> dic = dict( map( lambda i: ( s[i], i ), range( 8 ) ) )

>>> dic
{'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7}

>>> base.rev_dic( dic )
{0: 'A', 1: 'B', 2: 'C', 3: 'D', 4: 'E', 5: 'F', 6: 'G', 7: 'H'}

dic_update_r(targ, add)

辞書targを辞書addで再帰的にupdate()します。

辞書の普通のupdate()メソッドの動作

>>> s = '''
... foo:
...   kind: a
...   num: 2
... bar:
...   kind: b
...   num: 3
... '''
>>> s
'\nfoo:\n  kind: a\n  num: 2\nbar:\n  kind: b\n  num: 3\n'
>>> import yaml
>>> targ = yaml.load( s )
>>> targ
{'foo': {'kind': 'a', 'num': 2}, 'bar': {'kind': 'b', 'num': 3}}

>>> print( yaml.dump( targ ) )
bar: {kind: b, num: 3}
foo: {kind: a, num: 2}
>>> s_add = '''
... foo:
...   num: 5
... '''
>>> add = yaml.load( s_add )
>>> add
{'foo': {'num': 5}}
>>> targ.update( add )
>>> targ
{'foo': {'num': 5}, 'bar': {'kind': 'b', 'num': 3}}
>>> print( yaml.dump( targ ) )
bar: {kind: b, num: 3}
foo: {num: 5}

fooのnumは更新されますが、kindは消えます。

再帰的なupdate

>>> s
'\nfoo:\n  kind: a\n  num: 2\nbar:\n  kind: b\n  num: 3\n'
>>> targ = yaml.load( s )
>>> targ
{'foo': {'kind': 'a', 'num': 2}, 'bar': {'kind': 'b', 'num': 3}}

>>> add
{'foo': {'num': 5}}
>>> import base
>>> base.dic_update_r( targ, add )
>>> targ
{'foo': {'kind': 'a', 'num': 5}, 'bar': {'kind': 'b', 'num': 3}}

>>> print( yaml.dump( targ ) )
bar: {kind: b, num: 3}
foo: {kind: a, num: 5}

fooのnumだけが更新されます。

str_find(s, key_h, key_t=None)

文字列sについて、文字列key_hに一致する箇所から、文字列key_tに一致する箇所までの情報を、 オブジェクトの属性に設定して返します。

文字列sが、文字列key_hを含んでいなければ None を返します。

有効な文字列key_tが指定されて、文字列sがkey_tを含んでいなければ None を返します。

key_tが指定されていない場合は、空白文字(スペース、タブ、改行)や行末を、 区切りとして扱います。

返されるオブジェクトには次の属性が設定されます。

属性 内容
i key_hのインデックス
i_in key_hの直後の文字のインデックス
j_in key_tのインデックス
j key_tの直後の文字のインデックス
s_in key_hの直後からkey_t直前までの文字列 s[ i_in : j_in ]

key_t が指定されていない場合は、 j_in と j の値は同じで、空白文字などの区切り文字のインデックスか、 区切りが行末の場合は文字列の長さとなります。

>>> import base

>>> o = base.str_find( 'foo $HOGE bar', '$' )

>>> vars( o )
{'s_in': 'HOGE', 'f': <function str_find.<locals>.<lambda> at 0x10c9fcf28>, 'dlims': ' \t\n', \
'j': 9, 'j_in': 9, 'i_in': 5, 'i': 4, 'key_t': None, 'key_h': '$', 's': 'foo $HOGE bar', 's_': 'HOGE bar'}

>>> o2 = base.str_find( 'foo$(HOGE)bar', '$(', ')' )

>>> vars( o2 )
{'s_in': 'HOGE', 'j': 10, 'j_in': 9, 'i_in': 5, 'i': 3, 'key_t': ')', 'key_h': '$(', \
's': 'foo$(HOGE)bar', 's_': 'HOGE)bar'}

str_replace_dic(s, dic)

文字列sの中にある '$' で始まる単語 $key, $(key), ${key} について、 辞書dicにkeyがあれば、対応する値の文字列に置き換えた文字列を返します。

>>> import base

>>> s = '${FOO}_$(BAR) : hello $HOGE $FOO'

>>> dic = { 'FOO': '123', 'BAR': '456', 'HOGE': 'kondoh' }

>>> base.str_replace_dic( s, dic )
'123_456 : hello kondoh 123'


chide.py

文字列の特定の文字を使わないようにする変換用です。

chide.py
#!/usr/bin/env python

import empty

rep = lambda s, i, c: s[ : i ] + c + s[ i + 1 : ]

def enc(s, lst):
	lst = list( filter( lambda c2: c2[ 0 ] in s, lst ) )

	e = empty.new()
	e.s = s

	def cvt(c2):
		(c_s, c_d) = c2
		ps = []
		while c_s in e.s:
			i = e.s.index( c_s )
			e.s = rep( e.s, i, c_d )
			ps.append( i )

		ps = '_'.join( map( str, ps ) )
		return '/'.join( [ 's', c_d, str( ord( c_s ) ), ps ] )

	h = '/'.join( map( cvt, reversed( lst ) ) )
	return h + ' ' + e.s

def dec(s):
	if ' ' not in s:
		return s

	def get_hs(s):
		i = s.index( ' ' )
		(h, s) = ( s[ : i ], s[ i + 1 : ] )
		if '/' not in h:
			return None
		h = h.split( '/' )
		if len( h ) % 4 != 0:
			return None
		return empty.new( h=h, s=s )

	e = get_hs( s )
	if not e:
		return s

	def cvt(cmd, c_d, c_s, ps):
		if cmd != 's':
			return False
		c_s = chr( int( c_s ) )
		ps = list( map( int, ps.split( '_' ) ) )
		for i in ps:
			if e.s[ i ] != c_d:
				return False
			e.s = rep( e.s, i, c_s )
		return True

	while e.h:
		(h4, e.h) = ( e.h[ : 4 ], e.h[ 4 : ] )
		if not cvt( * h4 ):
			return s
	return e.s

def sample():
	s = 'abc.FOO.xyz/KONDOH_bar'
	lst = [ 'b.', 'O_' ]
	s_e = enc( s, lst )
	s_d = dec( s_e )
	print( s )
	print( s_e )
	print( s_d )

if __name__ == "__main__":
	sample()
# EOF

enc(s, lst)

sは対象の文字列、 lstには次のような文字列のリストを指定します。

例えば

lst = [ 'b.', 'O_' ]

では、'b'を’.'に、'O'を'_'に変換します。

対象の文字列を、lstの情報で変換します。

元の文字列に戻すためのヘッダ情報を、先頭に付加した文字列を返します。

指定の変換前の文字が対象文字列に含まれず、変換がなされなかった場合は、元の文字列を返します。

変換がなされた場合に返される文字列は、ヘッダ情報の文字列の後に空白1つを挟み変換後の文字列が続きます。

ヘッダ情報(文字列) + 空白' ' + 変換結果(文字列)

ヘッダ情報

lstの長さ(変換文字の数)だけ、ヘッダ(文字列)を '/' で連結した文字列

ヘッダ + '/' ヘッダ + '/' ... '/' + ヘッダ

ヘッダ

次の形式の文字列

s/変換後の文字/変換前の文字のコードの数値/対象文字列中の変換文字の位置情報

対象文字列中の変換文字の位置情報

対象文字列中の先頭からの位置を示す整数の文字列を、'_' で連結した文字列

位置の整数_位置の整数_ ... _位置の整数

制限

文字列から特定の文字を隠しておいて、また元に戻すのが目的ですが...

ヘッダの部分で必ず 's', '/', '_' ,数字の文字,空白を使ってます。

なので、上記の文字をenc(s, lst)のlstで変換前の文字として指定しても、 返される文字列のヘッダ部分に、文字列が残ってしまい、隠しきれない場合があります。

dec(s)

enc(s, lst) の結果として返る文字列を s に指定します。

sの先頭に、元も文字列に戻すためのヘッダ情報が含まれていると、元の文字列を復元して返します。

ヘッダ情報が含まれていなかったり、復元に失敗すると、引数の文字列sを返します。

使用例

ソースコード末尾にsample()を追加してます。

  :
def sample():
	s = 'abc.FOO.xyz/KONDOH_bar'
	lst = [ 'b.', 'O_' ]
	s_e = enc( s, lst )
	s_d = dec( s_e )
	print( s )
	print( s_e )
	print( s_d )

if __name__ == "__main__":
	sample()
# EOF
$ ./chide.py
abc.FOO.xyz/KONDOH_bar
s/_/79/5_6_13_16/s/./98/1_19 a.c.F__.xyz/K_ND_H_.ar
abc.FOO.xyz/KONDOH_bar


yaml_ut.py

YAML関連です。

yaml_ut.py
#!/usr/bin/env python

import yaml
import six

load = yaml.load # str, file

def load_fn(fn, dv=None):
	o = dv
	with open(fn, 'rb') as f:
		try:
			o = load(f)
		except:
			pass
	return o

dump = yaml.dump

def dump_no_flow(o):
	def represent_str(dumper, instance):
		tag = 'tag:yaml.org,2002:str'
		style = '|' if '\n' in instance else None
		return dumper.represent_scalar( tag, instance, style=style )

	for typ in [ str ] + ( [ unicode ] if six.PY2 else [] ):
		yaml.add_representer(typ, represent_str)

	return dump( o, default_flow_style=False, allow_unicode=True, encoding='utf-8' )

def save(o, fn, dv=''): # no_flow
	s = dump_no_flow(o)
	with open(fn, 'wb') as f:
		try:
			f.write(s)
		except:
			return dv
	return s

# EOF

load(s_or_f)

yaml.load そのままです。

引数には文字列や、リードopenしたファイルを指定します。

load_fn(fn, dv=None)

fnにはファイル名を指定します。

fnをリードopenして、load()した結果を返します。

例外が発生して失敗すると、引数のdvを返します。

2020/ARP/02 open mode を 'r' から 'rb' に修正しました。

dump(o)

yaml.dump そのままです。

dump_no_flow(o)

yaml.dump()の default_flow_style=Falseでダンプします。

他、 簡易なHTMLパーサ 2018秋 preタグのyamlダンプ でこしらえた yaml_dump() を 簡易なおれおれマークダウン 2019秋 で流用してましたが、本関数はその時の内容そのままです。

save(o, fn, dv='')

fnにはファイル名を指定します。

oをdump_no_flow()で文字列に変換して、fnに書き込みます。

書き込んだ文字列を返します。

ファイルへの書き込みで例外が発生して失敗すると、 引数のdvを返します。

2020/ARP/02 open mode を 'w' から 'wb' に修正しました。


html_ut.py

HTML関連です。

html_ut.py
#!/usr/bin/env python

import os
import yaml

import cmd_ut
import nkf
import arg
import dbg

search_dirs = [ os.path.dirname( __file__ ) ]

def get_cmd(path):
	cmd = 'cat'
	ps = [ 'http', 'https' ]
	if any( map( lambda p: path.startswith( p + '://' ), ps ) ):
		cmd = 'wget -q -O-'
	elif not path.startswith( '/' ) and not os.path.exists( path ):
		for d in search_dirs:
			t = os.path.join( d, path )
			if os.path.exists( t ):
				path = t
				break
	return cmd + ' ' + path

def get_yaml(path, head_only=False):
	cmd = get_cmd( path ) + ' | '
	if head_only:
		cmd += cmd_ut.cmd_py( 'from_to' ) + ' "<head>" "</head>" | '
	cmd += cmd_ut.cmd_py( 'ezhtml' ) + ' h'
	b = cmd_ut.call( cmd )
	(s, opt) = nkf.to_str( b )
	return s

def get_obj(path, head_only=False):
	s = get_yaml( path, head_only )
	o = yaml.load( s )
	if head_only:
		o = { 'html': [ o ] }
	return o

def find_tag(o, tag):
	# Ex. tag 'body'
	t = type( o )

	if t == dict:  # len 1
		(k, v) = list( o.items() )[ 0 ]
		if k.split()[ 0 ].lower() == tag:
			return v
		return find_tag( v, tag )

	if t == list:
		v = None
		for d in o:
			v = find_tag( d, tag )
			if v != None:
				break
		return v

	return None

def find_tag_path(o, tag_path):
	# Ex. tag_path 'html/head/title'

	tags = tag_path.split( '/' )

	v = None
	for tag in tags:
		v = find_tag( o, tag )
		if v == None:
			break
		o = v
	return v

def get_tag_path(path, tag_path):
	head_only = tag_path.startswith( 'html/head' )
	o = get_obj( path, head_only=head_only )
	s = find_tag_path( o, tag_path )
	return s

def get_title(path, dv=''):
	s = get_tag_path( path, 'html/head/title' )
	return s if s else dv

if __name__ == "__main__":
	a = arg.new()
	path = a.pop( 'http://kondoh.html.xdomain.jp/index.html' )
	s = get_title( path )
	print( nkf.enc( s ) )
# EOF

get_yaml(path, head_only=False)

いきなりHTMLじゃなくてYAMLですが...

pathに記録されているHTMLデータを、YAMLデータに変換して返します。

pathには、URLやファイルパスを指定します。

head_only=True を指定すると、 from_to.py を使って、"<head> ... </head>" だけを切り出して処理します。(2020/NOV/16 追加)

変換には 簡易なHTMLパーサ 2018秋 の ezhtml.py を使用します。

予め kon_pageのpythonモジュールのインストール からインストールしておきます。

pathに相対パスが指定された場合、 カレントディレクトリにファイルが見つからなければ、 変数search_dirsに保持されているディレクトリのリストで、 ファイルを探します。

get_obj(path, head_only=False)

get_yaml( path )結果のYAMLデータを、 yaml.load()でロードして、 生成されるオブジェクトを返します。

head_only=Trueを指定すると、get_yaml()で head_only=True を指定します。

head_only=Trueを指定すると、headオブジェクト生成後に { 'html': [ headオブジェクト ] } の辞書オブジェクトにして返します。(2020/NOV/16 追加)

find_tag(o, tag)

get_obj( path )で返るオブジェクトoから、 文字列tagのHTMLタグを探して返します。

見つからなければ、Noneが返ります。

tagには'body'などの文字列を与えます。

find_tag_path(o, tag_path)

get_obj( path )で返るオブジェクトoから、 tag_pathで指定する複数のtagを順に探し、 結果のHTMLタグを返します。

見つからなければ、Noneが返ります。

tag_pathには'html/head/title'などの、 tagを'/'で区切った文字列を与えます。

get_tag_path(path, tag_path)

pathに記録されているHTMLデータから、 tag_pathを探し、 結果のHTMLタグを返します。

見つからなければ、Noneが返ります。

tag_pathには'html/head/title'などの、 tagを'/'で区切った文字列を与えます。

安易な実装です。
def get_tag_path(path, tag_path):
	head_only = tag_path.startswith( 'html/head' )
	o = get_obj( path )
	s = find_tag_path( o, tag_path )
	return s

(2020/NOV/16 head_only追加)

get_title(path, dv='')

pathに記録されているHTMLデータから、 'html/head/title' のtag_pathを探し、 タイトル文字列を返します。

見つからなければ、引数dvを返します。


from_to.py

from_to.py
#!/usr/bin/env python

import sys

import empty
import dbg

def run():
	av = sys.argv[ 1 : ]
	if len( av ) < 2:
		dbg.help_exit( 'from_word to_word' )

	(from_word, to_word) = av[ : 2 ]

	e = empty.new()
	e.show = False

	def cnv(s):
		if e.show:
			if to_word in s:
				i = s.index( to_word ) + len( to_word )
				e.show = False
				return s[ : i ] + cnv( s[ i : ] )
			return s
		else:
			if from_word in s:
				i = s.index( from_word )
				e.show = True
				return cnv( s[ i : ] )
			return ''

	while True:
		s = sys.stdin.readline()
		if not s:
			break
		s = cnv( s )
		sys.stdout.write( s )

if __name__ == "__main__":
	run()
# EOF

簡易なテキストのフィルタです。

開始の文字列パターンから、終了の文字列パターンまでを表示します。

html_ut.pyから使ってます。


del_lines.py

del_lines.py
#!/usr/bin/env python

import sys

import dbg

def make_lines(s):
	if not s:
		return []
	if '\n' not in s:
		return [ s ]
	i = s.index( '\n' )
	return [ s[ : i + 1 ] ] + make_lines( s[ i + 1 : ] )

def run():
	av = sys.argv[ 1 : ]
	if len( av ) < 1:
		dbg.help_exit( 'pat_file' )

	pat_lines = []
	with open( av[ 0 ], 'r' ) as f:
		pat_lines = make_lines( f.read() )

	#dbg.out( 'pat_lines={}'.format( pat_lines ) )

	buf_lines = []

	def clear():
		del buf_lines[ : ]

	def flush():
		s = ''.join( buf_lines )
		clear()
		return s

	def cnv(s):
		i = len( buf_lines )
		if s == pat_lines[ i ]:
			buf_lines.append( s )
			if len( buf_lines ) == len( pat_lines ):
				clear()
			return ''

		if buf_lines:
			s = flush() + s
		return s

	while True:
		s = sys.stdin.readline()
		if not s:
			break
		s = cnv( s )
		sys.stdout.write( s )

	sys.stdout.write( flush() )

if __name__ == "__main__":
	run()
# EOF

簡易なテキストのフィルタです。

指定の複数行の文字列を見つけると削除します。


np_ut.py

np_ut.py
#!/usr/bin/env python

import numpy as np
import math

import empty
import yaml_ut
import dbg


def pos_trans_arr(arr, pos):
	return arr - np.array( [ pos.x, pos.y, pos.z ] )

def pos_trans(xs, ys, zs, pos):
	arr = np.array( [ xs, ys, zs ] ).T
	arr = pos_trans_arr( arr, pos )
	return arr.T

def pa_trans(xs, ys, zs, pa):
	(xs, ys, zs) = pos_trans( xs, ys, zs, pa.p )

	s = math.sin( -pa.a )
	c = math.cos( -pa.a )
	(xs, ys) = ( c * xs - s * ys, s * xs + c * ys )

	return [ xs, ys, zs ]

def pa_trans_arr(arr, pa):
	(xs, ys, zs) = arr.T
	(xs, ys, zs) = pa_trans( xs, ys, zs, pa )
	return np.array( [ xs, ys, zs ] ).T


def get_d2(arr, p):
	(xs, ys, zs) = ( arr - p ).T
	d2_lst = xs * xs + ys * ys + zs * zs
	return d2_lst


def get_near_bools(xs, ys, zs, dist):
	d2 = xs * xs + ys * ys + zs * zs
	bools = d2 <= dist * dist
	return bools

def get_near_bools_arr(arr, dist):
	(xs, ys, zs) = arr.T
	return get_near_bools( xs, ys, zs, dist )

def get_near_arr(arr, dist):
	bools = get_near_bools_arr( arr, dist )
	return arr[ bools, : ]


def to_bytes_head(b):
	h = np.int32( len( b ) ).tobytes()
	return h + b

def from_bytes_head(b):
	(h, b) = ( b[ : 4 ], b[ 4 : ] )
	n = np.frombuffer( h, dtype=np.int32 )[ 0 ]
	(b, t) = ( b[ : n ], b[ n : ] )
	return ( b, t )

def to_bytes_dtype(lst, dtype):
	b = np.array( lst, dtype=dtype ).tobytes()
	return to_bytes_head( b )

def from_bytes_dtype(b, dtype):
	(b, t) = from_bytes_head( b )
	arr = np.frombuffer( b, dtype=dtype ).copy()
	return (arr, t)

def to_bytes_yaml( o ):
	return to_bytes_head( yaml_ut.dump( o ).encode( 'utf-8' ) )

def from_bytes_yaml( b ):
	( b, t ) = from_bytes_head( b )
	o = yaml_ut.load( b.decode( 'utf-8' ) )
	return ( o, t )

def to_bytes_arr( arr ):
	dic = {}
	dic[ 'dtype' ] = str( arr.dtype )
	dic[ 'shape' ] = list( arr.shape )
	b = to_bytes_yaml( dic )
	b += to_bytes_head( arr.tobytes() )
	return b

def from_bytes_arr( b ):
	( dic, b ) = from_bytes_yaml( b )
	dtype = dic.get( 'dtype', 'float32' )
	shape = dic.get( 'shape' )
	( arr, t ) = from_bytes_dtype( b, dtype )
	if shape:
		shape = [ -1 ] + list( shape[ 1 : ] )
		arr = arr.reshape( shape )
	return ( arr, t )

def save_bytes(path, b, show=True):
	end = dbg.start_save( path, show )
	with open( path, 'wb' ) as f:
		f.write( b )
	end()

def load_bytes(path, show=True):
	end = dbg.start_load( path, show )
	b = b''
	with open( path, 'rb' ) as f:
		b = f.read()
	end()
	return b

def save_bytes_dtype(path, lst, dtype, show=True):
	save_bytes( path, to_bytes_dtype( lst, dtype ), show )

def load_bytes_dtype(path, dtype, show=True):
	(arr, t) = from_bytes_dtype( load_bytes( path, show ), dtype )
	return (arr, t )

def write_bytes_head( f, b ):
	f.write( to_bytes_head( b ) )

def read_bytes_head( f ):
	h = f.read( 4 )
	if not h:
		return h
	n = np.frombuffer( h, dtype=np.int32 )[ 0 ]
	return f.read( n )


def get_inf(dic):
	name = dic.get( 'name' )
	is_list = 'type_list' in dic
	typ = dic.get( 'type_list' if is_list else 'type' )
	n = dic.get( 'n', 1 )
	nest = type( typ ) == list
	dtype = eval( typ ) if not nest else None
	dtype_sz = np.dtype( dtype ).itemsize if dtype else None

	def set_n(n):
		dic[ 'n' ] = n

	return empty.new( locals() )


def tobytes_dic(o, dic):
	inf = get_inf( dic )

	v = getattr( o, inf.name, None )
	arr = v if inf.is_list else None
	if arr:
		inf.set_n( len( arr ) )

	if inf.nest:
		dics = inf.typ
		f = lambda o: tobytes( o, dics )
		if not inf.is_list:
			return f( v )
		return b''.join( map( f, arr ) )

	if type( arr ) == bytes:
		return arr

	if not inf.is_list:
		arr = [ v ]

	if arr and type( arr ) != np.ndarray or type( arr[ 0 ] ) != inf.dtype:
		arr = np.array( arr, dtype=inf.dtype )
		inf.set_n( len( arr ) )

	return arr.tobytes()

def tobytes(o, dics):
	f = lambda dic: tobytes_dic( o, dic )
	return b''.join( map( f, dics ) )


def frombytes_dic(buf, dic):
	inf = get_inf( dic )

	if inf.nest:
		dics = inf.typ
		lst = []
		for i in range( inf.n ):
			(o, buf) = frombytes( buf, dics )
			lst.append( o )

		v = lst if inf.is_list else lst[ 0 ]
		return ( inf.name, v, buf )

	v = np.frombuffer( buf, dtype=inf.dtype, count=inf.n )
	n = inf.dtype_sz * inf.n
	buf = buf[ n: ]

	if not inf.is_list and inf.n == 1:
		v = v[ 0 ]

	if inf.dtype == np.uint8:
		v = v.tobytes()
	return ( inf.name, v, buf )

def frombytes(buf, dics):
	o = empty.new()

	for dic in dics:
		(name, v, buf) = frombytes_dic( buf, dic )
		setattr( o, name, v )

	return ( o, buf )


def save(path, o, s_yaml):
	dics = yaml_ut.load( s_yaml )

	buf = tobytes( o, dics )

	s_yaml = yaml_ut.dump_no_flow( dics ).decode()

	n = s_yaml.count( '\n' )
	s = str( n ) + '\n' + s_yaml

	with open( path, 'wb' ) as f:
		f.write( s.encode() )
		f.write( buf )

def info(path):
	buf = b''
	with open( path, 'rb' ) as f:
		s = f.readline()
		n = int( s )
		for i in range( n ):
			buf += f.readline()
		pos = f.tell()

	s_yaml = buf.decode()
	return ( s_yaml, pos )

def load(path):
	(s_yaml, pos) = info( path )

	buf = b''
	with open( path, 'rb' ) as f:
		f.seek( pos )
		buf = f.read()

	dics = yaml_ut.load( s_yaml )

	(o, buf) = frombytes( buf, dics )
	return o


# vec

arr_new = lambda *args: np.array( args )

p000 = arr_new( 0, 0, 0 )
v100 = arr_new( 1, 0, 0 )
v010 = arr_new( 0, 1, 0 )
v001 = arr_new( 0, 0, 1 )

v_len = lambda v: np.dot( v, v ) ** 0.5

def v_unit(v):
	d = v_len( v )
	return v / d if d != 0 else v001

cross_unit = lambda va, vb: v_unit( np.cross( va, vb ) )

cross_y = lambda vx: cross_unit( v001, vx )

cross_z = lambda vx: cross_unit( vx, cross_y( vx ) )


def get_dist_lst(ps, p):
	return ( ( ps - p ).T ** 2 ).sum( axis=0 ) ** 0.5


# q4

def q4_rev(q4):
	(x, y, z, w) = q4
	return arr_new( -x, -y, -z, w )

def q4_mat(q4):
	(x, y, z, w) = q4
	return np.array( [
		[ w, -z, y, x ],
		[ z, w, -x, y ],
		[ -y, x, w, z ],
		[ -x, -y, -z, w ],
	] )

def q4_rot(q4, q4_targ):
	return q4_mat( q4 ).dot( q4_targ )

def q4_rot_v(q4, v):
	v4 = arr_new( v[ 0 ], v[ 1 ], v[ 2 ], 0 )
	q4_ = q4_rev( q4 )
	r4 = q4_rot( q4_rot( q4, v4 ), q4_ )
	return r4[ : 3 ]  # [ x, y, z ]


def q4_cnv(targ_q4, base_q4):
	return q4_rot( q4_rev( base_q4 ), targ_q4 )

def pos_cnv(targ_pos, base_pos, base_q4):
	return q4_rot_v( q4_rev( base_q4 ), targ_pos - base_pos )

def pose_cnv(targ_pos, targ_q4, base_pos, base_q4):
	pos = pos_cnv( targ_pos, base_pos, base_q4 )
	q4 = q4_cnv( targ_q4, base_q4 )
	return ( pos, q4 )


def q4_icnv(targ_q4, base_q4):
	return q4_rot( base_q4, targ_q4 )

def pos_icnv(targ_pos, base_pos, base_q4):
	return q4_rot_v( base_q4, targ_pos ) + base_pos

def pose_icnv(targ_pos_on_base, targ_q4_on_base, base_pos, base_q4):
	pos = pos_icnv( targ_pos_on_base, base_pos, base_q4 )
	q4 = q4_icnv( targ_q4_on_base, base_q4 )
	return ( pos, q4 )


def q4_new_v_arg(v, arg):
	L = v_len( v )
	if L == 0:
		return arr_new( 0, 0, 0, 1 )
	s = math.sin( arg * 0.5 )
	(x, y, z) = v * ( s / L )
	w = math.cos( arg * 0.5 )
	return arr_new( x, y, z, w )

def q4_to_v_arg( q4 ):
	( x, y, z, w ) = q4
	v = arr_new( x, y, z )
	s = v_len( v )
	if s == 0:
		return ( v001, 0 )  # !
	c = w
	arg_h = math.atan2( s, c )
	return ( v_unit( v ), arg_h * 2 )

def get_arg( va, vb, v_rot=None ):
	if v_rot is not None:
		va = np.cross( v_rot, va )
		vb = np.cross( v_rot, vb )
	va = v_unit( va )
	vb = v_unit( vb )
	crs = np.cross( va, vb )
	arg = math.atan2( v_len( crs ), np.dot( va, vb ) )
	if v_rot is not None and np.dot( crs, v_rot ) < 0:
		arg = -arg
	return arg

def std_arg( arg ):
	return math.atan2( math.sin( arg ), math.cos( arg ) )

def posi_arg( arg ):
	arg = std_arg( arg )
	if arg < 0:
		if abs( arg ) < 0.5 * math.pi / 180:
			return 0  # !
		arg += 2 * math.pi
	return arg

def rot_v( v_rot, arg, v_targ ):
	q4 = q4_new_v_arg( v_rot, arg )
	return q4_rot_v( q4, v_targ )

def rot_li_pos( li_rot, arg, p_targ ):
	( c, v_rot ) = li_rot
	return c + rot_v( v_rot, arg, p_targ - c )


def ypr_to_q4(yaw, pitch, roll):
	# yaw, pitch, roll: radian

	yaw_ = q4_new_v_arg( v001, yaw )
	pitch_ = q4_new_v_arg( v010, pitch )
	roll_ = q4_new_v_arg( v100, roll )

	return q4_rot( roll_, q4_rot( pitch_, yaw_ ) )

def q4_to_ypr(q4):
	v = q4_rot_v( q4, v001 )

	roll = 0
	(x, y, z) = v
	if y != 0 or z != 0:
		roll = math.atan2( -y, z )

	v = q4_rot_v( q4_new_v_arg( v100, -roll ), v )

	pitch = 0
	(x, y, z) = v
	if z != 0 or x != 0:
		pitch = math.atan2( x, z )

	v = v100
	v = q4_rot_v( q4, v )
	v = q4_rot_v( q4_new_v_arg( v100, -roll ), v )
	v = q4_rot_v( q4_new_v_arg( v010, -pitch ), v )

	yaw = 0
	(x, y, z) = v
	if x != 0 or y != 0:
		yaw = math.atan2( y, x )

	return ( yaw, pitch, roll )  # radian



def line_plane_cross(line_p, line_v, plane_p, plane_v):
	# ( ( line_p + L * line_v ) - plane_p ) dot plane_v == 0
	# ( line_p - plane_p ) dot plane_v + L * ( line_v dot plane_v ) == 0
	# L = ( plane_p - line_p ) dot plane_v / ( line_v dot plane_v )

	L = np.dot( plane_p - line_p, plane_v ) / np.dot( line_v, plane_v )
	return line_p + L * line_v

def line_plane_cross_point( li, pl ):
	( li_p, li_v ) = li
	( pl_p, pl_v ) = pl
	if np.dot( li_v, pl_v ) == 0:
		return None
	return line_plane_cross( li_p, li_v, pl_p, pl_v )

def plane2_cross_line( pl_a, pl_b ):
	( ap, av ) = pl_a
	( bp, bv ) = pl_b
	if all( av == bv ):
		return None
	v = cross_unit( av, bv )
	tv = cross_unit( av, v )
	p = line_plane_cross_point( [ ap, tv ], pl_b )
	if p is None:
		return None
	return [ p, v ]

def plane3_cross_point( pl3 ):
	li = plane2_cross_line( pl3[ 0 ], pl3[ 1 ] )
	if li is None:
		return None
	return line_plane_cross_point( li, pl3[ 2 ] )

def point3_circle_point( p3 ):
	( p0, p1, p2 ) = p3
	pl0 = [ ( p0 + p1 ) * 0.5, p1 - p0 ]
	pl1 = [ ( p1 + p2 ) * 0.5, p2 - p1 ]
	pl2 = [ p1, cross_unit( p0 - p1, p2 - p1 ) ]
	return plane3_cross_point( [ pl0, pl1, pl2 ] )

def point3_circle_touch_v( p3 ):
	c = point3_circle_point( p3 )
	if c is None:
		return None
	v3 = p3 - c
	rot_v = np.cross( v3[ 0 ], v3[ 1 ] )
	return list( map( lambda v: cross_unit( rot_v, v ), v3 ) )

def point_plane_len(p, plane_p, plane_v):
	return np.abs( np.dot( plane_p - p, plane_v ) / np.dot( plane_v, plane_v ) )

def get_plane_z( pl_p, pl_v, x, y ):
	return ( np.dot( pl_v, pl_p ) - pl_v[ 0 ] * x - pl_v[ 1 ] * y ) / pl_v[ 2 ]


def get_arg_from_line_p2(line_p, line_v, ps, pe):
	os = line_plane_cross( line_p, line_v, ps, line_v )
	oe = line_plane_cross( line_p, line_v, pe, line_v )

	vs = v_unit( ps - os )
	ve = v_unit( pe - oe )
	c = ( vs + ve ) * 0.5

	dx = v_len( c )
	dy = v_len( vs - c )
	arg = 2 * math.atan2( dy, dx )

	if np.dot( np.cross( vs, c ), line_v ) < 0:
		arg = -arg
	return arg

def get_q4_from_vx_vy(vx, vy):
	vx = v_unit( vx )
	vy = v_unit( vy )
	vz = cross_unit( vx, vy )

	dx = vx - v100
	dy = vy - v010
	dz = vz - v001

	lx = v_len( dx )
	ly = v_len( dy )
	lz = v_len( dz )

	if lx >= lz and ly >= lz:
		rot_v = cross_unit( dx, dy )
		arg_x = get_arg_from_line_p2( p000, rot_v, v100, vx )
		arg_y = get_arg_from_line_p2( p000, rot_v, v010, vy )
		arg = ( arg_x + arg_y ) * 0.5
	elif ly >= lx and lz >= lx:
		rot_v = cross_unit( dy, dz )
		arg_y = get_arg_from_line_p2( p000, rot_v, v010, vy )
		arg_z = get_arg_from_line_p2( p000, rot_v, v001, vz )
		arg = ( arg_y + arg_z ) * 0.5
	else:
		rot_v = cross_unit( dz, dx )
		arg_z = get_arg_from_line_p2( p000, rot_v, v001, vz )
		arg_x = get_arg_from_line_p2( p000, rot_v, v100, vx )
		arg = ( arg_z + arg_x ) * 0.5

	return q4_new_v_arg( rot_v, arg )

def corr_q4_by_ps_pe(ps, pe, targ_q4):
	vx = pe - ps
	inc_vz = pos_icnv( v001, p000, targ_q4 )
	vy = cross_unit( inc_vz, vx )
	base_q4 = get_q4_from_vx_vy( vx, vy )
	return q4_cnv( targ_q4, base_q4 )


def newton( f, a, b, lmt ):
	va = f( a )
	vb = f( b )
	if va * vb > 0:
		return None

	while abs( b - a ) > lmt:
		if va == 0:
			return a
		if vb == 0:
			return b

		c = ( a + b ) * 0.5
		vc = f( c )
		if va * vc >= 0:
			( a, va ) = ( c, vc )
		else:
			( b, vb ) = ( c, vc )
	return c


def curve_new(plst, vs=None, ve=None):
	# plst = [ [ x, y, z ], [ x, y, z], ... ]

	n = len( plst )
	Ls = np.zeros( n )
	Ds = np.zeros_like( plst )
	dlst = np.zeros_like( plst )

	for i in range( n - 1 ):
		ps = plst[ i ]
		pe = plst[ i + 1 ]
		Ls[ i ] = v_len( pe - ps)
		Ds[ i ] = ( pe - ps ) / Ls[ i ]

	for i in range( n - 2 ):
		ds = Ds[ i ]
		de = Ds[ i + 1 ]
		dlst[ i + 1 ] = ( ds + de ) * 0.5

	dlst[ 0 ] = Ds[ 0 ] * 2 - dlst[ 1 ] if vs is None else v_unit( vs )
	dlst[ -1 ] = Ds[ n - 2 ] * 2 - dlst[ -2 ] if ve is None else v_unit( ve )


	def get_ABCDL(ps, pe, ds, de, L):
		# 3A * LL + 2B L + C = de
		# A * LLL + B LL + C L + D = pe

		# 3A * LLL + 2B LL + C L = de L
		# 2A * LLL + 2B LL + 2C L + 2D = 2 pe

		# A * LLL - C L - 2 D = de L - 2 pe
		# A = ( de L - 2 pe + C L + 2 D ) / LLL
		# A = ( ( de + C ) * L + 2 * ( D - pe ) ) / LLL

		# 2B L = de - C - 3A * LL
		# B = ( de - C - 3A * LL ) / 2L

		C = ds
		D = ps
		A = ( ( de + C ) * L + 2 * ( D - pe ) ) / L ** 3
		B = ( de - C - 3 * A * L * L ) / ( 2 * L )
		return [ A, B, C, D, L ]

	def get_ABCDL_i( i ):
		return get_ABCDL( plst[ i ], plst[ i + 1 ], dlst[ i ], dlst[ i + 1 ], Ls[ i ] )

	ABCDLs = []
	for i in range( n - 1 ):
		ABCDL = get_ABCDL_i( i )
		ABCDLs.append( ABCDL )

	def update_ABCDLs( i ):
		if 0 <= i and i < len( ABCDLs ):
			ABCDLs[ i ] = get_ABCDL_i( i )

	def update_dlst( i, v ):
		# i < len( plst )
		dlst[ i ] = v
		update_ABCDLs( i )
		update_ABCDLs( i - 1 )

	def get_pos_lst( dist=0.5, strict_spc=False ):
		get_pos = lambda A, B, C, D, t: A * t ** 3 + B * t ** 2 + C * t + D

		def get_pos_idx_rate( i, rate ):
			( A, B, C, D, L ) = ABCDLs[ i ]
			t = L * rate
			return get_pos( A, B, C, D, t )

		lst = []
		n = len( ABCDLs )

		if strict_spc:
			( i, rate ) = ( 0, 0 )
			p = get_pos_idx_rate( i, rate )
			lst.append( p )
			while i < n:
				f = lambda r: v_len( get_pos_idx_rate( i, r ) - lst[ -1 ] ) - dist
				rate = newton( f, rate, 1.0, 0.001 )
				if rate is None:
					i += 1
					rate = 0
				else:
					p = get_pos_idx_rate( i, rate )
					lst.append( p )
			p = get_pos_idx_rate( n - 1, 1.0 )
			lst.append( p )
		else:
			for i in range( n ):
				(A, B, C, D, L) = ABCDLs[ i ]
				m = max( int( L / dist ), 1 )
				for j in range( m ):
					t = L * j / m
					lst.append( get_pos( A, B, C, D, t ) )
				if i == n - 1:
					lst.append( get_pos( A, B, C, D, L ) )
		return lst

	return empty.new( locals() )


def get_plane_from_points(ps):
	pl_p = np.mean( ps, axis=0 )

	adj = ps - pl_p
	mat = np.cov( adj.T )
	(w, v) = np.linalg.eig( mat )

	sort = w.argsort()[::-1]
	#w = w[ sort ]
	v = v[ : , sort ]

	pl_v = v[ : , 2 ]

	if pl_v.dtype == np.complex128:
		return None  # !

	return ( pl_p, pl_v )

# EOF

numpy関連です。

ですが、今のところnumpyらしからぬ内容...

2020/NOV/27

numpyらしく、 ベクトル[ x, y, z ] と4元数[ x, y, z, w ]関連を追加しました。

2021/JAN/28

bytesデータのsave,load関連を追加しました。

2021/JAN/29

上記save,load関連にパス表示機能を追加しました。

2021/MAR/23

追加しました。

curve_new()を互換性を保ちながら改良しました。

2021/APR/06

追加しました。

2021/JUN/30

追加しました。

to_bytes_head(b)

引数bにbytes型のデータを指定します。

データのバイト数の情報を、ヘッダとしてbの先頭に付けて返します。

from_bytes_head(b)

引数bには、to_bytes_head()が返す形式のbytes型を指定します。

bの先頭のバイト数の情報から直後のデータを取り出します。

取り出したデータと、残った部分とのタプルの形式で返します。

to_bytes_dtype(lst, dtype)

引数lstにはnumpy.array()で変換可能なリストを指定します。

引数dtypeにはnumpy.array()で指定する型を指定します。

lstをdtype型を指定してnumpy.array()で変換し、 さらに.tobytes()メソッドでbytes型のデータに変換します。

変換したbytes型データをさらに、 to_bytes_head()でバイト数の情報を先頭に付けて、返します。

from_bytes_dtype(b, dtype)

引数bには、to_bytes_dtype()が返す形式のbytes型を指定します。

引数dtypeにはnumpy.frombuffer()で指定する型を指定します。

bの先頭のバイト数の情報から直後のデータ部分と、残りの部分に分離します。

データ部分をdtypeを指定してnumpy.frombuffer()でnumpy.arrayに変換します。

変換結果のarrayは読み出し専用なので、 .copy()メソッドで書き込み可能なものに変換します。

変換結果と、残った部分とのタプルの形式で返します。

to_bytes_yaml( o )

引数oにはYAML形式に変換可能な辞書やリストなどを指定します。

oをbytes型のデータに変換します。

変換したbytes型データをさらに、 to_bytes_head()でバイト数の情報を先頭に付けて、返します。

from_bytes_yaml( b )

引数bには、to_bytes_yaml()が返す形式のbytes型を指定します。

bの先頭のバイト数の情報から直後のデータ部分と、残りの部分に分離します。

データ部分をto_bytes_yaml()で変換する前の辞書やリストに戻します。

変換結果と、残った部分とのタプルの形式で返します。

to_bytes_arr( arr )

引数arrにはnumpy.array()の配列を指定します。

arrのdtypeやshapeの情報を含めてbytes型のデータに変換します。

変換したbytes型データをさらに、 to_bytes_head()でバイト数の情報を先頭に付けて、返します。

from_bytes_arr( b )

引数bには、to_bytes_arre()が返す形式のbytes型を指定します。

bの先頭のバイト数の情報から直後のデータ部分と、残りの部分に分離します。

データ部分をto_bytes_yaml()で変換する前の配列に戻します。

変換結果のarrayは読み出し専用なので、 .copy()メソッドで書き込み可能なものに変換します。

変換結果と、残った部分とのタプルの形式で返します。

save_bytes(path, b, show=True)

引数pathに書き込むファイルのパスを指定します。

引数bにbytes型のデータを指定します。

bをpathのファイルに書き込みます。

showがTrueならば、dbg.start_save()を使ってパスを表示します。

load_bytes(path, show=True)

引数pathに読み出すファイルのパスを指定します。

pathのファイルを読み出し、bytes型のデータとして返します。

showがTrueならば、dbg.start_load()を使ってパスを表示します。

save_bytes_dtype(path, lst, dtype, show=True)

引数pathに書き込むファイルのパスを指定します。

引数lstにはnumpy.array()で変換可能なリストを指定します。

引数dtypeにはnumpy.array()で指定する型を指定します。

lstとdtypeを指定してto_bytes_dtype()を呼び出し、 lstをbytes型に変換します。

pathと変換したbytes型のデータを指定してsave_bytes()を呼び出して、 pathのファイルに書き込みます。

showがTrueならば、dbg.start_save()を使ってパスを表示します。

load_bytes_dtype(path, dtype, show=True)

引数pathに読み出すファイルのパスを指定します。

引数dtypeにはnumpy.frombuffer()で指定する型を指定します。

pathを指定してload_bytes()を呼び出して、 pathのファイルを読み出し、bytes型のデータを取得します。

取得したbytes型のデータとdtypeを指定して、 from_bytes_dtype()を呼び出し、 numpy.arrayに変換した結果と、残った部分とのタプルの形式で返します。

showがTrueならば、dbg.start_save()を使ってパスを表示します。

write_bytes_head( f, b )

引数fには書き込み可能としてオープンしたファイルを指定します。

引数bにbytes型のデータを指定します。

to_bytes_head()でbにデータのバイト数情報を付加し、ファイルfに書き込みます。

read_bytes_head( f )

引数fには先頭にバイト数情報があり、 読み出し可能としてオープンしたファイルを指定します。

fから先頭のバイト数情を読み出した後、 以降そのバイト数分のデータを読み出し、bytes型のデータで返します。

save(path, o, s_yaml)

オブジェクトoの指定の属性(アトリビュート)を、指定の形式でpathのファイルに保存します。

ファイル保存する「値」は、一旦numpyのndarrayに変換した後、 ndarrayのtobytes()でバイト列のバイナリに変換してファイルに保存します。

保存したいoの属性(アトリビュート)の名前や形式を、 YAML形式の文字列s_yamlで指定します。

YAML形式の文字列s_yamlの基本は、リスト形式で要素は辞書です。

辞書では属性の名前をキーnameで、 保存するときのnumpyの型をキーtypeで文字列で指定します。

簡単な例

例えば

o = empty.new()
o.n = 3
o.pos = empty.new()
o.pos.x = 12.3
o.pos.y = 45.6

o.nをnp.int32で、o.posのx,yをnp.float32で保存したいとします。

YAML形式の文字列s_yamlは

- name: n
  type: np.int32
- name:pos
  type:
  - name: x
    type: np.float32
  - name: y
    type: np.float32

- { name: n, type: np.int32 }
- { name: pos, type: [ { name: x, type: np.float32 }, { name: y, type: np.float32 } ] }

あるいは

- { name: n, type: np.int32 }
- name: pos
  type:
  - { name: x, type: np.float32 }
  - { name: y, type: np.float32 }

を与えます。

ファイルの先頭には、 指定したYAML文字列の情報に、それぞれの属性のデータの個数情報を追加したYAML文字列を、 ヘッダとして書き込みます。

ファイルの先頭行には、ヘッダのYAML文字列の行数をアスキー形式で書き込みます。

本体のデータは、ヘッダ以降に、バイナリ形式で書き込まれます。

path に /tmp/smp1.dat を指定してsave()を実行すると

$ ls -l /tmp/smp1.dat
-rw-r--r--  1 kondoh  wheel  153  5 16 20:35 /tmp/smp1.dat
$ head -1 /tmp/smp1.dat
11

$ head -12 /tmp/smp1.dat
11
- n: 1
  name: n
  type: np.int32
- name: pos
  type:
  - n: 1
    name: x
    type: np.float32
  - n: 1
    name: y
    type: np.float32
$ tail +13 /tmp/smp1.dat | hd
0000000 03 00 00 00 cd cc 44 41 66 66 36 42
000000c

$ hd /tmp/smp1.dat
0000000 31 31 0a 2d 20 6e 3a 20 31 0a 20 20 6e 61 6d 65
0000010 3a 20 6e 0a 20 20 74 79 70 65 3a 20 6e 70 2e 69
0000020 6e 74 33 32 0a 2d 20 6e 61 6d 65 3a 20 70 6f 73
0000030 0a 20 20 74 79 70 65 3a 0a 20 20 2d 20 6e 3a 20
0000040 31 0a 20 20 20 20 6e 61 6d 65 3a 20 78 0a 20 20
0000050 20 20 74 79 70 65 3a 20 6e 70 2e 66 6c 6f 61 74
0000060 33 32 0a 20 20 2d 20 6e 3a 20 31 0a 20 20 20 20
0000070 6e 61 6d 65 3a 20 79 0a 20 20 20 20 74 79 70 65
0000080 3a 20 6e 70 2e 66 6c 6f 61 74 33 32 0a 03 00 00
0000090 00 cd cc 44 41 66 66 36 42
0000099

などと書き込まれます。

リストの場合

リストのデータを書き出す場合は、YAML文字列の型をキー'type_list'を使って指定します。

内部でnumpyのndarrayを使っているので、リストの要素は全て同じ型で書き出されます。

o = empty.new()
o.int_0_9 = list( range( 10 ) )
o.v_lst = [ 1, 0.5, 99 ]

YAML文字列を

- { name: int_0_9, type_list: np.int32 }
- { name: v_lst, type_list: np.float32 }

path に /tmp/smp2.dat を指定してsave()を実行すると

$ head -1 /tmp/smp2.dat
6

$ head -7 /tmp/smp2.dat
6
- n: 10
  name: int_0_9
  type_list: np.int32
- n: 3
  name: v_lst
  type_list: np.float32

$ tail +8 /tmp/smp2.dat | hd
0000000 00 00 00 00 01 00 00 00 02 00 00 00 03 00 00 00
0000010 04 00 00 00 05 00 00 00 06 00 00 00 07 00 00 00
0000020 08 00 00 00 09 00 00 00 00 00 80 3f 00 00 00 3f
0000030 00 00 c6 42
0000034

$ hd /tmp/smp2.dat
0000000 36 0a 2d 20 6e 3a 20 31 30 0a 20 20 6e 61 6d 65
0000010 3a 20 69 6e 74 5f 30 5f 39 0a 20 20 74 79 70 65
0000020 5f 6c 69 73 74 3a 20 6e 70 2e 69 6e 74 33 32 0a
0000030 2d 20 6e 3a 20 33 0a 20 20 6e 61 6d 65 3a 20 76
0000040 5f 6c 73 74 0a 20 20 74 79 70 65 5f 6c 69 73 74
0000050 3a 20 6e 70 2e 66 6c 6f 61 74 33 32 0a 00 00 00
0000060 00 01 00 00 00 02 00 00 00 03 00 00 00 04 00 00
0000070 00 05 00 00 00 06 00 00 00 07 00 00 00 08 00 00
0000080 00 09 00 00 00 00 00 80 3f 00 00 00 3f 00 00 c6
0000090 42
0000091
$

辞書の場合

今のところ対応してません。

ですが、 #empty.py の from_dic(o) を使えば、 辞書を再帰的に探索してEmptyクラスのオブジェクトに変換できます。

オブジェクトに変換してから、対応するYAML文字列を作成すれば、可能かと。

load(path)

save(path, o, s_yaml) で保存したファイルを読み込んで、 Emptyクラスのオブジェクトを生成して返します。

アトリビュートの値は、 保存するときに指定した、numpyの型の値、 あるいはその型のnumpy.ndarrayでセットされます。

ファイルのヘッダについてるYAML形式の文字列は、 特に返りません。

保存しなおす際に必要であれば、info(path)を使って取得できます。

簡単な例

save()の例で保存した/tmp/smp1.datの場合

>>> import np_ut

>>> o = np_ut.load( '/tmp/smp1.dat' )

>>> o.n
3
>>> o.pos.x
12.3
>>> o.pos.y
45.6

>>> import empty

>>> empty.to_dic( o )
{'n': 3, 'pos': {'x': 12.3, 'y': 45.6}}

リストの場合

save()の例で保存した/tmp/smp2.datの場合

>>> import np_ut

>>> o = np_ut.load( '/tmp/smp2.dat' )

>>> o.int_0_9
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)

>>> o.int_0_9[3]
3

>>> o.v_lst
array([ 1. ,  0.5, 99. ], dtype=float32)

>>> o.v_lst[1]
0.5

>>> import empty

>>> empty.to_dic( o )
{'int_0_9': array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32), \
'v_lst': array([ 1. ,  0.5, 99. ], dtype=float32)}

info(path)

このような一般的な名前の関数にしてしまって、大丈夫だろうか... (^_^;

save(path, o, s_yaml) で保存したファイルのヘッダ部分を読み込んで、 YAML形式の文字列と、ファイル中のバイナリデータが始まるオフセットバイト数を、 タプルにして返します。

save()の例で保存した/tmp/smp1.datの場合

>>> import np_ut

>>> (s_yaml, pos) = np_ut.info( '/tmp/smp1.dat' )

>>> print( s_yaml )
- n: 1
  name: n
  type: np.int32
- name: pos
  type:
  - n: 1
    name: x
    type: np.float32
  - n: 1
    name: y
    type: np.float32

>>> pos
141

save()の例で保存した/tmp/smp2.datの場合

>>> (s_yaml, pos) = np_ut.info( '/tmp/smp2.dat' )

>>> pos
93

>>> print( s_yaml )
- n: 10
  name: int_0_9
  type_list: np.int32
- n: 3
  name: v_lst
  type_list: np.float32

>>>

サンプルプログラム

np_ut_test.py
#!/usr/bin/env python

import empty
import np_ut
import arg
import dbg

def data():
	o = empty.new()
	o.header = empty.new()
	o.header.seq = 1
	o.header.stamp = empty.new()
	o.header.stamp.secs = 2
	o.header.stamp.nsecs = 3

	p0 = empty.new()
	p0.stamp = empty.new()
	p0.stamp.secs = 4
	p0.stamp.nsecs = 5
	p0.data = b'\x06\x07\x08'

	p1 = empty.new()
	p1.stamp = empty.new()
	p1.stamp.secs = 9
	p1.stamp.nsecs = 10
	p1.data = b'\x0b\x0c\x0d'

	o.packets = [ p0, p1 ]

	s_yaml = '''
	- name: header
	  type:
	  - { name: seq, type: np.uint32 }
	  - name: stamp
	    type:
	    - { name: secs, type: np.int32 }
	    - { name: nsecs, type: np.int32 }
	- name: packets
	  type_list:
	  - name: stamp
	    type:
	    - { name: secs, type: np.int32 }
	    - { name: nsecs, type: np.int32 }
	  - { name: data, type_list: np.uint8 }
	'''.replace( '\t', '' )

	return ( o, s_yaml )

def data_1():
	o = empty.new()
	o.n = 3
	o.pos = empty.new()
	o.pos.x = 12.3
	o.pos.y = 45.6

	s_yaml = '''
	- { name: n, type: np.int32 }
	- name: pos
	  type:
	  - { name: x, type: np.float32 }
	  - { name: y, type: np.float32 }
	'''.replace( '\t', '' )

	return ( o, s_yaml )

def data_2():
	o = empty.new()
	o.int_0_9 = list( range( 10 ) )
	o.v_lst = [ 1, 0.5, 99 ]

	s_yaml = '''
	- { name: int_0_9, type_list: np.int32 }
	- { name: v_lst, type_list: np.float32 }
	'''.replace( '\t', '' )

	return ( o, s_yaml )

def test_obj_yaml(sample=None):
	s = 'data'
	if sample:
		s = 'data_{}'.format( sample )
	f = eval( s )
	return f()

def test_save(path, sample):
	(o, s_yaml) = test_obj_yaml( sample )
	np_ut.save( path, o, s_yaml )

def test_info(path):
	(s_yaml, pos) = np_ut.info( path )
	dbg.out( s_yaml )
	dbg.out( 'pos={}'.format( pos ) )

def test_load(path, sample):
	o = np_ut.load( path )

	d = empty.to_dic( o )
	dbg.out( 'd={}'.format( d ) )

	(o_, s_yaml) = test_obj_yaml( sample )
	d_ = empty.to_dic( o_ )
	dbg.out( 'd_={}'.format( d_ ) )

if __name__ == "__main__":
	cmds = [
		arg.cmd_new( 'test_save', [ 'path' ], opts={ 'sample': None } ),
		arg.cmd_new( 'test_info', [ 'path' ] ),
		arg.cmd_new( 'test_load', [ 'path' ], opts={ 'sample': None } ),
	]
	cmd = arg.get_name_args( cmds )

	f = eval( cmd.name )
	f( *cmd.args, **cmd.opts )
# EOF

$ ./np_ut_test.py
Usage: ./np_ut_test.py cmd arg ...
  test_save [ -sample None ] path
  test_info path
  test_load [ -sample None ] path
$ ./np_ut_test.py test_save /tmp/hoge.dat

$ ls -l /tmp/hoge.dat
-rw-r--r--  1 kondoh  wheel  437  5 16 23:37 /tmp/hoge.dat
$ ./np_ut_test.py test_info /tmp/hoge.dat
- name: header
  type:
  - n: 1
    name: seq
    type: np.uint32
  - name: stamp
    type:
    - n: 1
      name: secs
      type: np.int32
    - n: 1
      name: nsecs
      type: np.int32
- n: 2
  name: packets
  type_list:
  - name: stamp
    type:
    - n: 1
      name: secs
      type: np.int32
    - n: 1
      name: nsecs
      type: np.int32
  - n: 3
    name: data
    type_list: np.uint8

pos=403
$ ./np_ut_test.py test_load /tmp/hoge.dat
d={'header': {'seq': 1, 'stamp': {'secs': 2, 'nsecs': 3}}, \
'packets': [{'stamp': {'secs': 4, 'nsecs': 5}, 'data': b'\x06\x07\x08'}, \
{'stamp': {'secs': 9, 'nsecs': 10}, 'data': b'\x0b\x0c\r'}]}
d_={'header': {'seq': 1, 'stamp': {'secs': 2, 'nsecs': 3}}, \
'packets': [{'stamp': {'secs': 4, 'nsecs': 5}, 'data': b'\x06\x07\x08'}, \
{'stamp': {'secs': 9, 'nsecs': 10}, 'data': b'\x0b\x0c\r'}]}

以降2020/NOV/27 追加

ベクトル[ x, y, z ] と4元数[ x, y, z, w ]関連

arr_new(*args)

複数の引数の数値をnp.array()にまとめます。

arr_new = lambda *args: np.array( args )

次の4つの値を定数的に用意しております。

p000 = arr_new( 0, 0, 0 )
v100 = arr_new( 1, 0, 0 )
v010 = arr_new( 0, 1, 0 )
v001 = arr_new( 0, 0, 1 )

v_len(v)

ベクトルvの長さを返します。

np.dot() を使ってます。

v_len = lambda v: np.dot( v, v ) ** 0.5

あるいは

v_len = lambda v: ( v ** 2 ).sum() ** 0.5

にしようか迷いましたが、とりあえず前者の実装です。

v_unit(v)

ベクトルvの単位ベクトルを返します。

vの長さが0ならば、[ 0, 0, 1 ] を返します。

cross_unit(va, vb)

ベクトルva, vbの外積の向きの単位ベクトルを返します。

cross_y(vx)

[ 0, 0, 1 ]とvxとの外積の向きの単位ベクトルを返します。

cross_y = lambda vx: cross_unit( v001, vx )

cross_z(vx)

vxとcross_y(vx)との外積の向きの単位ベクトルを返します。

cross_z = lambda vx: cross_unit( vx, cross_y( vx ) )

get_dist_lst(ps, p)

点群psと点pとの距離のリストを返します。

q4_rev(q4)

4元数q4の逆回転のq4を返します。

q4_mat(q4)

4元数q4の回転行列を返します。

q4_rot(q4, q4_targ)

4元数q4_targを、4元数q4で回転した4元数を返します。

q4_rot_v(q4, v)

ベクトルvを、4元数q4で回転したベクトルを返します。

q4_cnv(targ_q4, base_q4)

4元数q4_targを、4元数base_q4回転座標系から見たときの 4元数に変換して返します。

pos_cnv(targ_pos, base_pos, base_q4)

位置座標targ_posを、原点base_posのbase_q4回転座標系から見たときの 位置座標に変換して返します。

pose_cnv(targ_pos, targ_q4, base_pos, base_q4)

原点targ_posのtarg_q4回転座標系を、 原点base_posのbase_q4回転座標系から見たときの 座標系に変換して、変換後の原点と回転座標系を返します。

q4_icnv(targ_q4, base_q4)

q4_cnv()の逆変換です。

pos_icnv(targ_pos, base_pos, base_q4)

pos_icnv()の逆変換です。

pose_icnv(targ_pos_on_base, targ_q4_on_base, base_pos, base_q4)

pose_cnv()の逆変換です。

q4_new_v_arg(v, arg)

ベクトルvを軸としてargラジアン回転する4元数を返します。

vの長さが0のときは、回転なしの[ 0, 0, 0, 1 ]を返します。

ypr_to_q4(yaw, pitch, roll)

Z軸回転yaw, Y軸回転pitch, X軸回転rollをする4元数を返します。

引数の単位はラジアンです。

q4_to_ypr(q4)

ypr_to_q4の逆変換です。

4元数q4の(yaw, pitch, roll)タプルを返します。

返り値の単位はラジアンです。

line_plane_cross(line_p, line_v, plane_p, plane_v)

位置line_pを通る方向ベクトルがline_vの直線と、 位置plane_pを通る法線ベクトルがplane_vの平面との、 交点の位置座標を返します。

line_plane_cross_point( li, pl )

line_plane_cross() と同様ですが、 引数を[ p, v ]の形式に変更して、 交点が無い場合はNoneを返すように変更しました。

plane2_cross_line( pl_a, pl_b )

2つの平面の交線を返します。

返す平面は[ p, v ]の形式です。

交線が無い場合はNoneを返します。

plane3_cross_point( pl3 )

3つの平面の交点を返します。

交点が無い場合はNoneを返します。

point3_circle_point( p3 )

3つの点を通る円弧の中心の位置座標を返します。

引数 p3 は [ p0, p1, p2 ] の形式で与えます。

point3_circle_touch_v( p3 )

3つの点を通る円弧を求め、各点の円弧に接っする方向ベクトルを返します。

返すベクトルの方向は、p3の最初の点が次の点に向かう方向に従います。

point_plane_len(p, plane_p, plane_v)

位置pと 位置plane_pを通る法線ベクトルがplane_vの平面との、 距離を返します。

get_plane_z( pl_p, pl_v, x, y )

位置pl_pを通る法線ベクトルがpl_vの平面上で、 X座標がx、Y座標がyのときのZ座標を返します。

pl_vがZ軸に垂直の場合、0割り算のエラーが発生するので、 呼び出し前のpl_vの確認は自己責任で。;-p)

get_arg_from_line_p2(line_p, line_v, ps, pe)

位置line_pを通る方向ベクトルline_vの直線と、位置psを含む平面を、 位置line_pを通る方向ベクトルline_vの直線と、位置peを含む平面まで、 直線を軸に回転させたときの、回転角度を返します。

返り値の単位はラジアンです。

get_q4_from_vx_vy(vx, vy)

X軸をvxベクトルに、Y軸をvyベクトルに回転させたときの、回転座標系の4元数を返します。

newton( f, a, b, lmt )

引数fには数値を1つ引数にとり、数値を返す関数を指定します。

引数aには、関数fに与える引数を指定します。

引数bには、関数fに与える引数を指定します。

引数lmtにはaとbが同一とみなせる時の差を指定します。

ニュートン法で f(c) = 0 となるcを求めて返します。

ただし、f(a)とf(b)の結果は符号が同一の場合は、Noneを返します。

curve_new(plst, vs=None, ve=None)

引数の位置座標のリストを通過する滑らかな曲線で補間した、座標を作成するためのオブジェクトを返します。

引数plstは [ [ x, y, z ], [ x, y, z], ... ] の形式の位置座標のリストを与えます。

引数vs, veは、始点、終点の方向を示すベクトルを指定します。

指定しなければ、従来通り端の3点から算出した傾きを使用します。

オブジェクトのメソッド

get_pos_lst( dist=0.5, strict_spc=False )

plstを含む曲線の位置座標のリストを返します。

distで各点間のおよその距離を指定します。

デフォルトのstrict_spc=Falseの場合は、従来通り plstの点を全て含むリストを返します。

strict_spc=Trueの場合は、 点の感覚がなるべくdistの距離となるようなリストを返します。 そのため、plstの点を必ずしも全て含みません。

get_plane_from_points(ps)

点群psにフィッティングする平面( pl_p, pl_v )を返します。


snap_ut.py

snap_ut.py
#!/usr/bin/env python

import os

import empty
import tm_ut
import base
import cmd_ut

to_str = base.to_str
cmd_call = lambda cmd: cmd_ut.call( cmd, b2s=True )
cmd_lst = lambda cmd: sorted( cmd_call( cmd ).strip().split('\n') )


# fs

# #define S_IFMT 0170000           /* type of file */
# #define        S_IFIFO  0010000  /* named pipe (fifo) */
# #define        S_IFCHR  0020000  /* character special */
# #define        S_IFDIR  0040000  /* directory */
# #define        S_IFBLK  0060000  /* block special */
# #define        S_IFREG  0100000  /* regular */
# #define        S_IFLNK  0120000  /* symbolic link */
# #define        S_IFSOCK 0140000  /* socket */
# #define        S_IFWHT  0160000  /* whiteout */
# #define S_ISUID 0004000  /* set user id on execution */
# #define S_ISGID 0002000  /* set group id on execution */
# #define S_ISVTX 0001000  /* save swapped text even after use */
# #define S_IRUSR 0000400  /* read permission, owner */
# #define S_IWUSR 0000200  /* write permission, owner */
# #define S_IXUSR 0000100  /* execute/search permission, owner */

mode_type_dic = {
	'S_IFMT':   0o170000,
	'S_IFIFO':  0o010000,
	'S_IFCHR':  0o020000,
	'S_IFDIR':  0o040000,
	'S_IFBLK':  0o060000,
	'S_IFREG':  0o100000,
	'S_IFLNK':  0o120000,
	'S_IFSOCK': 0o140000,
	'S_IFWHT':  0o160000,
	'S_ISUID':  0o004000,
	'S_ISGID':  0o002000,
	'S_ISVTX':  0o001000,
	'S_IRUSR':  0o000400,
	'S_IWUSR':  0o000200,
	'S_IXUSR':  0o000100,
}

def is_mode_type(mode, name):
	mask = mode_type_dic.get( 'S_IFMT', 0 )
	v = mode_type_dic.get( name, 0 )
	return mode & mask == v

is_mode_dir = lambda mode: is_mode_type( mode, 'S_IFDIR' )
is_mode_slink = lambda mode: is_mode_type( mode, 'S_IFLNK' )


def fs_new(mode, sec, size, path):
	def to_txt():
		lst = ( oct( mode ), tm_ut.sec_to_str( sec, tm_ut.sample ), str( size ), path )
		return to_str( lst, delim=' ' )

	def eq_mode(o_mode):
		(a, b) = ( mode, o_mode )

		is_allow_grp = ( lambda :
			( is_mode_slink( a ) and is_mode_slink( b ) ) or
			( is_mode_type( a, 'S_IFREG' ) and is_mode_type( b, 'S_IFREG' ) ) )

		if is_allow_grp():
			# 0120777 2018-10-13_00.36.06 13 argmnt/ido.sh
			# 0o120755 2018-10-13_00.36.06 13 argmnt/ido.sh
			mask_clr = 0o22
			a &= ~mask_clr
			b &= ~mask_clr

		return a == b

	eq = lambda o: eq_mode( o.mode ) and o.sec == sec and o.size == size and o.path == path

	return empty.new( locals() )

def fs_new_path(dir_path, path):
	st = os.lstat( os.path.join( dir_path, path ) )
	return fs_new( st.st_mode, int( st.st_mtime ), st.st_size, path )

def fs_new_str(s):
	lst = s.split( ' ' )
	(mode, sec, size, path) = lst[:3] + [ to_str( lst[3:], delim=' ' ) ]
	return fs_new( int( mode, 8 ), tm_ut.str_to_sec( sec ), int( size ), path )


# snap

def new_fss(fss, sec):
	get_paths = lambda : list( map( lambda fs: fs.path, fss ) )
	get_txt = lambda: to_str( map( lambda fs: fs.to_txt(), fss ), last=True )

	def write(fn):
		s = tm_ut.sec_to_str( sec, tm_ut.sample ) + '\n' + get_txt()
		with open( fn, 'w' ) as f:
			f.write( s )

	return empty.new( locals() )

def new_dir(dir_path):
	sec = tm_ut.now_sec()
	dir_paths = cmd_lst( 'find {} -type f -o -type l'.format( dir_path ) ) # files and symlinks
	dir_paths = filter( lambda s: s, dir_paths )
	paths = list( map( lambda s: os.path.relpath( s, start=dir_path ), dir_paths ) )
	fss = list( map( lambda path: fs_new_path( dir_path, path ), paths ) )
	return new_fss( fss, sec )

def new_fn(fn):
	s = ''
	with open( fn, 'r' ) as f:
		s = f.read().strip()
	lst = s.split( '\n' )
	(sec, lst) = ( tm_ut.str_to_sec( lst[ 0 ] ), lst[ 1: ] )
	fss = list( map( fs_new_str, lst ) )
	return new_fss( fss, sec )

def cmp_new(snap_old, snap_new):
	old = snap_old.get_paths()
	new = snap_new.get_paths()

	rm = list( filter( lambda p: p not in new, old ) )
	add = list( filter( lambda p: p not in old, new ) )
	com = list( filter( lambda p: p in old, new ) )

	fss_old = snap_old.fss
	fss_new = snap_new.fss
	fss_old_com = list( filter( lambda fs: fs.path in com, fss_old ) )
	fss_new_com = list( filter( lambda fs: fs.path in com, fss_new ) )

	(same, chg) = ( [], [] )
	for (fs_a, fs_b) in zip( fss_old_com, fss_new_com ):
		if fs_a.eq( fs_b ):
			same.append( fs_b.path )
		else:
			chg.append( fs_b.path )

	is_same = lambda : not ( rm + add + chg )

	e = empty.new()

	get_k_str = lambda k: to_str( getattr( e, k, '' ), pre=k+' ' )

	def get_ks_str(ks):
		ks = filter( lambda k: getattr(e, k), ks )
		return to_str( map( get_k_str, ks ) )

	rm_add_chg_str = lambda : get_ks_str( ( 'rm', 'add', 'chg' ) )

	return empty.to_attr( e, locals() )
# EOF

ファイルのサイズや日付などの情報関連です。

fs_new(mode, sec, size, path)

引数のモード、日付時刻用の秒数、サイズ、パス名をまとめてオブジェクトにして返します。

主なメソッド

メソッド 内容
to_txt() オブジェクトの内容を文字列にして返します
eq(o) 引数oの他のオブジェクトと内容が同一か比較して結果を返します

eq(o)のmode判定

2020/MAY/17

シンボリックリンクの場合のモードの一致判定を、ゆるくしました。

グループとその他の書き込み権限のビットは、無視して判定するようにしました。

簡易なファイルツリーのバージョン管理ツール(gitの真似事) 2020春 からの利用で不具合が出たので、とりあえずここでの判定を緩めて対応してます。

不具合は、リンク切れのシンボリックリンクを、別の環境にtarで展開した時に、 上記のビットの違いが出て、更新されたように判定されてしまう現象です。

2020/MAY/20

追加したコメント部分の影響で、動作しない環境がありました。

コメント箇所のみ修正しました。

2020/OCT/30

環境によってシンポリックリンク以外でも、ちょいちょい eq_mode()でgroup write bitの違いでひっかかってました。 通常のファイルの場合も、シンボリックリンク同様に判定を緩めました。

fs_new_path(dir_path, path)

dir_path と path を連結したパスについてlstatコマンドを実行して、 モード、日付時刻、サイズの情報を取得し、 fs_new() を呼び出してオブジェクトを返します。

>>> import snap_ut

>>> fs = snap_ut.fs_new_path( '.', 'snap_ut.py' )

>>> fs.to_txt()
'0o100644 2020-04-05_17.34.10 2714 snap_ut.py'

fs_new_str(s)

文字列sから情報を取得して、fs_new()を呼び出してブジェクトを返します。 文字列sには、fs_new()で返るオブジェクトのto_txt()メソッドで返る形式の文字列を指定します。

>>> fs = snap_ut.fs_new_path( '.', 'snap_ut.py' )

>>> fs2 = snap_ut.fs_new_str( '0o100644 2020-04-05_17.34.10 2714 snap_ut.py' )

>>> fs.eq( fs2 )
True

new_fss(fss, sec)

fs_new()で返るオブエクトのリストfssと、日付時刻の秒数secから、 スナップショットオブジェクトを生成して返します。

主なメソッド

メソッド 内容
write(fn) スナップショットの内容をファイル名fnで書き出します。
先頭行は sec から生成した日付時刻の文字列になります。
以降は、リストのfsのto_txt()による文字列になります。

new_dir(dir_path)

ディレクトリ dir_path 以下のファイルとシンボリック・リンクについて、 new_fss()を呼び出しててスナップを生成して返します。

>>> snap = snap_ut.new_dir( '.' )

>>> snap.write( 'snap-1' )

>>> ^D

$ cat snap-1
2020-04-05_21.02.09
0o100644 2020-02-01_12.25.46 846 arg.py
0o100644 2020-04-05_11.38.34 791 base.py
0o100755 2020-02-01_14.53.38 7724 chat_conn.py
0o100755 2020-04-01_00.32.11 3181 cmd_ut.py
0o100755 2020-02-15_12.42.12 468 dbg.py
  :

new_fn(fn)

ファイル名fnの内容を読み込み、 new_ss()を呼び出しててスナップを生成して返します。

>>> snap = snap_ut.new_fn( 'snap-1' )

>>> snap.sec
1586088129.0

>>> snap.fss[10].to_txt()
'0o100755 2020-02-01_14.53.38 7724 chat_conn.py'

cmp_new(snap_old, snap_new)

あるディレクトリ以下の、古いスナップショットと新しいスナップショットの内容を比較して、 その結果のオブジェクトを返します。

rmには削除されたファイルパスのリストがセットされます。

addには追加されたファイルパスのリストがセットされます。

chgには内容が更新されたファイルパスのリストがセットされます。

主なメソッド

メソッド 内容
is_same() 古いスナップショットと新しいスナップショットが、同じか判定して返します。
rm, add, chgのリストが全て空であれば、同じ内容であるとしてTrueを返します。


snd_ut.py

snd_ut.py
#!/usr/bin/env python

import os
import yaml
import numpy as np

import empty
import thr
import cmd_ut
import yaml_ut
import dbg

thr_gc = thr.gc_new()

silent = False

def dbg_out(s, tail='\n'):
	if not silent:
		dbg.out( s, tail )

def wait_msg_new(msg, tail='OK'):
	dbg_out( msg + ' ', '' )
	th = thr.loop_new( dbg_out, ( '.', '' ), wait_tmout=1.0, gc=thr_gc )
	th.start()

	def stop():
		th.stop()
		dbg_out( ' ' + tail )

	return empty.new( locals() )


def cmd_wait_msg(cmd, msg):
	wait_msg = wait_msg_new( msg )
	cmd_ut.call( cmd )
	wait_msg.stop()


def rm(name):
	cmd = 'rm -f ' + name
	cmd_wait_msg( cmd, 'del ' + name )


def load_bytes(name):
	wait_msg = wait_msg_new( 'load ' + name )
	b = bytes( [] )
	with open( name, 'rb' ) as f:
		b = f.read()
	wait_msg.stop()
	return b

def load_bytes_part( name, from_byte=None, to_byte=None ):
	if from_byte is None and to_byte is None:
		return load_bytes( name )

	if from_byte is None:
		from_byte = 0

	if to_byte is None:
		st = os.lstat( name )
		to_byte = st.st_size

	b = bytes( [] )
	with open( name, 'rb' ) as f:
		if from_byte > 0:
			f.seek( from_byte )
		b = f.read( to_byte - from_byte )
	return b

def save_bytes(name, b):
	wait_msg = wait_msg_new( 'save ' + name )
	with open( name, 'wb' ) as f:
		f.write( b )
	wait_msg.stop()


def cvt(name, from_ext, to_ext, raw_opt=''):
	cmd = ''

	if from_ext == 'mp3' and to_ext == 'wav':
		cmd = 'lame --decode {}.mp3 > /dev/null 2>&1'.format( name )

	elif from_ext == 'wav' and to_ext == 'raw':
		cmd = 'sox {}.wav {}.raw'.format( name, name )

	elif from_ext == 'wav' and to_ext == 'inf':
		cmd = 'sox --info {}.wav > {}.inf'.format( name, name )

	elif from_ext == 'raw' and to_ext == 'wav':  # raw_opt
		cmd = 'sox {} {}.raw {}.wav'.format( raw_opt, name, name )

	elif from_ext == 'wav' and to_ext == 'mp3':
		cmd = 'lame {}.wav {}.mp3 > /dev/null 2>&1'.format( name, name )

	if cmd:
		cmd_wait_msg( cmd, 'cvt {}.{} to {}.{}'.format( name, from_ext, name, to_ext ) )


def make(name, ext, direc='load', raw_opt=''):
	if os.path.exists( '{}.{}'.format( name, ext ) ):
		return

	rule = {
		'load': { 'raw': 'wav', 'inf': 'wav', 'wav': 'mp3' },
		'save': { 'mp3': 'wav', 'wav': 'raw' },
	}

	from_ext = rule.get( direc, {} ).get( ext )
	if not from_ext:
		dbg.err_exit( 'not found {}.{}'.format( name, ext ) )

	make( name, from_ext, direc, raw_opt )
	cvt( name, from_ext, ext, raw_opt )

	if direc == 'save':
		rm( '{}.{}'.format( name, from_ext ) )


def inf_new(name):
	make( name, 'inf', 'load' )

	dic = yaml_ut.load_fn( name + '.inf', {} )

	r = float( dic.get( 'Sample Rate', 44100 ) )

	b = 16
	s = dic.get( 'Precision' )
	if s.endswith( '-bit' ):
		b = int( s[ : -len( '-bit' ) ] )

	c = int( dic.get( 'Channels', 2 ) )

	e = 'signed-integer'
	s = dic.get( 'Sample Encoding', 'Signed' )
	if 'Signed' not in s:
		e = 'unsigned-integer'

	get_raw_opt = lambda r_rate=1.0: '-t raw -r {} -b {} -c {} -e {}'.format( r * r_rate, b, c, e )

	lst = dic.get( 'Duration', '' ).split()
	k = 'samples'
	i = lst.index( k ) if k in lst else -1
	smp_n = int( lst[ i - 1 ] ) if i > 0 else -1

	byte_per_smp = c * ( b // 8 )
	smp_sec = lambda sec: int( sec * r )
	smp_to_sec = lambda m: m / r
	byte_sec = lambda sec: smp_sec( sec ) * byte_per_smp
	byte_to_sec = lambda n: smp_to_sec( n // byte_per_smp )

	is_unsigned = lambda : e == 'unsigned-integer'
	get_dtype = lambda : '{}int{}'.format( 'u' if is_unsigned() else '', b )

	return empty.new( locals() )


def data_new(name_mp3):

	(name, mp3) = os.path.splitext( name_mp3 )

	inf = inf_new( name )

	def bytes_to_arr(b):
		arr = np.frombuffer( b, inf.get_dtype() )
		arr = np.array( arr, dtype='float64' )

		half = 2 ** ( inf.b - 1 )
		if inf.is_unsigned():
			arr -= half
		return arr / half

	def arr_to_bytes(arr):
		half = 2 ** ( inf.b - 1 )
		arr = arr * half

		arr = np.maximum( arr, -half )
		arr = np.minimum( arr, half - 1 )

		if inf.is_unsigned():
			arr += half

		arr = np.array( arr, dtype=inf.get_dtype() )
		return arr.tobytes()

	e = empty.new()
	e.raw_bytes = None
	e.exists_raw = False

	def make_raw():
		if not e.exists_raw:
			make( name, 'raw', 'load' )
			e.exists_raw = True

	def get_bytes_cache(start_sec=0, end_sec=-1):
		if e.raw_bytes == None:
			make_raw()
			e.raw_bytes = load_bytes( name + '.raw' )

		if start_sec == 0 and end_sec < 0:
			return e.raw_bytes

		start = inf.byte_sec( start_sec )
		end = inf.byte_sec( end_sec ) if end_sec >= 0 else len( e.raw_bytes )
		return e.raw_bytes[ start : end ]

	def get_bytes( start_sec=0, end_sec=-1, cache=True ):
		if cache:
			return get_bytes_cache( start_sec, end_sec )

		make_raw()
		from_byte = inf.byte_sec( start_sec )
		to_byte = inf.byte_sec( end_sec ) if end_sec >= 0 else None
		return load_bytes_part( name + '.raw', from_byte, to_byte )

	def get_arr(start_sec=0, end_sec=-1):
		b = get_bytes( start_sec, end_sec )
		return bytes_to_arr( b )

	def save_bytes_to(b, save_name):
		(sname, ext) = os.path.splitext( save_name )
		if not ext or ext not in ( 'raw', 'wav', 'mp3' ):
			ext = 'mp3'

		rm( save_name )
		save_bytes( sname + '.raw', b )
		if ext != 'raw':
			make( sname, ext, 'save', inf.get_raw_opt() )

	def save_arr(arr, save_name):
		b = arr_to_bytes( arr )
		save_bytes_to( b, save_name )

	return empty.add( e, locals() )


# EOF

音声ファイル関連です。

音楽ファイルの分割 2020冬

から、適当に切り出してみました。

soxコマンド、lameコマンドが使える環境が前提です。

問題なければ、そのうちにsnddiv.py側から本snd_ut.pyを使うように更新してみます。

snddiv 更新してみました。2020/SEP/23

data_new(name_mp3)

音声ファイルのパスをname_mp3に指定します。

パスの拡張子は小文字で '.mp3' または '.wav' を期待してます。

ファイルからデータを読み込み、dataオブジェクトを返します。

dataオブジェクトの主なメソッド

メソッド 内容
get_arr(start_sec=0, end_sec=-1) データをfloat64の値(-1から1の範囲)に変換して、numpyのarrayで返します。
秒範囲を指定すると、その範囲のデータを返します。
save_arr(arr, save_name) arrのデータをsave_nameのパスのファイルでセーブします。
パスの拡張子は'.mp3', '.wav', '.raw'を期待してます。
get_bytes(start_sec=0, end_sec=-1, cache=True) rawデータをbytesのリストで返します。
秒範囲を指定すると、その範囲のデータを返します。
cache=True(デフォルト)では、初回にファイル全体をメモリにロードして保持します。
cache=Falseを指定すると、毎回、指定の範囲をファイルからロードします。
save_bytes_to(b, save_name) rawデータbをsave_nameのパスのファイルでセーブします。
パスの拡張子は'.mp3', '.wav', '.raw'を期待してます。
bytes_to_arr(b) rawデータbをfloat64の値(-1から1の範囲)に変換して、numpyのarrayで返します。
arr_to_bytes(arr) arrのデータをrawデータに変換してbytesのリストで返します。

dataオブジェクトの.inf属性data.infには、 inf_new()関数で生成されたinfオブジェクトがセットされます。

infオブジェクトの主なメソッドと値

メソッド 内容
r サンプリング周波数
b ビット数
c チャンネル数
is_unsigned() データ形式が符合なしかどうかを返す
smp_n 全体のサンプリング数
byte_per_smp 1サンプルあたりのバイト数
smp_sec( sec ) sec秒あたりのサンプリング数を返す
smp_to_sec( m ) mサンプリング数あたりの秒数を返す
byte_sec( sec ) sec秒あたりのバイト数を返す
byte_to_sec( n ) nバイト数あたりの秒数を返す
get_dtype() 数値のデータ形式をnumpyで使用する次の文字列の形式で返す
'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32'
get_raw_opt( r_rate=1.0 ) soxコマンドで再生するときに指定するオプションの文字列を返す
r_rateで再速度の倍率を指定する

変数silent

初期値Falseの変数です。

Trueに変更すると、load, saveやコマンド実行時に、 標準出力にメッセージを出力しません。

サンプルプログラム

snd_ut_sample.py

#!/usr/bin/env python

import snd_ut

if __name__ == "__main__":

	data = snd_ut.data_new( 'ptmsk-14.mp3' )
	arr = data.get_arr()

	if data.inf.c == 2:
		arr = arr.reshape( ( -1, 2 ) )

	idxs = list( map( data.inf.smp_sec, [ 40, 50 ] ) )
	clip = arr[ idxs[0] : idxs[1] ]

	if data.inf.c == 2:
		clip = clip.reshape( ( -1, ) )

	data.save_arr( clip, 'clip.mp3' )

# EOF

実行例

$ wget http://kondoh.html.xdomain.jp/midi_mp3/ptmsk-14.mp3

$ ./snd_ut_sample.py
cvt ptmsk-14.mp3 to ptmsk-14.wav . OK
cvt ptmsk-14.wav to ptmsk-14.inf  OK
cvt ptmsk-14.wav to ptmsk-14.raw  OK
load ptmsk-14.raw  OK
del clip.mp3  OK
save clip.raw  OK
cvt clip.raw to clip.wav  OK
del clip.raw  OK
cvt clip.wav to clip.mp3  OK
del clip.wav  OK

$ ls -lt
total 129528
-rw-r--r--  1 kondoh  wheel    160913  8 20 21:01 clip.mp3
-rw-r--r--  1 kondoh  wheel       259  8 20 21:01 ptmsk-14.inf
-rw-r--r--  1 kondoh  wheel  31635576  8 20 21:01 ptmsk-14.raw
-rw-r--r--  1 kondoh  wheel  31635620  8 20 21:01 ptmsk-14.wav
-rwxr-xr-x  1 kondoh  wheel       369  8 20 21:01 snd_ut_sample.py
-rw-r--r--@ 1 kondoh  wheel   2870542 10  5  2015 ptmsk-14.mp3
$


pty_spawn.py

pty.spawn()実行用です。

pty_spawn.py
#!/usr/bin/env python

import os
import sys
import pty
import dbg

if __name__ == "__main__":
	cmd = sys.argv[1:]
	if cmd:
		pty.spawn( cmd )
	else:
		dbg.help_exit( 'cmd ...' )
# EOF

import用じゃなく実行用です。

$ ./pty_spawn.py
Usage: ./pty_spawn.py cmd ...

$ python -m pty_spawn
Usage: /Users/kondoh/kon_page/kon_ut/pty_spawn.py cmd ...

引数に実行したいコマンドやパラメータを与えると、 子プロセスで実行します。

以降は、まぁpty.spawn()の動作のままですが、 子プロセスの標準入力と標準出力は、擬似端末のマスター側に接続されます。

実行しているpty_spwan.pyそのものである親プロセス側の、 標準入力と標準出力は、擬似端末のスレーブ側に接続されます。


cmds_out.py

cmds_out.py
#!/usr/bin/env python

import empty
import thr
import cmd_ut
import yaml_ut
import base
import dbg

thr_gc = thr.gc_new()

def ecmd_new(cmd, id, que):
	proc = cmd_ut.proc_new( cmd, stdout=cmd_ut.PIPE )
	que.put( 'start cmd ' + cmd )
	f = proc.get_stdout()

	def stop():
		proc.kill()
		proc.wait()
		que.put( 'stop cmd ' + cmd )

	def th_f():
		while True:
			s = f.readline().decode()
			if not s:
				break
			s = s.strip()
			if s:
				que.put( id + ': ' + s )

	thr.th_new( th_f, gc=thr_gc ).start()

	return empty.new( locals() )

def run():
	data_fn = 'cmds_out.yaml'
	fn = base.find_path( data_fn, __file__ )
	if not fn:
		dbg.err_exit( data_fn + ' ?' )

	d = yaml_ut.load_fn( fn )
	lst = d.get( 'cmds', [] )
	rdic = d.get( 'replace', {} )

	th_show = thr.loop_que_new( dbg.out, 1.0, gc=thr_gc )
	th_show.start()

	ecmds = []
	for d in lst:
		o = empty.new( d, cmd='', id='' )
		if o.cmd:
			o.cmd = base.str_replace_dic( o.cmd, rdic )

			ecmd = ecmd_new( o.cmd, o.id, th_show.que )
			ecmds.append( ecmd )

	sigint = thr.sigint_new()
	sigint.wait()
	sigint.fini()

	for ecmd in ecmds:
		ecmd.stop()
	th_show.stop()
	thr_gc.stop()

if __name__ == "__main__":
	run()
# EOF

import用じゃなく実行用です。

複数のコマンドを実行して表示するだけのツールです。

コマンドはデータファイル cmds_out.yaml に記述しておきます。

データファイル例

cmds_out.yaml
cmds:
- id: ping
  cmd: ping $HOST

- id: date
  cmd: while true; do date; sleep $(WAIT); done

- id: header file
  cmd: find /usr/include -type f | sed -e "s/^/echo /" -e "s/$/; sleep ${FIND_WAIT}/" | sh

replace:
  HOST: www.google.com
  WAIT: "5"
  FIND_WAIT: "2"

# EOF

cmds に、実行するコマンドの情報のリストを記述します。

コマンドの情報は辞書形式で、idにID文字列、cmdに実行するコマンド文字列を記述します。

それぞれのコマンドを実行したときの標準出力が、 行単位で行頭に "ID文字列: " が挿入されて表示されます。

replaceに辞書を用意しておいて、 $xxx, $(xxx), ${xxx} による簡単な変換ができます。

実行例

$ ./cmds_out.py
start cmd ping www.google.com
start cmd while true; do date; sleep 5; done
date: 2020年 6月20日 土曜日 14時45分11秒 JST
start cmd find /usr/include -type f | sed -e "s/^/echo /" -e "s/$/; sleep 2/" | sh
header file: /usr/include/_locale.h
ping: PING www.google.com (172.217.26.100): 56 data bytes
ping: 64 bytes from 172.217.26.100: icmp_seq=0 ttl=115 time=12.074 ms
ping: 64 bytes from 172.217.26.100: icmp_seq=1 ttl=115 time=5.165 ms
ping: 64 bytes from 172.217.26.100: icmp_seq=2 ttl=115 time=5.485 ms
header file: /usr/include/_types/_intmax_t.h
ping: 64 bytes from 172.217.26.100: icmp_seq=3 ttl=115 time=4.933 ms
ping: 64 bytes from 172.217.26.100: icmp_seq=4 ttl=115 time=4.470 ms
header file: /usr/include/_types/_nl_item.h
date: 2020年 6月20日 土曜日 14時45分16秒 JST
ping: 64 bytes from 172.217.26.100: icmp_seq=5 ttl=115 time=5.630 ms
ping: 64 bytes from 172.217.26.100: icmp_seq=6 ttl=115 time=5.345 ms
header file: /usr/include/_types/_uint16_t.h
ping: 64 bytes from 172.217.26.100: icmp_seq=7 ttl=115 time=7.188 ms
ping: 64 bytes from 172.217.26.100: icmp_seq=8 ttl=115 time=5.204 ms
header file: /usr/include/_types/_uint32_t.h
ping: 64 bytes from 172.217.26.100: icmp_seq=9 ttl=115 time=5.601 ms
date: 2020年 6月20日 土曜日 14時45分21秒 JST
  C-c C-cheader file: /usr/include/_types/_uint64_t.h
stop cmd ping www.google.com
stop cmd while true; do date; sleep 5; done
stop cmd find /usr/include -type f | sed -e "s/^/echo /" -e "s/$/; sleep 2/" | sh
$