blob: 5962f2fc5d51c713c2582a044c1d127637e87a37 [file] [log] [blame]
Sadik Armagan3896b472020-02-10 12:24:15 +00001//
2// Copyright © 2020 Arm Ltd. All rights reserved.
3// SPDX-License-Identifier: MIT
4//
5
6#include "SendThread.hpp"
7#include "EncodeVersion.hpp"
8#include "ProfilingUtils.hpp"
9
10#include <armnn/Exceptions.hpp>
11#include <armnn/Conversion.hpp>
12#include <Processes.hpp>
13
14#include <boost/format.hpp>
15#include <boost/numeric/conversion/cast.hpp>
Sadik Armagan3896b472020-02-10 12:24:15 +000016
17#include <cstring>
18
19namespace armnn
20{
21
22namespace profiling
23{
24
25using boost::numeric_cast;
26
27SendThread::SendThread(armnn::profiling::ProfilingStateMachine& profilingStateMachine,
28 armnn::profiling::IBufferManager& buffer, armnn::profiling::ISendCounterPacket& sendCounterPacket, int timeout)
29 : m_StateMachine(profilingStateMachine)
30 , m_BufferManager(buffer)
31 , m_SendCounterPacket(sendCounterPacket)
32 , m_Timeout(timeout)
33 , m_IsRunning(false)
34 , m_KeepRunning(false)
35 , m_SendThreadException(nullptr)
36{
37 m_BufferManager.SetConsumer(this);
38}
39
40void SendThread::SetReadyToRead()
41{
42 // We need to wait for the send thread to release its mutex
43 {
44 std::lock_guard<std::mutex> lck(m_WaitMutex);
45 m_ReadyToRead = true;
46 }
47 // Signal the send thread that there's something to read in the buffer
48 m_WaitCondition.notify_one();
49}
50
51void SendThread::Start(IProfilingConnection& profilingConnection)
52{
53 // Check if the send thread is already running
54 if (m_IsRunning.load())
55 {
56 // The send thread is already running
57 return;
58 }
59
60 if (m_SendThread.joinable())
61 {
62 m_SendThread.join();
63 }
64
65 // Mark the send thread as running
66 m_IsRunning.store(true);
67
68 // Keep the send procedure going until the send thread is signalled to stop
69 m_KeepRunning.store(true);
70
71 // Make sure the send thread will not flush the buffer until signaled to do so
72 // no need for a mutex as the send thread can not be running at this point
73 m_ReadyToRead = false;
74
75 m_PacketSent = false;
76
77 // Start the send thread
78 m_SendThread = std::thread(&SendThread::Send, this, std::ref(profilingConnection));
79}
80
81void SendThread::Stop(bool rethrowSendThreadExceptions)
82{
83 // Signal the send thread to stop
84 m_KeepRunning.store(false);
85
86 // Check that the send thread is running
87 if (m_SendThread.joinable())
88 {
89 // Kick the send thread out of the wait condition
90 SetReadyToRead();
91 // Wait for the send thread to complete operations
92 m_SendThread.join();
93 }
94
95 // Check if the send thread exception has to be rethrown
96 if (!rethrowSendThreadExceptions)
97 {
98 // No need to rethrow the send thread exception, return immediately
99 return;
100 }
101
102 // Check if there's an exception to rethrow
103 if (m_SendThreadException)
104 {
105 // Rethrow the send thread exception
106 std::rethrow_exception(m_SendThreadException);
107
108 // Nullify the exception as it has been rethrown
109 m_SendThreadException = nullptr;
110 }
111}
112
113void SendThread::Send(IProfilingConnection& profilingConnection)
114{
115 // Run once and keep the sending procedure looping until the thread is signalled to stop
116 do
117 {
118 // Check the current state of the profiling service
119 ProfilingState currentState = m_StateMachine.GetCurrentState();
120 switch (currentState)
121 {
122 case ProfilingState::Uninitialised:
123 case ProfilingState::NotConnected:
124
125 // The send thread cannot be running when the profiling service is uninitialized or not connected,
126 // stop the thread immediately
127 m_KeepRunning.store(false);
128 m_IsRunning.store(false);
129
130 // An exception should be thrown here, save it to be rethrown later from the main thread so that
131 // it can be caught by the consumer
132 m_SendThreadException =
133 std::make_exception_ptr(RuntimeException("The send thread should not be running with the "
134 "profiling service not yet initialized or connected"));
135
136 return;
137 case ProfilingState::WaitingForAck:
138
139 // Send out a StreamMetadata packet and wait for the profiling connection to be acknowledged.
140 // When a ConnectionAcknowledged packet is received, the profiling service state will be automatically
141 // updated by the command handler
142
143 // Prepare a StreamMetadata packet and write it to the Counter Stream buffer
144 m_SendCounterPacket.SendStreamMetaDataPacket();
145
146 // Flush the buffer manually to send the packet
147 FlushBuffer(profilingConnection);
148
149 // Wait for a connection ack from the remote server. We should expect a response within timeout value.
150 // If not, drop back to the start of the loop and detect somebody closing the thread. Then send the
151 // StreamMetadata again.
152
153 // Wait condition lock scope - Begin
154 {
155 std::unique_lock<std::mutex> lock(m_WaitMutex);
156
157 bool timeout = m_WaitCondition.wait_for(lock,
Finn Williams7a16dcf2020-02-10 16:59:58 +0000158 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
Sadik Armagan3896b472020-02-10 12:24:15 +0000159 [&]{ return m_ReadyToRead; });
160 // If we get notified we need to flush the buffer again
161 if(timeout)
162 {
163 // Otherwise if we just timed out don't flush the buffer
164 continue;
165 }
166 //reset condition variable predicate for next use
167 m_ReadyToRead = false;
168 }
169 // Wait condition lock scope - End
170 break;
171 case ProfilingState::Active:
172 default:
173 // Wait condition lock scope - Begin
174 {
175 std::unique_lock<std::mutex> lock(m_WaitMutex);
176
177 // Normal working state for the send thread
178 // Check if the send thread is required to enforce a timeout wait policy
179 if (m_Timeout < 0)
180 {
181 // Wait indefinitely until notified that something to read has become available in the buffer
182 m_WaitCondition.wait(lock, [&] { return m_ReadyToRead; });
183 }
184 else
185 {
186 // Wait until the thread is notified of something to read from the buffer,
187 // or check anyway after the specified number of milliseconds
188 m_WaitCondition.wait_for(lock, std::chrono::milliseconds(m_Timeout), [&] { return m_ReadyToRead; });
189 }
190
191 //reset condition variable predicate for next use
192 m_ReadyToRead = false;
193 }
194 // Wait condition lock scope - End
195 break;
196 }
197
198 // Send all the available packets in the buffer
199 FlushBuffer(profilingConnection);
200 } while (m_KeepRunning.load());
201
202 // Ensure that all readable data got written to the profiling connection before the thread is stopped
203 // (do not notify any watcher in this case, as this is just to wrap up things before shutting down the send thread)
204 FlushBuffer(profilingConnection, false);
205
206 // Mark the send thread as not running
207 m_IsRunning.store(false);
208}
209
210void SendThread::FlushBuffer(IProfilingConnection& profilingConnection, bool notifyWatchers)
211{
212 // Get the first available readable buffer
213 IPacketBufferPtr packetBuffer = m_BufferManager.GetReadableBuffer();
214
215 // Initialize the flag that indicates whether at least a packet has been sent
216 bool packetsSent = false;
217
218 while (packetBuffer != nullptr)
219 {
220 // Get the data to send from the buffer
221 const unsigned char* readBuffer = packetBuffer->GetReadableData();
222 unsigned int readBufferSize = packetBuffer->GetSize();
223
224 if (readBuffer == nullptr || readBufferSize == 0)
225 {
226 // Nothing to send, get the next available readable buffer and continue
227 m_BufferManager.MarkRead(packetBuffer);
228 packetBuffer = m_BufferManager.GetReadableBuffer();
229
230 continue;
231 }
232
233 // Check that the profiling connection is open, silently drop the data and continue if it's closed
234 if (profilingConnection.IsOpen())
235 {
236 // Write a packet to the profiling connection. Silently ignore any write error and continue
237 profilingConnection.WritePacket(readBuffer, boost::numeric_cast<uint32_t>(readBufferSize));
238
239 // Set the flag that indicates whether at least a packet has been sent
240 packetsSent = true;
241 }
242
243 // Mark the packet buffer as read
244 m_BufferManager.MarkRead(packetBuffer);
245
246 // Get the next available readable buffer
247 packetBuffer = m_BufferManager.GetReadableBuffer();
248 }
249 // Check whether at least a packet has been sent
250 if (packetsSent && notifyWatchers)
251 {
252 // Wait for the parent thread to release its mutex if necessary
253 {
254 std::lock_guard<std::mutex> lck(m_PacketSentWaitMutex);
255 m_PacketSent = true;
256 }
257 // Notify to any watcher that something has been sent
258 m_PacketSentWaitCondition.notify_one();
259 }
260}
261
262bool SendThread::WaitForPacketSent(uint32_t timeout = 1000)
263{
264 std::unique_lock<std::mutex> lock(m_PacketSentWaitMutex);
265 // Blocks until notified that at least a packet has been sent or until timeout expires.
266 bool timedOut = m_PacketSentWaitCondition.wait_for(lock,
267 std::chrono::milliseconds(timeout),
268 [&] { return m_PacketSent; });
269
270 m_PacketSent = false;
271
272 return timedOut;
273}
274
275} // namespace profiling
276
277} // namespace armnn