blob: e2e41b362d1008c1ed42b9b5662370340809d456 [file] [log] [blame]
Lior Dekel489e40b2021-08-02 12:03:55 +03001/*
Lior Dekel4882dbe2022-02-09 17:18:27 +02002 * Copyright (c) 2019-2022 Arm Limited. All rights reserved.
Lior Dekel489e40b2021-08-02 12:03:55 +03003 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Licensed under the Apache License, Version 2.0 (the License); you may
7 * not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an AS IS BASIS, WITHOUT
14 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19/****************************************************************************
20 * Includes
21 ****************************************************************************/
22#include "tx_api.h"
23
24#include <inttypes.h>
25#include <stdio.h>
26#include <vector>
27
28#include "inference_process.hpp"
29
30// Model data (Defined & changable by modifiying compile definition in CMakeLists.txt)
31#include "input.h"
32#include "model.h"
33#include "output.h"
34
35using namespace std;
36using namespace InferenceProcess;
37
38/****************************************************************************
39 * Defines
40 ****************************************************************************/
41// Nr. of threads to process inferences with. Thread reserves driver & runs inference (Normally 1 per NPU, but not a
42// must)
43#define NUM_INFERENCE_THREADS 1
44// Nr. of threads to create jobs and recieve responses
45#define NUM_JOB_THREADS 2
46// Nr. of jobs to create per job thread
47#define NUM_JOBS_PER_THREAD 1
48
Lior Dekel412adc22021-12-22 16:47:25 +020049#define PROCESS_THREAD_STACK_SIZE (20 * 1024)
Lior Dekel489e40b2021-08-02 12:03:55 +030050#define SENDER_THREAD_STACK_SIZE (2 * 1024)
51#define PROCESS_THREAD_CONTEXT_SIZE (sizeof(TX_THREAD))
52#define SENDER_THREAD_CONTEXT_SIZE (sizeof(TX_THREAD))
53
54// Tensor arena size
55#ifdef TENSOR_ARENA_SIZE // If defined in model.h
56#define TENSOR_ARENA_SIZE_PER_INFERENCE TENSOR_ARENA_SIZE
57#else // If not defined, use maximum available - 2M
58#define TENSOR_ARENA_SIZE 2000000
59#define TENSOR_ARENA_SIZE_PER_INFERENCE (TENSOR_ARENA_SIZE / NUM_INFERENCE_THREADS)
60#endif
61
62#define PROCESS_QUEUE_SIZE (NUM_JOBS_PER_THREAD * NUM_JOB_THREADS * sizeof(xInferenceJob *))
63#define SENDER_QUEUE_SIZE (NUM_JOBS_PER_THREAD * sizeof(xInferenceJob *))
64
65/* BYTE_POOL_SIZE_OVERHEAD is used to increase the memory byte pool size, as the number of
66 allocatable bytes in a memory byte pool is slightly less than what was specified during creation */
67#define BYTE_POOL_SIZE_OVERHEAD (512)
68#define BYTE_POOL_SIZE \
69 (((PROCESS_THREAD_CONTEXT_SIZE + PROCESS_THREAD_STACK_SIZE) * NUM_INFERENCE_THREADS) + \
70 (SENDER_THREAD_CONTEXT_SIZE + SENDER_THREAD_STACK_SIZE + SENDER_QUEUE_SIZE) * NUM_JOB_THREADS + \
71 PROCESS_QUEUE_SIZE + BYTE_POOL_SIZE_OVERHEAD)
72
73/****************************************************************************
74 * Structures
75 ****************************************************************************/
76struct ProcessThreadParams {
77 ProcessThreadParams() : queueHandle(nullptr), tensorArena(nullptr), arenaSize(0) {}
78 ProcessThreadParams(TX_QUEUE *_queue, uint8_t *_tensorArena, size_t _arenaSize) :
79 queueHandle(_queue), tensorArena(_tensorArena), arenaSize(_arenaSize) {}
80
81 TX_QUEUE *queueHandle;
82 uint8_t *tensorArena;
83 size_t arenaSize;
84};
85
86// Wrapper around InferenceProcess::InferenceJob. Adds responseQueue and status for ThreadX multi-threaded purposes.
87struct xInferenceJob : public InferenceJob {
88 TX_QUEUE *responseQueue;
89 bool status;
90
91 xInferenceJob() : InferenceJob(), responseQueue(nullptr), status(false) {}
92 xInferenceJob(const string &_name,
93 const DataPtr &_networkModel,
94 const vector<DataPtr> &_input,
95 const vector<DataPtr> &_output,
96 const vector<DataPtr> &_expectedOutput,
97 const size_t _numBytesToPrint,
Lior Dekel4882dbe2022-02-09 17:18:27 +020098 void *_userArg,
Lior Dekel489e40b2021-08-02 12:03:55 +030099 TX_QUEUE *_queue) :
Lior Dekel4882dbe2022-02-09 17:18:27 +0200100 InferenceJob(_name, _networkModel, _input, _output, _expectedOutput, _numBytesToPrint, _userArg),
Lior Dekel489e40b2021-08-02 12:03:55 +0300101 responseQueue(_queue), status(false) {}
102};
103
104/****************************************************************************
105 * Global and static variables
106 ****************************************************************************/
107namespace {
108// Number of total completed jobs, needed to exit application correctly if NUM_JOB_THREADS > 1
109int totalCompletedJobs = 0;
110
111// TensorArena static initialisation
112const size_t arenaSize = TENSOR_ARENA_SIZE_PER_INFERENCE;
113
114TX_QUEUE inferenceProcessQueue;
115
116ProcessThreadParams threadParams[NUM_INFERENCE_THREADS];
117
118TX_BYTE_POOL bytePool;
119ULONG memoryArea[BYTE_POOL_SIZE / sizeof(ULONG)];
120} // namespace
121
122__attribute__((section(".bss.tensor_arena"), aligned(16)))
123uint8_t inferenceProcessTensorArena[NUM_INFERENCE_THREADS][arenaSize];
124
125/****************************************************************************
126 * Mutex & Semaphore
127 * Overrides weak-linked symbols in ethosu_driver.c to implement thread handling
128 ****************************************************************************/
129extern "C" {
130void *ethosu_mutex_create(void) {
131 UINT status;
132 TX_MUTEX *mutex;
133
134 mutex = new TX_MUTEX;
135 status = tx_mutex_create(mutex, "mutex 0", TX_NO_INHERIT);
136 if (status != TX_SUCCESS) {
137 printf("mutex create failed, error - %d\n", status);
138 }
139 return (void *)mutex;
140}
141
142void ethosu_mutex_lock(void *mutex) {
143 UINT status;
144 status = tx_mutex_get(reinterpret_cast<TX_MUTEX *>(mutex), TX_WAIT_FOREVER);
145 if (status != TX_SUCCESS) {
146 printf("mutex get failed, error - %d\n", status);
147 }
148 return;
149}
150
151void ethosu_mutex_unlock(void *mutex) {
152 UINT status;
153 status = tx_mutex_put(reinterpret_cast<TX_MUTEX *>(mutex));
154 if (status != TX_SUCCESS) {
155 printf("mutex put failed, error - %d\n", status);
156 }
157 return;
158}
159
160void *ethosu_semaphore_create(void) {
161 UINT status;
162 TX_SEMAPHORE *semaphore;
163
164 semaphore = new TX_SEMAPHORE;
165 status = tx_semaphore_create(semaphore, "semaphore", 1);
166
167 if (status != TX_SUCCESS) {
168 printf("Semaphore create failed, error - %d\n", status);
169 }
170
171 return (void *)semaphore;
172}
173
174void ethosu_semaphore_take(void *sem) {
175 UINT status;
176
177 status = tx_semaphore_get(reinterpret_cast<TX_SEMAPHORE *>(sem), TX_WAIT_FOREVER);
178
179 if (status != TX_SUCCESS) {
180 printf("Semaphore get/take, error - %d\n", status);
181 }
182
183 return;
184}
185
186void ethosu_semaphore_give(void *sem) {
187 UINT status;
188
189 status = tx_semaphore_put(reinterpret_cast<TX_SEMAPHORE *>(sem));
190
191 if (status != TX_SUCCESS) {
192 printf("Semaphore put/give, error - %d\n", status);
193 }
194
195 return;
196}
197}
198
199/****************************************************************************
200 * Functions
201 ****************************************************************************/
202// inferenceProcessThread - Run jobs from queue with available driver
203void inferenceProcessThread(ULONG pvParameters) {
204 ProcessThreadParams params = *reinterpret_cast<ProcessThreadParams *>(pvParameters);
205 UINT tx_status = TX_QUEUE_ERROR;
206
207 class InferenceProcess inferenceProcess(params.tensorArena, params.arenaSize);
208
209 for (;;) {
210 xInferenceJob *xJob;
211
212 // Get the job details from the process queue
213 tx_status = tx_queue_receive(params.queueHandle, &xJob, TX_WAIT_FOREVER);
214 if (tx_status != TX_SUCCESS) {
215 printf("process failed to receive from Queue, error - %d\n", tx_status);
216 exit(1);
217 }
218
219 // run the job
220 bool status = inferenceProcess.runJob(*xJob);
221 xJob->status = status;
222
223 // Send response for the job in the response queue
224 tx_status = tx_queue_send(xJob->responseQueue, &xJob, TX_WAIT_FOREVER);
225 if (tx_status != TX_SUCCESS) {
226 printf("process inferenceProcessThread failed to send to Queue, error - %d\n", tx_status);
227 exit(1);
228 }
229 }
230
231 tx_status = tx_thread_terminate(nullptr);
232 if (tx_status != TX_SUCCESS) {
233 printf("process inferenceProcessThread failed to terminate thread, error - %d\n", tx_status);
234 exit(1);
235 }
236}
237
238// inferenceSenderThread - Creates NUM_INFERNECE_JOBS jobs, queues them, and then listens for completion status
239void inferenceSenderThread(ULONG pvParameters) {
240 int ret = 0;
241 TX_QUEUE senderQueue;
242 UINT status = TX_QUEUE_ERROR;
243 TX_QUEUE *inferenceProcessQueueLocal = reinterpret_cast<TX_QUEUE *>(pvParameters);
244 xInferenceJob jobs[NUM_JOBS_PER_THREAD];
245 CHAR *senderQueuePtr = nullptr;
246
247 /* Allocate memory for this inference sender thread responses queue */
248 status = tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&senderQueuePtr), SENDER_QUEUE_SIZE, TX_NO_WAIT);
249 if (status != TX_SUCCESS) {
250 printf("Sender thread failed to allocate bytes for Queue, error - %d\n", status);
251 exit(1);
252 }
253
254 /* Create responses queue for this inference sender thread */
255 status = tx_queue_create(
256 &senderQueue, "senderQueue", sizeof(xInferenceJob *) / sizeof(uint32_t), senderQueuePtr, SENDER_QUEUE_SIZE);
257
258 if (status != TX_SUCCESS) {
259 printf("Sender thread failed to create Queue, error - %d\n", status);
260 exit(1);
261 }
262
263 /* Create the jobs and queue them in the inference process queue */
264 for (int n = 0; n < NUM_JOBS_PER_THREAD; n++) {
265
266 // Create job
267 xInferenceJob *job = &jobs[n];
268 job->name = string(modelName);
269 job->networkModel = DataPtr(networkModelData, sizeof(networkModelData));
270 job->input.push_back(DataPtr(inputData, sizeof(inputData)));
271 job->expectedOutput.push_back(DataPtr(expectedOutputData, sizeof(expectedOutputData)));
272 job->responseQueue = &senderQueue;
273
274 // queue job
275 status = tx_queue_send(inferenceProcessQueueLocal, &job, TX_WAIT_FOREVER);
276 if (status != TX_SUCCESS) {
277 printf("Sender thread failed to send to Queue, error - %d\n", status);
278 exit(1);
279 }
280 }
281
282 /* Listen for completion status on the response queue */
283 do {
284 xInferenceJob *pSendJob;
285
286 status = tx_queue_receive(&senderQueue, &pSendJob, TX_WAIT_FOREVER);
287 if (status != TX_SUCCESS) {
288 printf("Sender thread failed to receive from Queue, error - %d\n", status);
289 exit(1);
290 }
291
292 totalCompletedJobs++;
293 ret = (pSendJob->status);
294 if (pSendJob->status != 0) {
295 break;
296 }
297 } while (totalCompletedJobs < NUM_JOBS_PER_THREAD * NUM_JOB_THREADS);
298
299 /* delete the response queue */
300 status = tx_queue_delete(&senderQueue);
301 if (status != TX_SUCCESS) {
302 printf("Sender thread failed to delete Queue, error - %d\n", status);
303 exit(1);
304 }
305
306 exit(ret);
307}
308
309/****************************************************************************
310 * Application
311 ****************************************************************************/
312int main() {
313 /* Enter the ThreadX kernel. */
314 tx_kernel_enter();
315 return 0;
316}
317
318void tx_application_define(void *first_unused_memory) {
319 UINT status;
320 CHAR *senderThreadStackPtr[NUM_JOB_THREADS] = {nullptr};
321 CHAR *processThreadStackPtr[NUM_INFERENCE_THREADS] = {nullptr};
322 CHAR *processQueuePtr = nullptr;
323 CHAR *senderThreadPtr[NUM_JOB_THREADS] = {nullptr};
324 CHAR *processThreadPtr[NUM_INFERENCE_THREADS] = {nullptr};
325
326 /* Create a byte memory pool from which to allocate the threads stacks and queues. */
327 status = tx_byte_pool_create(&bytePool, "byte pool", memoryArea, BYTE_POOL_SIZE);
328 if (status != TX_SUCCESS) {
329 printf("Main failed to allocate pool of bytes, error - %d\n", status);
330 exit(1);
331 }
332
333 /* Allocate memory for the inference process queue */
334 status = tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&processQueuePtr), PROCESS_QUEUE_SIZE, TX_NO_WAIT);
335 if (status != TX_SUCCESS) {
336 printf("Main failed to allocate bytes for process queue, error - %d\n", status);
337 exit(1);
338 }
339
340 status = tx_queue_create(&inferenceProcessQueue,
341 "inferenceProcessQueue",
342 sizeof(xInferenceJob *) / sizeof(uint32_t),
343 processQueuePtr,
344 PROCESS_QUEUE_SIZE);
345 if (status != TX_SUCCESS) {
346 printf("Main failed to create Queue, error - %d\n", status);
347 exit(1);
348 }
349
350 /* inferenceSender threads to create and queue the jobs */
351 for (int n = 0; n < NUM_JOB_THREADS; n++) {
352
353 /* Allocate the thread context for the inference sender thread. */
354 status =
355 tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&senderThreadPtr[n]), sizeof(TX_THREAD), TX_NO_WAIT);
356 if (status != TX_SUCCESS) {
357 printf("Main failed to allocate bytes for sender tread, error - %d\n", status);
358 exit(1);
359 }
360
361 /* Allocate the stack for the inference sender thread. */
362 status = tx_byte_allocate(
363 &bytePool, reinterpret_cast<VOID **>(&senderThreadStackPtr[n]), SENDER_THREAD_STACK_SIZE, TX_NO_WAIT);
364 if (status != TX_SUCCESS) {
365 printf("Main failed to allocate bytes for sender tread stack, error - %d\n", status);
366 exit(1);
367 }
368
369 /* Create the inference sender thread. */
370 status = tx_thread_create(reinterpret_cast<TX_THREAD *>(senderThreadPtr[n]),
371 "senderThread",
372 inferenceSenderThread,
373 reinterpret_cast<ULONG>(&inferenceProcessQueue),
374 senderThreadStackPtr[n],
375 SENDER_THREAD_STACK_SIZE,
376 1,
377 1,
378 TX_NO_TIME_SLICE,
379 TX_AUTO_START);
380 if (status != TX_SUCCESS) {
381 printf("Main failed to create Thread, error - %d\n", status);
382 exit(1);
383 }
384 }
385
386 /* Create inferenceProcess threads to process the queued jobs */
387 for (int n = 0; n < NUM_INFERENCE_THREADS; n++) {
388
389 /* Allocate the thread context for the inference process thread. */
390 status =
391 tx_byte_allocate(&bytePool, reinterpret_cast<VOID **>(&processThreadPtr[n]), sizeof(TX_THREAD), TX_NO_WAIT);
392 if (status != TX_SUCCESS) {
393 printf("Main failed to allocate bytes for process tread, error - %d\n", status);
394 exit(1);
395 }
396
397 /* Allocate the stack for the inference process thread. */
398 status = tx_byte_allocate(
399 &bytePool, reinterpret_cast<VOID **>(&processThreadStackPtr[n]), PROCESS_THREAD_STACK_SIZE, TX_NO_WAIT);
400 if (status != TX_SUCCESS) {
401 printf("Main failed to allocate bytes for process stack, error - %d\n", status);
402 exit(1);
403 }
404
405 threadParams[n] = ProcessThreadParams(
406 &inferenceProcessQueue, inferenceProcessTensorArena[n], reinterpret_cast<size_t>(arenaSize));
407
408 /* Create the inference process thread. */
409 status = tx_thread_create(reinterpret_cast<TX_THREAD *>(processThreadPtr[n]),
410 "processThread",
411 inferenceProcessThread,
412 reinterpret_cast<ULONG>(&threadParams[n]),
413 processThreadStackPtr[n],
414 PROCESS_THREAD_STACK_SIZE,
415 1,
416 1,
417 TX_NO_TIME_SLICE,
418 TX_AUTO_START);
419 if (status != TX_SUCCESS) {
420 printf("Main failed to create thread, error - %d\n", status);
421 exit(1);
422 }
423 }
424
425 printf("ThreadX application initialisation - Done \n");
426 return;
427}