+"""Build conformance tests.
+- Specific input shapes (or tests) are specified and produced by using the
+  settings in the .json files.
+- Tests are selected to produce a good coverage.
+- Tests are run on the reference model to produce the correct output files.
+- Tests are converted into JSON format and saved to desired output directory.
+import argparse
+import json
+import logging
+import multiprocessing as mp
+import os
+import shlex
+import shutil
+import subprocess
+from functools import partial
+from itertools import tee
+from pathlib import Path
+from conformance.test_select import Operator
+from convert2conformance.convert2conformance import main as c2c_main
+from distutils.dir_util import copy_tree
+logger = logging.getLogger("tosa_verif_conformance_generator")
+# Configuration for each TOSA profile
+    "base": {
+        "operator_test_params": "tosa_base_profile_ops_info.json",
+        "framework_tests": "tosa_base_profile_framework_ops_info.json",
+        "exclude_types": ["float"],
+    }
+LOCATION_REF_MODEL_BINARY = Path("build/reference_model/tosa_reference_model")
+class GenConformanceError(Exception):
+    """Generation error reporting exception."""
+    pass
+def _run_sh_command(args, cwd, full_cmd):
+    """Run an external command and capture stdout/stderr."""
+    # Quote the command line for printing
+    full_cmd_esc = [shlex.quote(x) for x in full_cmd]
+    if args.capture_output:
+        logger.debug(f"Command: {full_cmd_esc}")
+    rc =
+        full_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd
+    )
+    if args.capture_output:
+        stdout = rc.stdout.decode("utf-8")
+        logger.debug(f"stdout: \n{stdout}")
+    if rc.returncode != 0:
+        raise Exception(
+            "Error running command: {}.\n{}".format(
+                " ".join(full_cmd_esc), rc.stderr.decode("utf-8")
+            )
+        )
+    return (rc.stdout, rc.stderr)
+def build_op_tests(args, operator, test_params):
+    """Build tests for a given operator.
+    Builds a set of tests based on the parameters defined in test_params
+    Returns operator output directory
+    """
+    assert operator in test_params
+    build_tests_cmd = "tosa_verif_build_tests"
+    op_build_dir = args.build_dir
+    ref_cmd_base = [
+        build_tests_cmd,
+        "--filter",
+        operator,
+        "-o",
+        str(op_build_dir),
+        "--seed",
+        "42",
+    ]
+    ref_cmds = []
+    if args.test_type in ["positive", "both"]:
+        # Append extra parameters and run test generator for each set of parameters.
+        for arglist in test_params[operator]["generator_args"]:
+            ref_cmd_pos_test = ref_cmd_base.copy()
+            ref_cmd_pos_test.extend(arglist)
+            ref_cmds.append(ref_cmd_pos_test)
+    if args.test_type in ["negative", "both"]:
+        ref_cmd_neg_test = ref_cmd_base.copy()
+        ref_cmd_neg_test.extend(["--test-type", "negative"])
+        ref_cmds.append(ref_cmd_neg_test)
+    logger.debug(f"Creating {operator} tests with {len(ref_cmds)} parameter(s)")
+    error = False
+    for i, cmd in enumerate(ref_cmds):
+        try:
+            _run_sh_command(args, args.ref_model_dir.absolute(), cmd)
+                f"{operator} test batch {(i+1)}/{len(ref_cmds)} created successfully"
+            )
+        except Exception as e:
+            logger.error(
+                f"{operator} test batch {(i+1)}/{len(ref_cmds)} unsuccessful, skipping"
+            )
+            logger.error(f" build_op_tests error: {e} ")
+            error = True
+    if error:
+        raise (GenConformanceError())
+    return op_build_dir
+def _check_to_include_test(profile, test_name, exclude_negative_tests=False):
+    """Check test name for exclusions, return False to indicate excluded."""
+    excludes = ["ERRORIF"] if exclude_negative_tests else []
+    excludes.extend(PROFILE_OPS_INFO[profile]["exclude_types"])
+    for exclusion in excludes:
+        if f"_{exclusion}_" in test_name:
+            return False
+    return True
+def _get_all_tests_list(
+    profile, test_root_dir, operator, exclude_negative_tests=False, include_all=False
+    """Create test list based on tests in the test_dir."""
+    test_dir = test_root_dir / operator
+    if not test_dir.is_dir():
+        # Tests are split into multiple dirs, for example: conv2d_1x1, conv2d_3x3
+        test_dir = test_root_dir
+        directories = [
+            tdir for tdir in test_dir.glob("*") if
+        ]
+    else:
+        directories = [test_dir]
+    tests = []
+    for tdir in directories:
+        tests.extend(
+            [
+                test
+                for test in tdir.glob("*")
+                if include_all
+                or _check_to_include_test(profile,, exclude_negative_tests)
+            ]
+        )
+    return tests
+def generate_results(args, operator, op_build_dir, tests=None):
+    """Run tests on reference model and save result to the test directory."""
+    num_cores = args.num_cores
+    run_tests_cmd = "tosa_verif_run_tests"
+    ref_model_path = args.ref_model_dir / LOCATION_REF_MODEL_BINARY
+    ref_cmd_base = ref_cmd = [
+        run_tests_cmd,
+        "--ref-model-path",
+        str(ref_model_path.absolute()),
+        "-j",
+        str(num_cores),
+        "-v",
+        "-t",
+    ]
+    ref_cmds = []
+    if not tests:
+        # Do not need to run ERRORIF tests as they don't have result files
+        tests = _get_all_tests_list(
+            args.profile, op_build_dir, operator, exclude_negative_tests=True
+        )
+    for test in tests:
+        ref_cmd = ref_cmd_base.copy()
+        ref_cmd.append(str(test))
+        ref_cmds.append(ref_cmd)
+    fail_string = "UNEXPECTED_FAILURE"
+    failed_counter = 0
+    job_pool = mp.Pool(args.num_cores)
+    sh_partial = partial(_run_sh_command, args, args.ref_model_dir.absolute())
+    pool_results =, ref_cmds)
+    job_pool.close()
+    job_pool.join()
+    # Use captured output for run_sh_command to work out if test passed.
+    for i, rc in enumerate(pool_results):
+        if fail_string in str(rc[0]):
+            logger.error(f"Test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} failed.")
+            failed_counter += 1
+        else:
+  "Test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} passed.")
+"{len(ref_cmds)-failed_counter}/{len(ref_cmds)} tests passed")
+"Ran tests on model and saved results of passing tests")
+def convert_tests(
+    args,
+    operator,
+    op_build_dir,
+    output_dir,
+    tests=None,
+    group=None,
+    trim_op_subdir=False,
+    """Convert tests to JSON and save to output directory."""
+    ref_model_dir = args.ref_model_dir
+    if group:
+        output_dir = output_dir / group
+    ref_cmd_base = ["--ref-model-directory", str(ref_model_dir)]
+    if args.framework_schema:
+        ref_cmd_base.extend(["--framework-schema", str(args.framework_schema)])
+    ref_cmd_base.append("--output-directory")
+    ref_cmds = []
+    if not tests:
+        tests = _get_all_tests_list(args.profile, op_build_dir, operator)
+"Converting all {args.profile} profile tests")
+    # Controls if we copy the tests in their operator sub-directory or not
+    output_dir_relative_pos = -1 if trim_op_subdir else -2
+    for test in tests:
+"Test chosen: {test}")
+        ref_cmd = ref_cmd_base.copy()
+        full_output_directory = output_dir / test.relative_to(
+            *[:output_dir_relative_pos]
+        )
+        ref_cmd.append(str(full_output_directory))
+        ref_cmd.append(str(test))
+        ref_cmds.append(ref_cmd)
+    if len(ref_cmds) == 0:
+        logger.warning("No tests found. Nothing to convert")
+        return
+    job_pool = mp.Pool(args.num_cores)
+    pool_results =, ref_cmds)
+    job_pool.close()
+    job_pool.join()
+    failed_counter = 0
+    for i, result in enumerate(pool_results):
+        if result != 0:
+            logger.error(
+                f"test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} failed to convert."
+            )
+            failed_counter += 1
+        else:
+  "test {i+1}/{len(ref_cmds)}: {ref_cmds[i][-1]} converted")
+        f"{len(ref_cmds)-failed_counter}/{len(ref_cmds)} tests successfully converted"
+    )
+    if failed_counter > 0:
+        logger.error(f"Stopping due to {failed_counter} test conversion errors")
+        raise (GenConformanceError())
+"Converted tests to JSON and saved to output directory")
+    return output_dir
+def get_op_tests_selection(args, operator, op_build_dir, test_params, negative=False):
+    """Use test picker to get subsection of tests generated."""
+    assert operator in test_params
+    try:
+        op_params = test_params[operator]
+        op = Operator.registry[operator](
+            op_build_dir,
+            op_params,
+            negative,
+            exclude_types=PROFILE_OPS_INFO[args.profile]["exclude_types"],
+        )
+    except KeyError:
+        logger.error(f"{operator} operator is not supported by test_select")
+        raise (GenConformanceError())
+    return op.select_tests()
+def check_op_tests(args, operator, output_dir):
+    """Move test folders than contain files larger than 30MB to new directory."""
+    destination_dir = str(args.output_dir) + "_large_files"
+    tests = _get_all_tests_list(args.profile, output_dir, operator, include_all=True)
+    if not tests:
+        logger.error(
+            f"Couldn't find any tests to size check for {operator} in {output_dir}"
+        )
+        raise (GenConformanceError())
+    for tdir in tests:
+        move_dir = False
+        test_files = [file for file in tdir.glob("*")]
+        for file in test_files:
+            file_size = os.stat(file).st_size / 1024**2
+            if file_size > 30:
+                move_dir = True
+        if move_dir:
+            move_destination = destination_dir / tdir.relative_to(output_dir)
+            logger.warning(
+                f"{tdir.relative_to(output_dir)} contains files that are too large (>30MB), test moved to new folder: {destination_dir}"
+            )
+            if move_destination.is_dir():
+                logger.warning(
+                    f"{move_destination} directory already exists, deleting existing."
+                )
+                shutil.rmtree(str(move_destination))
+            shutil.move(str(tdir), move_destination)
+def copy_rename_framework_tests(args, operator, test_picks):
+    """Copy framework tests into new folder and rename them if needed.
+    The tests are renamed to match the framework operator names if an
+    alternate name has been used instead.
+    """
+    framework_tests_dir = args.framework_tests_dir
+    new_tests_dir = args.build_dir / "frameworks" / operator
+    os.makedirs(new_tests_dir, exist_ok=True)
+    # Get the framework tests operator name
+    if "alternate_names" in test_picks[operator]:
+        alternate_names = test_picks[operator]["alternate_names"]
+    else:
+        alternate_names = [operator]
+    # Get the alternate named test directories for the operator
+    for alt_name in alternate_names:
+        test_prefix = f"test_{alt_name}"
+        test_dirs = list(framework_tests_dir.glob(f"{test_prefix}_*"))
+        # Copy tests to new directory and rename to match framework operator names
+        # - if there is just 1 alternate name, replace the full test prefix
+        #       test_add_... -> add_...
+        # - if there are multiple alternate names, just replace the "test"
+        #       test_concatv2_... -> concatenation_concatv2_...
+        old_prefix = test_prefix if len(alternate_names) == 1 else "test"
+        for tdir in test_dirs:
+            new_test_name =, operator)
+            copy_destination = new_tests_dir / new_test_name
+            logger.debug(f"copying test folder {tdir} to {copy_destination}")
+            copy_tree(str(tdir), str(copy_destination))
+"Copied and renamed {len(test_dirs)} framework test folders")
+    return new_tests_dir.parent
+def get_framework_tests_selection(args, operator, test_picks, op_build_dir):
+    """Get the list of pre-chosen tests with relative paths."""
+    try:
+        tests = test_picks[operator]["tests"]
+    except KeyError:
+        logger.error(f"Framework test selection not defined for {operator} operator")
+        raise (GenConformanceError())
+    test_paths = [op_build_dir / operator / test for test in tests]
+    return test_paths
+def parse_args(argv=None):
+    """Parse the arguments."""
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--operators",
+        type=str,
+        nargs="*",
+        help="The operator(s) to create tests for, if not supplied all tests will be created",
+    )
+    parser.add_argument(
+        "--ref-model-directory",
+        dest="ref_model_dir",
+        type=Path,
+        required=True,
+        help="Reference Model directory (must be pre-built)",
+    )
+    script_dir = Path(__file__).parent.absolute()
+    parser.add_argument(
+        "--test-param-json-directory",
+        dest="param_json_dir",
+        type=Path,
+        default=script_dir,
+        help="Test parameters (ops info) JSON file directory",
+    )
+    parser.add_argument(
+        "--convert-all-tests",
+        action="store_true",
+        help="Converts all tests instead of those picked by test_select",
+    )
+    parser.add_argument(
+        "--keep-large-files",
+        action="store_true",
+        help="Keeps tests that contain files larger than 30MB in output directory",
+    )
+    parser.add_argument(
+        "--capture-output",
+        action="store_true",
+        help="Prints output of running sh commands",
+    )
+    parser.add_argument(
+        "--build-directory",
+        dest="build_dir",
+        type=Path,
+        default=Path.cwd() / "conformance_build",
+        help="Temporary build directory for files created during this process (default is conformance_build)",
+    )
+    parser.add_argument(
+        "--output-directory",
+        dest="output_dir",
+        type=Path,
+        default=Path.cwd() / "conformance",
+        help="Output directory (default is conformance)",
+    )
+    parser.add_argument(
+        "-j",
+        dest="num_cores",
+        type=int,
+        default=6,
+        help="Number of simultaneous jobs to split the tasks into for multiprocessing",
+    )
+    parser.add_argument(
+        "-v",
+        dest="verbosity",
+        action="count",
+        default=0,
+        help="Verbosity (can be used multiple times for more details)",
+    )
+    parser.add_argument(
+        "--unit-tests",
+        dest="unit_tests",
+        choices=["operator", "framework", "both"],
+        default="operator",
+        type=str,
+        help="Which unit tests are produced: operator, framework, or both",
+    )
+    parser.add_argument(
+        "--test-type",
+        dest="test_type",
+        choices=["positive", "negative", "both"],
+        default="both",
+        type=str,
+        help="Type of tests produced: positive, negative, or both",
+    )
+    profiles = list(PROFILE_OPS_INFO.keys())
+    parser.add_argument(
+        "--profile",
+        dest="profile",
+        choices=profiles,
+        default=profiles[0],
+        type=str,
+        help="TOSA profile",
+    )
+    parser.add_argument(
+        "--framework-tests-directory",
+        dest="framework_tests_dir",
+        type=Path,
+        default=Path.cwd() / "tests",
+        help="The pre-built framework tests directory (default is tests)",
+    )
+    parser.add_argument(
+        "--framework-schema",
+        dest="framework_schema",
+        type=Path,
+        help="Framework flatbuffers schema needed to convert framework models",
+    )
+    args = parser.parse_args(argv)
+    return args
+def main():
+    args = parse_args()
+    if not args.ref_model_dir.is_dir():
+        logger.error(
+            f"Missing or invalid reference model directory: {args.ref_model_dir}"
+        )
+        return 2
+    else:
+        ref_model = args.ref_model_dir / LOCATION_REF_MODEL_BINARY
+        if not ref_model.is_file():
+            logger.error(
+                f"{LOCATION_REF_MODEL_BINARY} not found in {args.ref_model_dir}\nHave you built the reference model?"
+            )
+            return 2
+    if args.unit_tests in ["framework", "both"]:
+        if not args.framework_schema:
+            logger.error(
+                "Need to supply location of Framework flatbuffers schema via --framework-schema"
+            )
+            return 2
+        if not args.framework_tests_dir.is_dir():
+            logger.error(
+                f"Missing or invalid framework tests directory: {args.framework_tests_dir}"
+            )
+            return 2
+    loglevels = (logging.WARNING, logging.INFO, logging.DEBUG)
+    loglevel = loglevels[min(args.verbosity, len(loglevels) - 1)]
+    logger.setLevel(loglevel)
+    # Set other loggers the same
+    logging.getLogger("test_select").setLevel(loglevel)
+    logging.getLogger("convert2conformance").setLevel(loglevel)
+    print(f"Creating conformance tests for TOSA {args.profile} profile")
+    print(f"Output directory: {args.output_dir}")
+    args.build_dir = args.build_dir.resolve()
+    logger.debug(f"Creating build directory: {args.build_dir}")
+    args.build_dir.mkdir(parents=True, exist_ok=True)
+    try:
+        # Framework unit tests
+        if args.unit_tests in ["framework", "both"]:
+            logger.debug("Creating FRAMEWORK unit tests")
+            test_picks_file = (
+                args.param_json_dir / PROFILE_OPS_INFO[args.profile]["framework_tests"]
+            )
+            try:
+                with open(test_picks_file, "r") as fd:
+                    test_picks = json.load(fd)
+            except Exception as e:
+                logger.error(
+                    f"Couldn't load framework tests info - {test_picks_file}: {e}"
+                )
+                return 1
+            operators = args.operators
+            if not operators:
+                # Create tests for all the operators
+                operators = list(test_picks.keys())
+            root_output_dir = args.output_dir / "frameworks" / "tflite" / "operators"
+            for op in operators:
+                if op not in test_picks:
+                    logger.warning(
+                        f"Framework op {op} not found in {test_picks_file} - skipping"
+                    )
+                    continue
+                logger.debug(f"Copying and renaming {op}")
+                framework_test_dir = copy_rename_framework_tests(args, op, test_picks)
+                if args.convert_all_tests:
+                    logger.debug("Running and converting all framework tests")
+                    convert_tests(
+                        args,
+                        op,
+                        framework_test_dir,
+                        root_output_dir,
+                        trim_op_subdir=True,
+                    )
+                else:
+                    framework_tests = get_framework_tests_selection(
+                        args, op, test_picks, framework_test_dir
+                    )
+                    convert_tests(
+                        args,
+                        op,
+                        framework_test_dir,
+                        root_output_dir,
+                        tests=framework_tests,
+                        trim_op_subdir=True,
+                    )
+        # Operator unit tests
+        if args.unit_tests in ["operator", "both"]:
+            logger.debug("Creating OPERATOR unit tests")
+            test_params_file = (
+                args.param_json_dir
+                / PROFILE_OPS_INFO[args.profile]["operator_test_params"]
+            )
+            try:
+                with open(test_params_file, "r") as fd:
+                    test_params = json.load(fd)
+            except Exception as e:
+                logger.error(
+                    f"Couldn't load operator test params - {test_params_file}: {e}"
+                )
+                return 1
+            operators = args.operators
+            if not operators:
+                # Create tests for all the operators
+                operators = list(test_params.keys())
+            for op in operators:
+                if op not in test_params:
+                    logger.warning(
+                        f"{op} operator parameters not found in {test_params_file} - skipping"
+                    )
+                    continue
+                if (
+                    args.test_type == "negative"
+                    and "no_negative_tests" in test_params[op]
+                    and test_params[op]["no_negative_tests"]
+                ):
+                    logger.warning(f"No negative tests for {op}")
+                    continue
+                op_build_dir = build_op_tests(args, op, test_params)
+                operator_group = test_params[op]["group"]
+                root_output_dir = args.output_dir / "operators"
+                if args.convert_all_tests:
+                    logger.debug(f"Running and converting all {op} tests")
+                    generate_results(args, op, op_build_dir)
+                    output_dir = convert_tests(
+                        args, op, op_build_dir, root_output_dir, group=operator_group
+                    )
+                else:
+                    if args.test_type in ["positive", "both"]:
+                        tests_gen1, tests_gen2 = tee(
+                            get_op_tests_selection(args, op, op_build_dir, test_params)
+                        )
+                        generate_results(args, op, op_build_dir, tests_gen1)
+                        output_dir = convert_tests(
+                            args,
+                            op,
+                            op_build_dir,
+                            root_output_dir,
+                            tests=tests_gen2,
+                            group=operator_group,
+                        )
+                    if args.test_type in ["negative", "both"] and (
+                        "no_negative_tests" not in test_params[op]
+                        or not test_params[op]["no_negative_tests"]
+                    ):
+                        negative_tests = get_op_tests_selection(
+                            args, op, op_build_dir, test_params, negative=True
+                        )
+                        output_dir = convert_tests(
+                            args,
+                            op,
+                            op_build_dir,
+                            root_output_dir,
+                            tests=negative_tests,
+                            group=operator_group,
+                        )
+                if not args.keep_large_files:
+                    check_op_tests(args, op, output_dir)
+    except GenConformanceError:
+        return 1
+    return 0
+if __name__ == "__main__":
+    exit(main())