fio_ut

2021/JUL/19

更新履歴
日付 変更内容
2021/JUL/18 新規作成
2021/JUL/19 入力用のスレッドについて 追加

目次


概要

sock_ut の v10 から、のれん分けしました。

をなるべく同じように扱えるように、くくりだして、まとめてみました。


ソースコード

fio_ut.py
#!/usr/bin/env python

import sys
import select
import socket

import empty
import thr
import cmd_ut
import dbg

def fs_default_new():
	g_b1 = lambda f: f.read( 1 )

	def g_b( f ):
		fs = get_fs( f )
		b = b''
		while not b or fs.c_g( f ):
			b1 = fs.g_b1( f )
			if not b1:
				break
			b += b1
			return b

	c_g = lambda f, tmout=0: select.select( [ f ], [], [], tmout )[ 0 ] == [ f ]
	p_b = lambda f, b: f.write( b )

	return empty.new( locals() )

fs_df = fs_default_new()

def b1_th_new( f, g_b1_ ):
	q = thr.que_new()

	def th_f():
		b1 = g_b1_( f )
		q.put( b1 )

	th = thr.loop_new( th_f )
	thr.ths.start( th )

	def stop():
		th.stop()

	def g_b1( f ):
		return q.get()

	g_b = fs_df.g_b

	def c_g( f ):
		return not q.is_empty()

	return empty.new( locals() )

fdic = {}

def set_fs( fi, fo, g_b1=fs_df.g_b1, g_b=fs_df.g_b, c_g=fs_df.c_g, new_b1_th=False, p_b=fs_df.p_b ):
	b1_th=None
	if new_b1_th:
		b1_th = b1_th_new( fi, g_b1 )
		g_b1 = b1_th.g_b1
		g_b = b1_th.g_b
		c_g = b1_th.c_g

	if fi == fo:
		fdic[ fi ] = empty.new( g_b1=g_b1, g_b=g_b, c_g=c_g, b1_th=b1_th, p_b=p_b )
	else:
		fdic[ fi ] = empty.new( g_b1=g_b1, g_b=g_b, c_g=c_g, b1_th=b1_th, p_b=None )
		fdic[ fo ] = empty.new( g_b1=None, g_b=None, c_b=None, b1_th=None, p_b=p_b )

def set_fs_sock( f ):
	g_b1 = lambda f: f.recv( 1 )
	g_b = lambda f: f.recv( 1024 * 16 )
	p_b = lambda f, b: f.sendall( b )
	set_fs( f, f, g_b1=g_b1, g_b=g_b, p_b=p_b )

def set_fs_sys_stdio():
	get_buffer = lambda f: f.buffer if hasattr( f, 'buffer' ) else f
	fi = get_buffer( sys.stdin )
	fo = get_buffer( sys.stdout )
	set_fs( fi, fo )
	return ( fi, fo )

def set_fs_proc_stdio( proc ):
	fi = proc.stdout
	fo = proc.stdin
	set_fs( fi, fo, new_b1_th=True )
	return ( fi, fo )

def get_fs( f ):
	if f not in fdic:
		set_fs( f, f )
	return fdic.get( f )

def del_fs( f ):
	if f not in fdic:
		return
	fs = get_fs( f )
	if fs.b1_th:
		# need kill proc of f, for return blocked get_b1
		fs.b1_th.stop()
	fdic.pop( f )

def close( f ):
	del_fs( f )
	f.close()

def g_b1( f ):
	fs = get_fs( f )
	b = b''
	if not fs.g_b1:
		return b

	b1 = fs.g_b1( f )
	if b1:
		b = b1
	return b

def g_b( f ):
	fs = get_fs( f )
	b = b''
	if not fs.g_b:
		return b

	b = fs.g_b( f )
	return b

def g_b_line( f ):
	fs = get_fs( f )
	b = b''
	if not fs.g_b1:
		return b

	while True:
		b1 = fs.g_b1( f )
		if not b1:
			break
		b += b1
		if b1 == b'\n':
			break
	return b

def g_b_all( f ):
	fs = get_fs( f )
	b = b''
	if not fs.g_b or not fs.c_g:
		return b

	while not b or fs.c_g( f ):
		b_ = fs.g_b( f )
		if not b_:
			break
		b += b_
	return b

def c_g( f ):
	fs = get_fs( f )
	if not fs.c_g:
		return False

	return fs.c_g( f )

def p_b( f, b ):
	fs = get_fs( f )
	if not fs.p_b:
		return False

	try:
		fs.p_b( f, b )
	except:
		return False
	if hasattr( f, 'flush' ):
		f.flush()
	return True


to_s = lambda bt: bt.decode( 'utf-8' )
to_b = lambda s: s.encode( 'utf-8' )

g_s1 = lambda f : to_s( g_b1( f ) )
g_s = lambda f: to_s( g_b( f ) )
g_s_line = lambda f: to_s( g_b_line( f ) )
g_s_all = lambda f: to_s( g_b_all( f ) )
p_s = lambda f, s: p_b( f, to_b( s ) )


def proc_new( cmd ):
	proc = cmd_ut.proc_new( cmd, stdin=cmd_ut.PIPE, stdout=cmd_ut.PIPE )
	( fi, fo ) = set_fs_proc_stdio( proc.proc )

	def stop():
		proc.kill()
		proc.wait()
		del_fs( fi )
		del_fs( fo )

	return empty.new( locals() )


def run():
	( fi, fo ) = set_fs_sys_stdio()


	cmd = 'cat'
	if len( sys.argv ) >= 2:
		cmd = ' '.join( sys.argv[ 1 : ] )

	proc = proc_new( cmd )


	port = 13579
	host = 'localhost'

	e = empty.new()
	e.cs = None

	def srv():
		ss = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
		ss.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
		ss.bind( ( '', port ) )
		ss.listen( 5 )
		( e.cs, adr ) = ss.accept()

	th = thr.th_new( srv )
	thr.ths.start( th )

	sc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
	sc.connect( ( host, port ) )
	set_fs_sock( sc )

	th.stop()
	set_fs_sock( e.cs )


	while True:
		b = g_b_line( fi )
		if not b:
			break

		if not p_b( proc.fo, b ):
			break

		b = g_b_all( proc.fi )
		if not b:
			break

		if not p_b( sc, b ):
			break

		b = g_b_all( e.cs )
		if not b:
			break

		if not p_b( e.cs, b ):
			break

		b = g_b_all( sc )
		if not b:
			break

		if not p_b( fo, b ):
			break

	close( sc )
	close( e.cs )

	proc.stop()

	del_fs( fi )
	del_fs( fo )

if __name__ == "__main__":
	run()
# EOF


関数

ファイルディスクリプタ登録関数

set_fs_sock( f )

f ソケット
返り値 なし

fをソケットとして登録する。

set_fs_sys_stdio()

返り値 ( fi, fo ) の組
fi 入力用のファイルディスクリプタ
fo 出力用のファイルディスクリプタ

標準入出力 sys.stdin/stdout を登録して、組で返す。

set_fs_proc_stdio( proc )

proc subprocssで生成したプロセスオブジェクト
返り値 ( fi, fo ) の組
fi 入力用のファイルディスクリプタ
fo 出力用のファイルディスクリプタ

プロセスの標準入出力 proc.stdout/proc.stdin を登録して、組で返す。

fiはプロセスの標準出力結果を入力するディスクリプタ。

foはプロセスの標準入力に向けて出力するディスクリプタ。

set_fs( fi, fo, g_b1=fs_df.g_b1, g_b=fs_df.g_b, c_g=fs_df.c_g, new_b1_th=False, p_b=fs_df.p_b )

fi 入力用のファイルディスクリプタ
fo 出力用のファイルディスクリプタ
g_b1 fiを引数として1バイト入力する関数
g_b fiを引数としてバイト列を入力する関数
c_g fiを引数として入力可能かどうかをboolで返す関数(can get)
new_b1_th 1バイト入力を繰り返すスレッドを生成するかどうかの指定
p_b foを引数としてバイト列を出力する関数
返り値 なし

fi, foについて、指定の関数を使用するように登録する。

del_fs( f )

f 登録済みのファイルディスクリプタ

登録を解除する。

close( f )

f 登録済みのファイルディスクリプタ

登録を解除して、f.close() を実行する。

入力用の関数

g_b1( f )

f 登録済みのファイルディスクリプタ

1バイト入力して取得した1バイトを返す。

closeにより入力できなかった場合などは b'' を返す。

g_b( f )

f 登録済みのファイルディスクリプタ

取得したバイト列を返す。

closeにより入力できなかった場合などは b'' を返す。

g_b_line( f )

f 登録済みのファイルディスクリプタ

改行コードまでの1行分のバイト列を返す。(末尾の改行コードを含む)

closeにより入力できなかった場合などは b'' を返す。

g_b_all( f )

f 登録済みのファイルディスクリプタ

c_g() 判定して、入力可能な限り取得したバイト列を返す。

closeにより入力できなかった場合などは b'' を返す。

c_g( f )

f 登録済みのファイルディスクリプタ

入力可能かどうかをboolで返す。(can get)

出力用の関数

p_b( f, b )

f 登録済みのファイルディスクリプタ
b 出力するバイト列

バイト列を出力する。

文字列用の一式

to_s = lambda bt: bt.decode( 'utf-8' )

bt バイト列

バイト列btを'utf-8'でデコードして文字列を返す。

to_b = lambda s: s.encode( 'utf-8' )

s 文字列

文字列sを'utf-8'でエンコードドしてバイト列を返す。

g_s1 = lambda f : to_s( g_b1( f ) )

f 登録済みのファイルディスクリプタ

1バイト入力して取得した1バイトを、文字に変換して返す。

closeにより入力できなかった場合などは '' を返す。

g_s = lambda f: to_s( g_b( f ) )

f 登録済みのファイルディスクリプタ

取得したバイト列を、文字列に変換して返す。

closeにより入力できなかった場合などは '' を返す。

g_s_line = lambda f: to_s( g_b_line( f ) )

f 登録済みのファイルディスクリプタ

改行コードまでの1行分のバイト列を取得し、 文字列に変換して返す。(末尾の改行コードを含む)

closeにより入力できなかった場合などは '' を返す。

g_s_all = lambda f: to_s( g_b_all( f ) )

f 登録済みのファイルディスクリプタ

c_g() 判定して、入力可能な限り取得したバイト列を、 文字列に変換して返す。

closeにより入力できなかった場合などは '' を返す。

p_s = lambda f, s: p_b( f, to_b( s ) )

f 登録済みのファイルディスクリプタ
s 出力する文字列

指定の文字列をバイト列に変換して出力する。

プロセス用の関数

proc_new( cmd )

cmd コマンド文字列
返り値 オブジェクト

コマンド文字列を実行するプロセスを生成し、 生成プロセスの標準入出力を登録し、 オブジェクトを返す。

オブジェクトの属性

fi 入力用のファイルディスクリプタ
fo 出力用のファイルディスクリプタ
stop() プロセス停止用のメソッド

stop()メソッドを実行すると、プロセスにSIGKILLを発行し、wait()で終了を待つ。

さらに、del_fs() を呼び出し fi, fo の登録を解除する。


使用例

fio_ut.pyをコマンドとして実行したときに呼び出すrun()の例。

run()
def run():
	( fi, fo ) = set_fs_sys_stdio()


	cmd = 'cat'
	if len( sys.argv ) >= 2:
		cmd = ' '.join( sys.argv[ 1 : ] )

	proc = proc_new( cmd )


	port = 13579
	host = 'localhost'

	e = empty.new()
	e.cs = None

	def srv():
		ss = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
		ss.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 )
		ss.bind( ( '', port ) )
		ss.listen( 5 )
		( e.cs, adr ) = ss.accept()

	th = thr.th_new( srv )
	thr.ths.start( th )

	sc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
	sc.connect( ( host, port ) )
	set_fs_sock( sc )

	th.stop()
	set_fs_sock( e.cs )


	while True:
		b = g_b_line( fi )
		if not b:
			break

		if not p_b( proc.fo, b ):
			break

		b = g_b_all( proc.fi )
		if not b:
			break

		if not p_b( sc, b ):
			break

		b = g_b_all( e.cs )
		if not b:
			break

		if not p_b( e.cs, b ):
			break

		b = g_b_all( sc )
		if not b:
			break

		if not p_b( fo, b ):
			break

	close( sc )
	close( e.cs )

	proc.stop()

	del_fs( fi )
	del_fs( fo )

登録処理

$ ./sock_ut.py

を実行すると

( fi, fo ) = set_fs_sys_stdio()

標準入出力を登録して、fi, fo として保持して

cmd = 'cat'
if len( sys.argv ) >= 2:
	cmd = ' '.join( sys.argv[ 1 : ] )

proc = proc_new( cmd )

catコマンドを実行するプオセスを生成。

プロセスの標準入出力を登録し、オブジェクトprocを取得。

proc.fo, proc.fi のファイルディスクリプタで、 catコマンドへの出力、catコマンドからの入力が可能。

port = 13579
host = 'localhost'
  :
sc = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
sc.connect( ( host, port ) )
set_fs_sock( sc )

th.stop()
set_fs_sock( e.cs )

サーバソケットを生成し、クライアントソケットを接続。

クライアント側のソケット sc を登録。

サーバ側のソケット e.cs を登録。

入出力ループ

while True:
	b = g_b_line( fi )
	if not b:
		break

	if not p_b( proc.fo, b ):
		break

	b = g_b_all( proc.fi )
	if not b:
		break

	if not p_b( sc, b ):
		break

	b = g_b_all( e.cs )
	if not b:
		break

	if not p_b( e.cs, b ):
		break

	b = g_b_all( sc )
	if not b:
		break

	if not p_b( fo, b ):
		break

標準入力から1行分のバイト列を入力。

catプロセスにバイト列を出力。

catプロセスからバイト列を入力。

クライアントソケットにバイト列を出力。

サーバソケットからバイト列を入力。

サーバソケットにバイト列を出力。

クライアントソケットからバイト列を入力。

標準出力にバイト列を出力。

という一連の処理を繰り返す。

^Dを入力すると、標準入力からの1行入力で b'' が返り、ループを抜ける。

終了処理

close( sc )
close( e.cs )

proc.stop()

del_fs( fi )
del_fs( fo )

クライアントソケット、サーバソケットを close() で 登録を解除して、クローズする。

catプロセスを終了して、catプロセスの標準入出力の登録を解除する。

fio_ut.pyコマンドの標準入出力の登録を解除する。

プロセスで実行するコマンドの指定

コマンドライン引数を指定すると、catコマンドの代わりとしてプロセスを生成して実行する。

実行例

$ ./fio_ut.py
hello
hello
world
world
^D
$
$ ./fio_ut.py bc -l
1+1
2
2+3
5
quit
$
$ ./fio_ut.py sed -l s/o/0/g
hellow
hell0w
world
w0rld
^D
$


入力用のスレッドについて

スレッドを生成する場合

このプログラムは

をなるべく同じように扱える事を目指してます。

の関数は、内部的に

set_fs( fi, fo, g_b1=fs_df.g_b1, g_b=fs_df.g_b, c_g=fs_df.c_g, new_b1_th=False, p_b=fs_df.p_b )

を呼び出しています。

boolの引数

new_b1_th 1バイト入力を繰り返すスレッドを生成するかどうかの指定

ここでしれっとスレッドを生成すると述べてます。

この指定をTrueにしてるのは、

set_fs_proc_stdio()からの呼び出しで

def set_fs_proc_stdio( proc ):
	fi = proc.stdout
	fo = proc.stdin
	set_fs( fi, fo, new_b1_th=True )
	return ( fi, fo )

subprocessのPIPE指定したprocのstdin/stdoutの場合です。

なにゆえこの場合だけ専用スレッドを生成するようにしたのか?

データ取得時のブロック

ファイルディスクリプタからデータを取得する時のブロックする動作が関連してます。

そもそも最初は、ソケットからの受信処理から出発してます。

sock.recv( bufmax指定 ) は基本、ブロックします。

一通りデータを受信して、すぐには次のデータが来ない状況になるか、 受信バイト数がbufmaxに達するかで、呼元に返ります。

この動作をAPIの g_b() (get bytes) としました。

g_b1() は、同様にブロックして1バイト受信して返るタイプ。

g_b_line() は、改行コードが現れると、そこで返るタイプ。

g_b_all() は、データが来てるうちは、g_b()を繰り返して、bufmaxの限度以上も返せるようにしたタイプ。

さて、subprocessのPIPE指定したprocのstdin/stdoutの場合について。

プロセスのstdoutを引数指定なしでread()してみると、ブロックします。

そして、コマンドが完全に終了してから全ての出力が、まとめて一気に返ってきました。

これでは、ソケットのように対話的に使えません。

communiate() を使って試した場合も、プロセスのstdinに一度に書き込んで、プロセスのstdoutから一度に読み出す動作前提のようでした。

ところが、read( 1 )の読み出しを繰り返すと、1バイトづつ返ってきて取得できてます。

すぐに返せるデータがあるうちは返ってきて、なくなったのにread( 1 )に突っ込むとブロックします。

これは使える。

でもselectがー

ならば select でリード可能な限りread( 1 )でループを回せば、目的の動作になりそうです。

最後に不用意にread( 1 )に突っ込んでブロックする前に、selectでリード不可が返れば良いはず。

と、そのように組んでみたのですが、これが、うまく動作しません。

初回のread( 1 )のあと、select がリード不可を返して1回しかループがまわりません。

ですがですが、そのままread( 1 )してみると、ちゃんと次の1バイトが取得できてます。

これは、select が期待したようには動いてくれてない様子。

そこで、read( 1 )専用のスレッドです。

動きが明らかな安全なqueueを1つ用意して、専用スレッドを生成します。

そこで select もなにもせずに、ひたすらループでブロックする read( 1 ) を呼び出して、 1バイト取得しては queue に入れていきます。

queueのワンクッションを経てデータを取得することで、扱いが簡単になります。

データが取得可能かどうかは、queueが空かどうかを判定すれば良くなります。

というような事情があって

new_b1_th 1バイト入力を繰り返すスレッドを生成するかどうかの指定

導入に至った事を、ここに記録しておきます。