IPC
This commit is contained in:
@ -36,12 +36,10 @@ namespace IACore
|
|||||||
const auto fd = (INT32) ((UINT64) std::get<0>(handles));
|
const auto fd = (INT32) ((UINT64) std::get<0>(handles));
|
||||||
if (fd != -1)
|
if (fd != -1)
|
||||||
::close(fd);
|
::close(fd);
|
||||||
#else
|
|
||||||
# error "IACore FileOps does not support this platform"
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
Expected<PCUINT8, String> FileOps::MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner)
|
Expected<PUINT8, String> FileOps::MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner)
|
||||||
{
|
{
|
||||||
#if IA_PLATFORM_WINDOWS
|
#if IA_PLATFORM_WINDOWS
|
||||||
int wchars_num = MultiByteToWideChar(CP_UTF8, 0, name.c_str(), -1, NULL, 0);
|
int wchars_num = MultiByteToWideChar(CP_UTF8, 0, name.c_str(), -1, NULL, 0);
|
||||||
@ -59,7 +57,7 @@ namespace IACore
|
|||||||
return MakeUnexpected(
|
return MakeUnexpected(
|
||||||
std::format("Failed to {} shared memory '{}'", isOwner ? "owner" : "consumer", name.c_str()));
|
std::format("Failed to {} shared memory '{}'", isOwner ? "owner" : "consumer", name.c_str()));
|
||||||
|
|
||||||
const auto result = static_cast<PCUINT8>(MapViewOfFile(hMap, FILE_MAP_ALL_ACCESS, 0, 0, size));
|
const auto result = static_cast<PUINT8>(MapViewOfFile(hMap, FILE_MAP_ALL_ACCESS, 0, 0, size));
|
||||||
if (result == NULL)
|
if (result == NULL)
|
||||||
{
|
{
|
||||||
CloseHandle(hMap);
|
CloseHandle(hMap);
|
||||||
@ -98,18 +96,18 @@ namespace IACore
|
|||||||
return MakeUnexpected(std::format("Failed to mmap shared memory '{}'", name.c_str()));
|
return MakeUnexpected(std::format("Failed to mmap shared memory '{}'", name.c_str()));
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto result = static_cast<PCUINT8>(addr);
|
const auto result = static_cast<PUINT8>(addr);
|
||||||
|
|
||||||
s_mappedFiles[result] = std::make_tuple((PVOID) ((UINT64) fd), (PVOID) addr, (PVOID) size);
|
s_mappedFiles[result] = std::make_tuple((PVOID) ((UINT64) fd), (PVOID) addr, (PVOID) size);
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
#else
|
|
||||||
# error "IACore FileOps does not support this platform"
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
VOID FileOps::UnlinkSharedMemory(IN CONST String &name)
|
VOID FileOps::UnlinkSharedMemory(IN CONST String &name)
|
||||||
{
|
{
|
||||||
|
if (name.empty())
|
||||||
|
return;
|
||||||
#if IA_PLATFORM_UNIX
|
#if IA_PLATFORM_UNIX
|
||||||
shm_unlink(name.c_str());
|
shm_unlink(name.c_str());
|
||||||
#endif
|
#endif
|
||||||
@ -182,9 +180,6 @@ namespace IACore
|
|||||||
madvise(addr, size, MADV_SEQUENTIAL);
|
madvise(addr, size, MADV_SEQUENTIAL);
|
||||||
s_mappedFiles[result] = std::make_tuple((PVOID) ((UINT64) handle), (PVOID) addr, (PVOID) size);
|
s_mappedFiles[result] = std::make_tuple((PVOID) ((UINT64) handle), (PVOID) addr, (PVOID) size);
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
#else
|
|
||||||
# error "IACore FileOps does not support this platform"
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -260,17 +255,23 @@ namespace IACore
|
|||||||
|
|
||||||
FilePath FileOps::NormalizeExecutablePath(IN CONST FilePath &path)
|
FilePath FileOps::NormalizeExecutablePath(IN CONST FilePath &path)
|
||||||
{
|
{
|
||||||
|
FilePath result = path;
|
||||||
|
|
||||||
#if IA_PLATFORM_WINDOWS
|
#if IA_PLATFORM_WINDOWS
|
||||||
return path;
|
if (!result.has_extension())
|
||||||
|
result.replace_extension(".exe");
|
||||||
|
|
||||||
#elif IA_PLATFORM_UNIX
|
#elif IA_PLATFORM_UNIX
|
||||||
String result(path);
|
if (result.extension() == ".exe")
|
||||||
if (path.extension() == ".exe")
|
result.replace_extension("");
|
||||||
result.resize(result.size() - 4);
|
|
||||||
if (!path.is_absolute() && !path.string().starts_with("./"))
|
if (result.is_relative())
|
||||||
result = "./" + result;
|
{
|
||||||
return FilePath(result);
|
String pathStr = result.string();
|
||||||
#else
|
if (!pathStr.starts_with("./") && !pathStr.starts_with("../"))
|
||||||
# error "IACore FileOps does not support this platform"
|
result = "./" + pathStr;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
} // namespace IACore
|
} // namespace IACore
|
||||||
@ -21,13 +21,13 @@
|
|||||||
|
|
||||||
namespace IACore
|
namespace IACore
|
||||||
{
|
{
|
||||||
HRTimePoint g_startTime{};
|
HighResTimePoint g_startTime{};
|
||||||
std::thread::id g_mainThreadID{};
|
std::thread::id g_mainThreadID{};
|
||||||
|
|
||||||
VOID Initialize()
|
VOID Initialize()
|
||||||
{
|
{
|
||||||
g_mainThreadID = std::this_thread::get_id();
|
g_mainThreadID = std::this_thread::get_id();
|
||||||
g_startTime = HRClock::now();
|
g_startTime = HighResClock::now();
|
||||||
Logger::Initialize();
|
Logger::Initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,12 +38,12 @@ namespace IACore
|
|||||||
|
|
||||||
UINT64 GetTicksCount()
|
UINT64 GetTicksCount()
|
||||||
{
|
{
|
||||||
return std::chrono::duration_cast<std::chrono::milliseconds>(HRClock::now() - g_startTime).count();
|
return std::chrono::duration_cast<std::chrono::milliseconds>(HighResClock::now() - g_startTime).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
FLOAT64 GetSecondsCount()
|
FLOAT64 GetSecondsCount()
|
||||||
{
|
{
|
||||||
return (HRClock::now() - g_startTime).count();
|
return (HighResClock::now() - g_startTime).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
UINT32 GetRandom(IN UINT32 seed)
|
UINT32 GetRandom(IN UINT32 seed)
|
||||||
|
|||||||
@ -16,7 +16,398 @@
|
|||||||
|
|
||||||
#include <IACore/IPC.hpp>
|
#include <IACore/IPC.hpp>
|
||||||
|
|
||||||
|
#include <IACore/FileOps.hpp>
|
||||||
|
#include <IACore/StringOps.hpp>
|
||||||
|
|
||||||
namespace IACore
|
namespace IACore
|
||||||
{
|
{
|
||||||
|
struct IPC_ConnectionDescriptor
|
||||||
}
|
{
|
||||||
|
String SocketPath;
|
||||||
|
String SharedMemPath;
|
||||||
|
UINT32 SharedMemSize;
|
||||||
|
|
||||||
|
String Serialize() CONST
|
||||||
|
{
|
||||||
|
return std::format("{}|{}|{}|", SocketPath, SharedMemPath, SharedMemSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
STATIC IPC_ConnectionDescriptor Deserialize(IN CONST String &data)
|
||||||
|
{
|
||||||
|
enum class EParseState
|
||||||
|
{
|
||||||
|
SocketPath,
|
||||||
|
SharedMemPath,
|
||||||
|
SharedMemSize
|
||||||
|
};
|
||||||
|
|
||||||
|
IPC_ConnectionDescriptor result{};
|
||||||
|
|
||||||
|
SIZE_T t{};
|
||||||
|
EParseState state{EParseState::SocketPath};
|
||||||
|
for (SIZE_T i = 0; i < data.size(); i++)
|
||||||
|
{
|
||||||
|
if (data[i] != '|')
|
||||||
|
continue;
|
||||||
|
|
||||||
|
switch (state)
|
||||||
|
{
|
||||||
|
case EParseState::SocketPath:
|
||||||
|
result.SocketPath = data.substr(t, i - t);
|
||||||
|
state = EParseState::SharedMemPath;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case EParseState::SharedMemPath:
|
||||||
|
result.SharedMemPath = data.substr(t, i - t);
|
||||||
|
state = EParseState::SharedMemSize;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case EParseState::SharedMemSize: {
|
||||||
|
if (std::from_chars(&data[t], &data[i], result.SharedMemSize).ec != std::errc{})
|
||||||
|
return {};
|
||||||
|
goto done_parsing;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t = i + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
done_parsing:
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace IACore
|
||||||
|
|
||||||
|
namespace IACore
|
||||||
|
{
|
||||||
|
IPC_Node::~IPC_Node()
|
||||||
|
{
|
||||||
|
SocketOps::Close(m_socket); // SocketOps gracefully handles INVALID_SOCKET
|
||||||
|
}
|
||||||
|
|
||||||
|
Expected<VOID, String> IPC_Node::Connect(IN PCCHAR connectionString)
|
||||||
|
{
|
||||||
|
auto desc = IPC_ConnectionDescriptor::Deserialize(connectionString);
|
||||||
|
m_shmName = desc.SharedMemPath;
|
||||||
|
|
||||||
|
m_socket = SocketOps::CreateUnixSocket();
|
||||||
|
if (!SocketOps::ConnectUnixSocket(m_socket, desc.SocketPath.c_str()))
|
||||||
|
return MakeUnexpected("Failed to create an unix socket");
|
||||||
|
|
||||||
|
auto mapRes = FileOps::MapSharedMemory(desc.SharedMemPath, desc.SharedMemSize, FALSE);
|
||||||
|
if (!mapRes.has_value())
|
||||||
|
return MakeUnexpected("Failed to map the shared memory");
|
||||||
|
|
||||||
|
m_sharedMemory = mapRes.value();
|
||||||
|
|
||||||
|
auto *layout = reinterpret_cast<IPC_SharedMemoryLayout *>(m_sharedMemory);
|
||||||
|
|
||||||
|
if (layout->Meta.Magic != 0x49414950) // "IAIP"
|
||||||
|
return MakeUnexpected("Invalid shared memory header signature");
|
||||||
|
|
||||||
|
if (layout->Meta.Version != 1)
|
||||||
|
return MakeUnexpected("IPC version mismatch");
|
||||||
|
|
||||||
|
PUINT8 moniDataPtr = m_sharedMemory + layout->MONI_DataOffset;
|
||||||
|
PUINT8 minoDataPtr = m_sharedMemory + layout->MINO_DataOffset;
|
||||||
|
|
||||||
|
MONI = std::make_unique<RingBufferView>(
|
||||||
|
&layout->MONI_Control, Span<UINT8>(moniDataPtr, static_cast<size_t>(layout->MONI_DataSize)), FALSE);
|
||||||
|
|
||||||
|
MINO = std::make_unique<RingBufferView>(
|
||||||
|
&layout->MINO_Control, Span<UINT8>(minoDataPtr, static_cast<size_t>(layout->MINO_DataSize)), FALSE);
|
||||||
|
|
||||||
|
#if IA_PLATFORM_WINDOWS
|
||||||
|
u_long mode = 1;
|
||||||
|
ioctlsocket(m_socket, FIONBIO, &mode);
|
||||||
|
#else
|
||||||
|
fcntl(m_socket, F_SETFL, O_NONBLOCK);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
m_recieveBuffer.resize(UINT16_MAX + 1);
|
||||||
|
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Node::Update()
|
||||||
|
{
|
||||||
|
if (!MONI)
|
||||||
|
return;
|
||||||
|
|
||||||
|
RingBufferView::PacketHeader header;
|
||||||
|
|
||||||
|
// Process all available messages from Manager
|
||||||
|
while (MONI->Pop(header, Span<UINT8>(m_recieveBuffer.data(), m_recieveBuffer.size())))
|
||||||
|
OnPacket(header.ID, {m_recieveBuffer.data(), header.PayloadSize});
|
||||||
|
|
||||||
|
UINT8 signal;
|
||||||
|
const auto res = recv(m_socket, &signal, 1, 0);
|
||||||
|
if (res == 1)
|
||||||
|
OnSignal(signal);
|
||||||
|
else if (res == 0 || (res < 0 && errno != EWOULDBLOCK))
|
||||||
|
{
|
||||||
|
SocketOps::Close(m_socket);
|
||||||
|
FileOps::UnlinkSharedMemory(m_shmName);
|
||||||
|
|
||||||
|
// Manager disconnected, exit immediately
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Node::SendSignal(IN UINT8 signal)
|
||||||
|
{
|
||||||
|
if (IS_VALID_SOCKET(m_socket))
|
||||||
|
send(m_socket, (const char *) &signal, sizeof(signal), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Node::SendPacket(IN UINT16 packetID, IN Span<CONST UINT8> payload)
|
||||||
|
{
|
||||||
|
MINO->Push(packetID, payload);
|
||||||
|
}
|
||||||
|
} // namespace IACore
|
||||||
|
|
||||||
|
namespace IACore
|
||||||
|
{
|
||||||
|
VOID IPC_Manager::NodeSession::SendSignal(IN UINT8 signal)
|
||||||
|
{
|
||||||
|
if (IS_VALID_SOCKET(DataSocket))
|
||||||
|
send(DataSocket, (const char *) &signal, sizeof(signal), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Manager::NodeSession::SendPacket(IN UINT16 packetID, IN Span<CONST UINT8> payload)
|
||||||
|
{
|
||||||
|
MONI->Push(packetID, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
IPC_Manager::IPC_Manager()
|
||||||
|
{
|
||||||
|
// SocketOps is smart enough to handle multiple inits
|
||||||
|
SocketOps::Initialize();
|
||||||
|
|
||||||
|
m_recieveBuffer.resize(UINT16_MAX + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
IPC_Manager::~IPC_Manager()
|
||||||
|
{
|
||||||
|
for (auto &session : m_activeSessions)
|
||||||
|
{
|
||||||
|
ProcessOps::TerminateProcess(session->ProcessHandle);
|
||||||
|
FileOps::UnmapFile(session->MappedPtr);
|
||||||
|
FileOps::UnlinkSharedMemory(session->SharedMemName);
|
||||||
|
SocketOps::Close(session->DataSocket);
|
||||||
|
}
|
||||||
|
m_activeSessions.clear();
|
||||||
|
|
||||||
|
for (auto &session : m_pendingSessions)
|
||||||
|
{
|
||||||
|
ProcessOps::TerminateProcess(session->ProcessHandle);
|
||||||
|
FileOps::UnmapFile(session->MappedPtr);
|
||||||
|
FileOps::UnlinkSharedMemory(session->SharedMemName);
|
||||||
|
SocketOps::Close(session->ListenerSocket);
|
||||||
|
}
|
||||||
|
m_pendingSessions.clear();
|
||||||
|
|
||||||
|
// SocketOps is smart enough to handle multiple terminates
|
||||||
|
SocketOps::Terminate();
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Manager::Update()
|
||||||
|
{
|
||||||
|
const auto now = SteadyClock::now();
|
||||||
|
|
||||||
|
for (INT32 i = m_pendingSessions.size() - 1; i >= 0; i--)
|
||||||
|
{
|
||||||
|
auto &session = m_pendingSessions[i];
|
||||||
|
|
||||||
|
if (now - session->CreationTime > std::chrono::seconds(5))
|
||||||
|
{
|
||||||
|
ProcessOps::TerminateProcess(session->ProcessHandle);
|
||||||
|
|
||||||
|
FileOps::UnmapFile(session->MappedPtr);
|
||||||
|
FileOps::UnlinkSharedMemory(session->SharedMemName);
|
||||||
|
SocketOps::Close(session->DataSocket);
|
||||||
|
|
||||||
|
m_pendingSessions.erase(m_pendingSessions.begin() + i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SocketHandle newSock = accept(session->ListenerSocket, NULL, NULL);
|
||||||
|
|
||||||
|
if (IS_VALID_SOCKET(newSock))
|
||||||
|
{
|
||||||
|
session->DataSocket = newSock;
|
||||||
|
session->IsReady = TRUE;
|
||||||
|
|
||||||
|
// Set Data Socket to Non-Blocking
|
||||||
|
#if IA_PLATFORM_WINDOWS
|
||||||
|
u_long mode = 1;
|
||||||
|
ioctlsocket(session->DataSocket, FIONBIO, &mode);
|
||||||
|
#else
|
||||||
|
fcntl(session->DataSocket, F_SETFL, O_NONBLOCK);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
SocketOps::Close(session->ListenerSocket);
|
||||||
|
session->ListenerSocket = INVALID_SOCKET;
|
||||||
|
|
||||||
|
m_activeSessions.push_back(std::move(session));
|
||||||
|
m_pendingSessions.erase(m_pendingSessions.begin() + i);
|
||||||
|
m_activeSessionMap[session->ProcessHandle->ID.load()] = session.get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (INT32 i = m_activeSessions.size() - 1; i >= 0; i--)
|
||||||
|
{
|
||||||
|
auto &node = m_activeSessions[i];
|
||||||
|
|
||||||
|
auto nodeID = node->ProcessHandle->ID.load();
|
||||||
|
|
||||||
|
RingBufferView::PacketHeader header;
|
||||||
|
|
||||||
|
while (node->MINO->Pop(header, Span<UINT8>(m_recieveBuffer.data(), m_recieveBuffer.size())))
|
||||||
|
OnPacket(nodeID, header.ID, {m_recieveBuffer.data(), header.PayloadSize});
|
||||||
|
|
||||||
|
UINT8 signal;
|
||||||
|
const auto res = recv(node->DataSocket, &signal, 1, 0);
|
||||||
|
|
||||||
|
if (res == 1)
|
||||||
|
OnSignal(nodeID, signal);
|
||||||
|
|
||||||
|
if (res == 0 || (res < 0 && errno != EWOULDBLOCK))
|
||||||
|
{
|
||||||
|
ProcessOps::TerminateProcess(node->ProcessHandle);
|
||||||
|
|
||||||
|
FileOps::UnmapFile(node->MappedPtr);
|
||||||
|
FileOps::UnlinkSharedMemory(node->SharedMemName);
|
||||||
|
SocketOps::Close(node->DataSocket);
|
||||||
|
|
||||||
|
m_activeSessions.erase(m_activeSessions.begin() + i);
|
||||||
|
m_activeSessionMap.erase(node->ProcessHandle->ID.load());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Expected<NativeProcessID, String> IPC_Manager::SpawnNode(IN CONST FilePath &executablePath,
|
||||||
|
IN UINT32 sharedMemorySize)
|
||||||
|
{
|
||||||
|
auto session = std::make_unique<NodeSession>();
|
||||||
|
|
||||||
|
static Atomic<UINT32> s_idGen{0};
|
||||||
|
UINT32 sid = ++s_idGen;
|
||||||
|
|
||||||
|
#if IA_PLATFORM_WINDOWS
|
||||||
|
char tempPath[MAX_PATH];
|
||||||
|
GetTempPathA(MAX_PATH, tempPath);
|
||||||
|
String sockPath = std::format("{}\\ia_sess_{}.sock", tempPath, sid);
|
||||||
|
#else
|
||||||
|
String sockPath = std::format("/tmp/ia_sess_{}.sock", sid);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
session->ListenerSocket = SocketOps::CreateUnixSocket();
|
||||||
|
if (!SocketOps::BindUnixSocket(session->ListenerSocket, sockPath.c_str()))
|
||||||
|
return MakeUnexpected("Failed to bind unique socket");
|
||||||
|
|
||||||
|
if (listen(session->ListenerSocket, 1) != 0)
|
||||||
|
return MakeUnexpected("Failed to listen on unqiue socket");
|
||||||
|
|
||||||
|
#if IA_PLATFORM_WINDOWS
|
||||||
|
u_long mode = 1;
|
||||||
|
ioctlsocket(session->ListenerSocket, FIONBIO, &mode);
|
||||||
|
#else
|
||||||
|
fcntl(session->ListenerSocket, F_SETFL, O_NONBLOCK);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
String shmName = std::format("ia_shm_{}", sid);
|
||||||
|
auto mapRes = FileOps::MapSharedMemory(shmName, sharedMemorySize, TRUE);
|
||||||
|
if (!mapRes.has_value())
|
||||||
|
return MakeUnexpected("Failed to map shared memory");
|
||||||
|
|
||||||
|
PUINT8 mappedPtr = mapRes.value();
|
||||||
|
|
||||||
|
auto *layout = reinterpret_cast<IPC_SharedMemoryLayout *>(mappedPtr);
|
||||||
|
|
||||||
|
layout->Meta.Magic = 0x49414950;
|
||||||
|
layout->Meta.Version = 1;
|
||||||
|
layout->Meta.TotalSize = sharedMemorySize;
|
||||||
|
|
||||||
|
UINT64 headerSize = IPC_SharedMemoryLayout::GetHeaderSize();
|
||||||
|
UINT64 usableBytes = sharedMemorySize - headerSize;
|
||||||
|
|
||||||
|
UINT64 halfSize = (usableBytes / 2);
|
||||||
|
halfSize -= (halfSize % 64);
|
||||||
|
|
||||||
|
layout->MONI_DataOffset = headerSize;
|
||||||
|
layout->MONI_DataSize = halfSize;
|
||||||
|
|
||||||
|
layout->MINO_DataOffset = headerSize + halfSize;
|
||||||
|
layout->MINO_DataSize = halfSize;
|
||||||
|
|
||||||
|
session->MONI = std::make_unique<RingBufferView>(
|
||||||
|
&layout->MONI_Control, Span<UINT8>(mappedPtr + layout->MONI_DataOffset, layout->MONI_DataSize), TRUE);
|
||||||
|
|
||||||
|
session->MINO = std::make_unique<RingBufferView>(
|
||||||
|
&layout->MINO_Control, Span<UINT8>(mappedPtr + layout->MINO_DataOffset, layout->MINO_DataSize), TRUE);
|
||||||
|
|
||||||
|
IPC_ConnectionDescriptor desc;
|
||||||
|
desc.SocketPath = sockPath;
|
||||||
|
desc.SharedMemPath = shmName;
|
||||||
|
desc.SharedMemSize = sharedMemorySize;
|
||||||
|
|
||||||
|
String args = std::format("\"{}\"", desc.Serialize());
|
||||||
|
|
||||||
|
session->ProcessHandle = ProcessOps::SpawnProcessAsync(
|
||||||
|
FileOps::NormalizeExecutablePath(executablePath), args,
|
||||||
|
[sid](IN StringView line) {
|
||||||
|
UNUSED(sid);
|
||||||
|
UNUSED(line);
|
||||||
|
},
|
||||||
|
[sid](IN Expected<INT32, String> result) {
|
||||||
|
UNUSED(sid);
|
||||||
|
UNUSED(result);
|
||||||
|
});
|
||||||
|
|
||||||
|
session->CreationTime = SteadyClock::now();
|
||||||
|
m_pendingSessions.push_back(std::move(session));
|
||||||
|
|
||||||
|
return session->ProcessHandle->ID.load();
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Manager::ShutdownNode(IN NativeProcessID pid)
|
||||||
|
{
|
||||||
|
const auto itNode = m_activeSessionMap.find(pid);
|
||||||
|
if (itNode == m_activeSessionMap.end())
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto &node = itNode->second;
|
||||||
|
|
||||||
|
ProcessOps::TerminateProcess(node->ProcessHandle);
|
||||||
|
FileOps::UnmapFile(node->MappedPtr);
|
||||||
|
FileOps::UnlinkSharedMemory(node->SharedMemName);
|
||||||
|
SocketOps::Close(node->DataSocket);
|
||||||
|
|
||||||
|
for (auto it = m_activeSessions.begin(); it != m_activeSessions.end(); it++)
|
||||||
|
{
|
||||||
|
if (it->get() == node)
|
||||||
|
{
|
||||||
|
m_activeSessions.erase(it);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m_activeSessionMap.erase(itNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Manager::SendSignal(IN NativeProcessID node, IN UINT8 signal)
|
||||||
|
{
|
||||||
|
const auto itNode = m_activeSessionMap.find(node);
|
||||||
|
if (itNode == m_activeSessionMap.end())
|
||||||
|
return;
|
||||||
|
itNode->second->SendSignal(signal);
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID IPC_Manager::SendPacket(IN NativeProcessID node, IN UINT16 packetID, IN Span<CONST UINT8> payload)
|
||||||
|
{
|
||||||
|
const auto itNode = m_activeSessionMap.find(node);
|
||||||
|
if (itNode == m_activeSessionMap.end())
|
||||||
|
return;
|
||||||
|
itNode->second->SendPacket(packetID, payload);
|
||||||
|
}
|
||||||
|
} // namespace IACore
|
||||||
@ -34,6 +34,15 @@ namespace IACore
|
|||||||
|
|
||||||
namespace IACore
|
namespace IACore
|
||||||
{
|
{
|
||||||
|
NativeProcessID ProcessOps::GetCurrentProcessID()
|
||||||
|
{
|
||||||
|
#if IA_PLATFORM_WINDOWS
|
||||||
|
return ::GetCurrentProcessId();
|
||||||
|
#else
|
||||||
|
return getpid();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
Expected<INT32, String> ProcessOps::SpawnProcessSync(IN CONST String &command, IN CONST String &args,
|
Expected<INT32, String> ProcessOps::SpawnProcessSync(IN CONST String &command, IN CONST String &args,
|
||||||
IN Function<VOID(IN StringView line)> onOutputLineCallback)
|
IN Function<VOID(IN StringView line)> onOutputLineCallback)
|
||||||
{
|
{
|
||||||
@ -52,24 +61,25 @@ namespace IACore
|
|||||||
SharedPtr<ProcessHandle> handle = std::make_shared<ProcessHandle>();
|
SharedPtr<ProcessHandle> handle = std::make_shared<ProcessHandle>();
|
||||||
handle->IsRunning = true;
|
handle->IsRunning = true;
|
||||||
|
|
||||||
handle->ThreadHandle = JoiningThread([=, h = handle.get(), cmd = IA_MOVE(command), args = std::move(args)]() mutable {
|
handle->ThreadHandle =
|
||||||
|
JoiningThread([=, h = handle.get(), cmd = IA_MOVE(command), args = std::move(args)]() mutable {
|
||||||
|
|
||||||
#if IA_PLATFORM_WINDOWS
|
#if IA_PLATFORM_WINDOWS
|
||||||
auto result = SpawnProcessWindows(cmd, args, onOutputLineCallback, h->ID);
|
auto result = SpawnProcessWindows(cmd, args, onOutputLineCallback, h->ID);
|
||||||
#else
|
#else
|
||||||
auto result = SpawnProcessPosix(cmd, args, onOutputLineCallback, h->ID);
|
auto result = SpawnProcessPosix(cmd, args, onOutputLineCallback, h->ID);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
h->IsRunning = false;
|
h->IsRunning = false;
|
||||||
|
|
||||||
if (!onFinishCallback)
|
if (!onFinishCallback)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (!result)
|
if (!result)
|
||||||
onFinishCallback(MakeUnexpected(result.error()));
|
onFinishCallback(MakeUnexpected(result.error()));
|
||||||
else
|
else
|
||||||
onFinishCallback(*result);
|
onFinishCallback(*result);
|
||||||
});
|
});
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -18,6 +18,15 @@
|
|||||||
|
|
||||||
namespace IACore
|
namespace IACore
|
||||||
{
|
{
|
||||||
|
INT32 SocketOps::s_initCount{0};
|
||||||
|
|
||||||
|
VOID SocketOps::Close(IN SocketHandle sock)
|
||||||
|
{
|
||||||
|
if (sock == INVALID_SOCKET)
|
||||||
|
return;
|
||||||
|
CLOSE_SOCKET(sock);
|
||||||
|
}
|
||||||
|
|
||||||
BOOL SocketOps::Listen(IN SocketHandle sock, IN INT32 queueSize)
|
BOOL SocketOps::Listen(IN SocketHandle sock, IN INT32 queueSize)
|
||||||
{
|
{
|
||||||
return listen(sock, queueSize) == 0;
|
return listen(sock, queueSize) == 0;
|
||||||
|
|||||||
@ -27,14 +27,22 @@ namespace IACore
|
|||||||
|
|
||||||
struct ControlBlock
|
struct ControlBlock
|
||||||
{
|
{
|
||||||
alignas(64) Atomic<UINT32> WriteOffset{0};
|
struct alignas(64)
|
||||||
alignas(64) Atomic<UINT32> ReadOffset{0};
|
{
|
||||||
UINT32 Capacity;
|
Atomic<UINT32> WriteOffset{0};
|
||||||
|
} Producer;
|
||||||
|
|
||||||
// Padding to ensure data starts on a cache line
|
struct alignas(64)
|
||||||
UINT8 _padding[64 - sizeof(UINT32) * 3];
|
{
|
||||||
|
Atomic<UINT32> ReadOffset{0};
|
||||||
|
// Capacity is effectively constant after init,
|
||||||
|
// so it doesn't cause false sharing invalidations.
|
||||||
|
UINT32 Capacity{0};
|
||||||
|
} Consumer;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static_assert(offsetof(ControlBlock, Consumer) == 64, "False sharing detected in ControlBlock");
|
||||||
|
|
||||||
// All of the data in ring buffer will be stored as packets
|
// All of the data in ring buffer will be stored as packets
|
||||||
struct PacketHeader
|
struct PacketHeader
|
||||||
{
|
{
|
||||||
@ -43,9 +51,10 @@ namespace IACore
|
|||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
INLINE RingBufferView(IN Span<UINT8> buffer);
|
INLINE RingBufferView(IN Span<UINT8> buffer, IN BOOL isOwner);
|
||||||
|
INLINE RingBufferView(IN ControlBlock *controlBlock, IN Span<UINT8> buffer, IN BOOL isOwner);
|
||||||
|
|
||||||
INLINE BOOL Pop(OUT PacketHeader &outHeader, OUT Span<UINT8> outBuffer);
|
INLINE INT32 Pop(OUT PacketHeader &outHeader, OUT Span<UINT8> outBuffer);
|
||||||
INLINE BOOL Push(IN UINT16 packetID, IN Span<CONST UINT8> data);
|
INLINE BOOL Push(IN UINT16 packetID, IN Span<CONST UINT8> data);
|
||||||
|
|
||||||
INLINE ControlBlock *GetControlBlock();
|
INLINE ControlBlock *GetControlBlock();
|
||||||
@ -56,77 +65,108 @@ namespace IACore
|
|||||||
ControlBlock *m_controlBlock{};
|
ControlBlock *m_controlBlock{};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
INLINE VOID WritePacket(IN UINT32 offset, IN UINT16 id, IN PCVOID data, IN UINT16 dataSize);
|
INLINE VOID WriteWrapped(IN UINT32 offset, IN PCVOID data, IN UINT32 size);
|
||||||
|
INLINE VOID ReadWrapped(IN UINT32 offset, OUT PVOID outData, IN UINT32 size);
|
||||||
};
|
};
|
||||||
} // namespace IACore
|
} // namespace IACore
|
||||||
|
|
||||||
namespace IACore
|
namespace IACore
|
||||||
{
|
{
|
||||||
RingBufferView::RingBufferView(IN Span<UINT8> buffer)
|
RingBufferView::RingBufferView(IN Span<UINT8> buffer, IN BOOL isOwner)
|
||||||
{
|
{
|
||||||
IA_ASSERT(buffer.size() > sizeof(ControlBlock));
|
IA_ASSERT(buffer.size() > sizeof(ControlBlock));
|
||||||
|
|
||||||
m_controlBlock = reinterpret_cast<ControlBlock *>(buffer.data());
|
m_controlBlock = reinterpret_cast<ControlBlock *>(buffer.data());
|
||||||
m_dataPtr = buffer.data() + sizeof(ControlBlock);
|
m_dataPtr = buffer.data() + sizeof(ControlBlock);
|
||||||
m_controlBlock->Capacity = m_capacity = buffer.size() - sizeof(ControlBlock);
|
|
||||||
|
m_capacity = static_cast<UINT32>(buffer.size()) - sizeof(ControlBlock);
|
||||||
|
|
||||||
|
if (isOwner)
|
||||||
|
{
|
||||||
|
m_controlBlock->Consumer.Capacity = m_capacity;
|
||||||
|
m_controlBlock->Producer.WriteOffset.store(0, std::memory_order_release);
|
||||||
|
m_controlBlock->Consumer.ReadOffset.store(0, std::memory_order_release);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
IA_ASSERT(m_controlBlock->Consumer.Capacity == m_capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
BOOL RingBufferView::Pop(OUT PacketHeader &outHeader, OUT Span<UINT8> outBuffer)
|
RingBufferView::RingBufferView(IN ControlBlock *controlBlock, IN Span<UINT8> buffer, IN BOOL isOwner)
|
||||||
{
|
{
|
||||||
UINT32 currentWriteOffset = m_controlBlock->WriteOffset.load(std::memory_order_acquire);
|
IA_ASSERT(controlBlock != nullptr);
|
||||||
UINT32 currentReadOffset = m_controlBlock->ReadOffset.load(std::memory_order_acquire);
|
IA_ASSERT(buffer.size() > 0);
|
||||||
|
|
||||||
if (currentReadOffset == currentWriteOffset)
|
m_controlBlock = controlBlock;
|
||||||
return false;
|
m_dataPtr = buffer.data();
|
||||||
|
m_capacity = static_cast<UINT32>(buffer.size());
|
||||||
|
|
||||||
const auto header = reinterpret_cast<PacketHeader *>(m_dataPtr + currentReadOffset);
|
if (isOwner)
|
||||||
|
|
||||||
if (header->ID == PACKET_ID_SKIP)
|
|
||||||
{
|
{
|
||||||
m_controlBlock->ReadOffset.store(0, std::memory_order_release);
|
m_controlBlock->Consumer.Capacity = m_capacity;
|
||||||
return Pop(outHeader, outBuffer);
|
m_controlBlock->Producer.WriteOffset.store(0, std::memory_order_release);
|
||||||
|
m_controlBlock->Consumer.ReadOffset.store(0, std::memory_order_release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
INT32 RingBufferView::Pop(OUT PacketHeader &outHeader, OUT Span<UINT8> outBuffer)
|
||||||
|
{
|
||||||
|
UINT32 write = m_controlBlock->Producer.WriteOffset.load(std::memory_order_acquire);
|
||||||
|
UINT32 read = m_controlBlock->Consumer.ReadOffset.load(std::memory_order_relaxed);
|
||||||
|
UINT32 cap = m_capacity;
|
||||||
|
|
||||||
|
if (read == write)
|
||||||
|
return 0; // Empty
|
||||||
|
|
||||||
|
ReadWrapped(read, &outHeader, sizeof(PacketHeader));
|
||||||
|
|
||||||
|
if (outHeader.PayloadSize > outBuffer.size())
|
||||||
|
return -static_cast<INT32>(outHeader.PayloadSize);
|
||||||
|
|
||||||
|
if (outHeader.PayloadSize > 0)
|
||||||
|
{
|
||||||
|
UINT32 dataReadOffset = (read + sizeof(PacketHeader)) % cap;
|
||||||
|
ReadWrapped(dataReadOffset, outBuffer.data(), outHeader.PayloadSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
outHeader = *header;
|
// Move read pointer forward
|
||||||
if (outHeader.PayloadSize > outBuffer.size())
|
UINT32 newReadOffset = (read + sizeof(PacketHeader) + outHeader.PayloadSize) % cap;
|
||||||
return false;
|
m_controlBlock->Consumer.ReadOffset.store(newReadOffset, std::memory_order_release);
|
||||||
|
|
||||||
memcpy(outBuffer.data(), m_dataPtr + currentReadOffset + sizeof(PacketHeader), outHeader.PayloadSize);
|
return outHeader.PayloadSize;
|
||||||
m_controlBlock->ReadOffset.store(currentReadOffset + sizeof(PacketHeader) + outHeader.PayloadSize, std::memory_order_release);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
BOOL RingBufferView::Push(IN UINT16 packetID, IN Span<CONST UINT8> data)
|
BOOL RingBufferView::Push(IN UINT16 packetID, IN Span<CONST UINT8> data)
|
||||||
{
|
{
|
||||||
IA_ASSERT(data.size() <= UINT16_MAX);
|
IA_ASSERT(data.size() <= UINT16_MAX);
|
||||||
|
|
||||||
UINT32 currentWriteOffset = m_controlBlock->WriteOffset.load(std::memory_order_acquire);
|
const UINT32 totalSize = sizeof(PacketHeader) + static_cast<UINT32>(data.size());
|
||||||
UINT32 currentReadOffset = m_controlBlock->ReadOffset.load(std::memory_order_acquire);
|
|
||||||
|
|
||||||
UINT32 requiredSpace = sizeof(PacketHeader) + data.size();
|
UINT32 read = m_controlBlock->Consumer.ReadOffset.load(std::memory_order_acquire);
|
||||||
if (currentWriteOffset + requiredSpace <= m_capacity)
|
UINT32 write = m_controlBlock->Producer.WriteOffset.load(std::memory_order_relaxed);
|
||||||
|
UINT32 cap = m_capacity;
|
||||||
|
|
||||||
|
UINT32 freeSpace = (read <= write)
|
||||||
|
? (m_capacity - write) + read
|
||||||
|
: (read - write);
|
||||||
|
|
||||||
|
// Ensure to always leave 1 byte empty to prevent Read == Write ambiguity (Wait-Free Ring Buffer standard)
|
||||||
|
if (freeSpace <= totalSize)
|
||||||
|
return FALSE;
|
||||||
|
|
||||||
|
PacketHeader header{packetID, static_cast<UINT16>(data.size())};
|
||||||
|
WriteWrapped(write, &header, sizeof(PacketHeader));
|
||||||
|
|
||||||
|
UINT32 dataWriteOffset = (write + sizeof(PacketHeader)) % cap;
|
||||||
|
|
||||||
|
if (data.size() > 0)
|
||||||
{
|
{
|
||||||
WritePacket(currentWriteOffset, packetID, data.data(), static_cast<UINT16>(data.size()));
|
WriteWrapped(dataWriteOffset, data.data(), static_cast<UINT32>(data.size()));
|
||||||
m_controlBlock->WriteOffset.store(currentWriteOffset + requiredSpace, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UINT32 remainingAtEnd = m_capacity - currentWriteOffset;
|
UINT32 newWriteOffset = (dataWriteOffset + data.size()) % cap;
|
||||||
if (requiredSpace < currentReadOffset)
|
m_controlBlock->Producer.WriteOffset.store(newWriteOffset, std::memory_order_release);
|
||||||
{
|
|
||||||
if (remainingAtEnd >= sizeof(PacketHeader))
|
|
||||||
{
|
|
||||||
const auto p = PacketHeader{PACKET_ID_SKIP, 0};
|
|
||||||
memcpy(m_dataPtr + currentWriteOffset, &p, sizeof(p));
|
|
||||||
}
|
|
||||||
|
|
||||||
WritePacket(0, packetID, data.data(), static_cast<UINT16>(data.size()));
|
return TRUE;
|
||||||
m_controlBlock->WriteOffset.store(requiredSpace, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RingBufferView::ControlBlock *RingBufferView::GetControlBlock()
|
RingBufferView::ControlBlock *RingBufferView::GetControlBlock()
|
||||||
@ -134,10 +174,43 @@ namespace IACore
|
|||||||
return m_controlBlock;
|
return m_controlBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
VOID RingBufferView::WritePacket(IN UINT32 offset, IN UINT16 id, IN PCVOID data, IN UINT16 dataSize)
|
VOID RingBufferView::WriteWrapped(IN UINT32 offset, IN PCVOID data, IN UINT32 size)
|
||||||
{
|
{
|
||||||
PacketHeader h = {id, dataSize};
|
if (offset + size <= m_capacity)
|
||||||
memcpy(m_dataPtr + offset, &h, sizeof(PacketHeader));
|
{
|
||||||
memcpy(m_dataPtr + offset + sizeof(PacketHeader), data, dataSize);
|
// Contiguous write
|
||||||
|
memcpy(m_dataPtr + offset, data, size);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Split write
|
||||||
|
UINT32 firstChunk = m_capacity - offset;
|
||||||
|
UINT32 secondChunk = size - firstChunk;
|
||||||
|
|
||||||
|
const UINT8 *src = static_cast<const UINT8 *>(data);
|
||||||
|
|
||||||
|
memcpy(m_dataPtr + offset, src, firstChunk);
|
||||||
|
memcpy(m_dataPtr, src + firstChunk, secondChunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
VOID RingBufferView::ReadWrapped(IN UINT32 offset, OUT PVOID outData, IN UINT32 size)
|
||||||
|
{
|
||||||
|
if (offset + size <= m_capacity)
|
||||||
|
{
|
||||||
|
// Contiguous read
|
||||||
|
memcpy(outData, m_dataPtr + offset, size);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Split read
|
||||||
|
UINT32 firstChunk = m_capacity - offset;
|
||||||
|
UINT32 secondChunk = size - firstChunk;
|
||||||
|
|
||||||
|
UINT8 *dst = static_cast<UINT8 *>(outData);
|
||||||
|
|
||||||
|
memcpy(dst, m_dataPtr + offset, firstChunk);
|
||||||
|
memcpy(dst + firstChunk, m_dataPtr, secondChunk);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} // namespace IACore
|
} // namespace IACore
|
||||||
@ -31,7 +31,7 @@ namespace IACore
|
|||||||
STATIC Expected<PCUINT8, String> MapFile(IN CONST FilePath &path, OUT SIZE_T &size);
|
STATIC Expected<PCUINT8, String> MapFile(IN CONST FilePath &path, OUT SIZE_T &size);
|
||||||
|
|
||||||
// @param `isOwner` TRUE to allocate/truncate. FALSE to just open.
|
// @param `isOwner` TRUE to allocate/truncate. FALSE to just open.
|
||||||
STATIC Expected<PCUINT8, String> MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner);
|
STATIC Expected<PUINT8, String> MapSharedMemory(IN CONST String &name, IN SIZE_T size, IN BOOL isOwner);
|
||||||
STATIC VOID UnlinkSharedMemory(IN CONST String &name);
|
STATIC VOID UnlinkSharedMemory(IN CONST String &name);
|
||||||
|
|
||||||
STATIC Expected<StreamReader, String> StreamFromFile(IN CONST FilePath &path);
|
STATIC Expected<StreamReader, String> StreamFromFile(IN CONST FilePath &path);
|
||||||
|
|||||||
@ -16,19 +16,130 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <IACore/PCH.hpp>
|
#include <IACore/ADT/RingBuffer.hpp>
|
||||||
|
#include <IACore/ProcessOps.hpp>
|
||||||
|
#include <IACore/SocketOps.hpp>
|
||||||
|
|
||||||
namespace IACore
|
namespace IACore
|
||||||
{
|
{
|
||||||
class IPC_Node
|
struct alignas(64) IPC_SharedMemoryLayout
|
||||||
{
|
{
|
||||||
public:
|
// =========================================================
|
||||||
|
// SECTION 1: METADATA & HANDSHAKE
|
||||||
|
// =========================================================
|
||||||
|
struct Header
|
||||||
|
{
|
||||||
|
UINT32 Magic; // 0x49414950 ("IAIP")
|
||||||
|
UINT32 Version; // 1
|
||||||
|
UINT64 TotalSize; // Total size of SHM block
|
||||||
|
} Meta;
|
||||||
|
|
||||||
|
// Pad to ensure MONI starts on a fresh cache line (64 bytes)
|
||||||
|
UINT8 _pad0[64 - sizeof(Header)];
|
||||||
|
|
||||||
|
// =========================================================
|
||||||
|
// SECTION 2: RING BUFFER CONTROL BLOCKS
|
||||||
|
// =========================================================
|
||||||
|
|
||||||
|
// RingBufferView::ControlBlock is already 64-byte aligned internally.
|
||||||
|
RingBufferView::ControlBlock MONI_Control;
|
||||||
|
RingBufferView::ControlBlock MINO_Control;
|
||||||
|
|
||||||
|
// =========================================================
|
||||||
|
// SECTION 3: DATA BUFFER OFFSETS
|
||||||
|
// =========================================================
|
||||||
|
|
||||||
|
UINT64 MONI_DataOffset;
|
||||||
|
UINT64 MONI_DataSize;
|
||||||
|
|
||||||
|
UINT64 MINO_DataOffset;
|
||||||
|
UINT64 MINO_DataSize;
|
||||||
|
|
||||||
|
// Pad to ensure the actual Data Buffer starts on a fresh cache line
|
||||||
|
UINT8 _pad1[64 - (sizeof(UINT64) * 4)];
|
||||||
|
|
||||||
|
static constexpr size_t GetHeaderSize()
|
||||||
|
{
|
||||||
|
return sizeof(IPC_SharedMemoryLayout);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
class IPC_Server
|
// Static assert to ensure manual padding logic is correct
|
||||||
|
static_assert(sizeof(IPC_SharedMemoryLayout) % 64 == 0, "IPC Layout is not cache-line aligned!");
|
||||||
|
|
||||||
|
class IPC_Node
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
~IPC_Node();
|
||||||
|
|
||||||
|
// When Manager spawns a node, `connectionString` is passed
|
||||||
|
// as the first command line argument
|
||||||
|
Expected<VOID, String> Connect(IN PCCHAR connectionString);
|
||||||
|
|
||||||
|
VOID Update();
|
||||||
|
|
||||||
|
VOID SendSignal(IN UINT8 signal);
|
||||||
|
VOID SendPacket(IN UINT16 packetID, IN Span<CONST UINT8> payload);
|
||||||
|
|
||||||
|
PURE_VIRTUAL(VOID OnSignal(IN UINT8 signal));
|
||||||
|
PURE_VIRTUAL(VOID OnPacket(IN UINT16 packetID, IN Span<CONST UINT8> payload));
|
||||||
|
|
||||||
|
private:
|
||||||
|
String m_shmName;
|
||||||
|
PUINT8 m_sharedMemory{};
|
||||||
|
Vector<UINT8> m_recieveBuffer;
|
||||||
|
SocketHandle m_socket{INVALID_SOCKET};
|
||||||
|
|
||||||
|
UniquePtr<RingBufferView> MONI; // Manager Out, Node In
|
||||||
|
UniquePtr<RingBufferView> MINO; // Manager In, Node Out
|
||||||
};
|
};
|
||||||
}
|
|
||||||
|
class IPC_Manager
|
||||||
|
{
|
||||||
|
struct NodeSession
|
||||||
|
{
|
||||||
|
SteadyTimePoint CreationTime{};
|
||||||
|
SharedPtr<ProcessHandle> ProcessHandle;
|
||||||
|
|
||||||
|
String SharedMemName;
|
||||||
|
PUINT8 MappedPtr{};
|
||||||
|
|
||||||
|
SocketHandle ListenerSocket{INVALID_SOCKET};
|
||||||
|
SocketHandle DataSocket{INVALID_SOCKET};
|
||||||
|
|
||||||
|
UniquePtr<RingBufferView> MONI; // Manager Out, Node In
|
||||||
|
UniquePtr<RingBufferView> MINO; // Manager In, Node Out
|
||||||
|
|
||||||
|
BOOL IsReady{FALSE};
|
||||||
|
|
||||||
|
VOID SendSignal(IN UINT8 signal);
|
||||||
|
VOID SendPacket(IN UINT16 packetID, IN Span<CONST UINT8> payload);
|
||||||
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
|
STATIC CONSTEXPR UINT32 DEFAULT_NODE_SHARED_MEMORY_SIZE = SIZE_MB(4);
|
||||||
|
|
||||||
|
public:
|
||||||
|
IPC_Manager();
|
||||||
|
~IPC_Manager();
|
||||||
|
|
||||||
|
VOID Update();
|
||||||
|
|
||||||
|
Expected<NativeProcessID, String> SpawnNode(IN CONST FilePath &executablePath,
|
||||||
|
IN UINT32 sharedMemorySize = DEFAULT_NODE_SHARED_MEMORY_SIZE);
|
||||||
|
|
||||||
|
VOID ShutdownNode(IN NativeProcessID pid);
|
||||||
|
|
||||||
|
VOID SendSignal(IN NativeProcessID node, IN UINT8 signal);
|
||||||
|
VOID SendPacket(IN NativeProcessID node, IN UINT16 packetID, IN Span<CONST UINT8> payload);
|
||||||
|
|
||||||
|
PURE_VIRTUAL(VOID OnSignal(IN NativeProcessID node, IN UINT8 signal));
|
||||||
|
PURE_VIRTUAL(VOID OnPacket(IN NativeProcessID node, IN UINT16 packetID, IN Span<CONST UINT8> payload));
|
||||||
|
|
||||||
|
private:
|
||||||
|
Vector<UINT8> m_recieveBuffer;
|
||||||
|
Vector<UniquePtr<NodeSession>> m_activeSessions;
|
||||||
|
Vector<UniquePtr<NodeSession>> m_pendingSessions;
|
||||||
|
UnorderedMap<NativeProcessID, NodeSession *> m_activeSessionMap;
|
||||||
|
};
|
||||||
|
} // namespace IACore
|
||||||
@ -293,6 +293,10 @@
|
|||||||
#define IA_CONCAT(x, y) IA_CONCAT_IMPL(x, y)
|
#define IA_CONCAT(x, y) IA_CONCAT_IMPL(x, y)
|
||||||
#define IA_UNIQUE_NAME(prefix) IA_CONCAT(prefix, __LINE__)
|
#define IA_UNIQUE_NAME(prefix) IA_CONCAT(prefix, __LINE__)
|
||||||
|
|
||||||
|
#define SIZE_KB(v) (v * 1024)
|
||||||
|
#define SIZE_MB(v) (v * 1024 * 1024)
|
||||||
|
#define SIZE_GB(v) (v * 1024 * 1024 * 1024)
|
||||||
|
|
||||||
#define ENSURE_BINARY_COMPATIBILITY(A, B) \
|
#define ENSURE_BINARY_COMPATIBILITY(A, B) \
|
||||||
static_assert(sizeof(A) == sizeof(B), \
|
static_assert(sizeof(A) == sizeof(B), \
|
||||||
#A ", " #B " size mismatch! Do not add virtual functions or new member variables.");
|
#A ", " #B " size mismatch! Do not add virtual functions or new member variables.");
|
||||||
@ -560,8 +564,10 @@ using String = std::string;
|
|||||||
using StringView = std::string_view;
|
using StringView = std::string_view;
|
||||||
using StringStream = std::stringstream;
|
using StringStream = std::stringstream;
|
||||||
|
|
||||||
using HRClock = std::chrono::high_resolution_clock;
|
using SteadyClock = std::chrono::steady_clock;
|
||||||
using HRTimePoint = std::chrono::time_point<HRClock>;
|
using SteadyTimePoint = std::chrono::time_point<SteadyClock>;
|
||||||
|
using HighResClock = std::chrono::high_resolution_clock;
|
||||||
|
using HighResTimePoint = std::chrono::time_point<HighResClock>;
|
||||||
|
|
||||||
using Mutex = std::mutex;
|
using Mutex = std::mutex;
|
||||||
using StopToken = std::stop_token;
|
using StopToken = std::stop_token;
|
||||||
|
|||||||
@ -47,6 +47,8 @@ namespace IACore
|
|||||||
class ProcessOps
|
class ProcessOps
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
STATIC NativeProcessID GetCurrentProcessID();
|
||||||
|
|
||||||
STATIC Expected<INT32, String> SpawnProcessSync(IN CONST String &command, IN CONST String &args,
|
STATIC Expected<INT32, String> SpawnProcessSync(IN CONST String &command, IN CONST String &args,
|
||||||
IN Function<VOID(IN StringView line)> onOutputLineCallback);
|
IN Function<VOID(IN StringView line)> onOutputLineCallback);
|
||||||
STATIC SharedPtr<ProcessHandle> SpawnProcessAsync(IN CONST String &command, IN CONST String &args,
|
STATIC SharedPtr<ProcessHandle> SpawnProcessAsync(IN CONST String &command, IN CONST String &args,
|
||||||
@ -57,8 +59,10 @@ namespace IACore
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
STATIC Expected<INT32, String> SpawnProcessWindows(IN CONST String &command, IN CONST String &args,
|
STATIC Expected<INT32, String> SpawnProcessWindows(IN CONST String &command, IN CONST String &args,
|
||||||
IN Function<VOID(StringView)> onOutputLineCallback, OUT Atomic<NativeProcessID>& id);
|
IN Function<VOID(StringView)> onOutputLineCallback,
|
||||||
|
OUT Atomic<NativeProcessID> &id);
|
||||||
STATIC Expected<INT32, String> SpawnProcessPosix(IN CONST String &command, IN CONST String &args,
|
STATIC Expected<INT32, String> SpawnProcessPosix(IN CONST String &command, IN CONST String &args,
|
||||||
IN Function<VOID(StringView)> onOutputLineCallback, OUT Atomic<NativeProcessID>& id);
|
IN Function<VOID(StringView)> onOutputLineCallback,
|
||||||
|
OUT Atomic<NativeProcessID> &id);
|
||||||
};
|
};
|
||||||
} // namespace IACore
|
} // namespace IACore
|
||||||
@ -52,16 +52,26 @@ namespace IACore
|
|||||||
class SocketOps
|
class SocketOps
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
// SocketOps correctly handles multiple calls to Initialize and Terminate. Make sure
|
||||||
|
// every Initialize call is paired with a corresponding Terminate call
|
||||||
STATIC VOID Initialize()
|
STATIC VOID Initialize()
|
||||||
{
|
{
|
||||||
|
s_initCount++;
|
||||||
|
if (s_initCount > 1)
|
||||||
|
return;
|
||||||
#if IA_PLATFORM_WINDOWS
|
#if IA_PLATFORM_WINDOWS
|
||||||
WSADATA wsaData;
|
WSADATA wsaData;
|
||||||
WSAStartup(MAKEWORD(2, 2), &wsaData);
|
WSAStartup(MAKEWORD(2, 2), &wsaData);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SocketOps correctly handles multiple calls to Initialize and Terminate. Make sure
|
||||||
|
// every Initialize call is paired with a corresponding Terminate call
|
||||||
STATIC VOID Terminate()
|
STATIC VOID Terminate()
|
||||||
{
|
{
|
||||||
|
s_initCount--;
|
||||||
|
if (s_initCount > 0)
|
||||||
|
return;
|
||||||
#if IA_PLATFORM_WINDOWS
|
#if IA_PLATFORM_WINDOWS
|
||||||
WSACleanup();
|
WSACleanup();
|
||||||
#endif
|
#endif
|
||||||
@ -77,6 +87,8 @@ namespace IACore
|
|||||||
return IsPortAvailable(port, SOCK_DGRAM);
|
return IsPortAvailable(port, SOCK_DGRAM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STATIC VOID Close(IN SocketHandle sock);
|
||||||
|
|
||||||
STATIC BOOL Listen(IN SocketHandle sock, IN INT32 queueSize = 5);
|
STATIC BOOL Listen(IN SocketHandle sock, IN INT32 queueSize = 5);
|
||||||
|
|
||||||
STATIC SocketHandle CreateUnixSocket();
|
STATIC SocketHandle CreateUnixSocket();
|
||||||
@ -86,5 +98,8 @@ namespace IACore
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
STATIC BOOL IsPortAvailable(IN UINT16 port, IN INT32 type);
|
STATIC BOOL IsPortAvailable(IN UINT16 port, IN INT32 type);
|
||||||
|
|
||||||
|
private:
|
||||||
|
STATIC INT32 s_initCount;
|
||||||
};
|
};
|
||||||
} // namespace IACore
|
} // namespace IACore
|
||||||
Reference in New Issue
Block a user