blob: e527cd4d1f07c6021acfde87233dfd65a68dcf4d [file] [log] [blame]
# SPDX-FileCopyrightText: Copyright 2020-2022 Arm Limited and/or its affiliates <open-source-office@arm.com>
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the License); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Description:
# Functions used to write to a TensorFlow Lite format file. Supports adding in file identifiers.
import flatbuffers
import flatbuffers.number_types as N
import numpy as np
from flatbuffers import encode
from flatbuffers.builder import UOffsetTFlags
from .errors import VelaError
from .nn_graph import PassPlacement
from .operation import Op
from .reader_util import align_inputs_indices
from .tensor import MemType
from .tensor import shape_num_elements
from .tensor import TensorPurpose
from .tflite import Buffer
from .tflite import Metadata
from .tflite import Model
from .tflite import Operator
from .tflite import OperatorCode
from .tflite import QuantizationParameters
from .tflite import SubGraph
from .tflite import Tensor
from .tflite_mapping import builtin_operator_inv_map
from .tflite_mapping import BuiltinOperator
from .tflite_mapping import datatype_inv_map
# the python flatbuffer interface is missing a method to add in file identifier. patching it in here:
tflite_version = 3
tflite_file_identifier = "TFL" + str(tflite_version)
def FinishWithFileIdentifier(self, rootTable, fid):
if fid is None or len(fid) != 4:
raise VelaError("FileIdentifier must be 4 chars")
flags = N.Uint8Flags
prepSize = 4
self.Prep(self.minalign, prepSize + len(fid))
for i in range(3, -1, -1):
self.head = self.head - flags.bytewidth
encode.Write(flags.packer_type, self.Bytes, self.Head(), ord(fid[i]))
return self.Finish(rootTable)
flatbuffers.Builder.FinishWithFileIdentifier = FinishWithFileIdentifier
def make_vector(v):
try:
len(v)
return v
except TypeError:
return [v]
class TFLiteSerialiser:
BUF_IDX_SCRATCH = 0 # Always assign scratch to buffer 0
BUF_IDX_SCRATCH_FAST = 1 # Always assign scratch_fast to buffer 1
BUF_IDX_START = 2 # Unique buffer id for every tensor in all subgraphs
def __init__(self, nng):
self.builder = flatbuffers.Builder(0)
self.nng = nng
self.buf_idx = TFLiteSerialiser.BUF_IDX_START
self.buffers_to_write = [] # have an empty array there
self.tensor_map_all = [] # Keep track of all subgraphs
self.tensor_map_sg = [] # Keep track of one subgraph
self.ops_to_ignore = (Op.Const, Op.Placeholder, Op.SubgraphInput)
self.tensors_to_reshape = {}
self.subgraphs_to_write = [sg for sg in self.nng.subgraphs if sg.placement == PassPlacement.Cpu]
all_ops = []
for sg in self.subgraphs_to_write:
for ps in sg.passes:
for op in ps.ops:
if op.type not in self.ops_to_ignore:
# swap from nng input indexing to TensorFlow Lite input indexing
self.align_nng_inputs_to_tflite(op)
all_ops.append(op)
if op.type.is_conv2d_op() or op.type.is_depthwise_conv2d_op():
# If values are None op has non-constant weights
if op.inputs[1].values is not None:
self.tensors_to_reshape[op.inputs[1]] = (3, 0, 1, 2)
if op.type == Op.FullyConnected:
# If values are None op has non-constant weights
if op.inputs[1].values is not None:
self.tensors_to_reshape[op.inputs[1]] = (1, 0)
# list of tuple(Op, string); the custom code is only used for 3rd party custom operators
self.operator_codes = sorted(set((op.type, op.attrs.get("custom_code", "")) for op in all_ops))
self.operator_code_map = {}
def align_nng_inputs_to_tflite(self, op):
from_indices = op.type.info.indices
_, _, to_indices = builtin_operator_inv_map[op.type]
op.inputs = align_inputs_indices(from_indices, to_indices, op.inputs)
def write_byte_vector(self, v, alignment=1):
builder = self.builder
builder.StartVector(1, len(v), alignment)
for e in v[::-1]:
builder.PrependByte(e)
return builder.EndVector()
def write_int_vector(self, v):
builder = self.builder
builder.StartVector(4, len(v), 4)
for e in v[::-1]:
builder.PrependInt32(e)
return builder.EndVector()
def write_long_vector(self, v):
builder = self.builder
builder.StartVector(8, len(v), 8)
for e in v[::-1]:
builder.PrependInt64(e)
return builder.EndVector()
def write_float_vector(self, v):
builder = self.builder
builder.StartVector(4, len(v), 4)
for e in v[::-1]:
builder.PrependFloat32(e)
return builder.EndVector()
def write_offset_vector(self, v):
builder = self.builder
builder.StartVector(4, len(v), 4)
for e in v[::-1]:
builder.PrependUOffsetTRelative(e)
return builder.EndVector()
def assign_buffers_to_tensors(self, tensors, scratch_tensor):
if scratch_tensor is not None:
scratch_tensor_mem_area = scratch_tensor.mem_area
else:
scratch_tensor_mem_area = None # all tensors are initialised to MemArea.Unknown
buffer_map = {}
for tens in tensors:
# Set buffer ids depending on allocation
if tens.is_allocated_in_tensor_arena(scratch_tensor_mem_area):
buffer_map[tens] = TFLiteSerialiser.BUF_IDX_SCRATCH
elif tens.mem_type == MemType.Scratch_fast:
# For Scratch_fast when not co-allocated with scratch in the TensorArena:
buffer_map[tens] = TFLiteSerialiser.BUF_IDX_SCRATCH_FAST
else:
buffer_map[tens] = self.buf_idx
self.buf_idx += 1
# Initialize/extend buffers_to_write to a length equal to number of buffers so
# they can be appended at the correct index during tensor serialization
self.buffers_to_write += [None] * (self.buf_idx)
return buffer_map
def serialise_operator_code(self, idx, op_type, custom_code):
builder = self.builder
custom_code_offset = None
if op_type == Op.Custom:
tf_code, opt_serializer, _ = builtin_operator_inv_map[op_type]
custom_code_offset = builder.CreateString(custom_code)
else:
assert (
op_type in builtin_operator_inv_map
), "Vela does not contain a mapping to serialise {} operator to a TensorFlow Lite operator".format(op_type)
tf_code, opt_serializer, _ = builtin_operator_inv_map[op_type]
if op_type == Op.CustomNpuOp:
assert (
tf_code == BuiltinOperator.CUSTOM
), "Vela only supports serialising NpuOp operators as TensorFlow Lite Custom operators"
custom_code_offset = builder.CreateString("ethos-u")
# there can be multiple different types of 3rd party custom operators (i.e. non-"ethos-u" ones). therefore we
# need to add an extra level of indirection to this particular entry in the operator_code_map to allow for the
# correct lookup later on
if op_type == Op.Custom:
if op_type not in self.operator_code_map:
self.operator_code_map[op_type] = {}
self.operator_code_map[op_type][custom_code] = (idx, tf_code, opt_serializer)
else:
self.operator_code_map[op_type] = (idx, tf_code, opt_serializer)
OperatorCode.OperatorCodeStart(builder)
OperatorCode.OperatorCodeAddDeprecatedBuiltinCode(builder, tf_code if tf_code < 127 else 127)
OperatorCode.OperatorCodeAddBuiltinCode(builder, tf_code)
if custom_code_offset is not None:
OperatorCode.OperatorCodeAddCustomCode(builder, custom_code_offset)
return OperatorCode.OperatorCodeEnd(builder)
def serialise_quantization_parameters(self, quant):
builder = self.builder
qp = None
min = None
max = None
scale = None
zero_point = None
if quant is not None:
if quant.min is not None:
min = self.write_float_vector(make_vector(quant.min))
if quant.max is not None:
max = self.write_float_vector(make_vector(quant.max))
if quant.scale_f32 is not None:
scale = self.write_float_vector(make_vector(quant.scale_f32))
if quant.zero_point is not None:
zero_point = self.write_long_vector(make_vector(quant.zero_point))
QuantizationParameters.QuantizationParametersStart(builder)
if min is not None:
QuantizationParameters.QuantizationParametersAddMin(builder, min)
if max is not None:
QuantizationParameters.QuantizationParametersAddMax(builder, max)
if scale is not None:
QuantizationParameters.QuantizationParametersAddScale(builder, scale)
if zero_point is not None:
QuantizationParameters.QuantizationParametersAddZeroPoint(builder, zero_point)
if quant.quant_dim is not None:
QuantizationParameters.QuantizationParametersAddQuantizedDimension(builder, quant.quant_dim)
qp = QuantizationParameters.QuantizationParametersEnd(builder)
return qp
def serialise_tensor(self, tens):
builder = self.builder
if shape_num_elements(tens.original_shape) != shape_num_elements(tens.shape):
# shapes have changed size, therefore assume that the latest (modified) shape is correct
tens_shape = tens.shape
else:
# shapes have not changed size, therefore the original shape is valid
tens_shape = tens.original_shape
values = tens.values
if values is None:
values = np.empty(shape=(0), dtype=np.uint8)
if tens in self.tensors_to_reshape:
reorder = self.tensors_to_reshape[tens]
tens_shape = [tens_shape[idx] for idx in reorder]
values = values.transpose(reorder)
buf_id = self.buffer_map[tens]
self.buffers_to_write[buf_id] = values.flatten().view(np.uint8)
shape = self.write_int_vector(tens_shape)
name = builder.CreateString(tens.name)
quant = self.serialise_quantization_parameters(tens.quantization)
Tensor.TensorStart(builder)
Tensor.TensorAddShape(builder, shape)
Tensor.TensorAddType(builder, datatype_inv_map[tens.dtype])
# All tensors must have a valid backing buffer, even if it is empty.
# Empty buffers should be kept unique for TensorFlow Lite Micro
Tensor.TensorAddBuffer(builder, buf_id)
Tensor.TensorAddName(builder, name)
if quant is not None:
Tensor.TensorAddQuantization(builder, quant)
Tensor.TensorAddIsVariable(builder, tens.is_variable)
res = Tensor.TensorEnd(builder)
return res
def serialise_operator(self, op):
builder = self.builder
inputs_offset = self.write_int_vector(
[self.tensor_map_sg[tens] if tens in self.tensor_map_sg else -1 for tens in op.inputs]
)
outputs_offset = self.write_int_vector(
[self.tensor_map_sg[tens] for tens in op.outputs if tens in self.tensor_map_sg]
)
intermediates_offset = self.write_int_vector(
[self.tensor_map_sg[tens] for tens in op.intermediates if tens in self.tensor_map_sg]
)
if op.type == Op.Custom:
op_idx, tflop, opt_serializer = self.operator_code_map[op.type][op.attrs.get("custom_code", "")]
else:
op_idx, tflop, opt_serializer = self.operator_code_map[op.type]
builtin_opt_offset = None
custom_opt_offset = None
if opt_serializer is not None:
attrs = dict(op.attrs)
if "strides" in attrs:
attrs["stride_h"] = attrs["strides"][1]
attrs["stride_w"] = attrs["strides"][2]
if "ksize" in attrs:
attrs["filter_height"] = attrs["ksize"][1]
attrs["filter_width"] = attrs["ksize"][2]
if "dilation" in attrs:
attrs["dilation_h_factor"] = attrs["dilation"][1]
attrs["dilation_w_factor"] = attrs["dilation"][2]
if "channel_multiplier" in attrs:
attrs["depth_multiplier"] = attrs["channel_multiplier"]
attrs["fused_activation_function"] = op.activation.op_type if op.activation is not None else None
builtin_opt_offset, custom_opt_offset = opt_serializer.serialize(builder, attrs)
mutating_variable_inputs_offset = self.write_byte_vector([])
Operator.OperatorStart(builder)
Operator.OperatorAddOpcodeIndex(builder, op_idx)
Operator.OperatorAddInputs(builder, inputs_offset)
Operator.OperatorAddOutputs(builder, outputs_offset)
Operator.OperatorAddIntermediates(builder, intermediates_offset)
if builtin_opt_offset is not None:
Operator.OperatorAddBuiltinOptionsType(builder, opt_serializer.builtin_opt_type)
Operator.OperatorAddBuiltinOptions(builder, builtin_opt_offset)
if custom_opt_offset is not None:
Operator.OperatorAddCustomOptions(builder, custom_opt_offset)
Operator.OperatorAddCustomOptionsFormat(builder, opt_serializer.custom_opt_format)
Operator.OperatorAddMutatingVariableInputs(builder, mutating_variable_inputs_offset)
return Operator.OperatorEnd(builder)
def serialise_subgraph(self, sg, name):
builder = self.builder
all_ops = []
placeholder_ops = []
for ps in sg.passes:
for op in ps.ops:
if op.type not in self.ops_to_ignore:
all_ops.append(op)
elif op.type == Op.Placeholder:
placeholder_ops.append(op)
# Make sure all original tensors are written back, special case for Ops
# with connected subgraphs. Even though not all inputs are used,
# the reference kernel expects all inputs to be in the tflite file.
# Since we traverse the graph starting with all outputs they are
# always added but if an input is not referenced it will not be added
# to an op.
tensor_set = set(sg.original_inputs)
# Add the tensors from all valid ops, as well as the tensors from placeholder ops
# This allows us to serialise tensors which arent attached to any specific ops,
# e.g. due to an empty graph containing no ops
for op in all_ops + placeholder_ops:
for tens in op.inputs + op.outputs + op.intermediates:
if tens is not None:
tensor_set.add(tens)
all_tensors = [tens for nm, idx, tens in sorted((tens.name, idx, tens) for idx, tens in enumerate(tensor_set))]
scratch_tensors = [tens for tens in all_tensors if tens.purpose is TensorPurpose.Scratch]
if len(scratch_tensors) == 0:
scratch_tensor = None
else:
assert len(scratch_tensors) == 1, "Multiple scratch tensors"
scratch_tensor = scratch_tensors[0]
self.tensor_map_sg = {tens: idx for idx, tens in enumerate(all_tensors)}
self.buffer_map = self.assign_buffers_to_tensors(all_tensors, scratch_tensor)
self.tensor_map_all.append(self.tensor_map_sg)
tensors_offset = self.write_offset_vector([self.serialise_tensor(tens) for tens in all_tensors])
# Make sure the input_tensors haven't been modified
assert all(inp in sg.original_inputs for inp in sg.input_tensors)
inputs = [self.tensor_map_sg[tens] for tens in sg.original_inputs if tens in self.tensor_map_sg]
inputs_offset = self.write_int_vector(inputs)
outputs_offset = self.write_int_vector(
[self.tensor_map_sg[tens] for tens in sg.output_tensors if tens in self.tensor_map_sg]
)
operators_offset = self.write_offset_vector([self.serialise_operator(op) for op in all_ops])
SubGraph.SubGraphStart(builder)
SubGraph.SubGraphAddTensors(builder, tensors_offset)
SubGraph.SubGraphAddInputs(builder, inputs_offset)
SubGraph.SubGraphAddOutputs(builder, outputs_offset)
SubGraph.SubGraphAddOperators(builder, operators_offset)
SubGraph.SubGraphAddName(builder, name)
return SubGraph.SubGraphEnd(builder)
def write_aligned_bytes(self, buf):
builder = self.builder
builder.assertNotNested()
builder.nested = True
data = bytes(buf)
length_bytes = UOffsetTFlags.py_type(len(data))
builder.vectorNumElems = length_bytes
builder.Prep(16, length_bytes) # Reserve aligned storage
builder.head = UOffsetTFlags.py_type(builder.Head() - length_bytes) # Update FlatBuffer internal pointer
builder.Bytes[builder.Head() : builder.Head() + length_bytes] = data # Assign bytes to aligned area
return builder.EndVector()
def serialise_buffer(self, buf):
builder = self.builder
data = None
if buf is not None:
data = self.write_aligned_bytes(buf)
Buffer.BufferStart(builder)
if data is not None:
Buffer.BufferAddData(builder, data)
return Buffer.BufferEnd(builder)
def serialise_metadata(self, metadata):
builder = self.builder
name = builder.CreateString(metadata[0])
Metadata.MetadataStart(builder)
Metadata.MetadataAddName(builder, name)
Metadata.MetadataAddBuffer(builder, metadata[1])
return Metadata.MetadataEnd(builder)
def serialise_model(self):
builder = self.builder
operator_code_offset = self.write_offset_vector(
[self.serialise_operator_code(idx, optype, code) for idx, (optype, code) in enumerate(self.operator_codes)]
)
description = builder.CreateString("Vela Optimised")
subgraph_offset = self.write_offset_vector(
[self.serialise_subgraph(sg, builder.CreateString(sg.name)) for sg in self.subgraphs_to_write]
)
# Fill the metadata buffer
version = np.int32(0)
subgraph_idx = np.int32(len(self.subgraphs_to_write))
nbr_tensors_all = np.sum([len(tensor_map_sg) for tensor_map_sg in self.tensor_map_all], dtype=np.int32)
offlineAlloc = [version, subgraph_idx, nbr_tensors_all]
if not any([name == b"OfflineMemoryAllocation" for name, _ in self.nng.metadata]):
for tensor_map_sg in self.tensor_map_all:
nbr_tensors_sg = np.int32(len(tensor_map_sg))
# An offset of -1 indicates that the tensor will be allocated online by Tensorflow Lite Micro
offsets = [np.int32(-1)] * nbr_tensors_sg
# Ensure that the order of the offsets match the order of the tensors
for tens, idx in tensor_map_sg.items():
# Set offsets for tensor allocated in Tensor Arena or in the scratch_fast area
if tens.mem_type in (MemType.Scratch, MemType.Scratch_fast):
offsets[idx] = np.int32(tens.address) if tens.address is not None else np.int32(0)
offlineAlloc += offsets
self.nng.metadata.append(("OfflineMemoryAllocation", np.array(offlineAlloc)))
metadata_list = []
for name, buffer in self.nng.metadata:
self.buffers_to_write.append(buffer)
metadata_list.append((name, len(self.buffers_to_write) - 1))
buffers_offset = self.write_offset_vector([self.serialise_buffer(buf) for buf in self.buffers_to_write])
metadata_offset = self.write_offset_vector([self.serialise_metadata(metadata) for metadata in metadata_list])
Model.ModelStart(builder)
Model.ModelAddVersion(builder, tflite_version)
Model.ModelAddOperatorCodes(builder, operator_code_offset)
Model.ModelAddSubgraphs(builder, subgraph_offset)
Model.ModelAddDescription(builder, description)
Model.ModelAddBuffers(builder, buffers_offset)
Model.ModelAddMetadata(builder, metadata_offset)
return Model.ModelEnd(builder)
def serialise(self):
model = self.serialise_model()
self.builder.FinishWithFileIdentifier(model, tflite_file_identifier)
return self.builder.Output()
def write(self, filename):
with open(self.filename, "wb") as f:
f.write(self.serialised_buf)
def write_tflite(nng, filename):
writer = TFLiteSerialiser(nng)
buf = writer.serialise()
with open(filename, "wb") as f:
f.write(buf)