Skip to content

Python: Decompression Bombs #13557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
748e96d
V1 Bombs
am0o0 Jun 18, 2023
a38405e
fix formatting error/warnings
am0o0 Jun 26, 2023
b506b7d
better documents, remove separate PyZipFile
am0o0 Jun 26, 2023
8fccd65
fix a mistake :(
am0o0 Jun 26, 2023
7aa002f
fix an accident :)
am0o0 Jun 29, 2023
bcfc28a
add sources to detect CVE completely
am0o0 Sep 6, 2023
6ee5865
add sources to detect CVE completely
am0o0 Sep 7, 2023
3175db2
upgrade fastAPI remote sources
am0o0 Oct 9, 2023
1318afd
modularize
am0o0 Oct 9, 2023
9d86e79
move library file to experimental lib directory
am0o0 Oct 9, 2023
4283bb7
clean up unused vars,fix tests
am0o0 Oct 9, 2023
2d0067d
fix some qldocs, change Sink extenstion model, deduct some not necess…
am0o0 Dec 7, 2023
6ebdae3
Merge branch 'main' into amammad-python-bombs
am0o0 Dec 7, 2023
5795c72
added inline tests
am0o0 Dec 7, 2023
9399258
Merge branch 'main' into amammad-python-bombs
RasmusWL Feb 14, 2024
ad39b8c
Python: Accept .expected changes
RasmusWL Feb 14, 2024
e7772f1
Python: Use `Unit` class
RasmusWL Feb 14, 2024
d8fd457
Python: Use helper predicate
RasmusWL Feb 14, 2024
9ae3ea8
Python: Remove spurious results in stdlib
RasmusWL Feb 14, 2024
ba7dd38
Python: Delete duplicated file
RasmusWL Feb 14, 2024
69c8ef9
Python: Use dataflow instead of taint-tracking
RasmusWL Feb 14, 2024
e5bd633
Python: Change name/id to `Decompression Bomb`
RasmusWL Feb 14, 2024
cd596f5
Python: Reformat test-file
RasmusWL Feb 14, 2024
5901478
Python: Fix DataflowQueryTest
RasmusWL Feb 14, 2024
eb401a2
Python: Fix test exclusion for stdlib Python 3.12
RasmusWL Feb 14, 2024
09d8a75
Fix QLDoc issues
am0o0 Feb 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<!DOCTYPE qhelp PUBLIC
"-//Semmle//qhelp//EN"
"qhelp.dtd">
<qhelp>
<overview>
<p>Extracting Compressed files with any compression algorithm like gzip can cause to denial of service attacks.</p>
<p>Attackers can compress a huge file which created by repeated similiar byte and convert it to a small compressed file.</p>

</overview>
<recommendation>

<p>When you want to decompress a user-provided compressed file you must be careful about the decompression ratio or read these files within a loop byte by byte to be able to manage the decompressed size in each cycle of the loop.</p>

</recommendation>
<example>
<p>python ZipFile library is vulnerable by default</p>
<sample src="example_bad.py" />

<p>By checking the decompressed size from input zipped file you can check the decompression ratio. attackers can forge this decompressed size header too.
So can't rely on file_size attribute of ZipInfo class. this is recommended to use "ZipFile.open" method to be able to manage decompressed size.</p>
<p>Reading decompressed file byte by byte and verifying the total current size in each loop cycle in recommended to use in any decompression library.</p>
<sample src="example_good.py" />
</example>
<references>

<li>
<a href="https://nvd.nist.gov/vuln/detail/CVE-2023-22898">CVE-2023-22898</a>
</li>
<li>
<a href="https://www.bamsoftware.com/hacks/zipbomb/">A great research to gain more impact by this kind of attack</a>
</li>

</references>
</qhelp>
123 changes: 123 additions & 0 deletions python/ql/src/experimental/Security/CWE-409/DecompressionBombs.ql
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* @name Uncontrolled file decompression
* @description Uncontrolled data that flows into decompression library APIs without checking the compression rate is dangerous
* @kind path-problem
* @problem.severity error
* @security-severity 7.8
* @precision high
* @id py/uncontrolled-file-decompression
* @tags security
* experimental
* external/cwe/cwe-409
*/

import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.dataflow.new.internal.DataFlowPublic
import experimental.semmle.python.security.DecompressionBomb

/**
* `io.TextIOWrapper(ip, encoding='utf-8')` like following:
* ```python
* with gzip.open(bomb_input, 'rb') as ip:
* with io.TextIOWrapper(ip, encoding='utf-8') as decoder:
* content = decoder.read()
* print(content)
* ```
* I saw this builtin method many places so I added it as a AdditionalTaintStep.
* it would be nice if it is added as a global AdditionalTaintStep
*/
predicate isAdditionalTaintStepTextIOWrapper(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
exists(API::CallNode textIOWrapper |
textIOWrapper = API::moduleImport("io").getMember("TextIOWrapper").getACall()
|
nodeFrom = textIOWrapper.getParameter(0, "input").asSink() and
nodeTo = textIOWrapper
)
}

module FileAndFormRemoteFlowSource {
class FastAPI extends RemoteFlowSource::Range {
FastAPI() {
exists(API::Node fastApiParam, Expr fastApiUploadFile |
fastApiParam =
API::moduleImport("fastapi")
.getMember("FastAPI")
.getReturn()
.getMember("post")
.getReturn()
.getParameter(0)
.getKeywordParameter(_) and
fastApiUploadFile =
API::moduleImport("fastapi")
.getMember("UploadFile")
.getASubclass*()
.getAValueReachableFromSource()
.asExpr()
|
fastApiUploadFile =
fastApiParam.asSource().asExpr().(Parameter).getAnnotation().getASubExpression*() and
// Multiple Uploaded files as list of fastapi.UploadFile
exists(For f, Attribute attr |
fastApiParam.getAValueReachableFromSource().asExpr() = f.getIter().getASubExpression*()
|
TaintTracking::localExprTaint(f.getIter(), attr.getObject()) and
attr.getName() = ["filename", "content_type", "headers", "file", "read"] and
this.asExpr() = attr
)
or
// one Uploaded file as fastapi.UploadFile
this =
[
fastApiParam.getMember(["filename", "content_type", "headers"]).asSource(),
fastApiParam
.getMember("file")
.getMember(["readlines", "readline", "read"])
.getReturn()
.asSource(), fastApiParam.getMember("read").getReturn().asSource()
]
)
}

override string getSourceType() { result = "fastapi HTTP FORM files" }
}
}

module BombsConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
(
source instanceof RemoteFlowSource
or
source instanceof FileAndFormRemoteFlowSource::FastAPI
) and
not source.getLocation().getFile().inStdlib() and
not source.getLocation().getFile().getRelativePath().matches("%venv%")
}

predicate isSink(DataFlow::Node sink) {
sink instanceof DecompressionBomb::Sink and
not sink.getLocation().getFile().inStdlib() and
not sink.getLocation().getFile().getRelativePath().matches("%venv%")
}

predicate isAdditionalFlowStep(DataFlow::Node pred, DataFlow::Node succ) {
(
any(DecompressionBomb::AdditionalTaintStep a).isAdditionalTaintStep(pred, succ) or
isAdditionalTaintStepTextIOWrapper(pred, succ)
) and
not succ.getLocation().getFile().inStdlib() and
not succ.getLocation().getFile().getRelativePath().matches("%venv%")
}
}

module Bombs = TaintTracking::Global<BombsConfig>;

import Bombs::PathGraph

from Bombs::PathNode source, Bombs::PathNode sink
where Bombs::flowPath(source, sink)
select sink.getNode(), source, sink, "This uncontrolled file extraction is $@.", source.getNode(),
"depends on this user controlled data"
5 changes: 5 additions & 0 deletions python/ql/src/experimental/Security/CWE-409/example_bad.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import zipfile


def Bad(zip_path):
zipfile.ZipFile(zip_path, "r").extractall()
21 changes: 21 additions & 0 deletions python/ql/src/experimental/Security/CWE-409/example_good.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import zipfile


def safeUnzip(zipFileName):
buffer_size = 2 ** 10 * 8
total_size = 0
SIZE_THRESHOLD = 2 ** 10 * 100
with zipfile.ZipFile(zipFileName) as myzip:
for fileinfo in myzip.infolist():
content = b''
with myzip.open(fileinfo.filename, mode="r") as myfile:
chunk = myfile.read(buffer_size)
while chunk:
chunk = myfile.read(buffer_size)
total_size += buffer_size
if total_size > SIZE_THRESHOLD:
print("Bomb detected")
return
content += chunk
print(bytes.decode(content, 'utf-8'))
return "No Bombs!"
Loading