blob: 1d4e23b7aae07109605dbdf497de92a6066b8119 [file] [log] [blame]
Keith Davis3201eea2019-10-24 17:30:41 +01001//
2// Copyright © 2019 Arm Ltd. All rights reserved.
3// SPDX-License-Identifier: MIT
4//
5
6#include "FileOnlyProfilingConnection.hpp"
7#include "PacketVersionResolver.hpp"
8
9#include <armnn/Exceptions.hpp>
Finn Williamse09fc822020-04-29 13:17:30 +010010#include <common/include/Constants.hpp>
Keith Davis3201eea2019-10-24 17:30:41 +010011
Jim Flynn4e755a52020-03-29 17:48:26 +010012#include <algorithm>
Keith Davis3201eea2019-10-24 17:30:41 +010013#include <boost/numeric/conversion/cast.hpp>
14#include <iostream>
15#include <thread>
16
17namespace armnn
18{
19
20namespace profiling
21{
22
23FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
24{
25 Close();
26}
27
28bool FileOnlyProfilingConnection::IsOpen() const
29{
30 // This type of connection is always open.
31 return true;
32}
33
34void FileOnlyProfilingConnection::Close()
35{
36 // Dump any unread packets out of the queue.
Jim Flynn4e755a52020-03-29 17:48:26 +010037 size_t initialSize = m_PacketQueue.size();
38 for (size_t i = 0; i < initialSize; ++i)
Keith Davis3201eea2019-10-24 17:30:41 +010039 {
40 m_PacketQueue.pop();
41 }
Jim Flynn4e755a52020-03-29 17:48:26 +010042 // dispose of the processing thread
43 m_KeepRunning.store(false);
44 if (m_LocalHandlersThread.joinable())
45 {
46 // make sure the thread wakes up and sees it has to stop
47 m_ConditionPacketReadable.notify_one();
48 m_LocalHandlersThread.join();
49 }
Keith Davis3201eea2019-10-24 17:30:41 +010050}
51
52bool FileOnlyProfilingConnection::WaitForStreamMeta(const unsigned char* buffer, uint32_t length)
53{
Jan Eilers8eb25602020-03-09 12:13:48 +000054 IgnoreUnused(length);
Derek Lamberti1dd75b32019-12-10 21:23:23 +000055
Keith Davis3201eea2019-10-24 17:30:41 +010056 // The first word, stream_metadata_identifer, should always be 0.
57 if (ToUint32(buffer, TargetEndianness::BeWire) != 0)
58 {
59 Fail("Protocol error. The stream_metadata_identifer was not 0.");
60 }
61
62 // Before we interpret the length we need to read the pipe_magic word to determine endianness.
Finn Williamse09fc822020-04-29 13:17:30 +010063 if (ToUint32(buffer + 8, TargetEndianness::BeWire) == armnnProfiling::PIPE_MAGIC)
Keith Davis3201eea2019-10-24 17:30:41 +010064 {
65 m_Endianness = TargetEndianness::BeWire;
66 }
Finn Williamse09fc822020-04-29 13:17:30 +010067 else if (ToUint32(buffer + 8, TargetEndianness::LeWire) == armnnProfiling::PIPE_MAGIC)
Keith Davis3201eea2019-10-24 17:30:41 +010068 {
69 m_Endianness = TargetEndianness::LeWire;
70 }
71 else
72 {
73 Fail("Protocol read error. Unable to read PIPE_MAGIC value.");
74 }
75 return true;
76}
77
78void FileOnlyProfilingConnection::SendConnectionAck()
79{
80 if (!m_QuietOp)
81 {
82 std::cout << "Sending connection acknowledgement." << std::endl;
83 }
84 std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
Colm Donelan4cace322019-11-20 14:59:12 +000085 {
86 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
87 m_PacketQueue.push(Packet(0x10000, 0, uniqueNullPtr));
88 }
89 m_ConditionPacketAvailable.notify_one();
Keith Davis3201eea2019-10-24 17:30:41 +010090}
91
92bool FileOnlyProfilingConnection::SendCounterSelectionPacket()
93{
94 uint32_t uint16_t_size = sizeof(uint16_t);
95 uint32_t uint32_t_size = sizeof(uint32_t);
96
97 uint32_t offset = 0;
98 uint32_t bodySize = uint32_t_size + boost::numeric_cast<uint32_t>(m_IdList.size()) * uint16_t_size;
99
100 auto uniqueData = std::make_unique<unsigned char[]>(bodySize);
101 unsigned char* data = reinterpret_cast<unsigned char*>(uniqueData.get());
102
103 // Copy capturePeriod
104 WriteUint32(data, offset, m_Options.m_CapturePeriod);
105
106 // Copy m_IdList
107 offset += uint32_t_size;
108 for (const uint16_t& id : m_IdList)
109 {
110 WriteUint16(data, offset, id);
111 offset += uint16_t_size;
112 }
113
Colm Donelan4cace322019-11-20 14:59:12 +0000114 {
115 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
116 m_PacketQueue.push(Packet(0x40000, bodySize, uniqueData));
117 }
118 m_ConditionPacketAvailable.notify_one();
Keith Davis3201eea2019-10-24 17:30:41 +0100119
120 return true;
121}
122
123bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
124{
Narumol Prangnawaratac2770a2020-04-01 16:51:23 +0100125 ARMNN_ASSERT(buffer);
Jim Flynn4e755a52020-03-29 17:48:26 +0100126 Packet packet = ReceivePacket(buffer, length);
Keith Davis3201eea2019-10-24 17:30:41 +0100127
128 // Read Header and determine case
129 uint32_t outgoingHeaderAsWords[2];
Jim Flynn4e755a52020-03-29 17:48:26 +0100130 PackageActivity packageActivity = GetPackageActivity(packet, outgoingHeaderAsWords);
Keith Davis3201eea2019-10-24 17:30:41 +0100131
132 switch (packageActivity)
133 {
134 case PackageActivity::StreamMetaData:
135 {
136 if (!WaitForStreamMeta(buffer, length))
137 {
138 return EXIT_FAILURE;
139 }
140
141 SendConnectionAck();
142 break;
143 }
144 case PackageActivity::CounterDirectory:
145 {
146 std::unique_ptr<unsigned char[]> uniqueCounterData = std::make_unique<unsigned char[]>(length - 8);
147
148 std::memcpy(uniqueCounterData.get(), buffer + 8, length - 8);
149
150 Packet directoryPacket(outgoingHeaderAsWords[0], length - 8, uniqueCounterData);
151
152 armnn::profiling::PacketVersionResolver packetVersionResolver;
153 DirectoryCaptureCommandHandler directoryCaptureCommandHandler(
154 0, 2, packetVersionResolver.ResolvePacketVersion(0, 2).GetEncodedValue());
155 directoryCaptureCommandHandler.operator()(directoryPacket);
156 const ICounterDirectory& counterDirectory = directoryCaptureCommandHandler.GetCounterDirectory();
157 for (auto& category : counterDirectory.GetCategories())
158 {
159 // Remember we need to translate the Uid's from our CounterDirectory instance to the parent one.
160 std::vector<uint16_t> translatedCounters;
161 for (auto const& copyUid : category->m_Counters)
162 {
163 translatedCounters.emplace_back(directoryCaptureCommandHandler.TranslateUIDCopyToOriginal(copyUid));
164 }
165 m_IdList.insert(std::end(m_IdList), std::begin(translatedCounters), std::end(translatedCounters));
166 }
167 SendCounterSelectionPacket();
168 break;
169 }
170 default:
171 {
172 break;
173 }
174 }
Jim Flynn4e755a52020-03-29 17:48:26 +0100175 ForwardPacketToHandlers(packet);
Keith Davis3201eea2019-10-24 17:30:41 +0100176 return true;
177}
178
179Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
180{
Colm Donelan4cace322019-11-20 14:59:12 +0000181 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
Finn Williams09ad6f92019-12-19 17:05:18 +0000182
183 // Here we are using m_PacketQueue.empty() as a predicate variable
184 // The conditional variable will wait until packetQueue is not empty or until a timeout
185 if(!m_ConditionPacketAvailable.wait_for(lck,
186 std::chrono::milliseconds(timeout),
187 [&]{return !m_PacketQueue.empty();}))
Keith Davis3201eea2019-10-24 17:30:41 +0100188 {
Finn Williams09ad6f92019-12-19 17:05:18 +0000189 throw armnn::TimeoutException("Thread has timed out as per requested time limit");
Keith Davis3201eea2019-10-24 17:30:41 +0100190 }
Finn Williams09ad6f92019-12-19 17:05:18 +0000191
Keith Davis3201eea2019-10-24 17:30:41 +0100192 Packet returnedPacket = std::move(m_PacketQueue.front());
193 m_PacketQueue.pop();
194 return returnedPacket;
195}
196
Jim Flynn4e755a52020-03-29 17:48:26 +0100197PackageActivity FileOnlyProfilingConnection::GetPackageActivity(const Packet& packet, uint32_t headerAsWords[2])
Keith Davis3201eea2019-10-24 17:30:41 +0100198{
Jim Flynn4e755a52020-03-29 17:48:26 +0100199 headerAsWords[0] = packet.GetHeader();
200 headerAsWords[1] = packet.GetLength();
Keith Davis3201eea2019-10-24 17:30:41 +0100201 if (headerAsWords[0] == 0x20000) // Packet family = 0 Packet Id = 2
202 {
203 return PackageActivity::CounterDirectory;
204 }
205 else if (headerAsWords[0] == 0) // Packet family = 0 Packet Id = 0
206 {
207 return PackageActivity::StreamMetaData;
208 }
209 else
210 {
211 return PackageActivity::Unknown;
212 }
213}
214
215uint32_t FileOnlyProfilingConnection::ToUint32(const unsigned char* data, TargetEndianness endianness)
216{
217 // Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
218 // specified endianness.
219 if (endianness == TargetEndianness::BeWire)
220 {
221 return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
222 static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
223 }
224 else
225 {
226 return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
227 static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
228 }
229}
230
231void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
232{
233 Close();
234 throw RuntimeException(errorMessage);
235}
236
Jim Flynn4e755a52020-03-29 17:48:26 +0100237/// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
238/// a processing thread that will ensure that processing of packets will happen on a separate
239/// thread from the profiling services send thread and will therefore protect against the
240/// profiling message buffer becoming exhausted because packet handling slows the dispatch.
241void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
242{
243 m_PacketHandlers.push_back(std::move(localPacketHandler));
244 ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
245 localCopy->SetConnection(this);
246 if (localCopy->GetHeadersAccepted().empty())
247 {
248 //this is a universal handler
249 m_UniversalHandlers.push_back(localCopy);
250 }
251 else
252 {
253 for (uint32_t header : localCopy->GetHeadersAccepted())
254 {
255 auto iter = m_IndexedHandlers.find(header);
256 if (iter == m_IndexedHandlers.end())
257 {
258 std::vector<ILocalPacketHandlerSharedPtr> handlers;
259 handlers.push_back(localCopy);
260 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
261 }
262 else
263 {
264 iter->second.push_back(localCopy);
265 }
266 }
267 }
268}
269
270void FileOnlyProfilingConnection::StartProcessingThread()
271{
272 // check if the thread has already started
273 if (m_IsRunning.load())
274 {
275 return;
276 }
277 // make sure if there was one running before it is joined
278 if (m_LocalHandlersThread.joinable())
279 {
280 m_LocalHandlersThread.join();
281 }
282 m_IsRunning.store(true);
283 m_KeepRunning.store(true);
284 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
285}
286
287void FileOnlyProfilingConnection::ForwardPacketToHandlers(Packet& packet)
288{
289 if (m_PacketHandlers.empty())
290 {
291 return;
292 }
293 if (m_KeepRunning.load() == false)
294 {
295 return;
296 }
297 {
298 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
299 if (m_KeepRunning.load() == false)
300 {
301 return;
302 }
303 m_ReadableList.push(std::move(packet));
304 }
305 m_ConditionPacketReadable.notify_one();
306}
307
308void FileOnlyProfilingConnection::ServiceLocalHandlers()
309{
310 do
311 {
312 Packet returnedPacket;
313 bool readPacket = false;
314 { // only lock while we are taking the packet off the incoming list
315 std::unique_lock<std::mutex> lck(m_ReadableMutex);
316 if (m_Timeout < 0)
317 {
318 m_ConditionPacketReadable.wait(lck,
319 [&] { return !m_ReadableList.empty(); });
320 }
321 else
322 {
323 m_ConditionPacketReadable.wait_for(lck,
324 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
325 [&] { return !m_ReadableList.empty(); });
326 }
327 if (m_KeepRunning.load())
328 {
329 if (!m_ReadableList.empty())
330 {
331 returnedPacket = std::move(m_ReadableList.front());
332 m_ReadableList.pop();
333 readPacket = true;
334 }
335 }
336 else
337 {
338 ClearReadableList();
339 }
340 }
341 if (m_KeepRunning.load() && readPacket)
342 {
343 DispatchPacketToHandlers(returnedPacket);
344 }
345 } while (m_KeepRunning.load());
346 // make sure the readable list is cleared
347 ClearReadableList();
348 m_IsRunning.store(false);
349}
350
351void FileOnlyProfilingConnection::ClearReadableList()
352{
353 // make sure the incoming packet queue gets emptied
354 size_t initialSize = m_ReadableList.size();
355 for (size_t i = 0; i < initialSize; ++i)
356 {
357 m_ReadableList.pop();
358 }
359}
360
361void FileOnlyProfilingConnection::DispatchPacketToHandlers(const Packet& packet)
362{
363 for (auto& delegate : m_UniversalHandlers)
364 {
365 delegate->HandlePacket(packet);
366 }
367 auto iter = m_IndexedHandlers.find(packet.GetHeader());
368 if (iter != m_IndexedHandlers.end())
369 {
370 for (auto &delegate : iter->second)
371 {
372 delegate->HandlePacket(packet);
373 }
374 }
375}
376
Keith Davis3201eea2019-10-24 17:30:41 +0100377} // namespace profiling
378
379} // namespace armnn