#!/usr/bin/env python import os import yaml import numpy as np import empty import thr import cmd_ut import yaml_ut import dbg thr_gc = thr.gc_new() silent = False def dbg_out(s, tail='\n'): if not silent: dbg.out( s, tail ) def wait_msg_new(msg, tail='OK'): dbg_out( msg + ' ', '' ) th = thr.loop_new( dbg_out, ( '.', '' ), wait_tmout=1.0, gc=thr_gc ) th.start() def stop(): th.stop() dbg_out( ' ' + tail ) return empty.new( locals() ) def cmd_wait_msg(cmd, msg): wait_msg = wait_msg_new( msg ) cmd_ut.call( cmd ) wait_msg.stop() def rm(name): cmd = 'rm -f ' + name cmd_wait_msg( cmd, 'del ' + name ) def load_bytes(name): wait_msg = wait_msg_new( 'load ' + name ) b = bytes( [] ) with open( name, 'rb' ) as f: b = f.read() wait_msg.stop() return b def load_bytes_part( name, from_byte=None, to_byte=None ): if from_byte is None and to_byte is None: return load_bytes( name ) if from_byte is None: from_byte = 0 if to_byte is None: st = os.lstat( name ) to_byte = st.st_size b = bytes( [] ) with open( name, 'rb' ) as f: if from_byte > 0: f.seek( from_byte ) b = f.read( to_byte - from_byte ) return b def save_bytes(name, b): wait_msg = wait_msg_new( 'save ' + name ) with open( name, 'wb' ) as f: f.write( b ) wait_msg.stop() def cvt(name, from_ext, to_ext, raw_opt=''): cmd = '' if from_ext == 'mp3' and to_ext == 'wav': cmd = 'lame --decode {}.mp3 > /dev/null 2>&1'.format( name ) elif from_ext == 'wav' and to_ext == 'raw': cmd = 'sox {}.wav {}.raw'.format( name, name ) elif from_ext == 'wav' and to_ext == 'inf': cmd = 'sox --info {}.wav > {}.inf'.format( name, name ) elif from_ext == 'raw' and to_ext == 'wav': # raw_opt cmd = 'sox {} {}.raw {}.wav'.format( raw_opt, name, name ) elif from_ext == 'wav' and to_ext == 'mp3': cmd = 'lame {}.wav {}.mp3 > /dev/null 2>&1'.format( name, name ) if cmd: cmd_wait_msg( cmd, 'cvt {}.{} to {}.{}'.format( name, from_ext, name, to_ext ) ) def make(name, ext, direc='load', raw_opt=''): if os.path.exists( '{}.{}'.format( name, ext ) ): return rule = { 'load': { 'raw': 'wav', 'inf': 'wav', 'wav': 'mp3' }, 'save': { 'mp3': 'wav', 'wav': 'raw' }, } from_ext = rule.get( direc, {} ).get( ext ) if not from_ext: dbg.err_exit( 'not found {}.{}'.format( name, ext ) ) make( name, from_ext, direc, raw_opt ) cvt( name, from_ext, ext, raw_opt ) if direc == 'save': rm( '{}.{}'.format( name, from_ext ) ) def inf_new(name): make( name, 'inf', 'load' ) dic = yaml_ut.load_fn( name + '.inf', {} ) r = float( dic.get( 'Sample Rate', 44100 ) ) b = 16 s = dic.get( 'Precision' ) if s.endswith( '-bit' ): b = int( s[ : -len( '-bit' ) ] ) c = int( dic.get( 'Channels', 2 ) ) e = 'signed-integer' s = dic.get( 'Sample Encoding', 'Signed' ) if 'Signed' not in s: e = 'unsigned-integer' get_raw_opt = lambda r_rate=1.0: '-t raw -r {} -b {} -c {} -e {}'.format( r * r_rate, b, c, e ) lst = dic.get( 'Duration', '' ).split() k = 'samples' i = lst.index( k ) if k in lst else -1 smp_n = int( lst[ i - 1 ] ) if i > 0 else -1 byte_per_smp = c * ( b // 8 ) smp_sec = lambda sec: int( sec * r ) smp_to_sec = lambda m: m / r byte_sec = lambda sec: smp_sec( sec ) * byte_per_smp byte_to_sec = lambda n: smp_to_sec( n // byte_per_smp ) is_unsigned = lambda : e == 'unsigned-integer' get_dtype = lambda : '{}int{}'.format( 'u' if is_unsigned() else '', b ) return empty.new( locals() ) def data_new(name_mp3): (name, mp3) = os.path.splitext( name_mp3 ) inf = inf_new( name ) def bytes_to_arr(b): arr = np.frombuffer( b, inf.get_dtype() ) arr = np.array( arr, dtype='float64' ) half = 2 ** ( inf.b - 1 ) if inf.is_unsigned(): arr -= half return arr / half def arr_to_bytes(arr): half = 2 ** ( inf.b - 1 ) arr = arr * half arr = np.maximum( arr, -half ) arr = np.minimum( arr, half - 1 ) if inf.is_unsigned(): arr += half arr = np.array( arr, dtype=inf.get_dtype() ) return arr.tobytes() e = empty.new() e.raw_bytes = None e.exists_raw = False def make_raw(): if not e.exists_raw: make( name, 'raw', 'load' ) e.exists_raw = True def get_bytes_cache(start_sec=0, end_sec=-1): if e.raw_bytes == None: make_raw() e.raw_bytes = load_bytes( name + '.raw' ) if start_sec == 0 and end_sec < 0: return e.raw_bytes start = inf.byte_sec( start_sec ) end = inf.byte_sec( end_sec ) if end_sec >= 0 else len( e.raw_bytes ) return e.raw_bytes[ start : end ] def get_bytes( start_sec=0, end_sec=-1, cache=True ): if cache: return get_bytes_cache( start_sec, end_sec ) make_raw() from_byte = inf.byte_sec( start_sec ) to_byte = inf.byte_sec( end_sec ) if end_sec >= 0 else None return load_bytes_part( name + '.raw', from_byte, to_byte ) def get_arr(start_sec=0, end_sec=-1): b = get_bytes( start_sec, end_sec ) return bytes_to_arr( b ) def save_bytes_to(b, save_name): (sname, ext) = os.path.splitext( save_name ) ext = ext[ 1 : ] if not ext or ext not in ( 'raw', 'wav', 'mp3' ): ext = 'mp3' rm( save_name ) save_bytes( sname + '.raw', b ) if ext != 'raw': make( sname, ext, 'save', inf.get_raw_opt() ) def save_arr(arr, save_name): b = arr_to_bytes( arr ) save_bytes_to( b, save_name ) return empty.add( e, locals() ) # EOF