Open
Description
For example:
import json
def my_dumps(obj):
return json.dumps(obj)
def my_loads(s):
return json.loads(s)
with open('test.json', 'r') as json_file:
json_obj = json.load(json_file)
json_obj = my_loads(my_dumps(json_obj))
json_obj2 = my_loads(my_dumps([1,2,3]))
with open('out.json', 'w') as out:
json.dump(json_obj, out)
with open('out2.json', 'w') as out:
json.dump(json_obj2, out)
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
module TConfig implements DataFlow::ConfigSig{
predicate isSource(DataFlow::Node source) {
source = API::builtin("open").getReturn().asSource()
}
predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
}
}
module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph
from TFlow::PathNode source, TFlow::PathNode sink
where
TFlow::flowPath(source, sink)
select
source.getNode(), source, sink, "root"
This works as expected. json_file
only have a flow to json.dump(json_obj, out)
.
But when it comes to
import json
def my_dumps(obj):
return json.dumps(obj)
def my_loads(s):
return json.loads(s)
def mock_rpc_call(func_name, arg):
return globals()[func_name](arg)
with open('test.json', 'r') as json_file:
json_obj = json.load(json_file)
json_obj = mock_rpc_call('my_loads', mock_rpc_call('my_dumps', json_obj))
json_obj2 = mock_rpc_call('my_loads', mock_rpc_call('my_dumps', [1,2,3]))
with open('out.json', 'w') as out:
json.dump(json_obj, out)
with open('out2.json', 'w') as out:
json.dump(json_obj2, out)
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
module TConfig implements DataFlow::ConfigSig{
predicate isSource(DataFlow::Node source) {
source = API::builtin("open").getReturn().asSource()
}
predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
nodeFrom.getLocation().getFile() = nodeTo.getLocation().getFile() and (
// Handle call
exists(DataFlow::CallCfgNode rpc_call, FunctionObject rpc_func, FunctionObject called_func|
rpc_func.getName() = "mock_rpc_call" and
rpc_call.getArg(1) = nodeFrom and
rpc_call.getFunction().asCfgNode() = rpc_func.theCallable().getAReference() and
called_func.getName() = rpc_call.getArg(0).asExpr().(StringLiteral).getS() and
called_func.getFunction().getArg(0) = nodeTo.asExpr()
) or
// Handle return
exists(Return ret , DataFlow::CallCfgNode rpc_call, FunctionObject rpc_func|
ret.getValue() = nodeFrom.asExpr() and
ret.getScope().(Function).getFunctionObject().getName() = rpc_call.getArg(0).asExpr().(StringLiteral).getS() and
rpc_call.getFunction().asCfgNode() = rpc_func.theCallable().getAReference() and
rpc_call = nodeTo
)
// How can I match the call and return to preserve context sensitive?
)
}
predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
}
}
module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph
from TFlow::PathNode source, TFlow::PathNode sink
where
TFlow::flowPath(source, sink)
select
source.getNode(), source, sink, "root"
It will return 4 flow, and 3 of them is context insensitive. json_file
will have flows to json.dump(json_obj, out)
and json.dump(json_obj2, out)
.