blob: 41385c5cc8f258b6e466b71e44ab3d3a93f4d8f3 [file] [log] [blame]
Sadik Armagan3896b472020-02-10 12:24:15 +00001//
Jim Flynnbbfe6032020-07-20 16:57:44 +01002// Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
Sadik Armagan3896b472020-02-10 12:24:15 +00003// SPDX-License-Identifier: MIT
4//
5
6#include "SendThread.hpp"
Sadik Armagan3896b472020-02-10 12:24:15 +00007#include "ProfilingUtils.hpp"
8
9#include <armnn/Exceptions.hpp>
10#include <armnn/Conversion.hpp>
Matthew Sloyan371b70e2020-09-11 10:14:57 +010011#include <armnn/utility/NumericCast.hpp>
12
Sadik Armagan3896b472020-02-10 12:24:15 +000013#include <Processes.hpp>
14
Sadik Armagan3896b472020-02-10 12:24:15 +000015#include <cstring>
16
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000017namespace arm
Sadik Armagan3896b472020-02-10 12:24:15 +000018{
19
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000020namespace pipe
Sadik Armagan3896b472020-02-10 12:24:15 +000021{
22
Cathal Corbett5aa9fd72022-02-25 15:33:28 +000023SendThread::SendThread(ProfilingStateMachine& profilingStateMachine,
24 IBufferManager& buffer,
25 ISendCounterPacket& sendCounterPacket,
Jim Flynnbbfe6032020-07-20 16:57:44 +010026 int timeout)
Sadik Armagan3896b472020-02-10 12:24:15 +000027 : m_StateMachine(profilingStateMachine)
28 , m_BufferManager(buffer)
29 , m_SendCounterPacket(sendCounterPacket)
30 , m_Timeout(timeout)
31 , m_IsRunning(false)
32 , m_KeepRunning(false)
33 , m_SendThreadException(nullptr)
34{
35 m_BufferManager.SetConsumer(this);
36}
37
38void SendThread::SetReadyToRead()
39{
40 // We need to wait for the send thread to release its mutex
41 {
42 std::lock_guard<std::mutex> lck(m_WaitMutex);
43 m_ReadyToRead = true;
44 }
45 // Signal the send thread that there's something to read in the buffer
46 m_WaitCondition.notify_one();
47}
48
49void SendThread::Start(IProfilingConnection& profilingConnection)
50{
51 // Check if the send thread is already running
52 if (m_IsRunning.load())
53 {
54 // The send thread is already running
55 return;
56 }
57
58 if (m_SendThread.joinable())
59 {
60 m_SendThread.join();
61 }
62
63 // Mark the send thread as running
64 m_IsRunning.store(true);
65
66 // Keep the send procedure going until the send thread is signalled to stop
67 m_KeepRunning.store(true);
68
69 // Make sure the send thread will not flush the buffer until signaled to do so
70 // no need for a mutex as the send thread can not be running at this point
71 m_ReadyToRead = false;
72
73 m_PacketSent = false;
74
75 // Start the send thread
76 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
77}
78
79void SendThread::Stop(bool rethrowSendThreadExceptions)
80{
81 // Signal the send thread to stop
82 m_KeepRunning.store(false);
83
84 // Check that the send thread is running
85 if (m_SendThread.joinable())
86 {
87 // Kick the send thread out of the wait condition
88 SetReadyToRead();
89 // Wait for the send thread to complete operations
90 m_SendThread.join();
91 }
92
93 // Check if the send thread exception has to be rethrown
94 if (!rethrowSendThreadExceptions)
95 {
96 // No need to rethrow the send thread exception, return immediately
97 return;
98 }
99
100 // Check if there's an exception to rethrow
101 if (m_SendThreadException)
102 {
103 // Rethrow the send thread exception
104 std::rethrow_exception(m_SendThreadException);
105
106 // Nullify the exception as it has been rethrown
107 m_SendThreadException = nullptr;
108 }
109}
110
111void SendThread::Send(IProfilingConnection& profilingConnection)
112{
113 // Run once and keep the sending procedure looping until the thread is signalled to stop
114 do
115 {
116 // Check the current state of the profiling service
117 ProfilingState currentState = m_StateMachine.GetCurrentState();
118 switch (currentState)
119 {
120 case ProfilingState::Uninitialised:
121 case ProfilingState::NotConnected:
122
123 // The send thread cannot be running when the profiling service is uninitialized or not connected,
124 // stop the thread immediately
125 m_KeepRunning.store(false);
126 m_IsRunning.store(false);
127
128 // An exception should be thrown here, save it to be rethrown later from the main thread so that
129 // it can be caught by the consumer
130 m_SendThreadException =
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000131 std::make_exception_ptr(armnn::RuntimeException("The send thread should not be running with the "
Sadik Armagan3896b472020-02-10 12:24:15 +0000132 "profiling service not yet initialized or connected"));
133
134 return;
135 case ProfilingState::WaitingForAck:
136
137 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
138 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
139 // updated by the command handler
140
141 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
142 m_SendCounterPacket.SendStreamMetaDataPacket();
143
144 // Flush the buffer manually to send the packet
145 FlushBuffer(profilingConnection);
146
147 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
148 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
149 // StreamMetadata again.
150
151 // Wait condition lock scope - Begin
152 {
153 std::unique_lock<std::mutex> lock(m_WaitMutex);
154
155 bool timeout = m_WaitCondition.wait_for(lock,
Finn Williams7a16dcf2020-02-10 16:59:58 +0000156 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
Sadik Armagan3896b472020-02-10 12:24:15 +0000157 [&]{ return m_ReadyToRead; });
158 // If we get notified we need to flush the buffer again
159 if(timeout)
160 {
161 // Otherwise if we just timed out don't flush the buffer
162 continue;
163 }
164 //reset condition variable predicate for next use
165 m_ReadyToRead = false;
166 }
167 // Wait condition lock scope - End
168 break;
169 case ProfilingState::Active:
170 default:
171 // Wait condition lock scope - Begin
172 {
173 std::unique_lock<std::mutex> lock(m_WaitMutex);
174
175 // Normal working state for the send thread
176 // Check if the send thread is required to enforce a timeout wait policy
177 if (m_Timeout < 0)
178 {
179 // Wait indefinitely until notified that something to read has become available in the buffer
180 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
181 }
182 else
183 {
184 // Wait until the thread is notified of something to read from the buffer,
185 // or check anyway after the specified number of milliseconds
186 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
187 }
188
189 //reset condition variable predicate for next use
190 m_ReadyToRead = false;
191 }
192 // Wait condition lock scope - End
193 break;
194 }
195
196 // Send all the available packets in the buffer
197 FlushBuffer(profilingConnection);
198 } while (m_KeepRunning.load());
199
200 // Ensure that all readable data got written to the profiling connection before the thread is stopped
201 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
202 FlushBuffer(profilingConnection, false);
203
204 // Mark the send thread as not running
205 m_IsRunning.store(false);
206}
207
208void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
209{
210 // Get the first available readable buffer
211 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
212
213 // Initialize the flag that indicates whether at least a packet has been sent
214 bool packetsSent = false;
215
216 while (packetBuffer != nullptr)
217 {
218 // Get the data to send from the buffer
219 const unsigned char* readBuffer = packetBuffer->GetReadableData();
220 unsigned int readBufferSize = packetBuffer->GetSize();
221
222 if (readBuffer == nullptr || readBufferSize == 0)
223 {
224 // Nothing to send, get the next available readable buffer and continue
225 m_BufferManager.MarkRead(packetBuffer);
226 packetBuffer = m_BufferManager.GetReadableBuffer();
227
228 continue;
229 }
230
231 // Check that the profiling connection is open, silently drop the data and continue if it's closed
232 if (profilingConnection.IsOpen())
233 {
234 // Write a packet to the profiling connection. Silently ignore any write error and continue
Matthew Sloyan371b70e2020-09-11 10:14:57 +0100235 profilingConnection.WritePacket(readBuffer, armnn::numeric_cast<uint32_t>(readBufferSize));
Sadik Armagan3896b472020-02-10 12:24:15 +0000236
237 // Set the flag that indicates whether at least a packet has been sent
238 packetsSent = true;
239 }
240
241 // Mark the packet buffer as read
242 m_BufferManager.MarkRead(packetBuffer);
243
244 // Get the next available readable buffer
245 packetBuffer = m_BufferManager.GetReadableBuffer();
246 }
247 // Check whether at least a packet has been sent
248 if (packetsSent && notifyWatchers)
249 {
250 // Wait for the parent thread to release its mutex if necessary
251 {
252 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
253 m_PacketSent = true;
254 }
255 // Notify to any watcher that something has been sent
256 m_PacketSentWaitCondition.notify_one();
257 }
258}
259
260bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
261{
262 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
263 // Blocks until notified that at least a packet has been sent or until timeout expires.
264 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
265 std::chrono::milliseconds(timeout),
266 [&] { return m_PacketSent; });
267
268 m_PacketSent = false;
269
270 return timedOut;
271}
272
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000273} // namespace pipe
Sadik Armagan3896b472020-02-10 12:24:15 +0000274
Cathal Corbett5aa9fd72022-02-25 15:33:28 +0000275} // namespace arm