blob: 9a13dae05719baa9f2b2c4734a4d7ef62f147037 [file] [log] [blame]
Sadik Armagan3896b472020-02-10 12:24:15 +00001//
Jim Flynnbbfe6032020-07-20 16:57:44 +01002// Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
Sadik Armagan3896b472020-02-10 12:24:15 +00003// SPDX-License-Identifier: MIT
4//
5
6#include "SendThread.hpp"
Sadik Armagan3896b472020-02-10 12:24:15 +00007#include "ProfilingUtils.hpp"
8
Jim Flynn75c14f42022-03-10 22:05:42 +00009#include <common/include/NumericCast.hpp>
Jim Flynnf9db3ef2022-03-08 21:23:44 +000010#include <common/include/ProfilingException.hpp>
11
Jim Flynne195a042022-04-12 17:19:28 +010012#if defined(ARMNN_DISABLE_THREADS)
13#include <common/include/IgnoreUnused.hpp>
14#endif
15
Sadik Armagan3896b472020-02-10 12:24:15 +000016#include <cstring>
17
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000018namespace arm
Sadik Armagan3896b472020-02-10 12:24:15 +000019{
20
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000021namespace pipe
Sadik Armagan3896b472020-02-10 12:24:15 +000022{
23
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000024SendThread::SendThread(ProfilingStateMachine& profilingStateMachine,
25 IBufferManager& buffer,
26 ISendCounterPacket& sendCounterPacket,
Jim Flynnbbfe6032020-07-20 16:57:44 +010027 int timeout)
Sadik Armagan3896b472020-02-10 12:24:15 +000028 : m_StateMachine(profilingStateMachine)
29 , m_BufferManager(buffer)
30 , m_SendCounterPacket(sendCounterPacket)
31 , m_Timeout(timeout)
32 , m_IsRunning(false)
33 , m_KeepRunning(false)
34 , m_SendThreadException(nullptr)
35{
36 m_BufferManager.SetConsumer(this);
37}
38
39void SendThread::SetReadyToRead()
40{
41 // We need to wait for the send thread to release its mutex
42 {
Jim Flynne195a042022-04-12 17:19:28 +010043#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +000044 std::lock_guard<std::mutex> lck(m_WaitMutex);
Jim Flynne195a042022-04-12 17:19:28 +010045#endif
Sadik Armagan3896b472020-02-10 12:24:15 +000046 m_ReadyToRead = true;
47 }
48 // Signal the send thread that there's something to read in the buffer
Jim Flynne195a042022-04-12 17:19:28 +010049#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +000050 m_WaitCondition.notify_one();
Jim Flynne195a042022-04-12 17:19:28 +010051#endif
Sadik Armagan3896b472020-02-10 12:24:15 +000052}
53
54void SendThread::Start(IProfilingConnection& profilingConnection)
55{
56 // Check if the send thread is already running
57 if (m_IsRunning.load())
58 {
59 // The send thread is already running
60 return;
61 }
62
Jim Flynne195a042022-04-12 17:19:28 +010063#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +000064 if (m_SendThread.joinable())
65 {
66 m_SendThread.join();
67 }
Jim Flynne195a042022-04-12 17:19:28 +010068#endif
Sadik Armagan3896b472020-02-10 12:24:15 +000069
70 // Mark the send thread as running
71 m_IsRunning.store(true);
72
73 // Keep the send procedure going until the send thread is signalled to stop
74 m_KeepRunning.store(true);
75
76 // Make sure the send thread will not flush the buffer until signaled to do so
77 // no need for a mutex as the send thread can not be running at this point
78 m_ReadyToRead = false;
79
80 m_PacketSent = false;
81
82 // Start the send thread
Jim Flynne195a042022-04-12 17:19:28 +010083#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +000084 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
Jim Flynne195a042022-04-12 17:19:28 +010085#else
86 IgnoreUnused(profilingConnection);
87#endif
Sadik Armagan3896b472020-02-10 12:24:15 +000088}
89
90void SendThread::Stop(bool rethrowSendThreadExceptions)
91{
92 // Signal the send thread to stop
93 m_KeepRunning.store(false);
94
95 // Check that the send thread is running
Jim Flynne195a042022-04-12 17:19:28 +010096#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +000097 if (m_SendThread.joinable())
98 {
99 // Kick the send thread out of the wait condition
100 SetReadyToRead();
101 // Wait for the send thread to complete operations
102 m_SendThread.join();
103 }
Jim Flynne195a042022-04-12 17:19:28 +0100104#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000105
106 // Check if the send thread exception has to be rethrown
107 if (!rethrowSendThreadExceptions)
108 {
109 // No need to rethrow the send thread exception, return immediately
110 return;
111 }
112
113 // Check if there's an exception to rethrow
114 if (m_SendThreadException)
115 {
116 // Rethrow the send thread exception
117 std::rethrow_exception(m_SendThreadException);
118
119 // Nullify the exception as it has been rethrown
120 m_SendThreadException = nullptr;
121 }
122}
123
124void SendThread::Send(IProfilingConnection& profilingConnection)
125{
126 // Run once and keep the sending procedure looping until the thread is signalled to stop
127 do
128 {
129 // Check the current state of the profiling service
130 ProfilingState currentState = m_StateMachine.GetCurrentState();
131 switch (currentState)
132 {
133 case ProfilingState::Uninitialised:
134 case ProfilingState::NotConnected:
135
136 // The send thread cannot be running when the profiling service is uninitialized or not connected,
137 // stop the thread immediately
138 m_KeepRunning.store(false);
139 m_IsRunning.store(false);
140
141 // An exception should be thrown here, save it to be rethrown later from the main thread so that
142 // it can be caught by the consumer
143 m_SendThreadException =
Jim Flynnf9db3ef2022-03-08 21:23:44 +0000144 std::make_exception_ptr(arm::pipe::ProfilingException(
145 "The send thread should not be running with the profiling service not yet initialized or connected"));
Sadik Armagan3896b472020-02-10 12:24:15 +0000146
147 return;
148 case ProfilingState::WaitingForAck:
149
150 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
151 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
152 // updated by the command handler
153
154 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
155 m_SendCounterPacket.SendStreamMetaDataPacket();
156
157 // Flush the buffer manually to send the packet
158 FlushBuffer(profilingConnection);
159
160 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
161 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
162 // StreamMetadata again.
163
164 // Wait condition lock scope - Begin
165 {
Jim Flynne195a042022-04-12 17:19:28 +0100166#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000167 std::unique_lock<std::mutex> lock(m_WaitMutex);
168
169 bool timeout = m_WaitCondition.wait_for(lock,
Finn Williams7a16dcf2020-02-10 16:59:58 +0000170 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
Sadik Armagan3896b472020-02-10 12:24:15 +0000171 [&]{ return m_ReadyToRead; });
172 // If we get notified we need to flush the buffer again
Jim Flynne195a042022-04-12 17:19:28 +0100173 if (timeout)
Sadik Armagan3896b472020-02-10 12:24:15 +0000174 {
175 // Otherwise if we just timed out don't flush the buffer
176 continue;
177 }
Jim Flynne195a042022-04-12 17:19:28 +0100178#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000179 //reset condition variable predicate for next use
180 m_ReadyToRead = false;
181 }
182 // Wait condition lock scope - End
183 break;
184 case ProfilingState::Active:
185 default:
186 // Wait condition lock scope - Begin
187 {
Jim Flynne195a042022-04-12 17:19:28 +0100188#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000189 std::unique_lock<std::mutex> lock(m_WaitMutex);
Jim Flynne195a042022-04-12 17:19:28 +0100190#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000191 // Normal working state for the send thread
192 // Check if the send thread is required to enforce a timeout wait policy
193 if (m_Timeout < 0)
194 {
195 // Wait indefinitely until notified that something to read has become available in the buffer
Jim Flynne195a042022-04-12 17:19:28 +0100196#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000197 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
Jim Flynne195a042022-04-12 17:19:28 +0100198#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000199 }
200 else
201 {
202 // Wait until the thread is notified of something to read from the buffer,
203 // or check anyway after the specified number of milliseconds
Jim Flynne195a042022-04-12 17:19:28 +0100204#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000205 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
Jim Flynne195a042022-04-12 17:19:28 +0100206#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000207 }
208
209 //reset condition variable predicate for next use
210 m_ReadyToRead = false;
211 }
212 // Wait condition lock scope - End
213 break;
214 }
215
216 // Send all the available packets in the buffer
217 FlushBuffer(profilingConnection);
218 } while (m_KeepRunning.load());
219
220 // Ensure that all readable data got written to the profiling connection before the thread is stopped
221 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
222 FlushBuffer(profilingConnection, false);
223
224 // Mark the send thread as not running
225 m_IsRunning.store(false);
226}
227
228void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
229{
230 // Get the first available readable buffer
231 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
232
233 // Initialize the flag that indicates whether at least a packet has been sent
234 bool packetsSent = false;
235
236 while (packetBuffer != nullptr)
237 {
238 // Get the data to send from the buffer
239 const unsigned char* readBuffer = packetBuffer->GetReadableData();
240 unsigned int readBufferSize = packetBuffer->GetSize();
241
242 if (readBuffer == nullptr || readBufferSize == 0)
243 {
244 // Nothing to send, get the next available readable buffer and continue
245 m_BufferManager.MarkRead(packetBuffer);
246 packetBuffer = m_BufferManager.GetReadableBuffer();
247
248 continue;
249 }
250
251 // Check that the profiling connection is open, silently drop the data and continue if it's closed
252 if (profilingConnection.IsOpen())
253 {
254 // Write a packet to the profiling connection. Silently ignore any write error and continue
Jim Flynn75c14f42022-03-10 22:05:42 +0000255 profilingConnection.WritePacket(readBuffer, arm::pipe::numeric_cast<uint32_t>(readBufferSize));
Sadik Armagan3896b472020-02-10 12:24:15 +0000256
257 // Set the flag that indicates whether at least a packet has been sent
258 packetsSent = true;
259 }
260
261 // Mark the packet buffer as read
262 m_BufferManager.MarkRead(packetBuffer);
263
264 // Get the next available readable buffer
265 packetBuffer = m_BufferManager.GetReadableBuffer();
266 }
267 // Check whether at least a packet has been sent
268 if (packetsSent && notifyWatchers)
269 {
270 // Wait for the parent thread to release its mutex if necessary
271 {
Jim Flynne195a042022-04-12 17:19:28 +0100272#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000273 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
Jim Flynne195a042022-04-12 17:19:28 +0100274#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000275 m_PacketSent = true;
276 }
277 // Notify to any watcher that something has been sent
Jim Flynne195a042022-04-12 17:19:28 +0100278#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000279 m_PacketSentWaitCondition.notify_one();
Jim Flynne195a042022-04-12 17:19:28 +0100280#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000281 }
282}
283
284bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
285{
Jim Flynne195a042022-04-12 17:19:28 +0100286#if !defined(ARMNN_DISABLE_THREADS)
Sadik Armagan3896b472020-02-10 12:24:15 +0000287 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
288 // Blocks until notified that at least a packet has been sent or until timeout expires.
289 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
290 std::chrono::milliseconds(timeout),
291 [&] { return m_PacketSent; });
Sadik Armagan3896b472020-02-10 12:24:15 +0000292 m_PacketSent = false;
293
294 return timedOut;
Jim Flynne195a042022-04-12 17:19:28 +0100295#else
296 IgnoreUnused(timeout);
297 return false;
298#endif
Sadik Armagan3896b472020-02-10 12:24:15 +0000299}
300
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000301} // namespace pipe
Sadik Armagan3896b472020-02-10 12:24:15 +0000302
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000303} // namespace arm