blob: 5a4a6da10d1b37a05b75f4a0dede87718e3afaeb [file] [log] [blame]
Lior Dekel489e40b2021-08-02 12:03:55 +03001/*
Lior Dekel4882dbe2022-02-09 17:18:27 +02002 * Copyright (c) 2019-2022 Arm Limited. All rights reserved.
Lior Dekel489e40b2021-08-02 12:03:55 +03003 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Licensed under the Apache License, Version 2.0 (the License); you may
7 * not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19/****************************************************************************
20 * Includes
21 ****************************************************************************/
22#include "tx_api.h"
23
24#include <inttypes.h>
25#include <stdio.h>
26#include <vector>
27
28#include "inference_process.hpp"
29
30// Model data (Defined & changable by modifiying compile definition in CMakeLists.txt)
31#include "input.h"
32#include "model.h"
33#include "output.h"
34
35using namespace std;
36using namespace InferenceProcess;
37
38/****************************************************************************
39 * Defines
40 ****************************************************************************/
41// Nr. of threads to process inferences with. Thread reserves driver & runs inference (Normally 1 per NPU, but not a
42// must)
43#define NUM_INFERENCE_THREADS 1
44// Nr. of threads to create jobs and recieve responses
45#define NUM_JOB_THREADS 2
46// Nr. of jobs to create per job thread
47#define NUM_JOBS_PER_THREAD 1
48
Davide Grohmann41dc3412022-02-15 17:19:56 +010049#define MAX_THREAD_NAME_SIZE 128
50
Kristofer Jonsson089b31e2022-08-10 16:04:26 +020051#define PROCESS_THREAD_STACK_SIZE (32 * 1024)
Lior Dekel489e40b2021-08-02 12:03:55 +030052#define SENDER_THREAD_STACK_SIZE (2 * 1024)
53#define PROCESS_THREAD_CONTEXT_SIZE (sizeof(TX_THREAD))
54#define SENDER_THREAD_CONTEXT_SIZE (sizeof(TX_THREAD))
55
56// Tensor arena size
57#ifdef TENSOR_ARENA_SIZE // If defined in model.h
58#define TENSOR_ARENA_SIZE_PER_INFERENCE TENSOR_ARENA_SIZE
59#else // If not defined, use maximum available - 2M
60#define TENSOR_ARENA_SIZE 2000000
61#define TENSOR_ARENA_SIZE_PER_INFERENCE (TENSOR_ARENA_SIZE / NUM_INFERENCE_THREADS)
62#endif
63
64#define PROCESS_QUEUE_SIZE (NUM_JOBS_PER_THREAD * NUM_JOB_THREADS * sizeof(xInferenceJob *))
65#define SENDER_QUEUE_SIZE (NUM_JOBS_PER_THREAD * sizeof(xInferenceJob *))
66
67/* BYTE_POOL_SIZE_OVERHEAD is used to increase the memory byte pool size, as the number of
68 allocatable bytes in a memory byte pool is slightly less than what was specified during creation */
69#define BYTE_POOL_SIZE_OVERHEAD (512)
70#define BYTE_POOL_SIZE \
71 (((PROCESS_THREAD_CONTEXT_SIZE + PROCESS_THREAD_STACK_SIZE) * NUM_INFERENCE_THREADS) + \
72 (SENDER_THREAD_CONTEXT_SIZE + SENDER_THREAD_STACK_SIZE + SENDER_QUEUE_SIZE) * NUM_JOB_THREADS + \
73 PROCESS_QUEUE_SIZE + BYTE_POOL_SIZE_OVERHEAD)
74
75/****************************************************************************
76 * Structures
77 ****************************************************************************/
78struct ProcessThreadParams {
79 ProcessThreadParams() : queueHandle(nullptr), tensorArena(nullptr), arenaSize(0) {}
80 ProcessThreadParams(TX_QUEUE *_queue, uint8_t *_tensorArena, size_t _arenaSize) :
81 queueHandle(_queue), tensorArena(_tensorArena), arenaSize(_arenaSize) {}
82
83 TX_QUEUE *queueHandle;
84 uint8_t *tensorArena;
85 size_t arenaSize;
86};
87
88// Wrapper around InferenceProcess::InferenceJob. Adds responseQueue and status for ThreadX multi-threaded purposes.
89struct xInferenceJob : public InferenceJob {
90 TX_QUEUE *responseQueue;
91 bool status;
92
93 xInferenceJob() : InferenceJob(), responseQueue(nullptr), status(false) {}
94 xInferenceJob(const string &_name,
95 const DataPtr &_networkModel,
96 const vector<DataPtr> &_input,
97 const vector<DataPtr> &_output,
98 const vector<DataPtr> &_expectedOutput,
99 const size_t _numBytesToPrint,
Lior Dekel4882dbe2022-02-09 17:18:27 +0200100 void *_userArg,
Lior Dekel489e40b2021-08-02 12:03:55 +0300101 TX_QUEUE *_queue) :
Lior Dekel4882dbe2022-02-09 17:18:27 +0200102 InferenceJob(_name, _networkModel, _input, _output, _expectedOutput, _numBytesToPrint, _userArg),
Lior Dekel489e40b2021-08-02 12:03:55 +0300103 responseQueue(_queue), status(false) {}
104};
105
106/****************************************************************************
107 * Global and static variables
108 ****************************************************************************/
109namespace {
110// Number of total completed jobs, needed to exit application correctly if NUM_JOB_THREADS > 1
111int totalCompletedJobs = 0;
112
113// TensorArena static initialisation
114const size_t arenaSize = TENSOR_ARENA_SIZE_PER_INFERENCE;
115
116TX_QUEUE inferenceProcessQueue;
Davide Grohmann41dc3412022-02-15 17:19:56 +0100117char inferenceProcessQueueName[] = "inferenceProcessQueue";
Lior Dekel489e40b2021-08-02 12:03:55 +0300118
119ProcessThreadParams threadParams[NUM_INFERENCE_THREADS];
120
121TX_BYTE_POOL bytePool;
Davide Grohmann41dc3412022-02-15 17:19:56 +0100122char bytePoolName[] = "byte pool";
123
Lior Dekel489e40b2021-08-02 12:03:55 +0300124ULONG memoryArea[BYTE_POOL_SIZE / sizeof(ULONG)];
125} // namespace
126
127__attribute__((section(".bss.tensor_arena"), aligned(16)))
128uint8_t inferenceProcessTensorArena[NUM_INFERENCE_THREADS][arenaSize];
129
130/****************************************************************************
131 * Mutex & Semaphore
132 * Overrides weak-linked symbols in ethosu_driver.c to implement thread handling
133 ****************************************************************************/
134extern "C" {
135void *ethosu_mutex_create(void) {
136 UINT status;
137 TX_MUTEX *mutex;
138
139 mutex = new TX_MUTEX;
Davide Grohmann41dc3412022-02-15 17:19:56 +0100140 status = tx_mutex_create(mutex, nullptr, TX_NO_INHERIT);
Lior Dekel489e40b2021-08-02 12:03:55 +0300141 if (status != TX_SUCCESS) {
142 printf("mutex create failed, error - %d\n", status);
143 }
144 return (void *)mutex;
145}
146
Ledion Daja60c57372022-04-05 15:04:11 +0200147int ethosu_mutex_lock(void *mutex) {
Lior Dekel489e40b2021-08-02 12:03:55 +0300148 UINT status;
149 status = tx_mutex_get(reinterpret_cast<TX_MUTEX *>(mutex), TX_WAIT_FOREVER);
150 if (status != TX_SUCCESS) {
151 printf("mutex get failed, error - %d\n", status);
Ledion Daja60c57372022-04-05 15:04:11 +0200152 return -1;
Lior Dekel489e40b2021-08-02 12:03:55 +0300153 }
Ledion Daja60c57372022-04-05 15:04:11 +0200154 return 0;
Lior Dekel489e40b2021-08-02 12:03:55 +0300155}
156
Ledion Daja60c57372022-04-05 15:04:11 +0200157int ethosu_mutex_unlock(void *mutex) {
Lior Dekel489e40b2021-08-02 12:03:55 +0300158 UINT status;
159 status = tx_mutex_put(reinterpret_cast<TX_MUTEX *>(mutex));
160 if (status != TX_SUCCESS) {
161 printf("mutex put failed, error - %d\n", status);
Ledion Daja60c57372022-04-05 15:04:11 +0200162 return -1;
Lior Dekel489e40b2021-08-02 12:03:55 +0300163 }
Ledion Daja60c57372022-04-05 15:04:11 +0200164 return 0;
Lior Dekel489e40b2021-08-02 12:03:55 +0300165}
166
167void *ethosu_semaphore_create(void) {
168 UINT status;
169 TX_SEMAPHORE *semaphore;
170
171 semaphore = new TX_SEMAPHORE;
Davide Grohmann06ebcbc2022-04-04 14:27:49 +0200172 status = tx_semaphore_create(semaphore, nullptr, 0);
Lior Dekel489e40b2021-08-02 12:03:55 +0300173
174 if (status != TX_SUCCESS) {
175 printf("Semaphore create failed, error - %d\n", status);
176 }
177
178 return (void *)semaphore;
179}
180
Ledion Daja60c57372022-04-05 15:04:11 +0200181int ethosu_semaphore_take(void *sem) {
Lior Dekel489e40b2021-08-02 12:03:55 +0300182 UINT status;
183
184 status = tx_semaphore_get(reinterpret_cast<TX_SEMAPHORE *>(sem), TX_WAIT_FOREVER);
185
186 if (status != TX_SUCCESS) {
187 printf("Semaphore get/take, error - %d\n", status);
Ledion Daja60c57372022-04-05 15:04:11 +0200188 return -1;
Lior Dekel489e40b2021-08-02 12:03:55 +0300189 }
190
Ledion Daja60c57372022-04-05 15:04:11 +0200191 return 0;
Lior Dekel489e40b2021-08-02 12:03:55 +0300192}
193
Ledion Daja60c57372022-04-05 15:04:11 +0200194int ethosu_semaphore_give(void *sem) {
Lior Dekel489e40b2021-08-02 12:03:55 +0300195 UINT status;
196
197 status = tx_semaphore_put(reinterpret_cast<TX_SEMAPHORE *>(sem));
198
199 if (status != TX_SUCCESS) {
200 printf("Semaphore put/give, error - %d\n", status);
Ledion Daja60c57372022-04-05 15:04:11 +0200201 return -1;
Lior Dekel489e40b2021-08-02 12:03:55 +0300202 }
203
Ledion Daja60c57372022-04-05 15:04:11 +0200204 return 0;
Lior Dekel489e40b2021-08-02 12:03:55 +0300205}
206}
207
208/****************************************************************************
209 * Functions
210 ****************************************************************************/
211// inferenceProcessThread - Run jobs from queue with available driver
212void inferenceProcessThread(ULONG pvParameters) {
213 ProcessThreadParams params = *reinterpret_cast<ProcessThreadParams *>(pvParameters);
214 UINT tx_status = TX_QUEUE_ERROR;
215
216 class InferenceProcess inferenceProcess(params.tensorArena, params.arenaSize);
217
218 for (;;) {
219 xInferenceJob *xJob;
220
221 // Get the job details from the process queue
222 tx_status = tx_queue_receive(params.queueHandle, &xJob, TX_WAIT_FOREVER);
223 if (tx_status != TX_SUCCESS) {
224 printf("process failed to receive from Queue, error - %d\n", tx_status);
225 exit(1);
226 }
227
228 // run the job
229 bool status = inferenceProcess.runJob(*xJob);
230 xJob->status = status;
231
232 // Send response for the job in the response queue
233 tx_status = tx_queue_send(xJob->responseQueue, &xJob, TX_WAIT_FOREVER);
234 if (tx_status != TX_SUCCESS) {
235 printf("process inferenceProcessThread failed to send to Queue, error - %d\n", tx_status);
236 exit(1);
237 }
238 }
239
240 tx_status = tx_thread_terminate(nullptr);
241 if (tx_status != TX_SUCCESS) {
242 printf("process inferenceProcessThread failed to terminate thread, error - %d\n", tx_status);
243 exit(1);
244 }
245}
246
247// inferenceSenderThread - Creates NUM_INFERNECE_JOBS jobs, queues them, and then listens for completion status
248void inferenceSenderThread(ULONG pvParameters) {
249 int ret = 0;
250 TX_QUEUE senderQueue;
251 UINT status = TX_QUEUE_ERROR;
252 TX_QUEUE *inferenceProcessQueueLocal = reinterpret_cast<TX_QUEUE *>(pvParameters);
253 xInferenceJob jobs[NUM_JOBS_PER_THREAD];
Davide Grohmann41dc3412022-02-15 17:19:56 +0100254 CHAR *senderQueuePtr = nullptr;
255 char senderQueueName[] = "senderQueue";
Lior Dekel489e40b2021-08-02 12:03:55 +0300256
257 /* Allocate memory for this inference sender thread responses queue */
258 status = tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&senderQueuePtr), SENDER_QUEUE_SIZE, TX_NO_WAIT);
259 if (status != TX_SUCCESS) {
260 printf("Sender thread failed to allocate bytes for Queue, error - %d\n", status);
261 exit(1);
262 }
263
264 /* Create responses queue for this inference sender thread */
265 status = tx_queue_create(
Davide Grohmann41dc3412022-02-15 17:19:56 +0100266 &senderQueue, senderQueueName, sizeof(xInferenceJob *) / sizeof(uint32_t), senderQueuePtr, SENDER_QUEUE_SIZE);
Lior Dekel489e40b2021-08-02 12:03:55 +0300267
268 if (status != TX_SUCCESS) {
269 printf("Sender thread failed to create Queue, error - %d\n", status);
270 exit(1);
271 }
272
273 /* Create the jobs and queue them in the inference process queue */
274 for (int n = 0; n < NUM_JOBS_PER_THREAD; n++) {
275
276 // Create job
277 xInferenceJob *job = &jobs[n];
278 job->name = string(modelName);
279 job->networkModel = DataPtr(networkModelData, sizeof(networkModelData));
280 job->input.push_back(DataPtr(inputData, sizeof(inputData)));
281 job->expectedOutput.push_back(DataPtr(expectedOutputData, sizeof(expectedOutputData)));
282 job->responseQueue = &senderQueue;
283
284 // queue job
285 status = tx_queue_send(inferenceProcessQueueLocal, &job, TX_WAIT_FOREVER);
286 if (status != TX_SUCCESS) {
287 printf("Sender thread failed to send to Queue, error - %d\n", status);
288 exit(1);
289 }
290 }
291
292 /* Listen for completion status on the response queue */
293 do {
294 xInferenceJob *pSendJob;
295
296 status = tx_queue_receive(&senderQueue, &pSendJob, TX_WAIT_FOREVER);
297 if (status != TX_SUCCESS) {
298 printf("Sender thread failed to receive from Queue, error - %d\n", status);
299 exit(1);
300 }
301
302 totalCompletedJobs++;
303 ret = (pSendJob->status);
304 if (pSendJob->status != 0) {
305 break;
306 }
307 } while (totalCompletedJobs < NUM_JOBS_PER_THREAD * NUM_JOB_THREADS);
308
309 /* delete the response queue */
310 status = tx_queue_delete(&senderQueue);
311 if (status != TX_SUCCESS) {
312 printf("Sender thread failed to delete Queue, error - %d\n", status);
313 exit(1);
314 }
315
316 exit(ret);
317}
318
319/****************************************************************************
320 * Application
321 ****************************************************************************/
322int main() {
323 /* Enter the ThreadX kernel. */
324 tx_kernel_enter();
325 return 0;
326}
327
328void tx_application_define(void *first_unused_memory) {
329 UINT status;
330 CHAR *senderThreadStackPtr[NUM_JOB_THREADS] = {nullptr};
331 CHAR *processThreadStackPtr[NUM_INFERENCE_THREADS] = {nullptr};
332 CHAR *processQueuePtr = nullptr;
333 CHAR *senderThreadPtr[NUM_JOB_THREADS] = {nullptr};
Davide Grohmann41dc3412022-02-15 17:19:56 +0100334 CHAR senderThreadNames[NUM_JOB_THREADS][MAX_THREAD_NAME_SIZE];
335 CHAR *processThreadPtr[NUM_INFERENCE_THREADS] = {nullptr};
336 CHAR processThreadNames[NUM_JOB_THREADS][MAX_THREAD_NAME_SIZE];
Lior Dekel489e40b2021-08-02 12:03:55 +0300337
338 /* Create a byte memory pool from which to allocate the threads stacks and queues. */
Davide Grohmann41dc3412022-02-15 17:19:56 +0100339 status = tx_byte_pool_create(&bytePool, bytePoolName, memoryArea, BYTE_POOL_SIZE);
Lior Dekel489e40b2021-08-02 12:03:55 +0300340 if (status != TX_SUCCESS) {
341 printf("Main failed to allocate pool of bytes, error - %d\n", status);
342 exit(1);
343 }
344
345 /* Allocate memory for the inference process queue */
346 status = tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&processQueuePtr), PROCESS_QUEUE_SIZE, TX_NO_WAIT);
347 if (status != TX_SUCCESS) {
348 printf("Main failed to allocate bytes for process queue, error - %d\n", status);
349 exit(1);
350 }
351
352 status = tx_queue_create(&inferenceProcessQueue,
Davide Grohmann41dc3412022-02-15 17:19:56 +0100353 inferenceProcessQueueName,
Lior Dekel489e40b2021-08-02 12:03:55 +0300354 sizeof(xInferenceJob *) / sizeof(uint32_t),
355 processQueuePtr,
356 PROCESS_QUEUE_SIZE);
357 if (status != TX_SUCCESS) {
358 printf("Main failed to create Queue, error - %d\n", status);
359 exit(1);
360 }
361
362 /* inferenceSender threads to create and queue the jobs */
363 for (int n = 0; n < NUM_JOB_THREADS; n++) {
364
365 /* Allocate the thread context for the inference sender thread. */
366 status =
367 tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&senderThreadPtr[n]), sizeof(TX_THREAD), TX_NO_WAIT);
368 if (status != TX_SUCCESS) {
369 printf("Main failed to allocate bytes for sender tread, error - %d\n", status);
370 exit(1);
371 }
372
373 /* Allocate the stack for the inference sender thread. */
374 status = tx_byte_allocate(
375 &bytePool, reinterpret_cast<VOID **>(&senderThreadStackPtr[n]), SENDER_THREAD_STACK_SIZE, TX_NO_WAIT);
376 if (status != TX_SUCCESS) {
377 printf("Main failed to allocate bytes for sender tread stack, error - %d\n", status);
378 exit(1);
379 }
380
Davide Grohmann41dc3412022-02-15 17:19:56 +0100381 snprintf(senderThreadNames[n], MAX_THREAD_NAME_SIZE, "senderThread-%d", n);
382
Lior Dekel489e40b2021-08-02 12:03:55 +0300383 /* Create the inference sender thread. */
384 status = tx_thread_create(reinterpret_cast<TX_THREAD *>(senderThreadPtr[n]),
Davide Grohmann41dc3412022-02-15 17:19:56 +0100385 senderThreadNames[n],
Lior Dekel489e40b2021-08-02 12:03:55 +0300386 inferenceSenderThread,
387 reinterpret_cast<ULONG>(&inferenceProcessQueue),
388 senderThreadStackPtr[n],
389 SENDER_THREAD_STACK_SIZE,
390 1,
391 1,
392 TX_NO_TIME_SLICE,
393 TX_AUTO_START);
394 if (status != TX_SUCCESS) {
395 printf("Main failed to create Thread, error - %d\n", status);
396 exit(1);
397 }
398 }
399
400 /* Create inferenceProcess threads to process the queued jobs */
401 for (int n = 0; n < NUM_INFERENCE_THREADS; n++) {
402
403 /* Allocate the thread context for the inference process thread. */
404 status =
405 tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&processThreadPtr[n]), sizeof(TX_THREAD), TX_NO_WAIT);
406 if (status != TX_SUCCESS) {
407 printf("Main failed to allocate bytes for process tread, error - %d\n", status);
408 exit(1);
409 }
410
411 /* Allocate the stack for the inference process thread. */
412 status = tx_byte_allocate(
413 &bytePool, reinterpret_cast<VOID **>(&processThreadStackPtr[n]), PROCESS_THREAD_STACK_SIZE, TX_NO_WAIT);
414 if (status != TX_SUCCESS) {
415 printf("Main failed to allocate bytes for process stack, error - %d\n", status);
416 exit(1);
417 }
418
419 threadParams[n] = ProcessThreadParams(
420 &inferenceProcessQueue, inferenceProcessTensorArena[n], reinterpret_cast<size_t>(arenaSize));
Davide Grohmann41dc3412022-02-15 17:19:56 +0100421 snprintf(processThreadNames[n], MAX_THREAD_NAME_SIZE, "processThread-%d", n);
Lior Dekel489e40b2021-08-02 12:03:55 +0300422
423 /* Create the inference process thread. */
424 status = tx_thread_create(reinterpret_cast<TX_THREAD *>(processThreadPtr[n]),
Davide Grohmann41dc3412022-02-15 17:19:56 +0100425 processThreadNames[n],
Lior Dekel489e40b2021-08-02 12:03:55 +0300426 inferenceProcessThread,
427 reinterpret_cast<ULONG>(&threadParams[n]),
428 processThreadStackPtr[n],
429 PROCESS_THREAD_STACK_SIZE,
430 1,
431 1,
432 TX_NO_TIME_SLICE,
433 TX_AUTO_START);
434 if (status != TX_SUCCESS) {
435 printf("Main failed to create thread, error - %d\n", status);
436 exit(1);
437 }
438 }
439
440 printf("ThreadX application initialisation - Done \n");
441 return;
442}