blob: 6be9dc25df3c62a74122702c3fc1cc9b85fc06eb [file] [log] [blame]
# SPDX-FileCopyrightText: Copyright 2020-2023 Arm Limited and/or its affiliates <open-source-office@arm.com>
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the License); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Description:
# Internal representation of a Neural Network Operation.
# For Class name forward references for the type annotations. (see PEP 563).
from __future__ import annotations
import copy
from collections import namedtuple
from enum import Enum
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import TYPE_CHECKING
from .api import NpuRoundingMode
from .errors import VelaError
from .ethos_u55_regs.ethos_u55_regs import resampling_mode
from .numeric_util import full_shape
from .shape4d import Shape4D
# Import needed for Type annotations. Only import for Type checking to avoid run-time errors due to cyclic import.
if TYPE_CHECKING:
from .tensor import Tensor
PointXY = namedtuple("PointXY", "x y")
PointXYZ = namedtuple("PointXYZ", "x y z")
class NpuBlockType(Enum):
Default = 0
ConvolutionMxN = 1
VectorProduct = 2
Pooling = 3
ConvolutionDepthWise = 4
ElementWise = 5
ReduceSum = 6
Dma = 7
class Kernel:
"""
Kernel information for NPU operations
"""
def __init__(
self,
w: int,
h: int,
stride_x: int = 1,
stride_y: int = 1,
dilation_x: int = 1,
dilation_y: int = 1,
valid_padding=False,
):
assert stride_x > 0 and stride_y > 0
assert dilation_x > 0 and dilation_y > 0
self.width = w
self.height = h
self.stride = PointXY(stride_x, stride_y)
self.dilation = PointXY(dilation_x, dilation_y)
self.valid_padding = valid_padding
def elements_wh(self) -> int:
return self.width * self.height
def area_width(self) -> int:
return (self.width - 1) * self.dilation.x + 1
def area_height(self) -> int:
return (self.height - 1) * self.dilation.y + 1
def dilated_wh(self) -> Tuple[int, int]:
"""Returns the dilated kernel width/height"""
return self.dilation.x * (self.width - 1) + 1, self.dilation.y * (self.height - 1) + 1
def __str__(self):
return f"w={self.width}, h={self.height}, stride={tuple(self.stride)}, dilation={tuple(self.dilation)}"
# Classifies operators of type Custom
class CustomType(Enum):
ThirdPartyOp = 0 # Third party custom op
NpuOp = 1 # NPU op
ExistingNpuOp = 2 # NPU op that was part of the input network
TensorIndices = namedtuple("TensorIndices", ["ifms", "weights", "biases"])
NNG_NO_INDICES = TensorIndices([], [], [])
NNG_IFM_INDICES = TensorIndices([0], [], [])
NNG_IFM_WEIGHTS_INDICES = TensorIndices([0], [1], [])
NNG_IFM_WEIGHTS_BIAS_INDICES = TensorIndices([0], [1], [2])
NNG_IFM_IFM2_INDICES = TensorIndices([0, 1], [], [])
NNG_CONV2D_BACKPROP_INDICES = TensorIndices([2], [1], [3])
NNG_TRANSPOSE_CONV_INDICES = TensorIndices([0], [1], [3])
NNG_CONCAT_INDICES = TensorIndices([1, 2], [], [])
NNG_SPLIT_IFM_INDICES = TensorIndices([1], [], [])
NNG_BLOCK_LSTM_INDICES = TensorIndices([3], [4], [])
# Static information related to operation codes
class OperatorInfo:
__slots__ = ("id", "block_type", "indices", "is_unary")
_id = 0
def __init__(self, block_type=NpuBlockType.Default, indices=NNG_NO_INDICES, is_unary=False):
OperatorInfo._id += 1
self.id = OperatorInfo._id
self.block_type = block_type
self.indices = indices # Indices of the different tensor purposes
self.is_unary = is_unary # Classifies elementwise operators
# Internally used operation codes
class Op(Enum):
Abs = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_INDICES, is_unary=True)
Add = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES)
AddN = OperatorInfo()
Any = OperatorInfo()
ArgMax = OperatorInfo()
ArgMin = OperatorInfo()
AvgPool = OperatorInfo(block_type=NpuBlockType.Pooling, indices=NNG_IFM_INDICES)
Atan2 = OperatorInfo(indices=NNG_IFM_IFM2_INDICES)
BatchMatMul = OperatorInfo()
BatchToSpaceND = OperatorInfo()
BidirectionalSequenceLstm = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
BidirectionalSequenceRnn = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
BlockLSTM = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_BLOCK_LSTM_INDICES)
CLZ = OperatorInfo(
block_type=NpuBlockType.ElementWise, indices=NNG_IFM_INDICES, is_unary=True
) # NPU specific operation
Call = OperatorInfo()
Cast = OperatorInfo()
Ceil = OperatorInfo()
Clamp = OperatorInfo(indices=NNG_IFM_INDICES) # TOSA specific
Clip = OperatorInfo() # NPU specific fused activation function for clipping between activation.min/max
Concat = OperatorInfo(indices=NNG_CONCAT_INDICES)
ConcatEmbeddings = OperatorInfo()
ConcatSliceWrite = OperatorInfo(indices=NNG_IFM_INDICES)
ConcatTFLite = OperatorInfo(indices=NNG_CONCAT_INDICES)
Const = OperatorInfo() # Constant tensor, only used in CPU subgraphs
Conv2D = OperatorInfo(block_type=NpuBlockType.ConvolutionMxN, indices=NNG_IFM_WEIGHTS_INDICES)
Conv2DBackpropInput = OperatorInfo(block_type=NpuBlockType.ConvolutionMxN, indices=NNG_CONV2D_BACKPROP_INDICES)
Conv2DBackpropInputSwitchedBias = OperatorInfo(
block_type=NpuBlockType.ConvolutionMxN, indices=NNG_TRANSPOSE_CONV_INDICES
)
Conv2DBias = OperatorInfo(block_type=NpuBlockType.ConvolutionMxN, indices=NNG_IFM_WEIGHTS_BIAS_INDICES)
Cos = OperatorInfo()
Cumsum = OperatorInfo()
Custom = OperatorInfo() # Custom 3rd party operator, only used in CPU subgraphs
CustomNpuOp = OperatorInfo() # NPU custom operator, only used in CPU subgraphs
Delegate = OperatorInfo()
Densify = OperatorInfo()
DepthToSpace = OperatorInfo()
DepthwiseConv2DBias = OperatorInfo(
block_type=NpuBlockType.ConvolutionDepthWise, indices=NNG_IFM_WEIGHTS_BIAS_INDICES
)
Dequantize = OperatorInfo(indices=NNG_IFM_INDICES)
Div = OperatorInfo()
Memcpy = OperatorInfo(block_type=NpuBlockType.Dma, indices=NNG_IFM_INDICES)
Elu = OperatorInfo()
EmbeddingLookup = OperatorInfo()
EmbeddingLookupSparse = OperatorInfo()
Equal = OperatorInfo()
Exp = OperatorInfo()
ExpandDims = OperatorInfo(indices=NNG_IFM_INDICES)
FakeQuantWithMinMaxArgs = OperatorInfo()
Fill = OperatorInfo()
Floor = OperatorInfo()
FloorDiv = OperatorInfo()
FloorMod = OperatorInfo()
FullyConnected = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_BIAS_INDICES)
GatherNd = OperatorInfo()
GatherV2 = OperatorInfo()
Greater = OperatorInfo()
GreaterEqual = OperatorInfo()
HardSwish = OperatorInfo(indices=NNG_IFM_INDICES)
HashtableLookup = OperatorInfo()
Identity = OperatorInfo(indices=NNG_IFM_INDICES)
If = OperatorInfo()
L2Norm = OperatorInfo()
L2Pool2D = OperatorInfo()
LRN = OperatorInfo()
LSHProjection = OperatorInfo()
LeakyRelu = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_INDICES, is_unary=True)
Less = OperatorInfo()
LessEqual = OperatorInfo()
Log = OperatorInfo()
LogSoftmax = OperatorInfo()
LogicalAnd = OperatorInfo()
LogicalNot = OperatorInfo()
LogicalOr = OperatorInfo()
Lstm = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
LUT = OperatorInfo() # NPU specific, operator has LUT, only used in fused activation functions
MatMul = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
MatrixDiag = OperatorInfo()
MatrixSetDiag = OperatorInfo()
Max = OperatorInfo()
MaxPool = OperatorInfo(block_type=NpuBlockType.Pooling, indices=NNG_IFM_INDICES)
Maximum = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES)
Mean = OperatorInfo(indices=NNG_IFM_INDICES)
Min = OperatorInfo()
Minimum = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES)
MirrorPad = OperatorInfo()
Mul = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES)
Neg = OperatorInfo()
NonMaxSuppressionV4 = OperatorInfo()
NonMaxSuppressionV5 = OperatorInfo()
NotEqual = OperatorInfo()
OneHot = OperatorInfo()
Pack = OperatorInfo(indices=NNG_IFM_INDICES)
PackReshaped = OperatorInfo(indices=NNG_IFM_INDICES)
Pad = OperatorInfo(indices=NNG_IFM_INDICES)
PadV2 = OperatorInfo()
Placeholder = OperatorInfo() # Only used in CPU subgraphs
Pow = OperatorInfo()
Prelu = OperatorInfo(indices=NNG_IFM_IFM2_INDICES)
Prod = OperatorInfo()
Quantize = OperatorInfo(indices=NNG_IFM_INDICES)
QuantizedAvgPool = OperatorInfo(block_type=NpuBlockType.Pooling, indices=NNG_IFM_INDICES)
QuantizedConv2D = OperatorInfo(block_type=NpuBlockType.ConvolutionMxN, indices=NNG_IFM_WEIGHTS_INDICES)
QuantizedMatMul = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
QuantizedMaxPool = OperatorInfo(block_type=NpuBlockType.Pooling, indices=NNG_IFM_INDICES)
QuantizedReshape = OperatorInfo(indices=NNG_IFM_INDICES)
Range = OperatorInfo()
Rank = OperatorInfo()
ReduceSum = OperatorInfo(block_type=NpuBlockType.ReduceSum, indices=NNG_IFM_INDICES)
Relu = OperatorInfo(indices=NNG_IFM_INDICES)
Relu0To1 = OperatorInfo(indices=NNG_IFM_INDICES)
Relu6 = OperatorInfo(indices=NNG_IFM_INDICES)
ReluN1To1 = OperatorInfo(indices=NNG_IFM_INDICES)
ReluN = OperatorInfo(indices=NNG_IFM_INDICES) # TOSA specific
Rescale = OperatorInfo(indices=NNG_IFM_INDICES) # TOSA specific
Reshape = OperatorInfo(indices=NNG_IFM_INDICES)
# resize ops map to pooling operations unless explicitly converted to other operations in the graph optimiser
ResizeBilinear = OperatorInfo(block_type=NpuBlockType.Pooling, indices=NNG_IFM_INDICES)
ResizeNearestNeighbor = OperatorInfo(block_type=NpuBlockType.Pooling, indices=NNG_IFM_INDICES)
ReverseSequence = OperatorInfo()
ReverseV2 = OperatorInfo()
Rnn = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
Round = OperatorInfo()
Rsqrt = OperatorInfo()
SHL = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES) # NPU specific operation
SHR = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES) # NPU specific operation
ScatterNd = OperatorInfo()
SegmentSum = OperatorInfo()
Select = OperatorInfo()
SelectV2 = OperatorInfo()
Shape = OperatorInfo(indices=NNG_IFM_INDICES)
Sigmoid = OperatorInfo(indices=NNG_IFM_INDICES)
Sign = OperatorInfo(indices=NNG_IFM_INDICES)
SignBit = OperatorInfo()
Sin = OperatorInfo()
SkipGram = OperatorInfo()
Slice = OperatorInfo(indices=NNG_IFM_INDICES)
Softmax = OperatorInfo(indices=NNG_IFM_INDICES)
SpaceToBatchND = OperatorInfo()
SpaceToDepth = OperatorInfo()
SparseToDense = OperatorInfo()
Split = OperatorInfo(indices=NNG_SPLIT_IFM_INDICES)
SplitSliceRead = OperatorInfo(indices=NNG_IFM_INDICES)
SplitV = OperatorInfo(indices=NNG_IFM_INDICES)
Sqrt = OperatorInfo()
Square = OperatorInfo()
SquaredDifference = OperatorInfo()
Squeeze = OperatorInfo(indices=NNG_IFM_INDICES)
StridedSlice = OperatorInfo(indices=NNG_IFM_INDICES)
Sub = OperatorInfo(block_type=NpuBlockType.ElementWise, indices=NNG_IFM_IFM2_INDICES)
SubgraphInput = OperatorInfo() # Only used in CPU subgraphs
Sum = OperatorInfo()
Svdf = OperatorInfo()
Table = OperatorInfo(indices=NNG_IFM_INDICES)
Tanh = OperatorInfo(indices=NNG_IFM_INDICES)
Tile = OperatorInfo()
TopKV2 = OperatorInfo()
Transpose = OperatorInfo(indices=NNG_IFM_IFM2_INDICES)
UnidirectionalSequenceLstm = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
UnidirectionalSequenceRnn = OperatorInfo(block_type=NpuBlockType.VectorProduct, indices=NNG_IFM_WEIGHTS_INDICES)
Unique = OperatorInfo()
Unpack = OperatorInfo(indices=NNG_IFM_INDICES)
UnpackReshaped = OperatorInfo(indices=NNG_IFM_INDICES)
Where = OperatorInfo()
While = OperatorInfo()
ZerosLike = OperatorInfo()
CallOnce = OperatorInfo()
BroadcastTo = OperatorInfo()
Rfft2D = OperatorInfo()
Conv3D = OperatorInfo()
Imag = OperatorInfo()
Real = OperatorInfo()
ComplexAbs = OperatorInfo()
Hashtable = OperatorInfo()
HashtableFind = OperatorInfo()
HashtableImport = OperatorInfo()
HashtableSize = OperatorInfo()
ReduceAll = OperatorInfo()
Conv3DTranspose = OperatorInfo()
VarHandle = OperatorInfo()
ReadVariable = OperatorInfo()
AssignVariable = OperatorInfo()
BroadcastArgs = OperatorInfo()
RandomStandardNormal = OperatorInfo()
Bucketize = OperatorInfo()
RandomUniform = OperatorInfo()
Multinomial = OperatorInfo()
Gelu = OperatorInfo()
DynamicUpdateSlice = OperatorInfo()
UnsortedSegmentProd = OperatorInfo()
UnsortedSegmentMax = OperatorInfo()
UnsortedSegmentMin = OperatorInfo()
UnsortedSegmentSum = OperatorInfo()
@property
def info(self):
return self.value
@property
def npu_block_type(self):
return self.info.block_type
def is_conv2d_op(self):
return self.info.block_type == NpuBlockType.ConvolutionMxN
def is_depthwise_conv2d_op(self):
return self.info.block_type == NpuBlockType.ConvolutionDepthWise
def is_pool_op(self):
return self.info.block_type == NpuBlockType.Pooling
def is_maxpool_op(self):
return self in (Op.MaxPool, Op.QuantizedMaxPool)
def is_avgpool_op(self):
return self in (Op.QuantizedAvgPool, Op.AvgPool)
def is_elementwise_op(self):
return self.info.block_type == NpuBlockType.ElementWise
def is_unary_elementwise_op(self):
return self.info.block_type == NpuBlockType.ElementWise and self.info.is_unary
def is_binary_elementwise_op(self):
return self.info.block_type == NpuBlockType.ElementWise and not self.info.is_unary
def is_relu_op(self):
return self in (Op.Relu, Op.Relu6, Op.ReluN1To1, Op.ReluN, Op.Clip, Op.Clamp)
def is_activation_op(self):
return self.is_relu_op() or self in (Op.Tanh, Op.Sigmoid, Op.Softmax, Op.LUT, Op.HardSwish)
def is_split_op(self):
return self in (Op.Split, Op.SplitV, Op.StridedSlice, Op.Slice, Op.UnpackReshaped, Op.Unpack)
def is_concat_op(self):
return self in (Op.Concat, Op.ConcatTFLite, Op.PackReshaped, Op.Pack)
def is_resize_op(self):
return self in (Op.ResizeBilinear, Op.ResizeNearestNeighbor)
def is_memcpy_op(self):
return self.info.block_type == NpuBlockType.Dma
def needs_bias(self):
return bool(self.info.indices.biases)
def needs_shapes(self):
return bool(self.info.indices.ifms)
@classmethod
def op_set(cls, predicate):
# Returns the set of all operator codes that fulfill the given predicate
return {op_type for op_type in Op if predicate(op_type)}
def __str__(self):
return self.name
__repr__ = __str__
def __lt__(self, other):
return self.value.id < other.value.id
class Padding(Enum):
SAME = 0
VALID = 1
EXPLICIT = 2 # Padding is specified in a PAD operation (only used for NPU operations)
TILE = 3 # Uses hardware tiles to pad by 1 with edge values on two sides of the IFM specified in explicit_padding
class ActivationFunction:
"""Fused activation function"""
def __init__(self, op_type: Op):
self.op_type = op_type # The activation operation to be performed
# min/max are optional; if present they are non-quantized values
self.min: Optional[float] = None
self.max: Optional[float] = None
# Table lookup index, only applicable for Op.LUT activation, 0-7
self.lut_index: int = 0
def clone(self):
res = copy.copy(self)
return res
class ExplicitScaling:
"""Explicit scaling parameters"""
def __init__(self, per_channel, shift, multiplier):
self.per_channel = per_channel
self.shift = shift
self.multiplier = multiplier
def clone(self):
res = copy.copy(self)
return res
def create_activation_function(op_type: Op, min=None, max=None) -> ActivationFunction:
"""Creates activation function with min/max depending on op_type"""
act = ActivationFunction(op_type)
if op_type == Op.Relu:
act.min = 0.0
elif op_type == Op.Relu6:
act.min = 0.0
act.max = 6.0
elif op_type == Op.ReluN1To1:
act.min = -1.0
act.max = 1.0
elif op_type == Op.Tanh:
act.min = -1.0
act.max = 1.0
elif op_type == Op.Sigmoid:
act.min = 0.0
act.max = 1.0
elif op_type == Op.Clamp:
assert min is not None and max is not None
act.min = min
act.max = max
elif op_type == Op.ReluN:
assert max is not None
act.min = 0.0
act.max = max
return act
class Operation:
"""Class representing a Neural Network operation. Has a name, a type,
input and output tensors, as well as an attribute dictionary."""
__slots__ = (
"type",
"_original_type",
"name",
"op_index",
"attrs",
"inputs",
"outputs",
"intermediates",
"flops",
"scheduled_pass",
"run_on_npu",
"activation",
"memory_function",
"forced_input_quantization",
"forced_output_quantization",
"activation_lut",
"_kernel",
"ifm_shapes",
"ofm_shapes",
"rescale",
"read_offsets",
"read_shapes",
"rounding_mode",
"explicit_scaling",
"write_offset",
"write_shape",
"ifm_resampling_mode",
"tile_base_offsets_ifm",
"tile_base_offsets_ofm",
"ofm_stride_multiplier",
)
def __init__(self, op_type: Op, name: str):
self.type = op_type
self._original_type = op_type # the original type of the operation. once set this shouldn't be changed
self.name = name
self.attrs: Dict[str, Any] = {}
self.inputs: List[Optional[Tensor]] = []
self.outputs: List[Tensor] = []
self.intermediates: List[Tensor] = []
self.flops = 0
self.run_on_npu = True
# Fused activation function. If not none: operator code.
self.activation: Optional[ActivationFunction] = None
# Fused memory function, if not None: operator code
self.memory_function: Optional[Op] = None
# If not none: contains QuantizationParameters to be used as output quantization
# (which overrides the ofm tensor's quantization), used in LUT
self.forced_input_quantization = None
self.forced_output_quantization = None
self.scheduled_pass = None
self.op_index = None # input network operator index
self.activation_lut = None
self._kernel = None
self.ifm_shapes: List[Shape4D] = []
self.ofm_shapes: List[Shape4D] = []
self.read_offsets: List[Optional[Shape4D]] = [None, None] # offset for [ifm, ifm2]
self.read_shapes: List[Optional[Shape4D]] = [None, None] # read shape for [ifm, ifm2]
self.rounding_mode: Optional[NpuRoundingMode] = None
# Rescale op in TOSA supplies explicit multiplier and shift values
self.explicit_scaling: Optional[ExplicitScaling] = None
# Write offset, for operations that only produce a part of the OFM
self.write_offset: Optional[Shape4D] = None
# The amount of OFM that is produced by the operation (only if write_offset is not None).
# E.g. an operation that only fills the bottom row of an OFM of size 1x10x8x1 would have
# write_offset 0,9,0,0, write_shape 1,1,8,1
self.write_shape: Optional[Shape4D] = None
self.ifm_resampling_mode: resampling_mode = resampling_mode.NONE
# ifm (nhwc), ifm2 (nhwc)
self.tile_base_offsets_ifm: List[List[int]] = [[0, 0, 0, 0], [0, 0, 0, 0]]
# ofm (nhwc)
self.tile_base_offsets_ofm: List[int] = [0, 0, 0, 0]
# For interleaved/sparse outputs - stride is multiplied with the stride factor of the corresponding axis
# Order is [C, H, W] - default is no multiplication
self.ofm_stride_multiplier: List[int] = [1, 1, 1]
def clone(self, suffix="_clone"):
res = Operation(self.type, self.name + suffix)
# maintain the original type, in cases where the type was changed to something different
res._original_type = self._original_type
res.attrs = dict(self.attrs)
res.inputs = list(self.inputs)
res.outputs = list(self.outputs)
res.intermediates = list(self.intermediates)
res.flops = self.flops
res.run_on_npu = self.run_on_npu
res.activation = None if self.activation is None else self.activation.clone()
res.memory_function = self.memory_function
res.forced_input_quantization = self.forced_input_quantization
res.forced_output_quantization = self.forced_output_quantization
res.scheduled_pass = self.scheduled_pass
res.op_index = None # not relevant as not part of input network
res.read_offsets = list(self.read_offsets)
res.read_shapes = list(self.read_shapes)
res.write_offset = Shape4D(*self.write_offset) if self.write_offset else None
res.write_shape = Shape4D(*self.write_shape) if self.write_shape else None
res.rounding_mode = self.rounding_mode
res.explicit_scaling = self.explicit_scaling
res.ifm_resampling_mode = self.ifm_resampling_mode
res.tile_base_offsets_ifm = [_ifm.copy() for _ifm in self.tile_base_offsets_ifm]
res.tile_base_offsets_ofm = self.tile_base_offsets_ofm.copy()
res.ofm_stride_multiplier = self.ofm_stride_multiplier.copy()
return res
def __str__(self):
return "<nng.Operation '{}' type={}>".format(self.name, self.type)
__repr__ = __str__
@property
def original_type(self):
return self._original_type
@property
def type_changed(self):
return self.type != self.original_type
def get_kernel_size(self):
weights = self.weights
if weights and self.type.npu_block_type in (NpuBlockType.ConvolutionDepthWise, NpuBlockType.ConvolutionMxN):
weight_shape = full_shape(4, weights.shape, 1)
h = weight_shape[-4]
w = weight_shape[-3]
elif self.type.npu_block_type in (NpuBlockType.Pooling, NpuBlockType.ReduceSum) and "ksize" in self.attrs:
h, w = self.attrs["ksize"][1:3]
else:
h = self.attrs.get("filter_height", 1)
w = self.attrs.get("filter_width", 1)
return w, h
def get_kernel_stride(self):
if "strides" in self.attrs:
_, h, w, _ = self.attrs["strides"]
else:
h = self.attrs.get("stride_h", 1)
w = self.attrs.get("stride_w", 1)
return w, h
def get_kernel_dilation(self):
if "dilation" in self.attrs:
_, h, w, _ = self.attrs["dilation"]
else:
h = self.attrs.get("dilation_h_factor", 1)
w = self.attrs.get("dilation_w_factor", 1)
return w, h
@property
def kernel(self):
k_w, k_h = self.get_kernel_size()
s_w, s_h = self.get_kernel_stride()
d_w, d_h = self.get_kernel_dilation()
self._kernel = Kernel(k_w, k_h, s_w, s_h, d_w, d_h)
return self._kernel
def get_ifm_ifm2_weights_ofm(self):
return self.ifm, self.ifm2, self.weights, self.ofm
def get_ifm_ifm2_ofm(self):
return self.ifm, self.ifm2, self.ofm
def get_ifm_weights_biases_ofm(self):
return self.ifm, self.weights, self.bias, self.ofm
def get_ifm_ifm2_weights_biases_ofm(self):
return self.ifm, self.ifm2, self.weights, self.bias, self.ofm
def get_ifm_ofm(self):
return self.ifm, self.ofm
@property
def ifm(self):
# Gets the IFM tensor, or None if not applicable
return self.get_input(self.type.info.indices.ifms, 0)
@property
def ifm2(self):
# Gets the IFM2 tensor, or None if not applicable
return self.get_input(self.type.info.indices.ifms, 1)
@property
def bias(self):
# Gets the bias tensor, or None if not applicable
return self.get_input(self.type.info.indices.biases, 0)
@property
def weights(self):
# Gets the weight tensor, or None if not applicable
return self.get_input(self.type.info.indices.weights, 0)
def get_ifm_tensors(self):
# Gets the IFM tensors, or empty list if not applicable
return self._index_list_to_tensors(self.type.info.indices.ifms)
def get_weight_tensors(self):
# Gets the weight tensors, or empty list if not applicable
return self._index_list_to_tensors(self.type.info.indices.weights)
def get_bias_tensors(self):
# Gets the bias tensors, or empty list if not applicable
return self._index_list_to_tensors(self.type.info.indices.biases)
def _index_list_to_tensors(self, index_list):
return [self.inputs[ix] for ix in index_list if ix < len(self.inputs)]
def get_input(self, index_list, ix):
if ix >= len(index_list):
return None
if index_list[ix] >= len(self.inputs):
return None
return self.inputs[index_list[ix]]
@property
def ofm(self):
# Gets the OFM tensor, or None if not applicable
return self.outputs[0] if self.outputs else None
def get_concat_inputs_axis(self):
assert self.type.is_concat_op()
if self.type == Op.Concat:
axis_tensor = self.inputs[0]
inputs = self.inputs[1:]
elif self.type == Op.ConcatTFLite:
inputs = self.inputs
axis = self.attrs["axis"]
elif self.type == Op.PackReshaped:
# Requires fixup_pack_input to be called before this point
inputs = self.inputs
axis = self.attrs["axis"]
assert len(self.inputs) == self.attrs["values_count"]
else:
assert len(axis_tensor.ops) == 1 and axis_tensor.ops[0].type == Op.Const
axis = int(axis_tensor.values)
return inputs, axis
def get_dilation_h_w(self):
_, dilation_h, dilation_w, _ = self.attrs.get("dilation", (1, 1, 1, 1))
return dilation_h, dilation_w
def get_split_inputs_axis(self):
assert self.type.is_split_op()
offset_start = None
offset_end = None
axis = None
if self.type == Op.Split:
num_splits = self.attrs.get("num_splits")
axis_tens = self.inputs[0]
assert len(axis_tens.ops) == 1 and axis_tens.ops[0].type == Op.Const
axis = int(axis_tens.values)
input_tens = self.inputs[1]
outputs = self.outputs
assert num_splits == len(outputs)
elif self.type == Op.SplitV:
num_splits = self.attrs.get("num_splits")
input_tens = self.inputs[0]
size_tens = self.inputs[1]
assert len(size_tens.ops) == 1 and size_tens.ops[0].type == Op.Const
sizes = size_tens.values
axis_tens = self.inputs[2]
assert len(axis_tens.ops) == 1 and axis_tens.ops[0].type == Op.Const
axis = int(axis_tens.values)
for idx, size in enumerate(sizes):
# One but only one size might be set to -1, indicating that size should be inferred
if size == -1:
sizes[idx] = input_tens.shape[axis] - (sum(sizes) + 1)
break
outputs = self.outputs
assert num_splits == len(outputs)
assert sum(sizes) == input_tens.shape[axis]
elif self.type == Op.Slice:
input_tens, begin_tens, size_tens = self.inputs
outputs = self.outputs
offset_start = [0] * len(input_tens.shape)
offset_end = [0] * len(input_tens.shape)
for idx in range(len(begin_tens.values)):
offset_start[idx] = begin_tens.values[idx]
offset_end[idx] = size_tens.values[idx] + offset_start[idx]
elif self.type == Op.StridedSlice:
input_tens, begin_tens, end_tens, strides_tens = self.inputs
outputs = self.outputs
# Extract masks
ellipsis_mask = self.attrs["ellipsis_mask"]
new_axis_mask = self.attrs["new_axis_mask"]
shrink_axis_mask = self.attrs["shrink_axis_mask"]
# shrink_axis_mask/new_axis_mask/ellipsis_mask is not supported by the Operation class but the operation
# may have the attribute modified and handled in the graph optimization phase.
assert shrink_axis_mask == new_axis_mask == ellipsis_mask == 0
# use the begin and end values that were calculated in the model semantic check. this is because the end
# values can be affected (ignored) by the shrink_axis_mask and this mask may have been changed in the graph
# optimizer (see assert above)
offset_start = self.attrs["offset_begin"]
offset_end = self.attrs["offset_end"]
elif self.type == Op.UnpackReshaped:
# Requires fixup_unpack_output to be called before this point
input_tens = self.inputs[0]
outputs = self.outputs
axis = self.attrs["axis"]
num_splits = self.attrs["num"]
# Number of outputs have to equal the value of the dimension to unpack
assert num_splits == len(outputs) == input_tens.shape[axis]
else:
assert False
return input_tens, outputs, axis, offset_start, offset_end
def set_activation_lut(self, lut_tensor):
self.activation = ActivationFunction(Op.LUT)
self.activation_lut = lut_tensor
self.add_input_tensor(lut_tensor)
def add_input_tensor(self, tens):
self.inputs.append(tens)
if self not in tens.consumer_list:
tens.consumer_list.append(self)
def set_input_tensor(self, tens, idx):
tens_to_remove = self.inputs[idx]
if tens_to_remove in tens.consumer_list:
tens.consumer_list.remove(tens_to_remove)
self.inputs[idx] = tens
if self not in tens.consumer_list:
tens.consumer_list.append(self)
def get_input_quantization(self):
if self.forced_input_quantization is not None:
return self.forced_input_quantization
return self.ifm.quantization
def set_output_tensor(self, tens):
tens.ops = [self]
self.outputs = [tens]
def get_output_quantization(self):
if self.forced_output_quantization is not None:
return self.forced_output_quantization
return self.ofm.quantization
def error(self, msg):
"""
Raises a VelaError exception for errors encountered when parsing an Operation
:param self: Operation object that resulted in the error
:param msg: str object that contains a description of the specific error encountered
"""
def _print_tensors(tensors):
lines = []
for idx, tens in enumerate(tensors):
tens_name = getattr(tens, "name", "Not a Tensor")
lines.append(f" {idx} = {tens_name}")
return lines
if self.op_index is None:
lines = [f"Invalid {self.type} (name = {self.name}) operator in the internal representation. {msg}"]
else:
lines = [f"Invalid {self.type} (op_index = {self.op_index}) operator in the input network. {msg}"]
lines += [" Input tensors:"]
lines += _print_tensors(self.inputs)
lines += [" Output tensors:"]
lines += _print_tensors(self.outputs)
raise VelaError("\n".join(lines))
def set_ifm_ofm_shapes(self):
self.ifm_shapes = []
self.ofm_shapes = []
ifm_tensor, ifm2_tensor, ofm_tensor = self.get_ifm_ifm2_ofm()
if self.type == Op.Reshape:
# Set ofm shape
if len(self.inputs) > 1 and self.inputs[1].values is not None:
ofm_tensor.shape = self.inputs[1].values.flatten().tolist()
ofm_elements = ofm_tensor.elements()
# Stretch dimension
if ofm_elements < 0:
ofm_tensor.shape[ofm_tensor.shape.index(-1)] = int(ifm_tensor.elements() / abs(ofm_elements))
# set all shapes to op, as 4D
if self.type == Op.FullyConnected:
if len(self.ifm.shape) == 2:
self.ifm_shapes.append(Shape4D([self.ifm.shape[0], 1, 1, self.ifm.shape[1]]))
else:
# Special case, handled in graph optimization
self.ifm_shapes.append(Shape4D(ifm_tensor.get_full_shape()))
self.ofm_shapes.append(Shape4D(ofm_tensor.get_full_shape()))
elif self.type == Op.Softmax:
self.ifm_shapes.append(Shape4D(ifm_tensor.get_full_shape()))
self.ofm_shapes.append(Shape4D(ofm_tensor.get_full_shape()))
elif self.type.is_split_op() or self.type.is_concat_op():
for inp in self.inputs:
if inp is not None:
self.ifm_shapes.append(Shape4D(full_shape(4, inp.shape, 1)))
else:
self.ifm_shapes.append(None)
for out in self.outputs:
if out is not None:
self.ofm_shapes.append(Shape4D(full_shape(4, out.shape, 1)))
else:
self.ofm_shapes.append(None)
else:
if ifm_tensor is not None:
self.ifm_shapes.append(Shape4D(full_shape(4, ifm_tensor.shape, 1)))
if ifm2_tensor is not None:
self.ifm_shapes.append(Shape4D(full_shape(4, ifm2_tensor.shape, 1)))
if ofm_tensor is not None:
self.ofm_shapes.append(Shape4D(full_shape(4, ofm_tensor.shape, 1)))
def has_scaling(self):
scaled = True
for tensor in [self.ifm, self.ifm2, self.ofm]:
if tensor is not None:
if tensor.quantization is None:
scaled = False
break
return scaled