blob: e932eba3e938af08ae597365111e4697bfa8547f [file] [log] [blame]
Sadik Armagan3896b472020-02-10 12:24:15 +00001//
Jim Flynnbbfe6032020-07-20 16:57:44 +01002// Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
Sadik Armagan3896b472020-02-10 12:24:15 +00003// SPDX-License-Identifier: MIT
4//
5
6#include "SendThread.hpp"
Sadik Armagan3896b472020-02-10 12:24:15 +00007#include "ProfilingUtils.hpp"
8
Sadik Armagan3896b472020-02-10 12:24:15 +00009#include <armnn/Conversion.hpp>
Matthew Sloyan371b70e2020-09-11 10:14:57 +010010#include <armnn/utility/NumericCast.hpp>
11
Jim Flynnf9db3ef2022-03-08 21:23:44 +000012#include <common/include/ProfilingException.hpp>
13
Sadik Armagan3896b472020-02-10 12:24:15 +000014#include <Processes.hpp>
15
Sadik Armagan3896b472020-02-10 12:24:15 +000016#include <cstring>
17
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000018namespace arm
Sadik Armagan3896b472020-02-10 12:24:15 +000019{
20
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000021namespace pipe
Sadik Armagan3896b472020-02-10 12:24:15 +000022{
23
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000024SendThread::SendThread(ProfilingStateMachine& profilingStateMachine,
25 IBufferManager& buffer,
26 ISendCounterPacket& sendCounterPacket,
Jim Flynnbbfe6032020-07-20 16:57:44 +010027 int timeout)
Sadik Armagan3896b472020-02-10 12:24:15 +000028 : m_StateMachine(profilingStateMachine)
29 , m_BufferManager(buffer)
30 , m_SendCounterPacket(sendCounterPacket)
31 , m_Timeout(timeout)
32 , m_IsRunning(false)
33 , m_KeepRunning(false)
34 , m_SendThreadException(nullptr)
35{
36 m_BufferManager.SetConsumer(this);
37}
38
39void SendThread::SetReadyToRead()
40{
41 // We need to wait for the send thread to release its mutex
42 {
43 std::lock_guard<std::mutex> lck(m_WaitMutex);
44 m_ReadyToRead = true;
45 }
46 // Signal the send thread that there's something to read in the buffer
47 m_WaitCondition.notify_one();
48}
49
50void SendThread::Start(IProfilingConnection& profilingConnection)
51{
52 // Check if the send thread is already running
53 if (m_IsRunning.load())
54 {
55 // The send thread is already running
56 return;
57 }
58
59 if (m_SendThread.joinable())
60 {
61 m_SendThread.join();
62 }
63
64 // Mark the send thread as running
65 m_IsRunning.store(true);
66
67 // Keep the send procedure going until the send thread is signalled to stop
68 m_KeepRunning.store(true);
69
70 // Make sure the send thread will not flush the buffer until signaled to do so
71 // no need for a mutex as the send thread can not be running at this point
72 m_ReadyToRead = false;
73
74 m_PacketSent = false;
75
76 // Start the send thread
77 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
78}
79
80void SendThread::Stop(bool rethrowSendThreadExceptions)
81{
82 // Signal the send thread to stop
83 m_KeepRunning.store(false);
84
85 // Check that the send thread is running
86 if (m_SendThread.joinable())
87 {
88 // Kick the send thread out of the wait condition
89 SetReadyToRead();
90 // Wait for the send thread to complete operations
91 m_SendThread.join();
92 }
93
94 // Check if the send thread exception has to be rethrown
95 if (!rethrowSendThreadExceptions)
96 {
97 // No need to rethrow the send thread exception, return immediately
98 return;
99 }
100
101 // Check if there's an exception to rethrow
102 if (m_SendThreadException)
103 {
104 // Rethrow the send thread exception
105 std::rethrow_exception(m_SendThreadException);
106
107 // Nullify the exception as it has been rethrown
108 m_SendThreadException = nullptr;
109 }
110}
111
112void SendThread::Send(IProfilingConnection& profilingConnection)
113{
114 // Run once and keep the sending procedure looping until the thread is signalled to stop
115 do
116 {
117 // Check the current state of the profiling service
118 ProfilingState currentState = m_StateMachine.GetCurrentState();
119 switch (currentState)
120 {
121 case ProfilingState::Uninitialised:
122 case ProfilingState::NotConnected:
123
124 // The send thread cannot be running when the profiling service is uninitialized or not connected,
125 // stop the thread immediately
126 m_KeepRunning.store(false);
127 m_IsRunning.store(false);
128
129 // An exception should be thrown here, save it to be rethrown later from the main thread so that
130 // it can be caught by the consumer
131 m_SendThreadException =
Jim Flynnf9db3ef2022-03-08 21:23:44 +0000132 std::make_exception_ptr(arm::pipe::ProfilingException(
133 "The send thread should not be running with the profiling service not yet initialized or connected"));
Sadik Armagan3896b472020-02-10 12:24:15 +0000134
135 return;
136 case ProfilingState::WaitingForAck:
137
138 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
139 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
140 // updated by the command handler
141
142 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
143 m_SendCounterPacket.SendStreamMetaDataPacket();
144
145 // Flush the buffer manually to send the packet
146 FlushBuffer(profilingConnection);
147
148 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
149 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
150 // StreamMetadata again.
151
152 // Wait condition lock scope - Begin
153 {
154 std::unique_lock<std::mutex> lock(m_WaitMutex);
155
156 bool timeout = m_WaitCondition.wait_for(lock,
Finn Williams7a16dcf2020-02-10 16:59:58 +0000157 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
Sadik Armagan3896b472020-02-10 12:24:15 +0000158 [&]{ return m_ReadyToRead; });
159 // If we get notified we need to flush the buffer again
160 if(timeout)
161 {
162 // Otherwise if we just timed out don't flush the buffer
163 continue;
164 }
165 //reset condition variable predicate for next use
166 m_ReadyToRead = false;
167 }
168 // Wait condition lock scope - End
169 break;
170 case ProfilingState::Active:
171 default:
172 // Wait condition lock scope - Begin
173 {
174 std::unique_lock<std::mutex> lock(m_WaitMutex);
175
176 // Normal working state for the send thread
177 // Check if the send thread is required to enforce a timeout wait policy
178 if (m_Timeout < 0)
179 {
180 // Wait indefinitely until notified that something to read has become available in the buffer
181 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
182 }
183 else
184 {
185 // Wait until the thread is notified of something to read from the buffer,
186 // or check anyway after the specified number of milliseconds
187 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
188 }
189
190 //reset condition variable predicate for next use
191 m_ReadyToRead = false;
192 }
193 // Wait condition lock scope - End
194 break;
195 }
196
197 // Send all the available packets in the buffer
198 FlushBuffer(profilingConnection);
199 } while (m_KeepRunning.load());
200
201 // Ensure that all readable data got written to the profiling connection before the thread is stopped
202 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
203 FlushBuffer(profilingConnection, false);
204
205 // Mark the send thread as not running
206 m_IsRunning.store(false);
207}
208
209void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
210{
211 // Get the first available readable buffer
212 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
213
214 // Initialize the flag that indicates whether at least a packet has been sent
215 bool packetsSent = false;
216
217 while (packetBuffer != nullptr)
218 {
219 // Get the data to send from the buffer
220 const unsigned char* readBuffer = packetBuffer->GetReadableData();
221 unsigned int readBufferSize = packetBuffer->GetSize();
222
223 if (readBuffer == nullptr || readBufferSize == 0)
224 {
225 // Nothing to send, get the next available readable buffer and continue
226 m_BufferManager.MarkRead(packetBuffer);
227 packetBuffer = m_BufferManager.GetReadableBuffer();
228
229 continue;
230 }
231
232 // Check that the profiling connection is open, silently drop the data and continue if it's closed
233 if (profilingConnection.IsOpen())
234 {
235 // Write a packet to the profiling connection. Silently ignore any write error and continue
Matthew Sloyan371b70e2020-09-11 10:14:57 +0100236 profilingConnection.WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
Sadik Armagan3896b472020-02-10 12:24:15 +0000237
238 // Set the flag that indicates whether at least a packet has been sent
239 packetsSent = true;
240 }
241
242 // Mark the packet buffer as read
243 m_BufferManager.MarkRead(packetBuffer);
244
245 // Get the next available readable buffer
246 packetBuffer = m_BufferManager.GetReadableBuffer();
247 }
248 // Check whether at least a packet has been sent
249 if (packetsSent && notifyWatchers)
250 {
251 // Wait for the parent thread to release its mutex if necessary
252 {
253 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
254 m_PacketSent = true;
255 }
256 // Notify to any watcher that something has been sent
257 m_PacketSentWaitCondition.notify_one();
258 }
259}
260
261bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
262{
263 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
264 // Blocks until notified that at least a packet has been sent or until timeout expires.
265 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
266 std::chrono::milliseconds(timeout),
267 [&] { return m_PacketSent; });
268
269 m_PacketSent = false;
270
271 return timedOut;
272}
273
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000274} // namespace pipe
Sadik Armagan3896b472020-02-10 12:24:15 +0000275
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000276} // namespace arm