diff --git a/Src/IACore/imp/cpp/FileOps.cpp b/Src/IACore/imp/cpp/FileOps.cpp index 0cdcb28..df27304 100644 --- a/Src/IACore/imp/cpp/FileOps.cpp +++ b/Src/IACore/imp/cpp/FileOps.cpp @@ -36,12 +36,10 @@ namespace IACore const auto fd = (INT32) ((UINT64) std::get<0>(handles)); if (fd != -1) ::close(fd); -#else -# error "IACore FileOps does not support this platform" #endif } - Expected FileOps::MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner) + Expected FileOps::MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner) { #if IA_PLATFORM_WINDOWS int wchars_num = MultiByteToWideChar(CP_UTF8, 0, name.c_str(), -1, NULL, 0); @@ -59,7 +57,7 @@ namespace IACore return MakeUnexpected( std::format("Failed to {} shared memory '{}'", isOwner ? "owner" : "consumer", name.c_str())); - const auto result = static_cast(MapViewOfFile(hMap, FILE_MAP_ALL_ACCESS, 0, 0, size)); + const auto result = static_cast(MapViewOfFile(hMap, FILE_MAP_ALL_ACCESS, 0, 0, size)); if (result == NULL) { CloseHandle(hMap); @@ -98,18 +96,18 @@ namespace IACore return MakeUnexpected(std::format("Failed to mmap shared memory '{}'", name.c_str())); } - const auto result = static_cast(addr); + const auto result = static_cast(addr); s_mappedFiles[result] = std::make_tuple((PVOID) ((UINT64) fd), (PVOID) addr, (PVOID) size); return result; -#else -# error "IACore FileOps does not support this platform" #endif } VOID FileOps::UnlinkSharedMemory(IN CONST String &name) { + if (name.empty()) + return; #if IA_PLATFORM_UNIX shm_unlink(name.c_str()); #endif @@ -182,9 +180,6 @@ namespace IACore madvise(addr, size, MADV_SEQUENTIAL); s_mappedFiles[result] = std::make_tuple((PVOID) ((UINT64) handle), (PVOID) addr, (PVOID) size); return result; - -#else -# error "IACore FileOps does not support this platform" #endif } @@ -260,17 +255,23 @@ namespace IACore FilePath FileOps::NormalizeExecutablePath(IN CONST FilePath &path) { + FilePath result = path; + #if IA_PLATFORM_WINDOWS - return path; + if (!result.has_extension()) + result.replace_extension(".exe"); + #elif IA_PLATFORM_UNIX - String result(path); - if (path.extension() == ".exe") - result.resize(result.size() - 4); - if (!path.is_absolute() && !path.string().starts_with("./")) - result = "./" + result; - return FilePath(result); -#else -# error "IACore FileOps does not support this platform" + if (result.extension() == ".exe") + result.replace_extension(""); + + if (result.is_relative()) + { + String pathStr = result.string(); + if (!pathStr.starts_with("./") && !pathStr.starts_with("../")) + result = "./" + pathStr; + } #endif + return result; } } // namespace IACore \ No newline at end of file diff --git a/Src/IACore/imp/cpp/IACore.cpp b/Src/IACore/imp/cpp/IACore.cpp index bd2edf8..5311c29 100644 --- a/Src/IACore/imp/cpp/IACore.cpp +++ b/Src/IACore/imp/cpp/IACore.cpp @@ -21,13 +21,13 @@ namespace IACore { - HRTimePoint g_startTime{}; + HighResTimePoint g_startTime{}; std::thread::id g_mainThreadID{}; VOID Initialize() { g_mainThreadID = std::this_thread::get_id(); - g_startTime = HRClock::now(); + g_startTime = HighResClock::now(); Logger::Initialize(); } @@ -38,12 +38,12 @@ namespace IACore UINT64 GetTicksCount() { - return std::chrono::duration_cast(HRClock::now() - g_startTime).count(); + return std::chrono::duration_cast(HighResClock::now() - g_startTime).count(); } FLOAT64 GetSecondsCount() { - return (HRClock::now() - g_startTime).count(); + return (HighResClock::now() - g_startTime).count(); } UINT32 GetRandom(IN UINT32 seed) diff --git a/Src/IACore/imp/cpp/IPC.cpp b/Src/IACore/imp/cpp/IPC.cpp index 6bdf851..3984bed 100644 --- a/Src/IACore/imp/cpp/IPC.cpp +++ b/Src/IACore/imp/cpp/IPC.cpp @@ -16,7 +16,398 @@ #include +#include +#include + namespace IACore { - -} \ No newline at end of file + struct IPC_ConnectionDescriptor + { + String SocketPath; + String SharedMemPath; + UINT32 SharedMemSize; + + String Serialize() CONST + { + return std::format("{}|{}|{}|", SocketPath, SharedMemPath, SharedMemSize); + } + + STATIC IPC_ConnectionDescriptor Deserialize(IN CONST String &data) + { + enum class EParseState + { + SocketPath, + SharedMemPath, + SharedMemSize + }; + + IPC_ConnectionDescriptor result{}; + + SIZE_T t{}; + EParseState state{EParseState::SocketPath}; + for (SIZE_T i = 0; i < data.size(); i++) + { + if (data[i] != '|') + continue; + + switch (state) + { + case EParseState::SocketPath: + result.SocketPath = data.substr(t, i - t); + state = EParseState::SharedMemPath; + break; + + case EParseState::SharedMemPath: + result.SharedMemPath = data.substr(t, i - t); + state = EParseState::SharedMemSize; + break; + + case EParseState::SharedMemSize: { + if (std::from_chars(&data[t], &data[i], result.SharedMemSize).ec != std::errc{}) + return {}; + goto done_parsing; + } + } + t = i + 1; + } + + done_parsing: + return result; + } + }; +} // namespace IACore + +namespace IACore +{ + IPC_Node::~IPC_Node() + { + SocketOps::Close(m_socket); // SocketOps gracefully handles INVALID_SOCKET + } + + Expected IPC_Node::Connect(IN PCCHAR connectionString) + { + auto desc = IPC_ConnectionDescriptor::Deserialize(connectionString); + m_shmName = desc.SharedMemPath; + + m_socket = SocketOps::CreateUnixSocket(); + if (!SocketOps::ConnectUnixSocket(m_socket, desc.SocketPath.c_str())) + return MakeUnexpected("Failed to create an unix socket"); + + auto mapRes = FileOps::MapSharedMemory(desc.SharedMemPath, desc.SharedMemSize, FALSE); + if (!mapRes.has_value()) + return MakeUnexpected("Failed to map the shared memory"); + + m_sharedMemory = mapRes.value(); + + auto *layout = reinterpret_cast(m_sharedMemory); + + if (layout->Meta.Magic != 0x49414950) // "IAIP" + return MakeUnexpected("Invalid shared memory header signature"); + + if (layout->Meta.Version != 1) + return MakeUnexpected("IPC version mismatch"); + + PUINT8 moniDataPtr = m_sharedMemory + layout->MONI_DataOffset; + PUINT8 minoDataPtr = m_sharedMemory + layout->MINO_DataOffset; + + MONI = std::make_unique( + &layout->MONI_Control, Span(moniDataPtr, static_cast(layout->MONI_DataSize)), FALSE); + + MINO = std::make_unique( + &layout->MINO_Control, Span(minoDataPtr, static_cast(layout->MINO_DataSize)), FALSE); + +#if IA_PLATFORM_WINDOWS + u_long mode = 1; + ioctlsocket(m_socket, FIONBIO, &mode); +#else + fcntl(m_socket, F_SETFL, O_NONBLOCK); +#endif + + m_recieveBuffer.resize(UINT16_MAX + 1); + + return {}; + } + + VOID IPC_Node::Update() + { + if (!MONI) + return; + + RingBufferView::PacketHeader header; + + // Process all available messages from Manager + while (MONI->Pop(header, Span(m_recieveBuffer.data(), m_recieveBuffer.size()))) + OnPacket(header.ID, {m_recieveBuffer.data(), header.PayloadSize}); + + UINT8 signal; + const auto res = recv(m_socket, &signal, 1, 0); + if (res == 1) + OnSignal(signal); + else if (res == 0 || (res < 0 && errno != EWOULDBLOCK)) + { + SocketOps::Close(m_socket); + FileOps::UnlinkSharedMemory(m_shmName); + + // Manager disconnected, exit immediately + exit(-1); + } + } + + VOID IPC_Node::SendSignal(IN UINT8 signal) + { + if (IS_VALID_SOCKET(m_socket)) + send(m_socket, (const char *) &signal, sizeof(signal), 0); + } + + VOID IPC_Node::SendPacket(IN UINT16 packetID, IN Span payload) + { + MINO->Push(packetID, payload); + } +} // namespace IACore + +namespace IACore +{ + VOID IPC_Manager::NodeSession::SendSignal(IN UINT8 signal) + { + if (IS_VALID_SOCKET(DataSocket)) + send(DataSocket, (const char *) &signal, sizeof(signal), 0); + } + + VOID IPC_Manager::NodeSession::SendPacket(IN UINT16 packetID, IN Span payload) + { + MONI->Push(packetID, payload); + } + + IPC_Manager::IPC_Manager() + { + // SocketOps is smart enough to handle multiple inits + SocketOps::Initialize(); + + m_recieveBuffer.resize(UINT16_MAX + 1); + } + + IPC_Manager::~IPC_Manager() + { + for (auto &session : m_activeSessions) + { + ProcessOps::TerminateProcess(session->ProcessHandle); + FileOps::UnmapFile(session->MappedPtr); + FileOps::UnlinkSharedMemory(session->SharedMemName); + SocketOps::Close(session->DataSocket); + } + m_activeSessions.clear(); + + for (auto &session : m_pendingSessions) + { + ProcessOps::TerminateProcess(session->ProcessHandle); + FileOps::UnmapFile(session->MappedPtr); + FileOps::UnlinkSharedMemory(session->SharedMemName); + SocketOps::Close(session->ListenerSocket); + } + m_pendingSessions.clear(); + + // SocketOps is smart enough to handle multiple terminates + SocketOps::Terminate(); + } + + VOID IPC_Manager::Update() + { + const auto now = SteadyClock::now(); + + for (INT32 i = m_pendingSessions.size() - 1; i >= 0; i--) + { + auto &session = m_pendingSessions[i]; + + if (now - session->CreationTime > std::chrono::seconds(5)) + { + ProcessOps::TerminateProcess(session->ProcessHandle); + + FileOps::UnmapFile(session->MappedPtr); + FileOps::UnlinkSharedMemory(session->SharedMemName); + SocketOps::Close(session->DataSocket); + + m_pendingSessions.erase(m_pendingSessions.begin() + i); + continue; + } + + SocketHandle newSock = accept(session->ListenerSocket, NULL, NULL); + + if (IS_VALID_SOCKET(newSock)) + { + session->DataSocket = newSock; + session->IsReady = TRUE; + + // Set Data Socket to Non-Blocking +#if IA_PLATFORM_WINDOWS + u_long mode = 1; + ioctlsocket(session->DataSocket, FIONBIO, &mode); +#else + fcntl(session->DataSocket, F_SETFL, O_NONBLOCK); +#endif + + SocketOps::Close(session->ListenerSocket); + session->ListenerSocket = INVALID_SOCKET; + + m_activeSessions.push_back(std::move(session)); + m_pendingSessions.erase(m_pendingSessions.begin() + i); + m_activeSessionMap[session->ProcessHandle->ID.load()] = session.get(); + } + } + + for (INT32 i = m_activeSessions.size() - 1; i >= 0; i--) + { + auto &node = m_activeSessions[i]; + + auto nodeID = node->ProcessHandle->ID.load(); + + RingBufferView::PacketHeader header; + + while (node->MINO->Pop(header, Span(m_recieveBuffer.data(), m_recieveBuffer.size()))) + OnPacket(nodeID, header.ID, {m_recieveBuffer.data(), header.PayloadSize}); + + UINT8 signal; + const auto res = recv(node->DataSocket, &signal, 1, 0); + + if (res == 1) + OnSignal(nodeID, signal); + + if (res == 0 || (res < 0 && errno != EWOULDBLOCK)) + { + ProcessOps::TerminateProcess(node->ProcessHandle); + + FileOps::UnmapFile(node->MappedPtr); + FileOps::UnlinkSharedMemory(node->SharedMemName); + SocketOps::Close(node->DataSocket); + + m_activeSessions.erase(m_activeSessions.begin() + i); + m_activeSessionMap.erase(node->ProcessHandle->ID.load()); + } + } + } + + Expected IPC_Manager::SpawnNode(IN CONST FilePath &executablePath, + IN UINT32 sharedMemorySize) + { + auto session = std::make_unique(); + + static Atomic s_idGen{0}; + UINT32 sid = ++s_idGen; + +#if IA_PLATFORM_WINDOWS + char tempPath[MAX_PATH]; + GetTempPathA(MAX_PATH, tempPath); + String sockPath = std::format("{}\\ia_sess_{}.sock", tempPath, sid); +#else + String sockPath = std::format("/tmp/ia_sess_{}.sock", sid); +#endif + + session->ListenerSocket = SocketOps::CreateUnixSocket(); + if (!SocketOps::BindUnixSocket(session->ListenerSocket, sockPath.c_str())) + return MakeUnexpected("Failed to bind unique socket"); + + if (listen(session->ListenerSocket, 1) != 0) + return MakeUnexpected("Failed to listen on unqiue socket"); + +#if IA_PLATFORM_WINDOWS + u_long mode = 1; + ioctlsocket(session->ListenerSocket, FIONBIO, &mode); +#else + fcntl(session->ListenerSocket, F_SETFL, O_NONBLOCK); +#endif + + String shmName = std::format("ia_shm_{}", sid); + auto mapRes = FileOps::MapSharedMemory(shmName, sharedMemorySize, TRUE); + if (!mapRes.has_value()) + return MakeUnexpected("Failed to map shared memory"); + + PUINT8 mappedPtr = mapRes.value(); + + auto *layout = reinterpret_cast(mappedPtr); + + layout->Meta.Magic = 0x49414950; + layout->Meta.Version = 1; + layout->Meta.TotalSize = sharedMemorySize; + + UINT64 headerSize = IPC_SharedMemoryLayout::GetHeaderSize(); + UINT64 usableBytes = sharedMemorySize - headerSize; + + UINT64 halfSize = (usableBytes / 2); + halfSize -= (halfSize % 64); + + layout->MONI_DataOffset = headerSize; + layout->MONI_DataSize = halfSize; + + layout->MINO_DataOffset = headerSize + halfSize; + layout->MINO_DataSize = halfSize; + + session->MONI = std::make_unique( + &layout->MONI_Control, Span(mappedPtr + layout->MONI_DataOffset, layout->MONI_DataSize), TRUE); + + session->MINO = std::make_unique( + &layout->MINO_Control, Span(mappedPtr + layout->MINO_DataOffset, layout->MINO_DataSize), TRUE); + + IPC_ConnectionDescriptor desc; + desc.SocketPath = sockPath; + desc.SharedMemPath = shmName; + desc.SharedMemSize = sharedMemorySize; + + String args = std::format("\"{}\"", desc.Serialize()); + + session->ProcessHandle = ProcessOps::SpawnProcessAsync( + FileOps::NormalizeExecutablePath(executablePath), args, + [sid](IN StringView line) { + UNUSED(sid); + UNUSED(line); + }, + [sid](IN Expected result) { + UNUSED(sid); + UNUSED(result); + }); + + session->CreationTime = SteadyClock::now(); + m_pendingSessions.push_back(std::move(session)); + + return session->ProcessHandle->ID.load(); + } + + VOID IPC_Manager::ShutdownNode(IN NativeProcessID pid) + { + const auto itNode = m_activeSessionMap.find(pid); + if (itNode == m_activeSessionMap.end()) + return; + + auto &node = itNode->second; + + ProcessOps::TerminateProcess(node->ProcessHandle); + FileOps::UnmapFile(node->MappedPtr); + FileOps::UnlinkSharedMemory(node->SharedMemName); + SocketOps::Close(node->DataSocket); + + for (auto it = m_activeSessions.begin(); it != m_activeSessions.end(); it++) + { + if (it->get() == node) + { + m_activeSessions.erase(it); + break; + } + } + + m_activeSessionMap.erase(itNode); + } + + VOID IPC_Manager::SendSignal(IN NativeProcessID node, IN UINT8 signal) + { + const auto itNode = m_activeSessionMap.find(node); + if (itNode == m_activeSessionMap.end()) + return; + itNode->second->SendSignal(signal); + } + + VOID IPC_Manager::SendPacket(IN NativeProcessID node, IN UINT16 packetID, IN Span payload) + { + const auto itNode = m_activeSessionMap.find(node); + if (itNode == m_activeSessionMap.end()) + return; + itNode->second->SendPacket(packetID, payload); + } +} // namespace IACore \ No newline at end of file diff --git a/Src/IACore/imp/cpp/ProcessOps.cpp b/Src/IACore/imp/cpp/ProcessOps.cpp index 18f9d17..8fd8def 100644 --- a/Src/IACore/imp/cpp/ProcessOps.cpp +++ b/Src/IACore/imp/cpp/ProcessOps.cpp @@ -34,6 +34,15 @@ namespace IACore namespace IACore { + NativeProcessID ProcessOps::GetCurrentProcessID() + { +#if IA_PLATFORM_WINDOWS + return ::GetCurrentProcessId(); +#else + return getpid(); +#endif + } + Expected ProcessOps::SpawnProcessSync(IN CONST String &command, IN CONST String &args, IN Function onOutputLineCallback) { @@ -52,24 +61,25 @@ namespace IACore SharedPtr handle = std::make_shared(); handle->IsRunning = true; - handle->ThreadHandle = JoiningThread([=, h = handle.get(), cmd = IA_MOVE(command), args = std::move(args)]() mutable { + handle->ThreadHandle = + JoiningThread([=, h = handle.get(), cmd = IA_MOVE(command), args = std::move(args)]() mutable { #if IA_PLATFORM_WINDOWS - auto result = SpawnProcessWindows(cmd, args, onOutputLineCallback, h->ID); + auto result = SpawnProcessWindows(cmd, args, onOutputLineCallback, h->ID); #else - auto result = SpawnProcessPosix(cmd, args, onOutputLineCallback, h->ID); + auto result = SpawnProcessPosix(cmd, args, onOutputLineCallback, h->ID); #endif - h->IsRunning = false; + h->IsRunning = false; - if (!onFinishCallback) - return; + if (!onFinishCallback) + return; - if (!result) - onFinishCallback(MakeUnexpected(result.error())); - else - onFinishCallback(*result); - }); + if (!result) + onFinishCallback(MakeUnexpected(result.error())); + else + onFinishCallback(*result); + }); return handle; } diff --git a/Src/IACore/imp/cpp/SocketOps.cpp b/Src/IACore/imp/cpp/SocketOps.cpp index 324f45c..145dc09 100644 --- a/Src/IACore/imp/cpp/SocketOps.cpp +++ b/Src/IACore/imp/cpp/SocketOps.cpp @@ -18,6 +18,15 @@ namespace IACore { + INT32 SocketOps::s_initCount{0}; + + VOID SocketOps::Close(IN SocketHandle sock) + { + if (sock == INVALID_SOCKET) + return; + CLOSE_SOCKET(sock); + } + BOOL SocketOps::Listen(IN SocketHandle sock, IN INT32 queueSize) { return listen(sock, queueSize) == 0; diff --git a/Src/IACore/inc/IACore/ADT/RingBuffer.hpp b/Src/IACore/inc/IACore/ADT/RingBuffer.hpp index 81386d7..3c71632 100644 --- a/Src/IACore/inc/IACore/ADT/RingBuffer.hpp +++ b/Src/IACore/inc/IACore/ADT/RingBuffer.hpp @@ -27,14 +27,22 @@ namespace IACore struct ControlBlock { - alignas(64) Atomic WriteOffset{0}; - alignas(64) Atomic ReadOffset{0}; - UINT32 Capacity; + struct alignas(64) + { + Atomic WriteOffset{0}; + } Producer; - // Padding to ensure data starts on a cache line - UINT8 _padding[64 - sizeof(UINT32) * 3]; + struct alignas(64) + { + Atomic ReadOffset{0}; + // Capacity is effectively constant after init, + // so it doesn't cause false sharing invalidations. + UINT32 Capacity{0}; + } Consumer; }; + static_assert(offsetof(ControlBlock, Consumer) == 64, "False sharing detected in ControlBlock"); + // All of the data in ring buffer will be stored as packets struct PacketHeader { @@ -43,9 +51,10 @@ namespace IACore }; public: - INLINE RingBufferView(IN Span buffer); + INLINE RingBufferView(IN Span buffer, IN BOOL isOwner); + INLINE RingBufferView(IN ControlBlock *controlBlock, IN Span buffer, IN BOOL isOwner); - INLINE BOOL Pop(OUT PacketHeader &outHeader, OUT Span outBuffer); + INLINE INT32 Pop(OUT PacketHeader &outHeader, OUT Span outBuffer); INLINE BOOL Push(IN UINT16 packetID, IN Span data); INLINE ControlBlock *GetControlBlock(); @@ -56,77 +65,108 @@ namespace IACore ControlBlock *m_controlBlock{}; private: - INLINE VOID WritePacket(IN UINT32 offset, IN UINT16 id, IN PCVOID data, IN UINT16 dataSize); + INLINE VOID WriteWrapped(IN UINT32 offset, IN PCVOID data, IN UINT32 size); + INLINE VOID ReadWrapped(IN UINT32 offset, OUT PVOID outData, IN UINT32 size); }; } // namespace IACore namespace IACore { - RingBufferView::RingBufferView(IN Span buffer) + RingBufferView::RingBufferView(IN Span buffer, IN BOOL isOwner) { IA_ASSERT(buffer.size() > sizeof(ControlBlock)); m_controlBlock = reinterpret_cast(buffer.data()); m_dataPtr = buffer.data() + sizeof(ControlBlock); - m_controlBlock->Capacity = m_capacity = buffer.size() - sizeof(ControlBlock); + + m_capacity = static_cast(buffer.size()) - sizeof(ControlBlock); + + if (isOwner) + { + m_controlBlock->Consumer.Capacity = m_capacity; + m_controlBlock->Producer.WriteOffset.store(0, std::memory_order_release); + m_controlBlock->Consumer.ReadOffset.store(0, std::memory_order_release); + } + else + IA_ASSERT(m_controlBlock->Consumer.Capacity == m_capacity); } - BOOL RingBufferView::Pop(OUT PacketHeader &outHeader, OUT Span outBuffer) + RingBufferView::RingBufferView(IN ControlBlock *controlBlock, IN Span buffer, IN BOOL isOwner) { - UINT32 currentWriteOffset = m_controlBlock->WriteOffset.load(std::memory_order_acquire); - UINT32 currentReadOffset = m_controlBlock->ReadOffset.load(std::memory_order_acquire); + IA_ASSERT(controlBlock != nullptr); + IA_ASSERT(buffer.size() > 0); - if (currentReadOffset == currentWriteOffset) - return false; + m_controlBlock = controlBlock; + m_dataPtr = buffer.data(); + m_capacity = static_cast(buffer.size()); - const auto header = reinterpret_cast(m_dataPtr + currentReadOffset); - - if (header->ID == PACKET_ID_SKIP) + if (isOwner) { - m_controlBlock->ReadOffset.store(0, std::memory_order_release); - return Pop(outHeader, outBuffer); + m_controlBlock->Consumer.Capacity = m_capacity; + m_controlBlock->Producer.WriteOffset.store(0, std::memory_order_release); + m_controlBlock->Consumer.ReadOffset.store(0, std::memory_order_release); + } + } + + INT32 RingBufferView::Pop(OUT PacketHeader &outHeader, OUT Span outBuffer) + { + UINT32 write = m_controlBlock->Producer.WriteOffset.load(std::memory_order_acquire); + UINT32 read = m_controlBlock->Consumer.ReadOffset.load(std::memory_order_relaxed); + UINT32 cap = m_capacity; + + if (read == write) + return 0; // Empty + + ReadWrapped(read, &outHeader, sizeof(PacketHeader)); + + if (outHeader.PayloadSize > outBuffer.size()) + return -static_cast(outHeader.PayloadSize); + + if (outHeader.PayloadSize > 0) + { + UINT32 dataReadOffset = (read + sizeof(PacketHeader)) % cap; + ReadWrapped(dataReadOffset, outBuffer.data(), outHeader.PayloadSize); } - outHeader = *header; - if (outHeader.PayloadSize > outBuffer.size()) - return false; + // Move read pointer forward + UINT32 newReadOffset = (read + sizeof(PacketHeader) + outHeader.PayloadSize) % cap; + m_controlBlock->Consumer.ReadOffset.store(newReadOffset, std::memory_order_release); - memcpy(outBuffer.data(), m_dataPtr + currentReadOffset + sizeof(PacketHeader), outHeader.PayloadSize); - m_controlBlock->ReadOffset.store(currentReadOffset + sizeof(PacketHeader) + outHeader.PayloadSize, std::memory_order_release); - - return true; + return outHeader.PayloadSize; } BOOL RingBufferView::Push(IN UINT16 packetID, IN Span data) { IA_ASSERT(data.size() <= UINT16_MAX); - UINT32 currentWriteOffset = m_controlBlock->WriteOffset.load(std::memory_order_acquire); - UINT32 currentReadOffset = m_controlBlock->ReadOffset.load(std::memory_order_acquire); + const UINT32 totalSize = sizeof(PacketHeader) + static_cast(data.size()); - UINT32 requiredSpace = sizeof(PacketHeader) + data.size(); - if (currentWriteOffset + requiredSpace <= m_capacity) + UINT32 read = m_controlBlock->Consumer.ReadOffset.load(std::memory_order_acquire); + UINT32 write = m_controlBlock->Producer.WriteOffset.load(std::memory_order_relaxed); + UINT32 cap = m_capacity; + + UINT32 freeSpace = (read <= write) + ? (m_capacity - write) + read + : (read - write); + + // Ensure to always leave 1 byte empty to prevent Read == Write ambiguity (Wait-Free Ring Buffer standard) + if (freeSpace <= totalSize) + return FALSE; + + PacketHeader header{packetID, static_cast(data.size())}; + WriteWrapped(write, &header, sizeof(PacketHeader)); + + UINT32 dataWriteOffset = (write + sizeof(PacketHeader)) % cap; + + if (data.size() > 0) { - WritePacket(currentWriteOffset, packetID, data.data(), static_cast(data.size())); - m_controlBlock->WriteOffset.store(currentWriteOffset + requiredSpace, std::memory_order_release); - return true; + WriteWrapped(dataWriteOffset, data.data(), static_cast(data.size())); } - UINT32 remainingAtEnd = m_capacity - currentWriteOffset; - if (requiredSpace < currentReadOffset) - { - if (remainingAtEnd >= sizeof(PacketHeader)) - { - const auto p = PacketHeader{PACKET_ID_SKIP, 0}; - memcpy(m_dataPtr + currentWriteOffset, &p, sizeof(p)); - } + UINT32 newWriteOffset = (dataWriteOffset + data.size()) % cap; + m_controlBlock->Producer.WriteOffset.store(newWriteOffset, std::memory_order_release); - WritePacket(0, packetID, data.data(), static_cast(data.size())); - m_controlBlock->WriteOffset.store(requiredSpace, std::memory_order_release); - return true; - } - - return false; + return TRUE; } RingBufferView::ControlBlock *RingBufferView::GetControlBlock() @@ -134,10 +174,43 @@ namespace IACore return m_controlBlock; } - VOID RingBufferView::WritePacket(IN UINT32 offset, IN UINT16 id, IN PCVOID data, IN UINT16 dataSize) + VOID RingBufferView::WriteWrapped(IN UINT32 offset, IN PCVOID data, IN UINT32 size) { - PacketHeader h = {id, dataSize}; - memcpy(m_dataPtr + offset, &h, sizeof(PacketHeader)); - memcpy(m_dataPtr + offset + sizeof(PacketHeader), data, dataSize); + if (offset + size <= m_capacity) + { + // Contiguous write + memcpy(m_dataPtr + offset, data, size); + } + else + { + // Split write + UINT32 firstChunk = m_capacity - offset; + UINT32 secondChunk = size - firstChunk; + + const UINT8 *src = static_cast(data); + + memcpy(m_dataPtr + offset, src, firstChunk); + memcpy(m_dataPtr, src + firstChunk, secondChunk); + } + } + + VOID RingBufferView::ReadWrapped(IN UINT32 offset, OUT PVOID outData, IN UINT32 size) + { + if (offset + size <= m_capacity) + { + // Contiguous read + memcpy(outData, m_dataPtr + offset, size); + } + else + { + // Split read + UINT32 firstChunk = m_capacity - offset; + UINT32 secondChunk = size - firstChunk; + + UINT8 *dst = static_cast(outData); + + memcpy(dst, m_dataPtr + offset, firstChunk); + memcpy(dst + firstChunk, m_dataPtr, secondChunk); + } } } // namespace IACore \ No newline at end of file diff --git a/Src/IACore/inc/IACore/FileOps.hpp b/Src/IACore/inc/IACore/FileOps.hpp index 2e2b483..c867c80 100644 --- a/Src/IACore/inc/IACore/FileOps.hpp +++ b/Src/IACore/inc/IACore/FileOps.hpp @@ -31,7 +31,7 @@ namespace IACore STATIC Expected MapFile(IN CONST FilePath &path, OUT SIZE_T &size); // @param `isOwner` TRUE to allocate/truncate. FALSE to just open. - STATIC Expected MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner); + STATIC Expected MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner); STATIC VOID UnlinkSharedMemory(IN CONST String &name); STATIC Expected StreamFromFile(IN CONST FilePath &path); diff --git a/Src/IACore/inc/IACore/IPC.hpp b/Src/IACore/inc/IACore/IPC.hpp index cc3d3d4..20fc8d0 100644 --- a/Src/IACore/inc/IACore/IPC.hpp +++ b/Src/IACore/inc/IACore/IPC.hpp @@ -16,19 +16,130 @@ #pragma once -#include +#include +#include +#include namespace IACore { - class IPC_Node + struct alignas(64) IPC_SharedMemoryLayout { - public: - + // ========================================================= + // SECTION 1: METADATA & HANDSHAKE + // ========================================================= + struct Header + { + UINT32 Magic; // 0x49414950 ("IAIP") + UINT32 Version; // 1 + UINT64 TotalSize; // Total size of SHM block + } Meta; + + // Pad to ensure MONI starts on a fresh cache line (64 bytes) + UINT8 _pad0[64 - sizeof(Header)]; + + // ========================================================= + // SECTION 2: RING BUFFER CONTROL BLOCKS + // ========================================================= + + // RingBufferView::ControlBlock is already 64-byte aligned internally. + RingBufferView::ControlBlock MONI_Control; + RingBufferView::ControlBlock MINO_Control; + + // ========================================================= + // SECTION 3: DATA BUFFER OFFSETS + // ========================================================= + + UINT64 MONI_DataOffset; + UINT64 MONI_DataSize; + + UINT64 MINO_DataOffset; + UINT64 MINO_DataSize; + + // Pad to ensure the actual Data Buffer starts on a fresh cache line + UINT8 _pad1[64 - (sizeof(UINT64) * 4)]; + + static constexpr size_t GetHeaderSize() + { + return sizeof(IPC_SharedMemoryLayout); + } }; - class IPC_Server + // Static assert to ensure manual padding logic is correct + static_assert(sizeof(IPC_SharedMemoryLayout) % 64 == 0, "IPC Layout is not cache-line aligned!"); + + class IPC_Node { - public: - + public: + ~IPC_Node(); + + // When Manager spawns a node, `connectionString` is passed + // as the first command line argument + Expected Connect(IN PCCHAR connectionString); + + VOID Update(); + + VOID SendSignal(IN UINT8 signal); + VOID SendPacket(IN UINT16 packetID, IN Span payload); + + PURE_VIRTUAL(VOID OnSignal(IN UINT8 signal)); + PURE_VIRTUAL(VOID OnPacket(IN UINT16 packetID, IN Span payload)); + + private: + String m_shmName; + PUINT8 m_sharedMemory{}; + Vector m_recieveBuffer; + SocketHandle m_socket{INVALID_SOCKET}; + + UniquePtr MONI; // Manager Out, Node In + UniquePtr MINO; // Manager In, Node Out }; -} \ No newline at end of file + + class IPC_Manager + { + struct NodeSession + { + SteadyTimePoint CreationTime{}; + SharedPtr ProcessHandle; + + String SharedMemName; + PUINT8 MappedPtr{}; + + SocketHandle ListenerSocket{INVALID_SOCKET}; + SocketHandle DataSocket{INVALID_SOCKET}; + + UniquePtr MONI; // Manager Out, Node In + UniquePtr MINO; // Manager In, Node Out + + BOOL IsReady{FALSE}; + + VOID SendSignal(IN UINT8 signal); + VOID SendPacket(IN UINT16 packetID, IN Span payload); + }; + + public: + STATIC CONSTEXPR UINT32 DEFAULT_NODE_SHARED_MEMORY_SIZE = SIZE_MB(4); + + public: + IPC_Manager(); + ~IPC_Manager(); + + VOID Update(); + + Expected SpawnNode(IN CONST FilePath &executablePath, + IN UINT32 sharedMemorySize = DEFAULT_NODE_SHARED_MEMORY_SIZE); + + VOID ShutdownNode(IN NativeProcessID pid); + + VOID SendSignal(IN NativeProcessID node, IN UINT8 signal); + VOID SendPacket(IN NativeProcessID node, IN UINT16 packetID, IN Span payload); + + PURE_VIRTUAL(VOID OnSignal(IN NativeProcessID node, IN UINT8 signal)); + PURE_VIRTUAL(VOID OnPacket(IN NativeProcessID node, IN UINT16 packetID, IN Span payload)); + + private: + Vector m_recieveBuffer; + Vector> m_activeSessions; + Vector> m_pendingSessions; + UnorderedMap m_activeSessionMap; + }; +} // namespace IACore \ No newline at end of file diff --git a/Src/IACore/inc/IACore/PCH.hpp b/Src/IACore/inc/IACore/PCH.hpp index 3002961..86390ad 100644 --- a/Src/IACore/inc/IACore/PCH.hpp +++ b/Src/IACore/inc/IACore/PCH.hpp @@ -293,6 +293,10 @@ #define IA_CONCAT(x, y) IA_CONCAT_IMPL(x, y) #define IA_UNIQUE_NAME(prefix) IA_CONCAT(prefix, __LINE__) +#define SIZE_KB(v) (v * 1024) +#define SIZE_MB(v) (v * 1024 * 1024) +#define SIZE_GB(v) (v * 1024 * 1024 * 1024) + #define ENSURE_BINARY_COMPATIBILITY(A, B) \ static_assert(sizeof(A) == sizeof(B), \ #A ", " #B " size mismatch! Do not add virtual functions or new member variables."); @@ -560,8 +564,10 @@ using String = std::string; using StringView = std::string_view; using StringStream = std::stringstream; -using HRClock = std::chrono::high_resolution_clock; -using HRTimePoint = std::chrono::time_point; +using SteadyClock = std::chrono::steady_clock; +using SteadyTimePoint = std::chrono::time_point; +using HighResClock = std::chrono::high_resolution_clock; +using HighResTimePoint = std::chrono::time_point; using Mutex = std::mutex; using StopToken = std::stop_token; diff --git a/Src/IACore/inc/IACore/ProcessOps.hpp b/Src/IACore/inc/IACore/ProcessOps.hpp index 9d8104a..df6ee24 100644 --- a/Src/IACore/inc/IACore/ProcessOps.hpp +++ b/Src/IACore/inc/IACore/ProcessOps.hpp @@ -47,6 +47,8 @@ namespace IACore class ProcessOps { public: + STATIC NativeProcessID GetCurrentProcessID(); + STATIC Expected SpawnProcessSync(IN CONST String &command, IN CONST String &args, IN Function onOutputLineCallback); STATIC SharedPtr SpawnProcessAsync(IN CONST String &command, IN CONST String &args, @@ -57,8 +59,10 @@ namespace IACore private: STATIC Expected SpawnProcessWindows(IN CONST String &command, IN CONST String &args, - IN Function onOutputLineCallback, OUT Atomic& id); + IN Function onOutputLineCallback, + OUT Atomic &id); STATIC Expected SpawnProcessPosix(IN CONST String &command, IN CONST String &args, - IN Function onOutputLineCallback, OUT Atomic& id); + IN Function onOutputLineCallback, + OUT Atomic &id); }; } // namespace IACore \ No newline at end of file diff --git a/Src/IACore/inc/IACore/SocketOps.hpp b/Src/IACore/inc/IACore/SocketOps.hpp index 2fd76d6..794b500 100644 --- a/Src/IACore/inc/IACore/SocketOps.hpp +++ b/Src/IACore/inc/IACore/SocketOps.hpp @@ -52,16 +52,26 @@ namespace IACore class SocketOps { public: + // SocketOps correctly handles multiple calls to Initialize and Terminate. Make sure + // every Initialize call is paired with a corresponding Terminate call STATIC VOID Initialize() { + s_initCount++; + if (s_initCount > 1) + return; #if IA_PLATFORM_WINDOWS WSADATA wsaData; WSAStartup(MAKEWORD(2, 2), &wsaData); #endif } + // SocketOps correctly handles multiple calls to Initialize and Terminate. Make sure + // every Initialize call is paired with a corresponding Terminate call STATIC VOID Terminate() { + s_initCount--; + if (s_initCount > 0) + return; #if IA_PLATFORM_WINDOWS WSACleanup(); #endif @@ -77,6 +87,8 @@ namespace IACore return IsPortAvailable(port, SOCK_DGRAM); } + STATIC VOID Close(IN SocketHandle sock); + STATIC BOOL Listen(IN SocketHandle sock, IN INT32 queueSize = 5); STATIC SocketHandle CreateUnixSocket(); @@ -86,5 +98,8 @@ namespace IACore private: STATIC BOOL IsPortAvailable(IN UINT16 port, IN INT32 type); + + private: + STATIC INT32 s_initCount; }; } // namespace IACore \ No newline at end of file