Richard Burton | dc0c6ed | 2020-04-08 16:39:05 +0100 | [diff] [blame] | 1 | # Copyright © 2020 Arm Ltd. All rights reserved. |
| 2 | # SPDX-License-Identifier: MIT |
| 3 | import os |
| 4 | |
| 5 | import pytest |
| 6 | import pyarmnn as ann |
| 7 | import numpy as np |
| 8 | |
| 9 | |
| 10 | @pytest.fixture() |
| 11 | def parser(shared_data_folder): |
| 12 | """ |
| 13 | Parse and setup the test network to be used for the tests below |
| 14 | """ |
| 15 | |
| 16 | # create tf parser |
| 17 | parser = ann.ITfParser() |
| 18 | |
| 19 | # path to model |
| 20 | path_to_model = os.path.join(shared_data_folder, 'mock_model.pb') |
| 21 | |
| 22 | # tensor shape [1, 28, 28, 1] |
| 23 | tensorshape = {'input': ann.TensorShape((1, 28, 28, 1))} |
| 24 | |
| 25 | # requested_outputs |
| 26 | requested_outputs = ["output"] |
| 27 | |
| 28 | # parse tf binary & create network |
| 29 | parser.CreateNetworkFromBinaryFile(path_to_model, tensorshape, requested_outputs) |
| 30 | |
| 31 | yield parser |
| 32 | |
| 33 | |
| 34 | def test_tf_parser_swig_destroy(): |
| 35 | assert ann.ITfParser.__swig_destroy__, "There is a swig python destructor defined" |
| 36 | assert ann.ITfParser.__swig_destroy__.__name__ == "delete_ITfParser" |
| 37 | |
| 38 | |
| 39 | def test_check_tf_parser_swig_ownership(parser): |
| 40 | # Check to see that SWIG has ownership for parser. This instructs SWIG to take |
| 41 | # ownership of the return value. This allows the value to be automatically |
| 42 | # garbage-collected when it is no longer in use |
| 43 | assert parser.thisown |
| 44 | |
| 45 | |
| 46 | def test_tf_parser_get_network_input_binding_info(parser): |
| 47 | input_binding_info = parser.GetNetworkInputBindingInfo("input") |
| 48 | |
| 49 | tensor = input_binding_info[1] |
| 50 | assert tensor.GetDataType() == 1 |
| 51 | assert tensor.GetNumDimensions() == 4 |
| 52 | assert tensor.GetNumElements() == 28*28*1 |
| 53 | assert tensor.GetQuantizationOffset() == 0 |
| 54 | assert tensor.GetQuantizationScale() == 0 |
| 55 | |
| 56 | |
| 57 | def test_tf_parser_get_network_output_binding_info(parser): |
| 58 | output_binding_info = parser.GetNetworkOutputBindingInfo("output") |
| 59 | |
| 60 | tensor = output_binding_info[1] |
| 61 | assert tensor.GetDataType() == 1 |
| 62 | assert tensor.GetNumDimensions() == 2 |
| 63 | assert tensor.GetNumElements() == 10 |
| 64 | assert tensor.GetQuantizationOffset() == 0 |
| 65 | assert tensor.GetQuantizationScale() == 0 |
| 66 | |
| 67 | |
| 68 | def test_tf_filenotfound_exception(shared_data_folder): |
| 69 | parser = ann.ITfParser() |
| 70 | |
| 71 | # path to model |
| 72 | path_to_model = os.path.join(shared_data_folder, 'some_unknown_model.pb') |
| 73 | |
| 74 | # tensor shape [1, 1, 1, 1] |
| 75 | tensorshape = {'input': ann.TensorShape((1, 1, 1, 1))} |
| 76 | |
| 77 | # requested_outputs |
| 78 | requested_outputs = [""] |
| 79 | |
| 80 | # parse tf binary & create network |
| 81 | |
| 82 | with pytest.raises(RuntimeError) as err: |
| 83 | parser.CreateNetworkFromBinaryFile(path_to_model, tensorshape, requested_outputs) |
| 84 | |
| 85 | # Only check for part of the exception since the exception returns |
| 86 | # absolute path which will change on different machines. |
| 87 | assert 'failed to open' in str(err.value) |
| 88 | |
| 89 | |
| 90 | def test_tf_parser_end_to_end(shared_data_folder): |
| 91 | parser = ann.ITfParser = ann.ITfParser() |
| 92 | |
| 93 | tensorshape = {'input': ann.TensorShape((1, 28, 28, 1))} |
| 94 | requested_outputs = ["output"] |
| 95 | |
| 96 | network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.pb'), |
| 97 | tensorshape, requested_outputs) |
| 98 | |
| 99 | input_binding_info = parser.GetNetworkInputBindingInfo("input") |
| 100 | |
| 101 | # load test image data stored in input_tf.npy |
| 102 | input_tensor_data = np.load(os.path.join(shared_data_folder, 'tf_parser/input_tf.npy')).astype(np.float32) |
| 103 | |
| 104 | preferred_backends = [ann.BackendId('CpuAcc'), ann.BackendId('CpuRef')] |
| 105 | |
| 106 | options = ann.CreationOptions() |
| 107 | runtime = ann.IRuntime(options) |
| 108 | |
| 109 | opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 110 | |
| 111 | assert 0 == len(messages) |
| 112 | |
| 113 | net_id, messages = runtime.LoadNetwork(opt_network) |
| 114 | |
| 115 | assert "" == messages |
| 116 | |
| 117 | input_tensors = ann.make_input_tensors([input_binding_info], [input_tensor_data]) |
| 118 | |
| 119 | outputs_binding_info = [] |
| 120 | |
| 121 | for output_name in requested_outputs: |
| 122 | outputs_binding_info.append(parser.GetNetworkOutputBindingInfo(output_name)) |
| 123 | |
| 124 | output_tensors = ann.make_output_tensors(outputs_binding_info) |
| 125 | |
| 126 | runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) |
| 127 | output_vectors = ann.workload_tensors_to_ndarray(output_tensors) |
| 128 | |
| 129 | # Load golden output file for result comparison. |
| 130 | golden_output = np.load(os.path.join(shared_data_folder, 'tf_parser/golden_output_tf.npy')) |
| 131 | |
| 132 | # Check that output matches golden output to 4 decimal places (there are slight rounding differences after this) |
| 133 | np.testing.assert_almost_equal(output_vectors[0], golden_output, decimal=4) |