2021/JUL/19
更新履歴
日付
変更内容 2021/JUL/18
新規作成 2021/JUL/19
入力用のスレッドについて
追加
sock_ut の v10 から、のれん分けしました。
をなるべく同じように扱えるように、くくりだして、まとめてみました。
fio_ut.py
#!/usr/bin/env python
import sys
import select
import socket
import empty
import thr
import cmd_ut
import dbg
def fs_default_new():
g_b1 = lambda f: f.read( 1 )
def g_b( f ):
fs = get_fs( f )
b = b''
while not b or fs.c_g( f ):
b1 = fs.g_b1( f )
if not b1:
break
b += b1
return b
c_g = lambda f, tmout=0: select.select( [ f ], [], [], tmout )[ 0 ] == [ f ]
p_b = lambda f, b: f.write( b )
return empty.new( locals() )
fs_df = fs_default_new()
def b1_th_new( f, g_b1_ ):
q = thr.que_new()
def th_f():
b1 = g_b1_( f )
q.put( b1 )
th = thr.loop_new( th_f )
thr.ths.start( th )
def stop():
th.stop()
def g_b1( f ):
return q.get()
g_b = fs_df.g_b
def c_g( f ):
return not q.is_empty()
return empty.new( locals() )
fdic = {}
def set_fs( fi, fo, g_b1=fs_df.g_b1, g_b=fs_df.g_b, c_g=fs_df.c_g, new_b1_th=False, p_b=fs_df.p_b ):
b1_th=None
if new_b1_th:
b1_th = b1_th_new( fi, g_b1 )
g_b1 = b1_th.g_b1
g_b = b1_th.g_b
c_g = b1_th.c_g
if fi == fo:
fdic[ fi ] = empty.new( g_b1=g_b1, g_b=g_b, c_g=c_g, b1_th=b1_th, p_b=p_b )
else:
fdic[ fi ] = empty.new( g_b1=g_b1, g_b=g_b, c_g=c_g, b1_th=b1_th, p_b=None )
fdic[ fo ] = empty.new( g_b1=None, g_b=None, c_b=None, b1_th=None, p_b=p_b )
def set_fs_sock( f ):
g_b1 = lambda f: f.recv( 1 )
g_b = lambda f: f.recv( 1024 * 16 )
p_b = lambda f, b: f.sendall( b )
set_fs( f, f, g_b1=g_b1, g_b=g_b, p_b=p_b )
def set_fs_sys_stdio():
get_buffer = lambda f: f.buffer if hasattr( f, 'buffer' ) else f
fi = get_buffer( sys.stdin )
fo = get_buffer( sys.stdout )
set_fs( fi, fo )
return ( fi, fo )
def set_fs_proc_stdio( proc ):
fi = proc.stdout
fo = proc.stdin
set_fs( fi, fo, new_b1_th=True )
return ( fi, fo )
def get_fs( f ):
if f not in fdic:
set_fs( f, f )
return fdic.get( f )
def del_fs( f ):
if f not in fdic:
return
fs = get_fs( f )
if fs.b1_th:
# need kill proc of f, for return blocked get_b1
fs.b1_th.stop()
fdic.pop( f )
def close( f ):
del_fs( f )
f.close()
def g_b1( f ):
fs = get_fs( f )
b = b''
if not fs.g_b1:
return b
b1 = fs.g_b1( f )
if b1:
b = b1
return b
def g_b( f ):
fs = get_fs( f )
b = b''
if not fs.g_b:
return b
b = fs.g_b( f )
return b
def g_b_line( f ):
fs = get_fs( f )
b = b''
if not fs.g_b1:
return b
while True:
b1 = fs.g_b1( f )
if not b1:
break
b += b1
if b1 == b'\n':
break
return b
def g_b_all( f ):
fs = get_fs( f )
b = b''
if not fs.g_b or not fs.c_g:
return b
while not b or fs.c_g( f ):
b_ = fs.g_b( f )
if not b_:
break
b += b_
return b
def c_g( f ):
fs = get_fs( f )
if not fs.c_g:
return False
return fs.c_g( f )
def p_b( f, b ):
fs = get_fs( f )
if not fs.p_b:
return False
try:
fs.p_b( f, b )
except:
return False
if hasattr( f, 'flush' ):
f.flush()
return True
to_s = lambda bt: bt.decode( 'utf-8' )
to_b = lambda s: s.encode( 'utf-8' )
g_s1 = lambda f : to_s( g_b1( f ) )
g_s = lambda f: to_s( g_b( f ) )
g_s_line = lambda f: to_s( g_b_line( f ) )
g_s_all = lambda f: to_s( g_b_all( f ) )
p_s = lambda f, s: p_b( f, to_b( s ) )
def proc_new( cmd ):
proc = cmd_ut.proc_new( cmd, stdin=cmd_ut.PIPE, stdout=cmd_ut.PIPE )
( fi, fo ) = set_fs_proc_stdio( proc.proc )
def stop():
proc.kill()
proc.wait()
del_fs( fi )
del_fs( fo )
return empty.new( locals() )
def run():
( fi, fo ) = set_fs_sys_stdio()
cmd = 'cat'
if len( sys.argv ) >= 2:
cmd = ' '.join( sys.argv[ 1 : ] )
proc = proc_new( cmd )
port = 13579
host = 'localhost'
e = empty.new()
e.cs = None
def srv():
ss = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
ss.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
ss.bind( ( '', port ) )
ss.listen( 5 )
( e.cs, adr ) = ss.accept()
th = thr.th_new( srv )
thr.ths.start( th )
sc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
sc.connect( ( host, port ) )
set_fs_sock( sc )
th.stop()
set_fs_sock( e.cs )
while True:
b = g_b_line( fi )
if not b:
break
if not p_b( proc.fo, b ):
break
b = g_b_all( proc.fi )
if not b:
break
if not p_b( sc, b ):
break
b = g_b_all( e.cs )
if not b:
break
if not p_b( e.cs, b ):
break
b = g_b_all( sc )
if not b:
break
if not p_b( fo, b ):
break
close( sc )
close( e.cs )
proc.stop()
del_fs( fi )
del_fs( fo )
if __name__ == "__main__":
run()
# EOF
f | ソケット |
返り値 | なし |
fをソケットとして登録する。
返り値 | ( fi, fo ) の組 | |
fi | 入力用のファイルディスクリプタ | |
fo | 出力用のファイルディスクリプタ |
標準入出力 sys.stdin/stdout を登録して、組で返す。
proc | subprocssで生成したプロセスオブジェクト | |
返り値 | ( fi, fo ) の組 | |
fi | 入力用のファイルディスクリプタ | |
fo | 出力用のファイルディスクリプタ |
プロセスの標準入出力 proc.stdout/proc.stdin を登録して、組で返す。
fiはプロセスの標準出力結果を入力するディスクリプタ。
foはプロセスの標準入力に向けて出力するディスクリプタ。
fi | 入力用のファイルディスクリプタ |
fo | 出力用のファイルディスクリプタ |
g_b1 | fiを引数として1バイト入力する関数 |
g_b | fiを引数としてバイト列を入力する関数 |
c_g | fiを引数として入力可能かどうかをboolで返す関数(can get) |
new_b1_th | 1バイト入力を繰り返すスレッドを生成するかどうかの指定 |
p_b | foを引数としてバイト列を出力する関数 |
返り値 | なし |
fi, foについて、指定の関数を使用するように登録する。
f | 登録済みのファイルディスクリプタ |
登録を解除する。
f | 登録済みのファイルディスクリプタ |
登録を解除して、f.close() を実行する。
f | 登録済みのファイルディスクリプタ |
1バイト入力して取得した1バイトを返す。
closeにより入力できなかった場合などは b'' を返す。
f | 登録済みのファイルディスクリプタ |
取得したバイト列を返す。
closeにより入力できなかった場合などは b'' を返す。
f | 登録済みのファイルディスクリプタ |
改行コードまでの1行分のバイト列を返す。(末尾の改行コードを含む)
closeにより入力できなかった場合などは b'' を返す。
f | 登録済みのファイルディスクリプタ |
c_g() 判定して、入力可能な限り取得したバイト列を返す。
closeにより入力できなかった場合などは b'' を返す。
f | 登録済みのファイルディスクリプタ |
入力可能かどうかをboolで返す。(can get)
f | 登録済みのファイルディスクリプタ |
b | 出力するバイト列 |
バイト列を出力する。
bt | バイト列 |
バイト列btを'utf-8'でデコードして文字列を返す。
s | 文字列 |
文字列sを'utf-8'でエンコードドしてバイト列を返す。
f | 登録済みのファイルディスクリプタ |
1バイト入力して取得した1バイトを、文字に変換して返す。
closeにより入力できなかった場合などは '' を返す。
f | 登録済みのファイルディスクリプタ |
取得したバイト列を、文字列に変換して返す。
closeにより入力できなかった場合などは '' を返す。
f | 登録済みのファイルディスクリプタ |
改行コードまでの1行分のバイト列を取得し、 文字列に変換して返す。(末尾の改行コードを含む)
closeにより入力できなかった場合などは '' を返す。
f | 登録済みのファイルディスクリプタ |
c_g() 判定して、入力可能な限り取得したバイト列を、 文字列に変換して返す。
closeにより入力できなかった場合などは '' を返す。
f | 登録済みのファイルディスクリプタ |
s | 出力する文字列 |
指定の文字列をバイト列に変換して出力する。
cmd | コマンド文字列 |
返り値 | オブジェクト |
コマンド文字列を実行するプロセスを生成し、 生成プロセスの標準入出力を登録し、 オブジェクトを返す。
オブジェクトの属性
fi | 入力用のファイルディスクリプタ |
fo | 出力用のファイルディスクリプタ |
stop() | プロセス停止用のメソッド |
stop()メソッドを実行すると、プロセスにSIGKILLを発行し、wait()で終了を待つ。
さらに、del_fs() を呼び出し fi, fo の登録を解除する。
fio_ut.pyをコマンドとして実行したときに呼び出すrun()の例。
run()
def run():
( fi, fo ) = set_fs_sys_stdio()
cmd = 'cat'
if len( sys.argv ) >= 2:
cmd = ' '.join( sys.argv[ 1 : ] )
proc = proc_new( cmd )
port = 13579
host = 'localhost'
e = empty.new()
e.cs = None
def srv():
ss = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
ss.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
ss.bind( ( '', port ) )
ss.listen( 5 )
( e.cs, adr ) = ss.accept()
th = thr.th_new( srv )
thr.ths.start( th )
sc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
sc.connect( ( host, port ) )
set_fs_sock( sc )
th.stop()
set_fs_sock( e.cs )
while True:
b = g_b_line( fi )
if not b:
break
if not p_b( proc.fo, b ):
break
b = g_b_all( proc.fi )
if not b:
break
if not p_b( sc, b ):
break
b = g_b_all( e.cs )
if not b:
break
if not p_b( e.cs, b ):
break
b = g_b_all( sc )
if not b:
break
if not p_b( fo, b ):
break
close( sc )
close( e.cs )
proc.stop()
del_fs( fi )
del_fs( fo )
$ ./sock_ut.py
を実行すると
( fi, fo ) = set_fs_sys_stdio()
標準入出力を登録して、fi, fo として保持して
cmd = 'cat' if len( sys.argv ) >= 2: cmd = ' '.join( sys.argv[ 1 : ] ) proc = proc_new( cmd )
catコマンドを実行するプオセスを生成。
プロセスの標準入出力を登録し、オブジェクトprocを取得。
proc.fo, proc.fi のファイルディスクリプタで、 catコマンドへの出力、catコマンドからの入力が可能。
port = 13579 host = 'localhost' : sc = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) sc.connect( ( host, port ) ) set_fs_sock( sc ) th.stop() set_fs_sock( e.cs )
サーバソケットを生成し、クライアントソケットを接続。
クライアント側のソケット sc を登録。
サーバ側のソケット e.cs を登録。
while True: b = g_b_line( fi ) if not b: break if not p_b( proc.fo, b ): break b = g_b_all( proc.fi ) if not b: break if not p_b( sc, b ): break b = g_b_all( e.cs ) if not b: break if not p_b( e.cs, b ): break b = g_b_all( sc ) if not b: break if not p_b( fo, b ): break
標準入力から1行分のバイト列を入力。
catプロセスにバイト列を出力。
catプロセスからバイト列を入力。
クライアントソケットにバイト列を出力。
サーバソケットからバイト列を入力。
サーバソケットにバイト列を出力。
クライアントソケットからバイト列を入力。
標準出力にバイト列を出力。
という一連の処理を繰り返す。
^Dを入力すると、標準入力からの1行入力で b'' が返り、ループを抜ける。
close( sc ) close( e.cs ) proc.stop() del_fs( fi ) del_fs( fo )
クライアントソケット、サーバソケットを close() で 登録を解除して、クローズする。
catプロセスを終了して、catプロセスの標準入出力の登録を解除する。
fio_ut.pyコマンドの標準入出力の登録を解除する。
コマンドライン引数を指定すると、catコマンドの代わりとしてプロセスを生成して実行する。
$ ./fio_ut.py hello hello world world ^D $
$ ./fio_ut.py bc -l 1+1 2 2+3 5 quit $
$ ./fio_ut.py sed -l s/o/0/g hellow hell0w world w0rld ^D $
このプログラムは
をなるべく同じように扱える事を目指してます。
の関数は、内部的に
set_fs( fi, fo, g_b1=fs_df.g_b1, g_b=fs_df.g_b, c_g=fs_df.c_g, new_b1_th=False, p_b=fs_df.p_b )
を呼び出しています。
boolの引数
new_b1_th | 1バイト入力を繰り返すスレッドを生成するかどうかの指定 |
ここでしれっとスレッドを生成すると述べてます。
この指定をTrueにしてるのは、
set_fs_proc_stdio()からの呼び出しで
def set_fs_proc_stdio( proc ): fi = proc.stdout fo = proc.stdin set_fs( fi, fo, new_b1_th=True ) return ( fi, fo )
subprocessのPIPE指定したprocのstdin/stdoutの場合です。
なにゆえこの場合だけ専用スレッドを生成するようにしたのか?
ファイルディスクリプタからデータを取得する時のブロックする動作が関連してます。
そもそも最初は、ソケットからの受信処理から出発してます。
sock.recv( bufmax指定 ) は基本、ブロックします。
一通りデータを受信して、すぐには次のデータが来ない状況になるか、 受信バイト数がbufmaxに達するかで、呼元に返ります。
この動作をAPIの g_b() (get bytes) としました。
g_b1() は、同様にブロックして1バイト受信して返るタイプ。
g_b_line() は、改行コードが現れると、そこで返るタイプ。
g_b_all() は、データが来てるうちは、g_b()を繰り返して、bufmaxの限度以上も返せるようにしたタイプ。
さて、subprocessのPIPE指定したprocのstdin/stdoutの場合について。
プロセスのstdoutを引数指定なしでread()してみると、ブロックします。
そして、コマンドが完全に終了してから全ての出力が、まとめて一気に返ってきました。
これでは、ソケットのように対話的に使えません。
communiate() を使って試した場合も、プロセスのstdinに一度に書き込んで、プロセスのstdoutから一度に読み出す動作前提のようでした。
ところが、read( 1 )の読み出しを繰り返すと、1バイトづつ返ってきて取得できてます。
すぐに返せるデータがあるうちは返ってきて、なくなったのにread( 1 )に突っ込むとブロックします。
これは使える。
ならば select でリード可能な限りread( 1 )でループを回せば、目的の動作になりそうです。
最後に不用意にread( 1 )に突っ込んでブロックする前に、selectでリード不可が返れば良いはず。
と、そのように組んでみたのですが、これが、うまく動作しません。
初回のread( 1 )のあと、select がリード不可を返して1回しかループがまわりません。
ですがですが、そのままread( 1 )してみると、ちゃんと次の1バイトが取得できてます。
これは、select が期待したようには動いてくれてない様子。
そこで、read( 1 )専用のスレッドです。
動きが明らかな安全なqueueを1つ用意して、専用スレッドを生成します。
そこで select もなにもせずに、ひたすらループでブロックする read( 1 ) を呼び出して、 1バイト取得しては queue に入れていきます。
queueのワンクッションを経てデータを取得することで、扱いが簡単になります。
データが取得可能かどうかは、queueが空かどうかを判定すれば良くなります。
というような事情があって
new_b1_th | 1バイト入力を繰り返すスレッドを生成するかどうかの指定 |
導入に至った事を、ここに記録しておきます。