blob: 86e6c05a02c4eeb5b2f7063b207ea63f020fd0d8 [file] [log] [blame]
Sadik Armagan3896b472020-02-10 12:24:15 +00001//
Jim Flynnbbfe6032020-07-20 16:57:44 +01002// Copyright © 2020 Arm Ltd and Contributors. All rights reserved.
Sadik Armagan3896b472020-02-10 12:24:15 +00003// SPDX-License-Identifier: MIT
4//
5
6#include "SendThread.hpp"
Sadik Armagan3896b472020-02-10 12:24:15 +00007#include "ProfilingUtils.hpp"
8
9#include <armnn/Exceptions.hpp>
10#include <armnn/Conversion.hpp>
11#include <Processes.hpp>
12
13#include <boost/format.hpp>
14#include <boost/numeric/conversion/cast.hpp>
Sadik Armagan3896b472020-02-10 12:24:15 +000015
16#include <cstring>
17
18namespace armnn
19{
20
21namespace profiling
22{
23
24using boost::numeric_cast;
25
26SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
Jim Flynnbbfe6032020-07-20 16:57:44 +010027 armnn::profiling::IBufferManager& buffer,
28 armnn::profiling::ISendCounterPacket& sendCounterPacket,
29 int timeout)
Sadik Armagan3896b472020-02-10 12:24:15 +000030 : m_StateMachine(profilingStateMachine)
31 , m_BufferManager(buffer)
32 , m_SendCounterPacket(sendCounterPacket)
33 , m_Timeout(timeout)
34 , m_IsRunning(false)
35 , m_KeepRunning(false)
36 , m_SendThreadException(nullptr)
37{
38 m_BufferManager.SetConsumer(this);
39}
40
41void SendThread::SetReadyToRead()
42{
43 // We need to wait for the send thread to release its mutex
44 {
45 std::lock_guard<std::mutex> lck(m_WaitMutex);
46 m_ReadyToRead = true;
47 }
48 // Signal the send thread that there's something to read in the buffer
49 m_WaitCondition.notify_one();
50}
51
52void SendThread::Start(IProfilingConnection& profilingConnection)
53{
54 // Check if the send thread is already running
55 if (m_IsRunning.load())
56 {
57 // The send thread is already running
58 return;
59 }
60
61 if (m_SendThread.joinable())
62 {
63 m_SendThread.join();
64 }
65
66 // Mark the send thread as running
67 m_IsRunning.store(true);
68
69 // Keep the send procedure going until the send thread is signalled to stop
70 m_KeepRunning.store(true);
71
72 // Make sure the send thread will not flush the buffer until signaled to do so
73 // no need for a mutex as the send thread can not be running at this point
74 m_ReadyToRead = false;
75
76 m_PacketSent = false;
77
78 // Start the send thread
79 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
80}
81
82void SendThread::Stop(bool rethrowSendThreadExceptions)
83{
84 // Signal the send thread to stop
85 m_KeepRunning.store(false);
86
87 // Check that the send thread is running
88 if (m_SendThread.joinable())
89 {
90 // Kick the send thread out of the wait condition
91 SetReadyToRead();
92 // Wait for the send thread to complete operations
93 m_SendThread.join();
94 }
95
96 // Check if the send thread exception has to be rethrown
97 if (!rethrowSendThreadExceptions)
98 {
99 // No need to rethrow the send thread exception, return immediately
100 return;
101 }
102
103 // Check if there's an exception to rethrow
104 if (m_SendThreadException)
105 {
106 // Rethrow the send thread exception
107 std::rethrow_exception(m_SendThreadException);
108
109 // Nullify the exception as it has been rethrown
110 m_SendThreadException = nullptr;
111 }
112}
113
114void SendThread::Send(IProfilingConnection& profilingConnection)
115{
116 // Run once and keep the sending procedure looping until the thread is signalled to stop
117 do
118 {
119 // Check the current state of the profiling service
120 ProfilingState currentState = m_StateMachine.GetCurrentState();
121 switch (currentState)
122 {
123 case ProfilingState::Uninitialised:
124 case ProfilingState::NotConnected:
125
126 // The send thread cannot be running when the profiling service is uninitialized or not connected,
127 // stop the thread immediately
128 m_KeepRunning.store(false);
129 m_IsRunning.store(false);
130
131 // An exception should be thrown here, save it to be rethrown later from the main thread so that
132 // it can be caught by the consumer
133 m_SendThreadException =
134 std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
135 "profiling service not yet initialized or connected"));
136
137 return;
138 case ProfilingState::WaitingForAck:
139
140 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
141 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
142 // updated by the command handler
143
144 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
145 m_SendCounterPacket.SendStreamMetaDataPacket();
146
147 // Flush the buffer manually to send the packet
148 FlushBuffer(profilingConnection);
149
150 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
151 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
152 // StreamMetadata again.
153
154 // Wait condition lock scope - Begin
155 {
156 std::unique_lock<std::mutex> lock(m_WaitMutex);
157
158 bool timeout = m_WaitCondition.wait_for(lock,
Finn Williams7a16dcf2020-02-10 16:59:58 +0000159 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
Sadik Armagan3896b472020-02-10 12:24:15 +0000160 [&]{ return m_ReadyToRead; });
161 // If we get notified we need to flush the buffer again
162 if(timeout)
163 {
164 // Otherwise if we just timed out don't flush the buffer
165 continue;
166 }
167 //reset condition variable predicate for next use
168 m_ReadyToRead = false;
169 }
170 // Wait condition lock scope - End
171 break;
172 case ProfilingState::Active:
173 default:
174 // Wait condition lock scope - Begin
175 {
176 std::unique_lock<std::mutex> lock(m_WaitMutex);
177
178 // Normal working state for the send thread
179 // Check if the send thread is required to enforce a timeout wait policy
180 if (m_Timeout < 0)
181 {
182 // Wait indefinitely until notified that something to read has become available in the buffer
183 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
184 }
185 else
186 {
187 // Wait until the thread is notified of something to read from the buffer,
188 // or check anyway after the specified number of milliseconds
189 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
190 }
191
192 //reset condition variable predicate for next use
193 m_ReadyToRead = false;
194 }
195 // Wait condition lock scope - End
196 break;
197 }
198
199 // Send all the available packets in the buffer
200 FlushBuffer(profilingConnection);
201 } while (m_KeepRunning.load());
202
203 // Ensure that all readable data got written to the profiling connection before the thread is stopped
204 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
205 FlushBuffer(profilingConnection, false);
206
207 // Mark the send thread as not running
208 m_IsRunning.store(false);
209}
210
211void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
212{
213 // Get the first available readable buffer
214 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
215
216 // Initialize the flag that indicates whether at least a packet has been sent
217 bool packetsSent = false;
218
219 while (packetBuffer != nullptr)
220 {
221 // Get the data to send from the buffer
222 const unsigned char* readBuffer = packetBuffer->GetReadableData();
223 unsigned int readBufferSize = packetBuffer->GetSize();
224
225 if (readBuffer == nullptr || readBufferSize == 0)
226 {
227 // Nothing to send, get the next available readable buffer and continue
228 m_BufferManager.MarkRead(packetBuffer);
229 packetBuffer = m_BufferManager.GetReadableBuffer();
230
231 continue;
232 }
233
234 // Check that the profiling connection is open, silently drop the data and continue if it's closed
235 if (profilingConnection.IsOpen())
236 {
237 // Write a packet to the profiling connection. Silently ignore any write error and continue
238 profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
239
240 // Set the flag that indicates whether at least a packet has been sent
241 packetsSent = true;
242 }
243
244 // Mark the packet buffer as read
245 m_BufferManager.MarkRead(packetBuffer);
246
247 // Get the next available readable buffer
248 packetBuffer = m_BufferManager.GetReadableBuffer();
249 }
250 // Check whether at least a packet has been sent
251 if (packetsSent && notifyWatchers)
252 {
253 // Wait for the parent thread to release its mutex if necessary
254 {
255 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
256 m_PacketSent = true;
257 }
258 // Notify to any watcher that something has been sent
259 m_PacketSentWaitCondition.notify_one();
260 }
261}
262
263bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
264{
265 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
266 // Blocks until notified that at least a packet has been sent or until timeout expires.
267 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
268 std::chrono::milliseconds(timeout),
269 [&] { return m_PacketSent; });
270
271 m_PacketSent = false;
272
273 return timedOut;
274}
275
276} // namespace profiling
277
278} // namespace armnn