diff --git a/CMakeLists.txt b/CMakeLists.txt index fb297d0..01e92d4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,19 @@ enable_language(C) # Default to ON if root, OFF if dependency option(IACore_BUILD_TESTS "Build unit tests" ${PROJECT_IS_TOP_LEVEL}) +if(CMAKE_BUILD_TYPE STREQUAL "Debug") + message(STATUS "Configuring IACore for Debug..") + add_compile_options(-O0 -g) + add_compile_definitions("__IA_DEBUG=1") +elseif(CMAKE_BUILD_TYPE STREQUAL "Release") + message(STATUS "Configuring IACore for Release..") + add_compile_options(-O3 -g0) + add_compile_definitions("__IA_DEBUG=0") +else() + message(FATAL_ERROR "Unknown CMAKE_BUILD_TYPE \"${CMAKE_BUILD_TYPE}\"") +endif() + + message(STATUS "Detected Compiler ID: ${CMAKE_CXX_COMPILER_ID}") # Check if the compiler is MSVC (cl.exe), but allow Clang acting like MSVC (clang-cl) if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") diff --git a/Src/IACore/imp/cpp/AsyncOps.cpp b/Src/IACore/imp/cpp/AsyncOps.cpp index 770ac97..409c377 100644 --- a/Src/IACore/imp/cpp/AsyncOps.cpp +++ b/Src/IACore/imp/cpp/AsyncOps.cpp @@ -18,5 +18,133 @@ namespace IACore { - -} \ No newline at end of file + Mutex AsyncOps::s_queueMutex; + ConditionVariable AsyncOps::s_wakeCondition; + Vector AsyncOps::s_scheduleWorkers; + Deque AsyncOps::s_highPriorityQueue; + Deque AsyncOps::s_normalPriorityQueue; + + VOID AsyncOps::RunTask(IN Function task) + { + JoiningThread(task).detach(); + } + + VOID AsyncOps::InitializeScheduler(IN UINT8 workerCount) + { + if (!workerCount) + workerCount = std::max((UINT32) 2, std::thread::hardware_concurrency() - 2); + for (UINT32 i = 0; i < workerCount; i++) + s_scheduleWorkers.emplace_back(AsyncOps::ScheduleWorkerLoop); + } + + VOID AsyncOps::TerminateScheduler() + { + for (auto &w : s_scheduleWorkers) + { + w.request_stop(); + } + + s_wakeCondition.notify_all(); + + for (auto &w : s_scheduleWorkers) + { + if (w.joinable()) + { + w.join(); + } + } + + s_scheduleWorkers.clear(); + } + + VOID AsyncOps::ScheduleTask(IN Function task, IN Schedule *schedule, IN Priority priority) + { + IA_ASSERT(s_scheduleWorkers.size() && "Scheduler must be initialized before calling this function"); + + schedule->Counter.fetch_add(1); + { + ScopedLock lock(s_queueMutex); + if (priority == Priority::High) + s_highPriorityQueue.emplace_back(ScheduledTask{IA_MOVE(task), schedule}); + else + s_normalPriorityQueue.emplace_back(ScheduledTask{IA_MOVE(task), schedule}); + } + s_wakeCondition.notify_one(); + } + + VOID AsyncOps::WaitForScheduleCompletion(IN Schedule *schedule) + { + IA_ASSERT(s_scheduleWorkers.size() && "Scheduler must be initialized before calling this function"); + + while (schedule->Counter.load() > 0) + { + ScheduledTask task; + BOOL foundTask{FALSE}; + { + UniqueLock lock(s_queueMutex); + if (!s_highPriorityQueue.empty()) + { + task = IA_MOVE(s_highPriorityQueue.front()); + s_highPriorityQueue.pop_front(); + foundTask = TRUE; + } + else if (!s_normalPriorityQueue.empty()) + { + task = IA_MOVE(s_normalPriorityQueue.front()); + s_normalPriorityQueue.pop_front(); + foundTask = TRUE; + } + } + if (foundTask) + { + task.Task(); + if (task.ScheduleHandle->Counter.fetch_sub(1) == 1) + task.ScheduleHandle->Counter.notify_all(); + } + else + { + auto currentVal = schedule->Counter.load(); + if (currentVal > 0) + schedule->Counter.wait(currentVal); + } + } + } + + VOID AsyncOps::ScheduleWorkerLoop(IN StopToken stopToken) + { + while (!stopToken.stop_requested()) + { + ScheduledTask task; + BOOL foundTask{FALSE}; + { + UniqueLock lock(s_queueMutex); + + s_wakeCondition.wait(lock, [&stopToken] { + return !s_highPriorityQueue.empty() || !s_normalPriorityQueue.empty() || stopToken.stop_requested(); + }); + + if (stopToken.stop_requested() && s_highPriorityQueue.empty() && s_normalPriorityQueue.empty()) + return; + + if (!s_highPriorityQueue.empty()) + { + task = IA_MOVE(s_highPriorityQueue.front()); + s_highPriorityQueue.pop_front(); + foundTask = TRUE; + } + else if (!s_normalPriorityQueue.empty()) + { + task = IA_MOVE(s_normalPriorityQueue.front()); + s_normalPriorityQueue.pop_front(); + foundTask = TRUE; + } + } + if (foundTask) + { + task.Task(); + if (task.ScheduleHandle->Counter.fetch_sub(1) == 1) + task.ScheduleHandle->Counter.notify_all(); + } + } + } +} // namespace IACore \ No newline at end of file diff --git a/Src/IACore/inc/IACore/AsyncOps.hpp b/Src/IACore/inc/IACore/AsyncOps.hpp index 822a5f6..6b85d20 100644 --- a/Src/IACore/inc/IACore/AsyncOps.hpp +++ b/Src/IACore/inc/IACore/AsyncOps.hpp @@ -20,5 +20,44 @@ namespace IACore { - -} \ No newline at end of file + class AsyncOps + { + public: + enum class Priority : UINT8 + { + High, + Normal + }; + + struct Schedule + { + Atomic Counter{0}; + }; + + public: + STATIC VOID InitializeScheduler(IN UINT8 workerCount = 0); + STATIC VOID TerminateScheduler(); + + STATIC VOID ScheduleTask(IN Function task, IN Schedule *schedule, + IN Priority priority = Priority::Normal); + STATIC VOID WaitForScheduleCompletion(IN Schedule *schedule); + + STATIC VOID RunTask(IN Function task); + + private: + struct ScheduledTask + { + Function Task{}; + Schedule *ScheduleHandle{}; + }; + + STATIC VOID ScheduleWorkerLoop(IN StopToken stopToken); + + private: + STATIC Mutex s_queueMutex; + STATIC ConditionVariable s_wakeCondition; + STATIC Vector s_scheduleWorkers; + STATIC Deque s_highPriorityQueue; + STATIC Deque s_normalPriorityQueue; + }; +} // namespace IACore \ No newline at end of file diff --git a/Src/IACore/inc/IACore/PCH.hpp b/Src/IACore/inc/IACore/PCH.hpp index a405e5c..5e4ec4b 100644 --- a/Src/IACore/inc/IACore/PCH.hpp +++ b/Src/IACore/inc/IACore/PCH.hpp @@ -40,9 +40,11 @@ # include # include # include +# include # include # include +# include # include # include # include @@ -535,6 +537,7 @@ using UnorderedMap = ankerl::unordered_dense::map<_key_type, _value_type>; template using Atomic = std::atomic<_value_type>; template using SharedPtr = std::shared_ptr<_value_type>; template using UniquePtr = std::unique_ptr<_value_type>; +template using Deque = std::deque<_value_type>; template using Pair = std::pair<_type_a, _type_b>; template using Tuple = std::tuple; @@ -550,9 +553,11 @@ using HRClock = std::chrono::high_resolution_clock; using HRTimePoint = std::chrono::time_point; using Mutex = std::mutex; +using StopToken = std::stop_token; using ScopedLock = std::scoped_lock; using UniqueLock = std::unique_lock; using JoiningThread = std::jthread; +using ConditionVariable = std::condition_variable; namespace FileSystem = std::filesystem; using FilePath = FileSystem::path;