blob: ee9b8b1701d3fe8ed0ecea6d11d0432d7e5d4b9a [file] [log] [blame]
//
// Copyright © 2019 Arm Ltd and Contributors. All rights reserved.
// SPDX-License-Identifier: MIT
//
#include "FileOnlyProfilingConnection.hpp"
#include <armnn/Exceptions.hpp>
#include <common/include/Constants.hpp>
#include <common/include/ProfilingException.hpp>
#include <common/include/PacketVersionResolver.hpp>
#include <algorithm>
#include <iostream>
#include <thread>
namespace arm
{
namespace pipe
{
std::vector<uint32_t> StreamMetaDataProcessor::GetHeadersAccepted()
{
std::vector<uint32_t> headers;
headers.push_back(m_MetaDataPacketHeader);
return headers;
}
void StreamMetaDataProcessor::HandlePacket(const arm::pipe::Packet& packet)
{
if (packet.GetHeader() != m_MetaDataPacketHeader)
{
throw arm::pipe::ProfilingException("StreamMetaDataProcessor can only handle Stream Meta Data Packets");
}
// determine the endianness of the protocol
TargetEndianness endianness;
if (ToUint32(packet.GetData(),TargetEndianness::BeWire) == arm::pipe::PIPE_MAGIC)
{
endianness = TargetEndianness::BeWire;
}
else if (ToUint32(packet.GetData(), TargetEndianness::LeWire) == arm::pipe::PIPE_MAGIC)
{
endianness = TargetEndianness::LeWire;
}
else
{
throw arm::pipe::ProfilingException("Protocol read error. Unable to read the PIPE_MAGIC value.");
}
m_FileOnlyProfilingConnection->SetEndianess(endianness);
// send back the acknowledgement
std::unique_ptr<unsigned char[]> uniqueNullPtr = nullptr;
arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
m_FileOnlyProfilingConnection->ReturnPacket(returnPacket);
}
uint32_t StreamMetaDataProcessor::ToUint32(const unsigned char* data, TargetEndianness endianness)
{
// Extract the first 4 bytes starting at data and push them into a 32bit integer based on the
// specified endianness.
if (endianness == TargetEndianness::BeWire)
{
return static_cast<uint32_t>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
static_cast<uint32_t>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
}
else
{
return static_cast<uint32_t>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
static_cast<uint32_t>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
}
}
FileOnlyProfilingConnection::~FileOnlyProfilingConnection()
{
try
{
Close();
}
catch (...)
{
// do nothing
}
}
bool FileOnlyProfilingConnection::IsOpen() const
{
// This type of connection is always open.
return true;
}
void FileOnlyProfilingConnection::Close()
{
// Dump any unread packets out of the queue.
size_t initialSize = m_PacketQueue.size();
for (size_t i = 0; i < initialSize; ++i)
{
m_PacketQueue.pop();
}
// dispose of the processing thread
m_KeepRunning.store(false);
if (m_LocalHandlersThread.joinable())
{
// make sure the thread wakes up and sees it has to stop
m_ConditionPacketReadable.notify_one();
m_LocalHandlersThread.join();
}
}
bool FileOnlyProfilingConnection::WritePacket(const unsigned char* buffer, uint32_t length)
{
ARMNN_ASSERT(buffer);
arm::pipe::Packet packet = ReceivePacket(buffer, length);
ForwardPacketToHandlers(packet);
return true;
}
void FileOnlyProfilingConnection::ReturnPacket(arm::pipe::Packet& packet)
{
{
std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
m_PacketQueue.push(std::move(packet));
}
m_ConditionPacketAvailable.notify_one();
}
arm::pipe::Packet FileOnlyProfilingConnection::ReadPacket(uint32_t timeout)
{
std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
// Here we are using m_PacketQueue.empty() as a predicate variable
// The conditional variable will wait until packetQueue is not empty or until a timeout
if (!m_ConditionPacketAvailable.wait_for(lck,
std::chrono::milliseconds(timeout),
[&]{return !m_PacketQueue.empty();}))
{
arm::pipe::Packet empty;
return empty;
}
arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
m_PacketQueue.pop();
return returnedPacket;
}
void FileOnlyProfilingConnection::Fail(const std::string& errorMessage)
{
Close();
throw armnn::RuntimeException(errorMessage);
}
/// Adds a local packet handler to the FileOnlyProfilingConnection. Invoking this will start
/// a processing thread that will ensure that processing of packets will happen on a separate
/// thread from the profiling services send thread and will therefore protect against the
/// profiling message buffer becoming exhausted because packet handling slows the dispatch.
void FileOnlyProfilingConnection::AddLocalPacketHandler(ILocalPacketHandlerSharedPtr localPacketHandler)
{
m_PacketHandlers.push_back(std::move(localPacketHandler));
ILocalPacketHandlerSharedPtr localCopy = m_PacketHandlers.back();
localCopy->SetConnection(this);
if (localCopy->GetHeadersAccepted().empty())
{
//this is a universal handler
m_UniversalHandlers.push_back(localCopy);
}
else
{
for (uint32_t header : localCopy->GetHeadersAccepted())
{
auto iter = m_IndexedHandlers.find(header);
if (iter == m_IndexedHandlers.end())
{
std::vector<ILocalPacketHandlerSharedPtr> handlers;
handlers.push_back(localCopy);
m_IndexedHandlers.emplace(std::make_pair(header, handlers));
}
else
{
iter->second.push_back(localCopy);
}
}
}
}
void FileOnlyProfilingConnection::StartProcessingThread()
{
// check if the thread has already started
if (m_IsRunning.load())
{
return;
}
// make sure if there was one running before it is joined
if (m_LocalHandlersThread.joinable())
{
m_LocalHandlersThread.join();
}
m_IsRunning.store(true);
m_KeepRunning.store(true);
m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers, this);
}
void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
{
if (m_PacketHandlers.empty())
{
return;
}
if (!m_KeepRunning.load())
{
return;
}
{
std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
if (!m_KeepRunning.load())
{
return;
}
m_ReadableList.push(std::move(packet));
}
m_ConditionPacketReadable.notify_one();
}
void FileOnlyProfilingConnection::ServiceLocalHandlers()
{
do
{
arm::pipe::Packet returnedPacket;
bool readPacket = false;
{ // only lock while we are taking the packet off the incoming list
std::unique_lock<std::mutex> lck(m_ReadableMutex);
if (m_Timeout < 0)
{
m_ConditionPacketReadable.wait(lck,
[&] { return !m_ReadableList.empty(); });
}
else
{
m_ConditionPacketReadable.wait_for(lck,
std::chrono::milliseconds(std::max(m_Timeout, 1000)),
[&] { return !m_ReadableList.empty(); });
}
if (m_KeepRunning.load())
{
if (!m_ReadableList.empty())
{
returnedPacket = std::move(m_ReadableList.front());
m_ReadableList.pop();
readPacket = true;
}
}
else
{
ClearReadableList();
}
}
if (m_KeepRunning.load() && readPacket)
{
DispatchPacketToHandlers(returnedPacket);
}
} while (m_KeepRunning.load());
// make sure the readable list is cleared
ClearReadableList();
m_IsRunning.store(false);
}
void FileOnlyProfilingConnection::ClearReadableList()
{
// make sure the incoming packet queue gets emptied
size_t initialSize = m_ReadableList.size();
for (size_t i = 0; i < initialSize; ++i)
{
m_ReadableList.pop();
}
}
void FileOnlyProfilingConnection::DispatchPacketToHandlers(const arm::pipe::Packet& packet)
{
for (auto& delegate : m_UniversalHandlers)
{
delegate->HandlePacket(packet);
}
auto iter = m_IndexedHandlers.find(packet.GetHeader());
if (iter != m_IndexedHandlers.end())
{
for (auto& delegate : iter->second)
{
try
{
delegate->HandlePacket(packet);
}
catch (const arm::pipe::ProfilingException& ex)
{
Fail(ex.what());
}
catch (const std::exception& ex)
{
Fail(ex.what());
}
catch (...)
{
Fail("handler failed");
}
}
}
}
} // namespace pipe
} // namespace arm