blob: 933828329b7742b63061b1b0cea5a7dd6e73114e [file] [log] [blame]
# Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
# SPDX-License-Identifier: MIT
import os
import stat
import numpy as np
import pytest
import pyarmnn as ann
def test_optimizer_options_default_values():
opt = ann.OptimizerOptions()
assert opt.m_ReduceFp32ToFp16 == False
assert opt.m_Debug == False
assert opt.m_ReduceFp32ToBf16 == False
assert opt.m_ImportEnabled == False
assert opt.m_shapeInferenceMethod == ann.ShapeInferenceMethod_ValidateOnly
def test_optimizer_options_set_values1():
opt = ann.OptimizerOptions(True, True)
assert opt.m_ReduceFp32ToFp16 == True
assert opt.m_Debug == True
assert opt.m_ReduceFp32ToBf16 == False
assert opt.m_ImportEnabled == False
assert opt.m_shapeInferenceMethod == ann.ShapeInferenceMethod_ValidateOnly
def test_optimizer_options_set_values2():
opt = ann.OptimizerOptions(False, False, True)
assert opt.m_ReduceFp32ToFp16 == False
assert opt.m_Debug == False
assert opt.m_ReduceFp32ToBf16 == True
assert opt.m_ImportEnabled == False
assert opt.m_shapeInferenceMethod == ann.ShapeInferenceMethod_ValidateOnly
def test_optimizer_options_set_values3():
opt = ann.OptimizerOptions(False, False, True, ann.ShapeInferenceMethod_InferAndValidate, True)
assert opt.m_ReduceFp32ToFp16 == False
assert opt.m_Debug == False
assert opt.m_ReduceFp32ToBf16 == True
assert opt.m_ImportEnabled == True
assert opt.m_shapeInferenceMethod == ann.ShapeInferenceMethod_InferAndValidate
@pytest.fixture(scope="function")
def get_runtime(shared_data_folder, network_file):
parser= ann.ITfLiteParser()
preferred_backends = [ann.BackendId('CpuAcc'), ann.BackendId('CpuRef')]
network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, network_file))
options = ann.CreationOptions()
runtime = ann.IRuntime(options)
yield preferred_backends, network, runtime
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite',
],
ids=['mock_model'])
def test_optimize_executes_successfully(network_file, get_runtime):
preferred_backends = [ann.BackendId('CpuRef')]
network = get_runtime[1]
runtime = get_runtime[2]
opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())
assert len(messages) == 0, 'With only CpuRef, there should be no warnings irrelevant of architecture.'
assert opt_network
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite',
],
ids=['mock_model'])
def test_optimize_owned_by_python(network_file, get_runtime):
preferred_backends = get_runtime[0]
network = get_runtime[1]
runtime = get_runtime[2]
opt_network, _ = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())
assert opt_network.thisown
@pytest.mark.aarch64
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite'
],
ids=['mock_model'])
def test_optimize_executes_successfully_for_neon_backend_only(network_file, get_runtime):
preferred_backends = [ann.BackendId('CpuAcc')]
network = get_runtime[1]
runtime = get_runtime[2]
opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())
assert 0 == len(messages)
assert opt_network
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite'
],
ids=['mock_model'])
def test_optimize_fails_for_invalid_backends(network_file, get_runtime):
invalid_backends = [ann.BackendId('Unknown')]
network = get_runtime[1]
runtime = get_runtime[2]
with pytest.raises(RuntimeError) as err:
ann.Optimize(network, invalid_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())
expected_error_message = "None of the preferred backends [Unknown ] are supported."
assert expected_error_message in str(err.value)
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite'
],
ids=['mock_model'])
def test_optimize_fails_for_no_backends_specified(network_file, get_runtime):
empty_backends = []
network = get_runtime[1]
runtime = get_runtime[2]
with pytest.raises(RuntimeError) as err:
ann.Optimize(network, empty_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())
expected_error_message = "Invoked Optimize with no backends specified"
assert expected_error_message in str(err.value)
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite'
],
ids=['mock_model'])
def test_serialize_to_dot(network_file, get_runtime, tmpdir):
preferred_backends = get_runtime[0]
network = get_runtime[1]
runtime = get_runtime[2]
opt_network, _ = ann.Optimize(network, preferred_backends,
runtime.GetDeviceSpec(), ann.OptimizerOptions())
dot_file_path = os.path.join(tmpdir, 'mock_model.dot')
"""Check that serialized file does not exist at the start, gets created after SerializeToDot and is not empty"""
assert not os.path.exists(dot_file_path)
opt_network.SerializeToDot(dot_file_path)
assert os.path.exists(dot_file_path)
with open(dot_file_path) as res_file:
expected_data = res_file.read()
assert len(expected_data) > 1
assert '[label=< [1,28,28,1] >]' in expected_data
@pytest.mark.x86_64
@pytest.mark.parametrize("network_file",
[
'mock_model.tflite'
],
ids=['mock_model'])
def test_serialize_to_dot_mode_readonly(network_file, get_runtime, tmpdir):
preferred_backends = get_runtime[0]
network = get_runtime[1]
runtime = get_runtime[2]
opt_network, _ = ann.Optimize(network, preferred_backends,
runtime.GetDeviceSpec(), ann.OptimizerOptions())
"""Create file, write to it and change mode to read-only"""
dot_file_path = os.path.join(tmpdir, 'mock_model.dot')
f = open(dot_file_path, "w+")
f.write("test")
f.close()
os.chmod(dot_file_path, stat.S_IREAD)
assert os.path.exists(dot_file_path)
with pytest.raises(RuntimeError) as err:
opt_network.SerializeToDot(dot_file_path)
expected_error_message = "Failed to open dot file"
assert expected_error_message in str(err.value)
@pytest.mark.parametrize("method", [
'AddActivationLayer',
'AddAdditionLayer',
'AddArgMinMaxLayer',
'AddBatchMatMulLayer',
'AddBatchNormalizationLayer',
'AddBatchToSpaceNdLayer',
'AddBroadcastToLayer',
'AddCastLayer',
'AddChannelShuffleLayer',
'AddComparisonLayer',
'AddConcatLayer',
'AddConstantLayer',
'AddConvolution2dLayer',
'AddConvolution3dLayer',
'AddDepthToSpaceLayer',
'AddDepthwiseConvolution2dLayer',
'AddDequantizeLayer',
'AddDetectionPostProcessLayer',
'AddDivisionLayer',
'AddElementwiseBinaryLayer',
'AddElementwiseUnaryLayer',
'AddFloorLayer',
'AddFillLayer',
'AddFullyConnectedLayer',
'AddFusedLayer',
'AddGatherLayer',
'AddGatherNdLayer',
'AddInputLayer',
'AddInstanceNormalizationLayer',
'AddLogicalBinaryLayer',
'AddLogSoftmaxLayer',
'AddL2NormalizationLayer',
'AddLstmLayer',
'AddMaximumLayer',
'AddMeanLayer',
'AddMergeLayer',
'AddMinimumLayer',
'AddMultiplicationLayer',
'AddNormalizationLayer',
'AddOutputLayer',
'AddPadLayer',
'AddPermuteLayer',
'AddPooling2dLayer',
'AddPooling3dLayer',
'AddPreluLayer',
'AddQuantizeLayer',
'AddQuantizedLstmLayer',
'AddRankLayer',
'AddReduceLayer',
'AddReshapeLayer',
'AddResizeLayer',
'AddReverseV2Layer',
'AddScatterNdLayer',
'AddShapeLayer',
'AddSliceLayer',
'AddSoftmaxLayer',
'AddSpaceToBatchNdLayer',
'AddSpaceToDepthLayer',
'AddSplitterLayer',
'AddStackLayer',
'AddStandInLayer',
'AddStridedSliceLayer',
'AddSubtractionLayer',
'AddSwitchLayer',
'AddTileLayer',
'AddTransposeConvolution2dLayer',
'AddTransposeLayer'
])
def test_network_method_exists(method):
assert getattr(ann.INetwork, method, None)
def test_Convolution2d_layer_optional_none():
net = ann.INetwork()
layer = net.AddConvolution2dLayer(convolution2dDescriptor=ann.Convolution2dDescriptor())
assert layer
def test_Convolution2d_layer_all_args():
net = ann.INetwork()
layer = net.AddConvolution2dLayer(convolution2dDescriptor=ann.Convolution2dDescriptor(),
name='NAME1')
assert layer
assert 'NAME1' == layer.GetName()
def test_add_constant_layer_to_fully_connected():
inputWidth = 1
inputHeight = 1
inputChannels = 5
inputNum = 2
outputChannels = 3
outputNum = 2
inputShape = ( inputNum, inputChannels, inputHeight, inputWidth )
outputShape = ( outputNum, outputChannels )
weightsShape = ( inputChannels, outputChannels )
biasShape = ( outputChannels, )
input = np.array([
[1.0, 2.0, 3.0, 4.0, 5.0],
[5.0, 4.0, 3.0, 2.0, 1.0]
], dtype=np.float32)
weights = np.array([
[.5, 2., .5],
[.5, 2., 1.],
[.5, 2., 2.],
[.5, 2., 3.],
[.5, 2., 4.]
], dtype=np.float32)
biasValues = np.array([10, 20, 30], dtype=np.float32)
expectedOutput = np.array([
[0.5 + 1.0 + 1.5 + 2.0 + 2.5 + biasValues[0],
2.0 + 4.0 + 6.0 + 8.0 + 10. + biasValues[1],
0.5 + 2.0 + 6.0 + 12. + 20. + biasValues[2]],
[2.5 + 2.0 + 1.5 + 1.0 + 0.5 + biasValues[0],
10.0 + 8.0 + 6.0 + 4.0 + 2. + biasValues[1],
2.5 + 4.0 + 6.0 + 6. + 4. + biasValues[2]]
], dtype=np.float32)
network = ann.INetwork()
input_info = ann.TensorInfo(ann.TensorShape(inputShape), ann.DataType_Float32, 0, 0, True)
input_tensor = ann.ConstTensor(input_info, input)
input_layer = network.AddInputLayer(0, "input")
w_info = ann.TensorInfo(ann.TensorShape(weightsShape), ann.DataType_Float32, 0, 0, True)
w_tensor = ann.ConstTensor(w_info, weights)
w_layer = network.AddConstantLayer(w_tensor, "weights")
b_info = ann.TensorInfo(ann.TensorShape(biasShape), ann.DataType_Float32, 0, 0, True)
b_tensor = ann.ConstTensor(b_info, biasValues)
b_layer = network.AddConstantLayer(b_tensor, "bias")
fc_descriptor = ann.FullyConnectedDescriptor()
fc_descriptor.m_BiasEnabled = True
fc_descriptor.m_ConstantWeights = True
fully_connected = network.AddFullyConnectedLayer(fc_descriptor, "fc")
output_info = ann.TensorInfo(ann.TensorShape(outputShape), ann.DataType_Float32)
output_tensor = ann.Tensor(output_info, np.zeros([1, 1], dtype=np.float32))
output = network.AddOutputLayer(0, "output")
input_layer.GetOutputSlot(0).Connect(fully_connected.GetInputSlot(0))
w_layer.GetOutputSlot(0).Connect(fully_connected.GetInputSlot(1))
b_layer.GetOutputSlot(0).Connect(fully_connected.GetInputSlot(2))
fully_connected.GetOutputSlot(0).Connect(output.GetInputSlot(0))
input_layer.GetOutputSlot(0).SetTensorInfo(input_info)
w_layer.GetOutputSlot(0).SetTensorInfo(w_info)
b_layer.GetOutputSlot(0).SetTensorInfo(b_info)
fully_connected.GetOutputSlot(0).SetTensorInfo(output_info)
preferred_backends = [ann.BackendId('CpuRef')]
options = ann.CreationOptions()
runtime = ann.IRuntime(options)
opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())
net_id, messages = runtime.LoadNetwork(opt_network)
input_tensors = [(0, input_tensor)]
output_tensors = [(0, output_tensor)]
runtime.EnqueueWorkload(net_id, input_tensors, output_tensors)
output_vectors = ann.workload_tensors_to_ndarray(output_tensors)
assert (output_vectors==expectedOutput).all()