OS命令注入漏洞是一个十分严重,但是又十分容易被人忽视的一个问题。

漏洞原理

在后台开发过程中,经常会遇到需要执行一些shell命令,但是命令的参数需要通过Restful API传入,举个简单的例子:
后台实现了一个ping的功能,用户可以指定ping的目标,假设后台代码:

def ping(target):
    return subprocess.getstatusoutput('ping -c 1 %s' % target)

如果用户正确传target值,那么没啥问题,比如用户想ping下百度,最终调用是:

ping("www.baidu.com")

执行的shell命令是:

ping -c 1 www.baidu.com

但是如果有一个用户,恶意的传了这么一个target: baidu.com; rm -rf /,后台直接填充到ping命令中去,那么后台执行的命令就是:

ping -c 1 baidu.com; rm -rf /

如果你是以root用户执行的话,那就只能赶紧跑路了。

如何防御

防御的办法有这么几个思路:

  1. 对传入的值做安全检查,比如必须符合IP或域名的格式,尤其要警惕那些命令连接符号,比如:|,&&,$(),;
  2. 尽量不要用root用户执行,对程序的权限做合理的控制。
  3. 使用带有安全机制的函数库,在上面那个例子中,subprocess.getstatusoutput底层是调用的subprocess.check_output并且参数中设置了shell=True:
def getstatusoutput(cmd):
    try:
        data = check_output(cmd, shell=True, universal_newlines=True, stderr=STDOUT)
        exitcode = 0
    except CalledProcessError as ex:
        data = ex.output
        exitcode = ex.returncode
    if data[-1:] == '\n':
        data = data[:-1]
    return exitcode, data

问题就出在shell=True上,设置为True就是直接把命令丢给shell处理的,就会出现上述问题。如果设置成False,再把subprocess接收的Command参数改成一个List,例如:["ls", "-l"],在运行时,subprocess会把Command参数List的第一个元素作为执行命令,后面的项都强制作为参数去处理,也就没有注入的风险了。

通过阅读源码,发现getstatusoutput下面是调用的subprocess.check_output,我们可以仿照其实现改写一下getstatusoutput函数,最终要的是设置shell=False


def getstatusoutput_safe(cmd):
    try:
        data = check_output(cmd, shell=False, universal_newlines=True, stderr=STDOUT)
        exitcode = 0
    except CalledProcessError as ex:
        data = ex.output
        exitcode = ex.returncode
    if data[-1:] == '\n':
        data = data[:-1]
    return exitcode, data


def ping(target):
    return getstatusoutput_safe(["ping", "-c", "1", target])

但上面的用法有1个弊端,就是无法使用管道,而管道又是shell命令中十分重要的东西,使用频率很高,这里简单提供一个函数,实现shell命令的管道功能。

def exit_processes(process_list):
    for process in process_list:
        try:
            LOG.debug("exit process: %s" % process)
            if process.stdout:
                process.stdout.close()
            if process.stderr:
                process.stderr.close()
            try:  # Flushing a BufferedWriter may raise an error
                if process.stdin:
                    process.stdin.close()
            finally:
                # Wait for the process to terminate, to avoid zombies.
                process.wait()
        except:
            pass


def shell(cmds, check_result=False, success_code=0, shell=False, timeout=30):
    LOG.debug("cmd: %s" % cmds)
    args_list = []
    process_list = []
    args = []
    for arg in cmds:
        if arg == "|":
            args_list.append(args)
            args = []
            continue

        args.append(arg)

    if args:
        args_list.append(args)

    try:
        pre_process = None
        for args in args_list:
            if pre_process:
                process = subprocess.Popen(args,
                                           stdin=pre_process.stdout,
                                           stdout=subprocess.PIPE,
                                           stderr=subprocess.PIPE,
                                           shell=shell)
            else:
                process = subprocess.Popen(args,
                                           stdout=subprocess.PIPE,
                                           stderr=subprocess.PIPE,
                                           shell=shell)
            process_list.append(process)
            pre_process = process

        try:
            output, errorput = pre_process.communicate(timeout=timeout)
        except Exception as e:
            pre_process.kill()
            pre_process.wait()
            raise e

        if output:
            output = output.decode().rstrip()

        returncode = pre_process.returncode

        if check_result and returncode != success_code:
            if errorput:
                errorput = errorput.decode().rstrip()
            raise Exception("shell command run failed.")

        return returncode, output
    finally:
        exit_processes(process_list)

使用方法:

shell(["ls", "-l", "/", "|", "grep", "usr"])