Skip to content

How to write a cross-function isAdditionalFlowStep while preserving context sensitive dataflow. #19308

Open
@hksdpc255

Description

@hksdpc255

For example:

import json

def my_dumps(obj):
    return json.dumps(obj)
def my_loads(s):
    return json.loads(s)

with open('test.json', 'r') as json_file:
    json_obj = json.load(json_file)

json_obj = my_loads(my_dumps(json_obj))
json_obj2 = my_loads(my_dumps([1,2,3]))

with open('out.json', 'w') as out:
    json.dump(json_obj, out)

with open('out2.json', 'w') as out:
    json.dump(json_obj2, out)
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module TConfig implements DataFlow::ConfigSig{
    predicate isSource(DataFlow::Node source) {
        source = API::builtin("open").getReturn().asSource()
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
    }
}

module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph

from TFlow::PathNode source, TFlow::PathNode sink
where
    TFlow::flowPath(source, sink)
select
    source.getNode(), source, sink, "root"

This works as expected. json_file only have a flow to json.dump(json_obj, out).

But when it comes to

import json

def my_dumps(obj):
    return json.dumps(obj)
def my_loads(s):
    return json.loads(s)

def mock_rpc_call(func_name, arg):
    return globals()[func_name](arg)

with open('test.json', 'r') as json_file:
    json_obj = json.load(json_file)

json_obj = mock_rpc_call('my_loads', mock_rpc_call('my_dumps', json_obj))
json_obj2 = mock_rpc_call('my_loads', mock_rpc_call('my_dumps', [1,2,3]))

with open('out.json', 'w') as out:
    json.dump(json_obj, out)

with open('out2.json', 'w') as out:
    json.dump(json_obj2, out)
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module TConfig implements DataFlow::ConfigSig{
    predicate isSource(DataFlow::Node source) {
        source = API::builtin("open").getReturn().asSource()
    }
    predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
        nodeFrom.getLocation().getFile() = nodeTo.getLocation().getFile() and (
            // Handle call
            exists(DataFlow::CallCfgNode rpc_call, FunctionObject rpc_func, FunctionObject called_func|
                rpc_func.getName() = "mock_rpc_call" and
                rpc_call.getArg(1) = nodeFrom and
                rpc_call.getFunction().asCfgNode() = rpc_func.theCallable().getAReference() and
                called_func.getName() = rpc_call.getArg(0).asExpr().(StringLiteral).getS() and
                called_func.getFunction().getArg(0) = nodeTo.asExpr()
            ) or
            // Handle return
            exists(Return ret , DataFlow::CallCfgNode rpc_call, FunctionObject rpc_func|
                ret.getValue() = nodeFrom.asExpr() and
                ret.getScope().(Function).getFunctionObject().getName() = rpc_call.getArg(0).asExpr().(StringLiteral).getS() and
                rpc_call.getFunction().asCfgNode() = rpc_func.theCallable().getAReference() and
                rpc_call = nodeTo
            )
            // How can I match the call and return to preserve context sensitive?
        )
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
    }
}

module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph

from TFlow::PathNode source, TFlow::PathNode sink
where
    TFlow::flowPath(source, sink)
select
    source.getNode(), source, sink, "root"

It will return 4 flow, and 3 of them is context insensitive. json_file will have flows to json.dump(json_obj, out) and json.dump(json_obj2, out).

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions