Richard Burton | dc0c6ed | 2020-04-08 16:39:05 +0100 | [diff] [blame] | 1 | # Copyright © 2020 Arm Ltd. All rights reserved. |
| 2 | # SPDX-License-Identifier: MIT |
| 3 | import os |
| 4 | |
| 5 | import pytest |
| 6 | import numpy as np |
| 7 | |
| 8 | import pyarmnn as ann |
| 9 | |
| 10 | |
| 11 | @pytest.fixture(scope="function") |
| 12 | def random_runtime(shared_data_folder): |
| 13 | parser = ann.ITfLiteParser() |
| 14 | network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite')) |
| 15 | preferred_backends = [ann.BackendId('CpuRef')] |
| 16 | options = ann.CreationOptions() |
Éanna Ó Catháin | 59da369 | 2020-04-16 08:54:12 +0100 | [diff] [blame] | 17 | |
Richard Burton | dc0c6ed | 2020-04-08 16:39:05 +0100 | [diff] [blame] | 18 | runtime = ann.IRuntime(options) |
| 19 | |
| 20 | graphs_count = parser.GetSubgraphCount() |
| 21 | |
| 22 | graph_id = graphs_count - 1 |
| 23 | input_names = parser.GetSubgraphInputTensorNames(graph_id) |
| 24 | |
| 25 | input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, input_names[0]) |
| 26 | input_tensor_id = input_binding_info[0] |
| 27 | |
| 28 | input_tensor_info = input_binding_info[1] |
| 29 | |
| 30 | output_names = parser.GetSubgraphOutputTensorNames(graph_id) |
| 31 | |
| 32 | input_data = np.random.randint(255, size=input_tensor_info.GetNumElements(), dtype=np.uint8) |
| 33 | |
| 34 | const_tensor_pair = (input_tensor_id, ann.ConstTensor(input_tensor_info, input_data)) |
| 35 | |
| 36 | input_tensors = [const_tensor_pair] |
| 37 | |
| 38 | output_tensors = [] |
| 39 | |
| 40 | for index, output_name in enumerate(output_names): |
| 41 | out_bind_info = parser.GetNetworkOutputBindingInfo(graph_id, output_name) |
| 42 | |
| 43 | out_tensor_info = out_bind_info[1] |
| 44 | out_tensor_id = out_bind_info[0] |
| 45 | |
| 46 | output_tensors.append((out_tensor_id, |
| 47 | ann.Tensor(out_tensor_info))) |
| 48 | |
| 49 | yield preferred_backends, network, runtime, input_tensors, output_tensors |
| 50 | |
| 51 | |
| 52 | @pytest.fixture(scope='function') |
| 53 | def mock_model_runtime(shared_data_folder): |
| 54 | parser = ann.ITfLiteParser() |
| 55 | network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite')) |
| 56 | graph_id = 0 |
| 57 | |
| 58 | input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, "input_1") |
| 59 | |
| 60 | input_tensor_data = np.load(os.path.join(shared_data_folder, 'tflite_parser/input_lite.npy')) |
| 61 | |
| 62 | preferred_backends = [ann.BackendId('CpuRef')] |
| 63 | |
| 64 | options = ann.CreationOptions() |
| 65 | runtime = ann.IRuntime(options) |
| 66 | |
| 67 | opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 68 | |
| 69 | print(messages) |
| 70 | |
| 71 | net_id, messages = runtime.LoadNetwork(opt_network) |
| 72 | |
| 73 | print(messages) |
| 74 | |
| 75 | input_tensors = ann.make_input_tensors([input_binding_info], [input_tensor_data]) |
| 76 | |
| 77 | output_names = parser.GetSubgraphOutputTensorNames(graph_id) |
| 78 | outputs_binding_info = [] |
| 79 | |
| 80 | for output_name in output_names: |
| 81 | outputs_binding_info.append(parser.GetNetworkOutputBindingInfo(graph_id, output_name)) |
| 82 | |
| 83 | output_tensors = ann.make_output_tensors(outputs_binding_info) |
| 84 | |
| 85 | yield runtime, net_id, input_tensors, output_tensors |
| 86 | |
| 87 | |
| 88 | def test_python_disowns_network(random_runtime): |
| 89 | preferred_backends = random_runtime[0] |
| 90 | network = random_runtime[1] |
| 91 | runtime = random_runtime[2] |
| 92 | opt_network, _ = ann.Optimize(network, preferred_backends, |
| 93 | runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 94 | |
| 95 | runtime.LoadNetwork(opt_network) |
| 96 | |
| 97 | assert not opt_network.thisown |
| 98 | |
Richard Burton | dc0c6ed | 2020-04-08 16:39:05 +0100 | [diff] [blame] | 99 | def test_load_network(random_runtime): |
| 100 | preferred_backends = random_runtime[0] |
| 101 | network = random_runtime[1] |
| 102 | runtime = random_runtime[2] |
| 103 | |
| 104 | opt_network, _ = ann.Optimize(network, preferred_backends, |
| 105 | runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 106 | |
| 107 | net_id, messages = runtime.LoadNetwork(opt_network) |
| 108 | assert "" == messages |
| 109 | assert net_id == 0 |
| 110 | |
Éanna Ó Catháin | 59da369 | 2020-04-16 08:54:12 +0100 | [diff] [blame] | 111 | def test_create_runtime_with_external_profiling_enabled(): |
| 112 | |
| 113 | options = ann.CreationOptions() |
| 114 | |
| 115 | options.m_ProfilingOptions.m_FileOnly = True |
| 116 | options.m_ProfilingOptions.m_EnableProfiling = True |
| 117 | options.m_ProfilingOptions.m_OutgoingCaptureFile = "/tmp/outgoing.txt" |
| 118 | options.m_ProfilingOptions.m_IncomingCaptureFile = "/tmp/incoming.txt" |
| 119 | options.m_ProfilingOptions.m_TimelineEnabled = True |
| 120 | options.m_ProfilingOptions.m_CapturePeriod = 1000 |
| 121 | options.m_ProfilingOptions.m_FileFormat = "JSON" |
| 122 | |
| 123 | runtime = ann.IRuntime(options) |
| 124 | |
| 125 | assert runtime is not None |
| 126 | |
| 127 | def test_create_runtime_with_external_profiling_enabled_invalid_options(): |
| 128 | |
| 129 | options = ann.CreationOptions() |
| 130 | |
| 131 | options.m_ProfilingOptions.m_FileOnly = True |
| 132 | options.m_ProfilingOptions.m_EnableProfiling = False |
| 133 | options.m_ProfilingOptions.m_OutgoingCaptureFile = "/tmp/outgoing.txt" |
| 134 | options.m_ProfilingOptions.m_IncomingCaptureFile = "/tmp/incoming.txt" |
| 135 | options.m_ProfilingOptions.m_TimelineEnabled = True |
| 136 | options.m_ProfilingOptions.m_CapturePeriod = 1000 |
| 137 | options.m_ProfilingOptions.m_FileFormat = "JSON" |
| 138 | |
| 139 | with pytest.raises(RuntimeError) as err: |
| 140 | runtime = ann.IRuntime(options) |
| 141 | |
| 142 | expected_error_message = "It is not possible to enable timeline reporting without profiling being enabled" |
| 143 | assert expected_error_message in str(err.value) |
| 144 | |
Richard Burton | dc0c6ed | 2020-04-08 16:39:05 +0100 | [diff] [blame] | 145 | |
| 146 | def test_load_network_properties_provided(random_runtime): |
| 147 | preferred_backends = random_runtime[0] |
| 148 | network = random_runtime[1] |
| 149 | runtime = random_runtime[2] |
| 150 | |
| 151 | opt_network, _ = ann.Optimize(network, preferred_backends, |
| 152 | runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 153 | |
| 154 | properties = ann.INetworkProperties(True, True) |
| 155 | net_id, messages = runtime.LoadNetwork(opt_network, properties) |
| 156 | assert "" == messages |
| 157 | assert net_id == 0 |
| 158 | |
| 159 | |
| 160 | def test_unload_network_fails_for_invalid_net_id(random_runtime): |
| 161 | preferred_backends = random_runtime[0] |
| 162 | network = random_runtime[1] |
| 163 | runtime = random_runtime[2] |
| 164 | |
| 165 | ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 166 | |
| 167 | with pytest.raises(RuntimeError) as err: |
| 168 | runtime.UnloadNetwork(9) |
| 169 | |
| 170 | expected_error_message = "Failed to unload network." |
| 171 | assert expected_error_message in str(err.value) |
| 172 | |
| 173 | |
| 174 | def test_enqueue_workload(random_runtime): |
| 175 | preferred_backends = random_runtime[0] |
| 176 | network = random_runtime[1] |
| 177 | runtime = random_runtime[2] |
| 178 | input_tensors = random_runtime[3] |
| 179 | output_tensors = random_runtime[4] |
| 180 | |
| 181 | opt_network, _ = ann.Optimize(network, preferred_backends, |
| 182 | runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 183 | |
| 184 | net_id, _ = runtime.LoadNetwork(opt_network) |
| 185 | runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) |
| 186 | |
| 187 | |
| 188 | def test_enqueue_workload_fails_with_empty_input_tensors(random_runtime): |
| 189 | preferred_backends = random_runtime[0] |
| 190 | network = random_runtime[1] |
| 191 | runtime = random_runtime[2] |
| 192 | input_tensors = [] |
| 193 | output_tensors = random_runtime[4] |
| 194 | |
| 195 | opt_network, _ = ann.Optimize(network, preferred_backends, |
| 196 | runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 197 | |
| 198 | net_id, _ = runtime.LoadNetwork(opt_network) |
| 199 | with pytest.raises(RuntimeError) as err: |
| 200 | runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) |
| 201 | |
| 202 | expected_error_message = "Number of inputs provided does not match network." |
| 203 | assert expected_error_message in str(err.value) |
| 204 | |
| 205 | |
| 206 | @pytest.mark.x86_64 |
| 207 | @pytest.mark.parametrize('count', [5]) |
| 208 | def test_multiple_inference_runs_yield_same_result(count, mock_model_runtime): |
| 209 | """ |
| 210 | Test that results remain consistent among multiple runs of the same inference. |
| 211 | """ |
| 212 | runtime = mock_model_runtime[0] |
| 213 | net_id = mock_model_runtime[1] |
| 214 | input_tensors = mock_model_runtime[2] |
| 215 | output_tensors = mock_model_runtime[3] |
| 216 | |
| 217 | expected_results = np.array([[4, 85, 108, 29, 8, 16, 0, 2, 5, 0]]) |
| 218 | |
| 219 | for _ in range(count): |
| 220 | runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) |
| 221 | |
| 222 | output_vectors = ann.workload_tensors_to_ndarray(output_tensors) |
| 223 | |
| 224 | for i in range(len(expected_results)): |
| 225 | assert output_vectors[i].all() == expected_results[i].all() |
| 226 | |
| 227 | |
| 228 | @pytest.mark.aarch64 |
| 229 | def test_aarch64_inference_results(mock_model_runtime): |
| 230 | |
| 231 | runtime = mock_model_runtime[0] |
| 232 | net_id = mock_model_runtime[1] |
| 233 | input_tensors = mock_model_runtime[2] |
| 234 | output_tensors = mock_model_runtime[3] |
| 235 | |
| 236 | runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) |
| 237 | |
| 238 | output_vectors = ann.workload_tensors_to_ndarray(output_tensors) |
| 239 | |
| 240 | expected_outputs = expected_results = np.array([[4, 85, 108, 29, 8, 16, 0, 2, 5, 0]]) |
| 241 | |
| 242 | for i in range(len(expected_outputs)): |
| 243 | assert output_vectors[i].all() == expected_results[i].all() |
| 244 | |
| 245 | |
| 246 | def test_enqueue_workload_with_profiler(random_runtime): |
| 247 | """ |
| 248 | Tests ArmNN's profiling extension |
| 249 | """ |
| 250 | preferred_backends = random_runtime[0] |
| 251 | network = random_runtime[1] |
| 252 | runtime = random_runtime[2] |
| 253 | input_tensors = random_runtime[3] |
| 254 | output_tensors = random_runtime[4] |
| 255 | |
| 256 | opt_network, _ = ann.Optimize(network, preferred_backends, |
| 257 | runtime.GetDeviceSpec(), ann.OptimizerOptions()) |
| 258 | net_id, _ = runtime.LoadNetwork(opt_network) |
| 259 | |
| 260 | profiler = runtime.GetProfiler(net_id) |
| 261 | # By default profiling should be turned off: |
| 262 | assert profiler.IsProfilingEnabled() is False |
| 263 | |
| 264 | # Enable profiling: |
| 265 | profiler.EnableProfiling(True) |
| 266 | assert profiler.IsProfilingEnabled() is True |
| 267 | |
| 268 | # Run the inference: |
| 269 | runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) |
| 270 | |
| 271 | # Get profile output as a string: |
| 272 | str_profile = profiler.as_json() |
| 273 | |
| 274 | # Verify that certain markers are present: |
| 275 | assert len(str_profile) != 0 |
| 276 | assert str_profile.find('\"ArmNN\": {') > 0 |
| 277 | |
| 278 | # Get events analysis output as a string: |
| 279 | str_events_analysis = profiler.event_log() |
| 280 | |
| 281 | assert "Event Sequence - Name | Duration (ms) | Start (ms) | Stop (ms) | Device" in str_events_analysis |
| 282 | |
| 283 | assert profiler.thisown == 0 |
| 284 | |
| 285 | |
| 286 | def test_check_runtime_swig_ownership(random_runtime): |
| 287 | # Check to see that SWIG has ownership for runtime. This instructs SWIG to take |
| 288 | # ownership of the return value. This allows the value to be automatically |
| 289 | # garbage-collected when it is no longer in use |
| 290 | runtime = random_runtime[2] |
| 291 | assert runtime.thisown |