7 #include "PacketVersionResolver.hpp" 10 #include <common/include/Constants.hpp> 11 #include <common/include/ProfilingException.hpp> 25 std::vector<uint32_t> headers;
26 headers.push_back(m_MetaDataPacketHeader);
32 if (packet.GetHeader() != m_MetaDataPacketHeader)
34 throw arm::pipe::ProfilingException(
"StreamMetaDataProcessor can only handle Stream Meta Data Packets");
48 throw arm::pipe::ProfilingException(
"Protocol read error. Unable to read the PIPE_MAGIC value.");
52 std::unique_ptr<unsigned char[]> uniqueNullPtr =
nullptr;
53 arm::pipe::Packet returnPacket(0x10000, 0, uniqueNullPtr);
54 m_FileOnlyProfilingConnection->
ReturnPacket(returnPacket);
57 uint32_t StreamMetaDataProcessor::ToUint32(
const unsigned char* data,
TargetEndianness endianness)
63 return static_cast<uint32_t
>(data[0]) << 24 | static_cast<uint32_t>(data[1]) << 16 |
64 static_cast<uint32_t
>(data[2]) << 8 | static_cast<uint32_t>(data[3]);
68 return static_cast<uint32_t
>(data[3]) << 24 | static_cast<uint32_t>(data[2]) << 16 |
69 static_cast<uint32_t
>(data[1]) << 8 | static_cast<uint32_t>(data[0]);
94 size_t initialSize = m_PacketQueue.size();
95 for (
size_t i = 0; i < initialSize; ++i)
100 m_KeepRunning.store(
false);
101 if (m_LocalHandlersThread.joinable())
104 m_ConditionPacketReadable.notify_one();
105 m_LocalHandlersThread.join();
113 ForwardPacketToHandlers(packet);
120 std::lock_guard<std::mutex> lck(m_PacketAvailableMutex);
121 m_PacketQueue.push(std::move(packet));
123 m_ConditionPacketAvailable.notify_one();
128 std::unique_lock<std::mutex> lck(m_PacketAvailableMutex);
132 if (!m_ConditionPacketAvailable.wait_for(lck,
133 std::chrono::milliseconds(timeout),
134 [&]{return !m_PacketQueue.empty();}))
136 arm::pipe::Packet empty;
140 arm::pipe::Packet returnedPacket = std::move(m_PacketQueue.front());
142 return returnedPacket;
145 void FileOnlyProfilingConnection::Fail(
const std::string& errorMessage)
157 m_PacketHandlers.push_back(std::move(localPacketHandler));
159 localCopy->SetConnection(
this);
160 if (localCopy->GetHeadersAccepted().empty())
163 m_UniversalHandlers.push_back(localCopy);
167 for (uint32_t header : localCopy->GetHeadersAccepted())
169 auto iter = m_IndexedHandlers.find(header);
170 if (iter == m_IndexedHandlers.end())
172 std::vector<ILocalPacketHandlerSharedPtr> handlers;
173 handlers.push_back(localCopy);
174 m_IndexedHandlers.emplace(std::make_pair(header, handlers));
178 iter->second.push_back(localCopy);
184 void FileOnlyProfilingConnection::StartProcessingThread()
187 if (m_IsRunning.load())
192 if (m_LocalHandlersThread.joinable())
194 m_LocalHandlersThread.join();
196 m_IsRunning.store(
true);
197 m_KeepRunning.store(
true);
198 m_LocalHandlersThread = std::thread(&FileOnlyProfilingConnection::ServiceLocalHandlers,
this);
201 void FileOnlyProfilingConnection::ForwardPacketToHandlers(arm::pipe::Packet& packet)
203 if (m_PacketHandlers.empty())
207 if (!m_KeepRunning.load())
212 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
213 if (!m_KeepRunning.load())
217 m_ReadableList.push(std::move(packet));
219 m_ConditionPacketReadable.notify_one();
222 void FileOnlyProfilingConnection::ServiceLocalHandlers()
226 arm::pipe::Packet returnedPacket;
227 bool readPacket =
false;
229 std::unique_lock<std::mutex> lck(m_ReadableMutex);
232 m_ConditionPacketReadable.wait(lck,
233 [&] {
return !m_ReadableList.empty(); });
237 m_ConditionPacketReadable.wait_for(lck,
238 std::chrono::milliseconds(std::max(m_Timeout, 1000)),
239 [&] {
return !m_ReadableList.empty(); });
241 if (m_KeepRunning.load())
243 if (!m_ReadableList.empty())
245 returnedPacket = std::move(m_ReadableList.front());
246 m_ReadableList.pop();
255 if (m_KeepRunning.load() && readPacket)
257 DispatchPacketToHandlers(returnedPacket);
259 }
while (m_KeepRunning.load());
262 m_IsRunning.store(
false);
265 void FileOnlyProfilingConnection::ClearReadableList()
268 size_t initialSize = m_ReadableList.size();
269 for (
size_t i = 0; i < initialSize; ++i)
271 m_ReadableList.pop();
275 void FileOnlyProfilingConnection::DispatchPacketToHandlers(
const arm::pipe::Packet& packet)
277 for (
auto& delegate : m_UniversalHandlers)
279 delegate->HandlePacket(packet);
281 auto iter = m_IndexedHandlers.find(packet.GetHeader());
282 if (iter != m_IndexedHandlers.end())
284 for (
auto& delegate : iter->second)
288 delegate->HandlePacket(packet);
290 catch (
const arm::pipe::ProfilingException& ex)
294 catch (
const std::exception& ex)
300 Fail(
"handler failed");
std::shared_ptr< ILocalPacketHandler > ILocalPacketHandlerSharedPtr
~FileOnlyProfilingConnection() override
bool IsOpen() const override
Copyright (c) 2021 ARM Limited and Contributors.
arm::pipe::Packet ReceivePacket(const unsigned char *buffer, uint32_t length)
void SetEndianess(const TargetEndianness &endianness) override
#define ARMNN_ASSERT(COND)
void ReturnPacket(arm::pipe::Packet &packet) override
bool WritePacket(const unsigned char *buffer, uint32_t length) override
arm::pipe::Packet ReadPacket(uint32_t timeout) override