From bd9d75e2a2c722ba8d7f55e2f57209dec9348c34 Mon Sep 17 00:00:00 2001 From: dev0 Date: Mon, 1 Dec 2025 19:36:26 +0530 Subject: [PATCH] Fixes --- Src/IACore/imp/cpp/AsyncOps.cpp | 43 +++++++++++++++++++++++----- Src/IACore/imp/cpp/HttpClient.cpp | 23 +++++++++------ Src/IACore/inc/IACore/AsyncOps.hpp | 17 +++++++++-- Src/IACore/inc/IACore/HttpClient.hpp | 2 ++ 4 files changed, 67 insertions(+), 18 deletions(-) diff --git a/Src/IACore/imp/cpp/AsyncOps.cpp b/Src/IACore/imp/cpp/AsyncOps.cpp index 409c377..7db4b04 100644 --- a/Src/IACore/imp/cpp/AsyncOps.cpp +++ b/Src/IACore/imp/cpp/AsyncOps.cpp @@ -34,7 +34,7 @@ namespace IACore if (!workerCount) workerCount = std::max((UINT32) 2, std::thread::hardware_concurrency() - 2); for (UINT32 i = 0; i < workerCount; i++) - s_scheduleWorkers.emplace_back(AsyncOps::ScheduleWorkerLoop); + s_scheduleWorkers.emplace_back(AsyncOps::ScheduleWorkerLoop, i + 1); } VOID AsyncOps::TerminateScheduler() @@ -57,7 +57,8 @@ namespace IACore s_scheduleWorkers.clear(); } - VOID AsyncOps::ScheduleTask(IN Function task, IN Schedule *schedule, IN Priority priority) + VOID AsyncOps::ScheduleTask(IN Function task, IN TaskTag tag, IN Schedule *schedule, + IN Priority priority) { IA_ASSERT(s_scheduleWorkers.size() && "Scheduler must be initialized before calling this function"); @@ -65,13 +66,36 @@ namespace IACore { ScopedLock lock(s_queueMutex); if (priority == Priority::High) - s_highPriorityQueue.emplace_back(ScheduledTask{IA_MOVE(task), schedule}); + s_highPriorityQueue.emplace_back(ScheduledTask{tag, schedule, IA_MOVE(task)}); else - s_normalPriorityQueue.emplace_back(ScheduledTask{IA_MOVE(task), schedule}); + s_normalPriorityQueue.emplace_back(ScheduledTask{tag, schedule, IA_MOVE(task)}); } s_wakeCondition.notify_one(); } + VOID AsyncOps::CancelTasksOfTag(IN TaskTag tag) + { + ScopedLock lock(s_queueMutex); + + auto cancelFromQueue = [&](Deque &queue) { + for (auto it = queue.begin(); it != queue.end(); /* no increment here */) + { + if (it->Tag == tag) + { + if (it->ScheduleHandle->Counter.fetch_sub(1) == 1) + it->ScheduleHandle->Counter.notify_all(); + + it = queue.erase(it); + } + else + ++it; + } + }; + + cancelFromQueue(s_highPriorityQueue); + cancelFromQueue(s_normalPriorityQueue); + } + VOID AsyncOps::WaitForScheduleCompletion(IN Schedule *schedule) { IA_ASSERT(s_scheduleWorkers.size() && "Scheduler must be initialized before calling this function"); @@ -97,7 +121,7 @@ namespace IACore } if (foundTask) { - task.Task(); + task.Task(MainThreadWorkerID); if (task.ScheduleHandle->Counter.fetch_sub(1) == 1) task.ScheduleHandle->Counter.notify_all(); } @@ -110,7 +134,12 @@ namespace IACore } } - VOID AsyncOps::ScheduleWorkerLoop(IN StopToken stopToken) + AsyncOps::WorkerID AsyncOps::GetWorkerCount() + { + return static_cast(s_scheduleWorkers.size() + 1); // +1 for MainThread (Work Stealing) + } + + VOID AsyncOps::ScheduleWorkerLoop(IN StopToken stopToken, IN WorkerID workerID) { while (!stopToken.stop_requested()) { @@ -141,7 +170,7 @@ namespace IACore } if (foundTask) { - task.Task(); + task.Task(workerID); if (task.ScheduleHandle->Counter.fetch_sub(1) == 1) task.ScheduleHandle->Counter.notify_all(); } diff --git a/Src/IACore/imp/cpp/HttpClient.cpp b/Src/IACore/imp/cpp/HttpClient.cpp index 0258f18..6c66ace 100644 --- a/Src/IACore/imp/cpp/HttpClient.cpp +++ b/Src/IACore/imp/cpp/HttpClient.cpp @@ -27,7 +27,7 @@ namespace IACore for (const auto &h : headers) { - std::string key = HttpClient::HeaderTypeToString(h.first); // Your existing helper + std::string key = HttpClient::HeaderTypeToString(h.first); out.emplace(key, h.second); if (h.first == HttpClient::EHeaderType::CONTENT_TYPE) @@ -53,8 +53,8 @@ namespace IACore { auto httpHeaders = BuildHeaders(headers, defaultContentType); - static_cast(m_client)->enable_server_certificate_verification(false); - auto res = static_cast(m_client)->Get(path.c_str(), httpHeaders); + static_cast(m_client)->enable_server_certificate_verification(false); + auto res = static_cast(m_client)->Get(path.c_str(), httpHeaders); if (res) { @@ -62,7 +62,7 @@ namespace IACore if (res->status >= 200 && res->status < 300) return res->body; else - return MakeUnexpected(std::format("HTTP Error {}", res->status)); + return MakeUnexpected(std::format("HTTP Error {} : {}", res->status, res->body)); } return MakeUnexpected(std::format("Network Error: {}", httplib::to_string(res.error()))); @@ -76,11 +76,13 @@ namespace IACore String contentType = defaultContentType; if (httpHeaders.count("Content-Type")) { - contentType = httpHeaders.find("Content-Type")->second; + const auto t = httpHeaders.find("Content-Type"); + contentType = t->second; + httpHeaders.erase(t); } - static_cast(m_client)->enable_server_certificate_verification(false); - auto res = static_cast(m_client)->Post(path.c_str(), httpHeaders, body, contentType.c_str()); + static_cast(m_client)->enable_server_certificate_verification(false); + auto res = static_cast(m_client)->Post(path.c_str(), httpHeaders, body, contentType.c_str()); if (res) { @@ -88,7 +90,7 @@ namespace IACore if (res->status >= 200 && res->status < 300) return res->body; else - return MakeUnexpected(std::format("HTTP Error {}", res->status)); + return MakeUnexpected(std::format("HTTP Error {} : {}", res->status, res->body)); } return MakeUnexpected(std::format("Network Error: {}", httplib::to_string(res.error()))); @@ -202,4 +204,9 @@ namespace IACore return ""; } } + + BOOL HttpClient::IsSuccessResponseCode(IN EResponseCode code) + { + return (INT32) code >= 200 && (INT32) code < 300; + } } // namespace IACore \ No newline at end of file diff --git a/Src/IACore/inc/IACore/AsyncOps.hpp b/Src/IACore/inc/IACore/AsyncOps.hpp index 6b85d20..856bc2d 100644 --- a/Src/IACore/inc/IACore/AsyncOps.hpp +++ b/Src/IACore/inc/IACore/AsyncOps.hpp @@ -23,6 +23,11 @@ namespace IACore class AsyncOps { public: + using TaskTag = UINT64; + using WorkerID = UINT16; + + STATIC CONSTEXPR WorkerID MainThreadWorkerID = 0; + enum class Priority : UINT8 { High, @@ -38,20 +43,26 @@ namespace IACore STATIC VOID InitializeScheduler(IN UINT8 workerCount = 0); STATIC VOID TerminateScheduler(); - STATIC VOID ScheduleTask(IN Function task, IN Schedule *schedule, + STATIC VOID ScheduleTask(IN Function task, IN TaskTag tag, IN Schedule *schedule, IN Priority priority = Priority::Normal); + + STATIC VOID CancelTasksOfTag(IN TaskTag tag); + STATIC VOID WaitForScheduleCompletion(IN Schedule *schedule); STATIC VOID RunTask(IN Function task); + STATIC WorkerID GetWorkerCount(); + private: struct ScheduledTask { - Function Task{}; + TaskTag Tag{}; Schedule *ScheduleHandle{}; + Function Task{}; }; - STATIC VOID ScheduleWorkerLoop(IN StopToken stopToken); + STATIC VOID ScheduleWorkerLoop(IN StopToken stopToken, IN WorkerID workerID); private: STATIC Mutex s_queueMutex; diff --git a/Src/IACore/inc/IACore/HttpClient.hpp b/Src/IACore/inc/IACore/HttpClient.hpp index 0cd85ee..6174794 100644 --- a/Src/IACore/inc/IACore/HttpClient.hpp +++ b/Src/IACore/inc/IACore/HttpClient.hpp @@ -158,6 +158,8 @@ namespace IACore STATIC String HeaderTypeToString(IN EHeaderType type); STATIC Header CreateHeader(IN EHeaderType key, IN CONST String &value); + STATIC BOOL IsSuccessResponseCode(IN EResponseCode code); + public: EResponseCode LastResponseCode() {