Skip to content

Arm backend: Add test pipeline for run_transform_for_annotation_pipeline #9488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 164 additions & 110 deletions backends/arm/test/ops/test_scalars.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

import unittest

from typing import Tuple

import common
import torch

from executorch.backends.arm.test import common
from executorch.backends.arm.test.tester.arm_tester import ArmTester
from parameterized import parameterized
from executorch.backends.arm.test.tester.test_pipeline import (
TosaPipelineBI,
TosaPipelineMI,
TransformAnnotationPassPipeline,
)

"""
Summary of non-working cases.
Expand All @@ -24,6 +29,7 @@
# MLETORCH-408
Sub or inplace-sub with an integer input.
"""
input_t1 = Tuple[torch.Tensor, torch.scalar_tensor] # Input x, Input y


class TestScalars(unittest.TestCase):
Expand Down Expand Up @@ -92,112 +98,160 @@ def forward(self, x):
x -= 10
return x

# Inplace ops end with '_' (from aten naming)
ops = [
("Add", Add()),
("Sub", Sub()),
("Mul", Mul()),
("Div", Div()),
("Add_", AddInplace()),
("Sub_", SubInplace()),
("Mul_", MulInplace()),
("Div_", DivInplace()),
("MulScalar", MulScalar()),
("DivScalar", DivScalar()),
("AddScalar", AddScalar()),
("SubScalar", SubScalar()),
]

const_ops = [("Add", AddConst())]

dtypes = [("int", 3), ("float", 3.0)]
sizes = [("r1", (1)), ("r4", (2, 4, 5, 3))]

# Create combinations of tests
tensor_scalar_tests = []
for op in ops:
for dtype in dtypes:
for size in sizes:
test_name = f"{op[0]}_{dtype[0]}_{size[0]}"
tensor = torch.rand(size[1])
scalar = dtype[1]
tensor_scalar_tests.append((test_name + "_ts", op[1], tensor, scalar))

# Don't add (scalar, tensor) test case for .Scalar ops.
if op[0][-6:] == "Scalar":
continue

tensor_scalar_tests.append((test_name + "_st", op[1], scalar, tensor))

tensor_const_tests = []
for op in const_ops:

# Inplace ops end with '_' (from aten naming)
ops = [
("Add", TestScalars.Add()),
("Sub", TestScalars.Sub()),
("Mul", TestScalars.Mul()),
("Div", TestScalars.Div()),
("Add_", TestScalars.AddInplace()),
("Sub_", TestScalars.SubInplace()),
("Mul_", TestScalars.MulInplace()),
("Div_", TestScalars.DivInplace()),
("MulScalar", TestScalars.MulScalar()),
("DivScalar", TestScalars.DivScalar()),
("AddScalar", TestScalars.AddScalar()),
("SubScalar", TestScalars.SubScalar()),
]

const_ops = [("Add", TestScalars.AddConst())]

dtypes = [("int", 3), ("float", 3.0)]
sizes = [("r1", (1)), ("r4", (2, 4, 5, 3))]

# Create combinations of tests
tensor_scalar_tests = {}
for op in ops:
for dtype in dtypes:
for size in sizes:
test_name = f"{op[0]}_{size[0]}"
test_name = f"{op[0]}_{dtype[0]}_{size[0]}"
tensor = torch.rand(size[1])
tensor_const_tests.append((test_name, op[1], tensor))

def _test_add_tosa_MI_pipeline(self, module: torch.nn.Module, test_data: tuple):
(
ArmTester(
module,
example_inputs=test_data,
compile_spec=common.get_tosa_compile_spec("TOSA-0.80+MI"),
)
.export()
.to_edge()
.partition()
.to_executorch()
.run_method_and_compare_outputs(inputs=test_data)
)

def _test_add_tosa_BI_pipeline(self, module: torch.nn.Module, test_data: tuple):
(
ArmTester(
module,
example_inputs=test_data,
compile_spec=common.get_tosa_compile_spec("TOSA-0.80+BI"),
)
.quantize()
.export()
.to_edge()
.partition()
.to_executorch()
.run_method_and_compare_outputs(inputs=test_data)
)

@parameterized.expand(tensor_scalar_tests)
def test_MI(self, test_name: str, op: torch.nn.Module, x, y):
expected_exception = None
if any(token in test_name for token in ("Sub_int", "Sub__int")):
expected_exception = AssertionError
if test_name.endswith("_st"):
expected_exception = AttributeError

if expected_exception:
with self.assertRaises(
expected_exception, msg=f"Test {test_name} is expected to fail."
):
self._test_add_tosa_MI_pipeline(op, (x, y))
return

self._test_add_tosa_MI_pipeline(op, (x, y))

# op(Scalar float, tensor) works if the scalar is constant.
@parameterized.expand(tensor_const_tests)
def test_MI_const(self, test_name: str, op: torch.nn.Module, x):
self._test_add_tosa_MI_pipeline(op, (x,))

@parameterized.expand(tensor_scalar_tests)
def test_BI(self, test_name: str, op: torch.nn.Module, x, y):
self._test_add_tosa_BI_pipeline(op, (x, y))

# op(Scalar float, tensor) works if the scalar is constant.
@parameterized.expand(tensor_const_tests)
def test_BI_const(self, test_name: str, op: torch.nn.Module, x):
self._test_add_tosa_BI_pipeline(op, (x,))

def test_shift_sub_inplace_tosa_MI(self):
self._test_add_tosa_MI_pipeline(self.ShiftInplaceSub(), (torch.IntTensor(5),))

def test_shift_sub_inplace_tosa_BI(self):
self._test_add_tosa_BI_pipeline(self.ShiftInplaceSub(), (torch.IntTensor(5),))
scalar = dtype[1]
tensor_scalar_tests[test_name + "_ts"] = (op[1], tensor, scalar)
# Don't add (scalar, tensor) test case for .Scalar ops.
if op[0][-6:] == "Scalar":
continue

tensor_scalar_tests[test_name + "_st"] = (op[1], scalar, tensor)

tensor_const_tests = {}
for op in const_ops:
for size in sizes:
test_name = f"{op[0]}_{size[0]}"
tensor = torch.rand(size[1])
tensor_const_tests[test_name] = (op[1], tensor)


def _test_add_tosa_MI_pipeline(module: torch.nn.Module, test_data: tuple):
pipeline = TosaPipelineMI[input_t1](module, test_data, aten_op=[], exir_op=[])
pipeline.run()


def _test_add_tosa_BI_pipeline(
module: torch.nn.Module, test_data: tuple, check_quant_nodes=True
):
pipeline = TosaPipelineBI[input_t1](module, test_data, aten_op=[], exir_op=[])
if not check_quant_nodes:
pipeline.pop_stage("check.quant_nodes")
pipeline.run()


fail_str = "MLETORCH-408: Arithmetic ops can't handle scalars first for MI"
MI_xfails = {
"Add_int_r1_st": fail_str,
"Add_int_r4_st": fail_str,
"Add_float_r1_st": fail_str,
"Add_float_r4_st": fail_str,
"Sub_int_r1_ts": fail_str,
"Sub_int_r1_st": fail_str,
"Sub_int_r4_ts": fail_str,
"Sub_int_r4_st": fail_str,
"Sub_float_r1_st": fail_str,
"Sub_float_r4_st": fail_str,
"Mul_int_r1_st": fail_str,
"Mul_int_r4_st": fail_str,
"Mul_float_r1_st": fail_str,
"Mul_float_r4_st": fail_str,
"Div_int_r1_st": fail_str,
"Div_int_r4_st": fail_str,
"Div_float_r1_st": fail_str,
"Div_float_r4_st": fail_str,
"Add__int_r1_st": fail_str,
"Add__float_r1_st": fail_str,
"Add__float_r4_st": fail_str,
"Add__int_r4_st": fail_str,
"Sub__int_r1_ts": fail_str,
"Sub__int_r1_st": fail_str,
"Sub__int_r4_ts": fail_str,
"Sub__int_r4_st": fail_str,
"Sub__float_r1_st": fail_str,
"Sub__float_r4_st": fail_str,
"Mul__int_r1_st": fail_str,
"Mul__int_r4_st": fail_str,
"Mul__float_r1_st": fail_str,
"Mul__float_r4_st": fail_str,
"Div__int_r1_st": fail_str,
"Div__int_r4_st": fail_str,
"Div__float_r1_st": fail_str,
"Div__float_r4_st": fail_str,
}


@common.parametrize("tensor_scalar_tests", tensor_scalar_tests, MI_xfails)
def test_MI(tensor_scalar_tests: list):
op, x, y = tensor_scalar_tests
_test_add_tosa_MI_pipeline(op, (x, y))


def _test_passes_tosa_BI_pipeline(module: torch.nn.Module, test_data: tuple):
pipeline = TransformAnnotationPassPipeline[input_t1](
module, test_data, tosa_version="TOSA-0.80+BI"
)
pipeline.run()


fail_str = "MLETORCH-770: Numerical issues on Div Scalar."
passes_xfails = {
"Div__int_r1_ts": fail_str,
"Div__int_r4_ts": fail_str,
"Div__float_r1_ts": fail_str,
"Div__float_r4_ts": fail_str,
}


@common.parametrize("tensor_scalar_tests", tensor_scalar_tests, passes_xfails)
def test_passes_BI(tensor_scalar_tests: list):
op, x, y = tensor_scalar_tests
_test_passes_tosa_BI_pipeline(op, (x, y))


# op(Scalar float, tensor) works if the scalar is constant.
@common.parametrize("tensor_const_tests", tensor_const_tests)
def test_MI_const(tensor_const_tests: list):
op, x = tensor_const_tests
_test_add_tosa_MI_pipeline(op, (x,))


@common.parametrize("tensor_scalar_tests", tensor_scalar_tests)
def test_BI(tensor_scalar_tests: list):
op, x, y = tensor_scalar_tests
_test_add_tosa_BI_pipeline(op, (x, y))


# op(Scalar float, tensor) works if the scalar is constant.
@common.parametrize("tensor_const_tests", tensor_const_tests)
def test_BI_const(tensor_const_tests: list):
op, x = tensor_const_tests
_test_add_tosa_BI_pipeline(op, (x,))


def test_shift_sub_inplace_tosa_MI():
_test_add_tosa_MI_pipeline(TestScalars.ShiftInplaceSub(), (torch.IntTensor(5),))


# Do not check for quant nodes in the graph for rshift.
def test_shift_sub_inplace_tosa_BI():
_test_add_tosa_BI_pipeline(
TestScalars.ShiftInplaceSub(), (torch.IntTensor(5),), check_quant_nodes=False
)
Loading
Loading