-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
234 lines (196 loc) · 7.69 KB
/
main.py
File metadata and controls
234 lines (196 loc) · 7.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import re
import urllib
from fastapi import FastAPI, Depends, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from sqlalchemy import text
from database import get_db, engine
import models
import uvicorn
from loguru import logger
# 创建数据库表
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# 初始化数据
@app.on_event("startup")
def init_data():
db = next(get_db())
# 检查用户表是否有数据
if db.query(models.User).count() == 0:
# 添加测试用户
test_users = [
models.User(username="admin", password="admin123", email="admin@example.com"),
models.User(username="user1", password="password1", email="user1@example.com"),
models.User(username="user2", password="password2", email="user2@example.com")
]
db.add_all(test_users)
# 检查产品表是否有数据
if db.query(models.Product).count() == 0:
# 添加测试产品
test_products = [
models.Product(name="笔记本电脑", price=5999, description="高性能笔记本电脑"),
models.Product(name="智能手机", price=3999, description="最新款智能手机"),
models.Product(name="平板电脑", price=2999, description="轻便平板电脑")
]
db.add_all(test_products)
db.commit()
# 首页
@app.get("/", response_class=HTMLResponse)
def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
# 表单注入演示页面
@app.get("/form_injection", response_class=HTMLResponse)
def form_injection_page(request: Request):
return templates.TemplateResponse("form_injection.html", {"request": request})
@app.get("/union_injection/{product_id}")
def union_injection(product_id: str, db: Session = Depends(get_db)):
"""
1. 联合查询注入演示
不安全的查询方式 - 直接拼接用户输入
"""
query = f"SELECT id, name, price FROM products WHERE id = {product_id}"
logger.debug(f"exe SQL: {query}")
try:
result = db.execute(text(query))
# 将查询结果转换为字典列表
products = [dict(row) for row in result.mappings()]
return {"query": query, "results": products}
except Exception as e:
return {"error": str(e), "query": query}
@app.get("/boolean_blind/{input}")
def boolean_blind(input: str, db: Session = Depends(get_db)):
"""
2. 布尔盲注演示
"""
query = f"SELECT username, email FROM users WHERE username = '{input}' "
logger.debug(f"exe SQL: {query}")
try:
result = db.execute(text(query))
# 将查询结果转换为字典列表
products = [dict(row) for row in result.mappings()]
return {"query": query, "results": products}
except Exception as e:
return {"error": str(e), "query": query}
@app.get("/error_injection/")
def error_injection(input: str, db: Session = Depends(get_db)):
"""
3. 报错注入演示
* updatexml(目标xml, xpath路径,新值)这个函数本来是用于修改 XML 数据的。</br>
* extractvalue(xml片段, xpath表达式): 用于从 XML 文档中提取值。</br>
* concat():字符串拼接函数; </br>
* 0x5c:十六进制,表示字符 \
"""
query = f"SELECT * FROM products WHERE id = {input}"
logger.debug(f"exe SQL: {query}")
try:
result = db.execute(text(query))
# 将查询结果转换为字典列表
products = [dict(row) for row in result.mappings()]
return {"query": query, "results": products}
except Exception as e:
return {"error": str(e), "query": query}
@app.get("/time_blind/{input}")
def time_blind(input: str, db: Session = Depends(get_db)):
"""
4. 时间盲注演示
"""
query = f"SELECT * FROM products WHERE id = {input}"
logger.debug(f"exe SQL: {query}")
try:
result = db.execute(text(query))
# 将查询结果转换为字典列表
products = [dict(row) for row in result.mappings()]
return {"query": query, "results": products}
except Exception as e:
return {"error": str(e), "query": query}
@app.get("/wide_byte_injection/{input}")
def wide_byte_injection(input: str, db: Session = Depends(get_db)):
"""
5.模拟宽字节注入:从原始 URL 获取字节流
"""
# 模拟:如果 input 包含 %df%5c(URL编码),认为反斜杠被吃掉
if re.search(r'%df%5c', input, re.IGNORECASE):
# 模拟原始行为:程序想转义 ' → \'
# 但由于 %df%5c 吃掉了 \,最终变成:...'%df' OR 1=1#...
# 我们手动模拟这个“被吃掉”的过程
# 假设攻击载荷是:%df%5c%27 OR 1=1%23
# URL 解码后:%df\' OR 1=1#
# 转义后:%df\\\' OR 1=1# ← 但 \ 被吃掉,变成:%df\' OR 1=1#
# 所以我们手动构造最终 SQL
# 提取注入部分(简化:假设用户输入了 OR 1=1#)
injected_sql = input.replace('%df%5c%27', "'") # 模拟 \ 被吃掉,' 保留
injected_sql = urllib.parse.unquote(injected_sql) # 一次 URL 解码
# 构造最终 SQL(拼接)
query = f"SELECT * FROM users WHERE username = '{injected_sql}'"
# 实际可能是:SELECT * FROM users WHERE username = '' OR 1=1#'
else:
# 正常转义
escaped_input = input.replace("'", "\\'")
query = f"SELECT * FROM users WHERE username = '{escaped_input}'"
logger.debug(f"exe SQL: {query}")
try:
result = db.execute(text(query))
users = [dict(row) for row in result.mappings()]
return {
"query": query,
"input": input,
"results": users,
"injection_detected": "%df%5c" in input.lower()
}
except Exception as e:
return {
"error": str(e),
"query": query,
"input": input
}
@app.get("/stacked_queries/{input}")
def stacked_queries(input: str, db: Session = Depends(get_db)):
"""
演示堆叠查询注入
攻击者通过 ; 分隔,执行多条 SQL 语句
"""
# 模拟存在漏洞的拼接
query = f"SELECT * FROM products WHERE id = 1; {input}"
logger.debug(f"exe SQL: {query}")
try:
# 使用 raw connection 执行多语句
conn = engine.raw_connection()
cursor = conn.cursor()
# 执行多条语句
cursor.execute(query)
results = []
while True:
if cursor.description: # 有结果集
rows = cursor.fetchall()
results.append([dict(row) for row in rows])
# 移动到下一个结果集
if not cursor.nextset():
break
conn.close()
return {
"query": query,
"results": results,
"message": "Stacked queries executed"
}
except Exception as e:
return {
"error": str(e),
"query": query
}
# 7. 表单注入处理 (不安全)
@app.post("/form_injection_submit")
def form_injection_submit(username: str = Form(...), db: Session = Depends(get_db)):
# 不安全的查询方式
query = f"SELECT * FROM users WHERE username = '{username}'"
logger.debug(f"exe SQL: {query}")
try:
result = db.execute(text(query))
# 将查询结果转换为字典列表
users = [dict(row) for row in result.mappings()]
return {"query": query, "results": users}
except Exception as e:
return {"error": str(e), "query": query}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)