代码解析 - envoy

envoy 是对 Python 标准库中 subprocess 模块的封装, 虽然功能对比原生 subprocess 来说少了很多,但是更易于使用,Github 项目地址

代码解析

envoy 只有一个文件,两百多行代码,非常的简单

$ tree envoy
envoy
├── __init__.py
└── core.py

envoy 可以非常轻松通过管道传递数据

from envoy.core import run

r = run('ifconfig|wc -l')

>>> r
<Response [wc]>

>>> r.command
['wc', '-l']

>>> r.std_out
' 52\n'

>>> r.history
[<Response [ifconfig]>]

# 或者像这样
>>> run('wc -l', data='1\n2\n3\n').std_out
' 3\n'

我们先看看 run 函数

run 函数接接收 命令,管道数据,超时时间,kill 超时时间,扩展环境变量,执行目录 作为参数

run 函数是对 Command.run 的封装,首先通过 expand_args 函数解析命令,通过 Command.run 执行多个命令,如果有多个命令的话,将上个命令的输出传递给下一个命令,实现管道间的数据传递,并返回一个 Response 对象

def run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None):
"""Executes a given commmand and returns Response.

Blocks until process is complete, or timeout is reached.
"""

command = expand_args(command)

history = []
for c in command:

if len(history):
# due to broken pipe problems pass only first 10 KiB
data = history[-1].std_out[0:10*1024]

cmd = Command(c)
try:
out, err = cmd.run(data, timeout, kill_timeout, env, cwd)
status_code = cmd.returncode
except OSError as e:
out, err = '', u"\n".join([e.strerror, traceback.format_exc()])
status_code = 127

r = Response(process=cmd)

r.command = c
r.std_out = out
r.std_err = err
r.status_code = status_code

history.append(r)

r = history.pop()
r.history = history

return r

我们再看看 envoy 是如何解析我们传入的命令的

如果传入的类型的是 str 或者 unicode, 则通过 shelx 模块对其进行解析成 subprocess 需要的格式

def expand_args(command):
"""Parses command strings and returns a Popen-ready list."""

# Prepare arguments.
if isinstance(command, (str, unicode)):
splitter = shlex.shlex(command.encode('utf-8'))
splitter.whitespace = '|'
splitter.whitespace_split = True
command = []

while True:
token = splitter.get_token()
if token:
command.append(token)
else:
break

command = list(map(shlex.split, command))

return command

我们手动传入一条命令,看一下它的返回

from envoy.core import expand_args

>>> expand_args('ls -l|wc -l')
[['ls', '-l'], ['wc', '-l']]

我们再仔细看一下,envoy 里面是如何通过管道传递数据的

首先 初始化一个 history 列表,保存每一次执行的命令,如果 len(history) > 0,也说明执行的命令超过一条,则将上条命令的 std_out10 KiB 的数据赋值给 data

再将 data 传递给 Command.run 方法,也就是下个命令的 std_in

def run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None):
...
...
...

history = []
for c in command:

if len(history):
# due to broken pipe problems pass only first 10 KiB
data = history[-1].std_out[0:10*1024]

cmd = Command(c)
try:
out, err = cmd.run(data, timeout, kill_timeout, env, cwd)
status_code = cmd.returncode
except OSError as e:
....
...
...

envoy 中最关键的就是 Command.run 这个方法了,我们来看看它是如何封装 subprocess

Command 的构造函数接收一个命令作为参数

run 方法则比较复杂,里面定义了一个 target 函数,target 函数封装 subprocess.Popen,然后将管道中的数据传入 communicate 方法

然后启动了一个新的线程,等待线程执行完成或者超时,并做一些检查 以保证进程完全退出,最后返回 std_outstd_err

class Command(object):
def __init__(self, cmd):
self.cmd = cmd
self.process = None
self.out = None
self.err = None
self.returncode = None
self.data = None
self.exc = None

def run(self, data, timeout, kill_timeout, env, cwd):
self.data = data
environ = dict(os.environ)
environ.update(env or {})

def target():

try:
self.process = subprocess.Popen(self.cmd,
universal_newlines=True,
shell=False,
env=environ,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
cwd=cwd,
)

if sys.version_info[0] >= 3:
self.out, self.err = self.process.communicate(
input = bytes(self.data, "UTF-8") if self.data else None
)
else:
self.out, self.err = self.process.communicate(self.data)
except Exception as exc:
self.exc = exc


thread = threading.Thread(target=target)
thread.start()

thread.join(timeout)
if self.exc:
raise self.exc
if _is_alive(thread) :
_terminate_process(self.process)
thread.join(kill_timeout)
if _is_alive(thread):
_kill_process(self.process)
thread.join()
self.returncode = self.process.returncode
return self.out, self.err

总结

envoy 的代码还是很简单的,代码里面也没有用什么黑魔法,毕竟只是一个简单的封装