AsyncOps
This commit is contained in:
@ -16,6 +16,19 @@ enable_language(C)
|
||||
# Default to ON if root, OFF if dependency
|
||||
option(IACore_BUILD_TESTS "Build unit tests" ${PROJECT_IS_TOP_LEVEL})
|
||||
|
||||
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
|
||||
message(STATUS "Configuring IACore for Debug..")
|
||||
add_compile_options(-O0 -g)
|
||||
add_compile_definitions("__IA_DEBUG=1")
|
||||
elseif(CMAKE_BUILD_TYPE STREQUAL "Release")
|
||||
message(STATUS "Configuring IACore for Release..")
|
||||
add_compile_options(-O3 -g0)
|
||||
add_compile_definitions("__IA_DEBUG=0")
|
||||
else()
|
||||
message(FATAL_ERROR "Unknown CMAKE_BUILD_TYPE \"${CMAKE_BUILD_TYPE}\"")
|
||||
endif()
|
||||
|
||||
|
||||
message(STATUS "Detected Compiler ID: ${CMAKE_CXX_COMPILER_ID}")
|
||||
# Check if the compiler is MSVC (cl.exe), but allow Clang acting like MSVC (clang-cl)
|
||||
if (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
|
||||
|
||||
@ -18,5 +18,133 @@
|
||||
|
||||
namespace IACore
|
||||
{
|
||||
Mutex AsyncOps::s_queueMutex;
|
||||
ConditionVariable AsyncOps::s_wakeCondition;
|
||||
Vector<JoiningThread> AsyncOps::s_scheduleWorkers;
|
||||
Deque<AsyncOps::ScheduledTask> AsyncOps::s_highPriorityQueue;
|
||||
Deque<AsyncOps::ScheduledTask> AsyncOps::s_normalPriorityQueue;
|
||||
|
||||
VOID AsyncOps::RunTask(IN Function<VOID()> task)
|
||||
{
|
||||
JoiningThread(task).detach();
|
||||
}
|
||||
|
||||
VOID AsyncOps::InitializeScheduler(IN UINT8 workerCount)
|
||||
{
|
||||
if (!workerCount)
|
||||
workerCount = std::max((UINT32) 2, std::thread::hardware_concurrency() - 2);
|
||||
for (UINT32 i = 0; i < workerCount; i++)
|
||||
s_scheduleWorkers.emplace_back(AsyncOps::ScheduleWorkerLoop);
|
||||
}
|
||||
|
||||
VOID AsyncOps::TerminateScheduler()
|
||||
{
|
||||
for (auto &w : s_scheduleWorkers)
|
||||
{
|
||||
w.request_stop();
|
||||
}
|
||||
|
||||
s_wakeCondition.notify_all();
|
||||
|
||||
for (auto &w : s_scheduleWorkers)
|
||||
{
|
||||
if (w.joinable())
|
||||
{
|
||||
w.join();
|
||||
}
|
||||
}
|
||||
|
||||
s_scheduleWorkers.clear();
|
||||
}
|
||||
|
||||
VOID AsyncOps::ScheduleTask(IN Function<VOID()> task, IN Schedule *schedule, IN Priority priority)
|
||||
{
|
||||
IA_ASSERT(s_scheduleWorkers.size() && "Scheduler must be initialized before calling this function");
|
||||
|
||||
schedule->Counter.fetch_add(1);
|
||||
{
|
||||
ScopedLock lock(s_queueMutex);
|
||||
if (priority == Priority::High)
|
||||
s_highPriorityQueue.emplace_back(ScheduledTask{IA_MOVE(task), schedule});
|
||||
else
|
||||
s_normalPriorityQueue.emplace_back(ScheduledTask{IA_MOVE(task), schedule});
|
||||
}
|
||||
s_wakeCondition.notify_one();
|
||||
}
|
||||
|
||||
VOID AsyncOps::WaitForScheduleCompletion(IN Schedule *schedule)
|
||||
{
|
||||
IA_ASSERT(s_scheduleWorkers.size() && "Scheduler must be initialized before calling this function");
|
||||
|
||||
while (schedule->Counter.load() > 0)
|
||||
{
|
||||
ScheduledTask task;
|
||||
BOOL foundTask{FALSE};
|
||||
{
|
||||
UniqueLock lock(s_queueMutex);
|
||||
if (!s_highPriorityQueue.empty())
|
||||
{
|
||||
task = IA_MOVE(s_highPriorityQueue.front());
|
||||
s_highPriorityQueue.pop_front();
|
||||
foundTask = TRUE;
|
||||
}
|
||||
else if (!s_normalPriorityQueue.empty())
|
||||
{
|
||||
task = IA_MOVE(s_normalPriorityQueue.front());
|
||||
s_normalPriorityQueue.pop_front();
|
||||
foundTask = TRUE;
|
||||
}
|
||||
}
|
||||
if (foundTask)
|
||||
{
|
||||
task.Task();
|
||||
if (task.ScheduleHandle->Counter.fetch_sub(1) == 1)
|
||||
task.ScheduleHandle->Counter.notify_all();
|
||||
}
|
||||
else
|
||||
{
|
||||
auto currentVal = schedule->Counter.load();
|
||||
if (currentVal > 0)
|
||||
schedule->Counter.wait(currentVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
VOID AsyncOps::ScheduleWorkerLoop(IN StopToken stopToken)
|
||||
{
|
||||
while (!stopToken.stop_requested())
|
||||
{
|
||||
ScheduledTask task;
|
||||
BOOL foundTask{FALSE};
|
||||
{
|
||||
UniqueLock lock(s_queueMutex);
|
||||
|
||||
s_wakeCondition.wait(lock, [&stopToken] {
|
||||
return !s_highPriorityQueue.empty() || !s_normalPriorityQueue.empty() || stopToken.stop_requested();
|
||||
});
|
||||
|
||||
if (stopToken.stop_requested() && s_highPriorityQueue.empty() && s_normalPriorityQueue.empty())
|
||||
return;
|
||||
|
||||
if (!s_highPriorityQueue.empty())
|
||||
{
|
||||
task = IA_MOVE(s_highPriorityQueue.front());
|
||||
s_highPriorityQueue.pop_front();
|
||||
foundTask = TRUE;
|
||||
}
|
||||
else if (!s_normalPriorityQueue.empty())
|
||||
{
|
||||
task = IA_MOVE(s_normalPriorityQueue.front());
|
||||
s_normalPriorityQueue.pop_front();
|
||||
foundTask = TRUE;
|
||||
}
|
||||
}
|
||||
if (foundTask)
|
||||
{
|
||||
task.Task();
|
||||
if (task.ScheduleHandle->Counter.fetch_sub(1) == 1)
|
||||
task.ScheduleHandle->Counter.notify_all();
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace IACore
|
||||
@ -20,5 +20,44 @@
|
||||
|
||||
namespace IACore
|
||||
{
|
||||
class AsyncOps
|
||||
{
|
||||
public:
|
||||
enum class Priority : UINT8
|
||||
{
|
||||
High,
|
||||
Normal
|
||||
};
|
||||
|
||||
}
|
||||
struct Schedule
|
||||
{
|
||||
Atomic<INT32> Counter{0};
|
||||
};
|
||||
|
||||
public:
|
||||
STATIC VOID InitializeScheduler(IN UINT8 workerCount = 0);
|
||||
STATIC VOID TerminateScheduler();
|
||||
|
||||
STATIC VOID ScheduleTask(IN Function<VOID()> task, IN Schedule *schedule,
|
||||
IN Priority priority = Priority::Normal);
|
||||
STATIC VOID WaitForScheduleCompletion(IN Schedule *schedule);
|
||||
|
||||
STATIC VOID RunTask(IN Function<VOID()> task);
|
||||
|
||||
private:
|
||||
struct ScheduledTask
|
||||
{
|
||||
Function<VOID()> Task{};
|
||||
Schedule *ScheduleHandle{};
|
||||
};
|
||||
|
||||
STATIC VOID ScheduleWorkerLoop(IN StopToken stopToken);
|
||||
|
||||
private:
|
||||
STATIC Mutex s_queueMutex;
|
||||
STATIC ConditionVariable s_wakeCondition;
|
||||
STATIC Vector<JoiningThread> s_scheduleWorkers;
|
||||
STATIC Deque<ScheduledTask> s_highPriorityQueue;
|
||||
STATIC Deque<ScheduledTask> s_normalPriorityQueue;
|
||||
};
|
||||
} // namespace IACore
|
||||
@ -40,9 +40,11 @@
|
||||
# include <functional>
|
||||
# include <type_traits>
|
||||
# include <initializer_list>
|
||||
# include <condition_variable>
|
||||
|
||||
# include <tuple>
|
||||
# include <array>
|
||||
# include <deque>
|
||||
# include <string>
|
||||
# include <vector>
|
||||
# include <format>
|
||||
@ -535,6 +537,7 @@ using UnorderedMap = ankerl::unordered_dense::map<_key_type, _value_type>;
|
||||
template<typename _value_type> using Atomic = std::atomic<_value_type>;
|
||||
template<typename _value_type> using SharedPtr = std::shared_ptr<_value_type>;
|
||||
template<typename _value_type> using UniquePtr = std::unique_ptr<_value_type>;
|
||||
template<typename _value_type> using Deque = std::deque<_value_type>;
|
||||
template<typename _type_a, typename _type_b> using Pair = std::pair<_type_a, _type_b>;
|
||||
template<typename... types> using Tuple = std::tuple<types...>;
|
||||
|
||||
@ -550,9 +553,11 @@ using HRClock = std::chrono::high_resolution_clock;
|
||||
using HRTimePoint = std::chrono::time_point<HRClock>;
|
||||
|
||||
using Mutex = std::mutex;
|
||||
using StopToken = std::stop_token;
|
||||
using ScopedLock = std::scoped_lock<Mutex>;
|
||||
using UniqueLock = std::unique_lock<Mutex>;
|
||||
using JoiningThread = std::jthread;
|
||||
using ConditionVariable = std::condition_variable;
|
||||
|
||||
namespace FileSystem = std::filesystem;
|
||||
using FilePath = FileSystem::path;
|
||||
|
||||
Reference in New Issue
Block a user