Restored old thread scheduler code

This commit is contained in:
Mr-Wiseguy 2023-10-23 18:01:29 -04:00
parent 1037bb5206
commit cdd397fe8f
5 changed files with 338 additions and 482 deletions

View File

@ -5,58 +5,6 @@
#include "multilibultra.hpp" #include "multilibultra.hpp"
#include "recomp.h" #include "recomp.h"
#if defined(_M_X64)
static inline void spinlock_pause() {
_mm_pause();
}
#elif defined(__x86_64__)
static inline void spinlock_pause() {
__builtin_ia32_pause();
}
#else
#error "No spinlock_pause implementation for current architecture"
#endif
template <typename T>
class atomic_spinlock {
static_assert(sizeof(std::atomic<T>) == sizeof(T), "atomic_spinlock must be used with a type that is the same size as its atomic counterpart");
static_assert(std::atomic<T>::is_always_lock_free, "atomic_spinlock must be used with an always lock-free atomic type");
std::atomic_ref<T> locked_;
public:
atomic_spinlock(T& flag) : locked_{ flag } {}
void lock() {
// Loop until the lock is acquired.
while (true) {
// Try to acquire the lock.
if (!locked_.exchange(true, std::memory_order_acquire)) {
// If it was acquired then exit the loop.
break;
}
// Otherwise, wait until the lock is no longer acquired.
// Doing this instead of constantly trying to acquire the lock reduces cache coherency traffic.
while (locked_.load(std::memory_order_relaxed)) {
// Add a platform-specific pause instruction to reduce load unit traffic.
spinlock_pause();
}
}
}
void unlock() {
// Release the lock by setting it to false.
locked_.store(false, std::memory_order_release);
}
};
class mesg_queue_lock {
OSMesgQueue* queue_;
atomic_spinlock<uint8_t> spinlock_;
public:
mesg_queue_lock(OSMesgQueue* mq) : queue_{ mq }, spinlock_{ mq->lock } {}
void lock() { spinlock_.lock(); }
void unlock() { spinlock_.unlock(); }
};
extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) { extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_); OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
mq->blocked_on_recv = NULLPTR; mq->blocked_on_recv = NULLPTR;
@ -65,7 +13,6 @@ extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) ms
mq->msg = msg; mq->msg = msg;
mq->validCount = 0; mq->validCount = 0;
mq->first = 0; mq->first = 0;
mq->lock = false;
} }
s32 MQ_GET_COUNT(OSMesgQueue *mq) { s32 MQ_GET_COUNT(OSMesgQueue *mq) {
@ -100,159 +47,148 @@ bool thread_queue_empty(RDRAM_ARG PTR(OSThread)* queue) {
return *queue == NULLPTR; return *queue == NULLPTR;
} }
std::mutex test_mutex{}; extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
// Attempts to put a message into a queue. // Prevent accidentally blocking anything that isn't a game thread
// If the queue is not full, returns true and pops a thread from the blocked on receive list. if (!Multilibultra::is_game_thread()) {
// If the queue is full and this is a blocking send, places the current thread into the blocked on send list flags = OS_MESG_NOBLOCK;
// for the message queue, marks the current thread as being blocked on a queue and returns false. }
bool mesg_queue_try_insert(RDRAM_ARG OSMesgQueue* mq, OSMesg msg, OSThread*& to_run, bool jam, bool blocking) {
//mesg_queue_lock lock{ mq };
std::lock_guard guard{ test_mutex };
// If the queue is full, insert this thread into the blocked on send queue and return false. Multilibultra::disable_preemption();
if (flags == OS_MESG_NOBLOCK) {
// If non-blocking, fail if the queue is full
if (MQ_IS_FULL(mq)) { if (MQ_IS_FULL(mq)) {
if (blocking) { Multilibultra::enable_preemption();
thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, Multilibultra::this_thread()); return -1;
// TODO is it safe to use the schedule queue here while in the message queue lock? }
Multilibultra::block_self(PASS_RDRAM1); } else {
// Otherwise, yield this thread until the queue has room
while (MQ_IS_FULL(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, Multilibultra::this_thread())->id);
thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, Multilibultra::this_thread());
Multilibultra::enable_preemption();
Multilibultra::pause_self(PASS_RDRAM1);
Multilibultra::disable_preemption();
} }
to_run = nullptr;
return false;
} }
// The queue wasn't full, so place the message into it.
if (jam) {
// Insert this message at the start of the queue.
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
mq->validCount++;
}
else {
// Insert this message at the end of the queue.
s32 last = (mq->first + mq->validCount) % mq->msgCount; s32 last = (mq->first + mq->validCount) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[last] = msg; TO_PTR(OSMesg, mq->msg)[last] = msg;
mq->validCount++; mq->validCount++;
}
// Pop a thread from the blocked on recv queue to wake afterwards. OSThread* to_run = nullptr;
if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) { if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) {
to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv); to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv);
} }
return true; Multilibultra::enable_preemption();
if (to_run) {
debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id);
if (Multilibultra::is_game_thread()) {
OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread());
if (to_run->priority > self->priority) {
Multilibultra::swap_to_thread(PASS_RDRAM to_run);
} else {
Multilibultra::schedule_running_thread(to_run);
}
} else {
Multilibultra::schedule_running_thread(to_run);
}
}
return 0;
} }
// Attempts to remove a message from a queue. extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
// If the queue is not empty, returns true and pops a thread from the blocked on send list. OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
// If the queue is empty and this is a blocking receive, places the current thread into the blocked on receive list Multilibultra::disable_preemption();
// for the message queue, marks the current thread as being blocked on a queue and returns false.
bool mesg_queue_try_remove(RDRAM_ARG OSMesgQueue* mq, PTR(OSMesg) msg_out, OSThread*& to_run, bool blocking) {
//mesg_queue_lock lock{ mq };
std::lock_guard guard{ test_mutex };
// If the queue is full, insert this thread into the blocked on receive queue and return false. if (flags == OS_MESG_NOBLOCK) {
// If non-blocking, fail if the queue is full
if (MQ_IS_FULL(mq)) {
Multilibultra::enable_preemption();
return -1;
}
} else {
// Otherwise, yield this thread in a loop until the queue is no longer full
while (MQ_IS_FULL(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on jam\n", TO_PTR(OSThread, Multilibultra::this_thread())->id);
thread_queue_insert(PASS_RDRAM &mq->blocked_on_send, Multilibultra::this_thread());
Multilibultra::enable_preemption();
Multilibultra::pause_self(PASS_RDRAM1);
Multilibultra::disable_preemption();
}
}
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
mq->validCount++;
OSThread *to_run = nullptr;
if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_recv)) {
to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_recv);
}
Multilibultra::enable_preemption();
if (to_run) {
debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id);
OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread());
if (to_run->priority > self->priority) {
Multilibultra::swap_to_thread(PASS_RDRAM to_run);
} else {
Multilibultra::schedule_running_thread(to_run);
}
}
return 0;
}
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) {
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
OSMesg *msg = TO_PTR(OSMesg, msg_);
Multilibultra::disable_preemption();
if (flags == OS_MESG_NOBLOCK) {
// If non-blocking, fail if the queue is empty
if (MQ_IS_EMPTY(mq)) { if (MQ_IS_EMPTY(mq)) {
if (blocking) { Multilibultra::enable_preemption();
thread_queue_insert(PASS_RDRAM &mq->blocked_on_recv, Multilibultra::this_thread()); return -1;
// TODO is it safe to use the schedule queue here while in the message queue lock? }
Multilibultra::block_self(PASS_RDRAM1); } else {
// Otherwise, yield this thread in a loop until the queue is no longer full
while (MQ_IS_EMPTY(mq)) {
debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, Multilibultra::this_thread())->id);
thread_queue_insert(PASS_RDRAM &mq->blocked_on_recv, Multilibultra::this_thread());
Multilibultra::enable_preemption();
Multilibultra::pause_self(PASS_RDRAM1);
Multilibultra::disable_preemption();
} }
to_run = nullptr;
return false;
} }
// The queue wasn't empty, so remove the first message from it. if (msg_ != NULLPTR) {
if (msg_out != NULLPTR) { *msg = TO_PTR(OSMesg, mq->msg)[mq->first];
*TO_PTR(OSMesg, msg_out) = TO_PTR(OSMesg, mq->msg)[mq->first];
} }
mq->first = (mq->first + 1) % mq->msgCount; mq->first = (mq->first + 1) % mq->msgCount;
mq->validCount--; mq->validCount--;
// Pop a thread from the blocked on send queue to wake afterwards. OSThread *to_run = nullptr;
if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_send)) { if (!thread_queue_empty(PASS_RDRAM &mq->blocked_on_send)) {
to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_send); to_run = thread_queue_pop(PASS_RDRAM &mq->blocked_on_send);
} }
return true; Multilibultra::enable_preemption();
}
enum class MesgQueueActionType {
Send,
Jam,
Receive
};
s32 mesg_queue_action(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, PTR(OSMesg) msg_out, s32 flags, MesgQueueActionType action) {
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
OSThread* this_thread = TO_PTR(OSThread, Multilibultra::this_thread());
bool is_blocking = flags != OS_MESG_NOBLOCK;
// Prevent accidentally blocking anything that isn't a game thread
if (!Multilibultra::is_game_thread()) {
is_blocking = false;
}
OSThread* to_run = nullptr;
// Repeatedly attempt to send the message until it's successful.
while (true) {
// Try to insert/remove the message into the queue depending on the action.
bool success = false;
switch (action) {
case MesgQueueActionType::Send:
success = mesg_queue_try_insert(PASS_RDRAM mq, msg, to_run, false, is_blocking);
break;
case MesgQueueActionType::Jam:
success = mesg_queue_try_insert(PASS_RDRAM mq, msg, to_run, true, is_blocking);
break;
case MesgQueueActionType::Receive:
success = mesg_queue_try_remove(PASS_RDRAM mq, msg_out, to_run, is_blocking);
break;
}
// If successful, don't block.
if (success) {
//goto after;
break;
}
// Otherwise if the action was unsuccessful but wasn't blocking, return -1 to indicate a failure.
if (!is_blocking) {
return -1;
}
// The action failed, so pause this thread until unblocked by the queue.
debug_printf("[Message Queue] Thread %d is blocked on %s\n", this_thread->id, action == MesgQueueActionType::Receive ? "receive" : "send");
// Wait for it this thread be resumed.
Multilibultra::wait_for_resumed(PASS_RDRAM1);
}
//after:
// If any thread was blocked on receiving from this queue, wake it.
if (to_run) { if (to_run) {
debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id); debug_printf("[Message Queue] Thread %d is unblocked\n", to_run->id);
Multilibultra::unblock_thread(to_run); OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread());
if (to_run->priority > self->priority) {
// If the unblocked thread is higher priority than this one, pause this thread so it can take over. Multilibultra::swap_to_thread(PASS_RDRAM to_run);
if (Multilibultra::is_game_thread() && to_run->priority > this_thread->priority) { } else {
Multilibultra::yield_self(PASS_RDRAM1); Multilibultra::schedule_running_thread(to_run);
Multilibultra::wait_for_resumed(PASS_RDRAM1);
} }
} }
return 0; return 0;
} }
extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
return mesg_queue_action(PASS_RDRAM mq_, msg, NULLPTR, flags, MesgQueueActionType::Send);
}
extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
return mesg_queue_action(PASS_RDRAM mq_, msg, NULLPTR, flags, MesgQueueActionType::Jam);
}
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_out_, s32 flags) {
return mesg_queue_action(PASS_RDRAM mq_, NULLPTR, msg_out_, flags, MesgQueueActionType::Receive);
}

View File

@ -23,8 +23,7 @@
struct UltraThreadContext { struct UltraThreadContext {
std::thread host_thread; std::thread host_thread;
std::atomic_bool scheduled; std::atomic_bool running;
std::atomic_bool descheduled;
std::atomic_bool initialized; std::atomic_bool initialized;
}; };
@ -52,13 +51,16 @@ void save_init();
void init_scheduler(); void init_scheduler();
void init_events(uint8_t* rdram, uint8_t* rom, WindowHandle window_handle); void init_events(uint8_t* rdram, uint8_t* rom, WindowHandle window_handle);
void init_timers(RDRAM_ARG1); void init_timers(RDRAM_ARG1);
void set_self_paused(RDRAM_ARG1);
void yield_self(RDRAM_ARG1); void yield_self(RDRAM_ARG1);
void block_self(RDRAM_ARG1); void block_self(RDRAM_ARG1);
void unblock_thread(OSThread* t); void unblock_thread(OSThread* t);
void wait_for_resumed(RDRAM_ARG1); void wait_for_resumed(RDRAM_ARG1);
void swap_to_thread(RDRAM_ARG OSThread *to); void swap_to_thread(RDRAM_ARG OSThread *to);
void pause_thread_impl(OSThread *t);
void resume_thread_impl(OSThread* t); void resume_thread_impl(OSThread* t);
void schedule_running_thread(OSThread *t); void schedule_running_thread(OSThread *t);
void pause_self(RDRAM_ARG1);
void halt_self(RDRAM_ARG1); void halt_self(RDRAM_ARG1);
void stop_thread(OSThread *t); void stop_thread(OSThread *t);
void cleanup_thread(OSThread *t); void cleanup_thread(OSThread *t);
@ -74,6 +76,8 @@ enum class ThreadPriority {
void set_native_thread_name(const std::string& name); void set_native_thread_name(const std::string& name);
void set_native_thread_priority(ThreadPriority pri); void set_native_thread_priority(ThreadPriority pri);
PTR(OSThread) this_thread(); PTR(OSThread) this_thread();
void disable_preemption();
void enable_preemption();
void notify_scheduler(); void notify_scheduler();
void reprioritize_thread(OSThread *t, OSPri pri); void reprioritize_thread(OSThread *t, OSPri pri);
void set_main_thread(); void set_main_thread();
@ -118,11 +122,19 @@ struct gfx_callbacks_t {
}; };
void set_gfx_callbacks(const gfx_callbacks_t* callbacks); void set_gfx_callbacks(const gfx_callbacks_t* callbacks);
class preemption_guard {
public:
preemption_guard();
~preemption_guard();
private:
std::lock_guard<std::mutex> lock;
};
} // namespace Multilibultra } // namespace Multilibultra
#define MIN(a, b) ((a) < (b) ? (a) : (b)) #define MIN(a, b) ((a) < (b) ? (a) : (b))
//#define debug_printf(...) #define debug_printf(...)
#define debug_printf(...) printf(__VA_ARGS__); //#define debug_printf(...) printf(__VA_ARGS__);
#endif #endif

View File

@ -2,10 +2,7 @@
#include <queue> #include <queue>
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <variant>
#include <algorithm>
#include "blockingconcurrentqueue.h"
#include "multilibultra.hpp" #include "multilibultra.hpp"
class OSThreadComparator { class OSThreadComparator {
@ -25,102 +22,64 @@ public:
return false; return false;
} }
if (it == this->c.begin()) {
// deque the top element
this->pop();
} else {
// remove element and re-heap // remove element and re-heap
this->c.erase(it); this->c.erase(it);
std::make_heap(this->c.begin(), this->c.end(), this->comp); std::make_heap(this->c.begin(), this->c.end(), this->comp);
}
return true; return true;
} }
void print() {
std::vector<OSThread*> backup = this->c;
debug_printf("[Scheduler] Scheduled Threads:\n");
while (!empty()) {
OSThread* t = top();
pop();
debug_printf(" %d: pri %d state %d\n", t->id, t->priority, t->state);
}
this->c = backup;
}
bool contains(OSThread* t) {
return std::find(this->c.begin(), this->c.end(), t) != this->c.end();
}
}; };
struct NotifySchedulerAction {
};
struct ScheduleThreadAction {
OSThread* t;
};
struct StopThreadAction {
OSThread* t;
};
struct CleanupThreadAction {
OSThread* t;
};
struct ReprioritizeThreadAction {
OSThread* t;
OSPri pri;
};
struct YieldedThreadAction {
OSThread* t;
};
struct BlockedThreadAction {
OSThread* t;
};
struct UnblockThreadAction {
OSThread* t;
};
using ThreadAction = std::variant<std::monostate, NotifySchedulerAction, ScheduleThreadAction, StopThreadAction, CleanupThreadAction, ReprioritizeThreadAction, YieldedThreadAction, BlockedThreadAction, UnblockThreadAction>;
static struct { static struct {
moodycamel::BlockingConcurrentQueue<ThreadAction> action_queue{}; std::vector<OSThread*> to_schedule;
OSThread* running_thread; std::vector<OSThread*> to_stop;
std::vector<OSThread*> to_cleanup;
std::vector<std::pair<OSThread*, OSPri>> to_reprioritize;
std::mutex mutex;
// OSThread* running_thread;
std::atomic_int notify_count;
std::atomic_int action_count;
bool can_preempt;
std::mutex premption_mutex;
} scheduler_context{}; } scheduler_context{};
void handle_thread_queueing(thread_queue_t& running_thread_queue, const ScheduleThreadAction& action) { void handle_thread_queueing(thread_queue_t& running_thread_queue) {
OSThread* to_schedule = action.t; std::lock_guard lock{scheduler_context.mutex};
debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id);
// Do not schedule the thread if it's waiting on a message queue if (!scheduler_context.to_schedule.empty()) {
if (to_schedule->state == OSThreadState::BLOCKED_STOPPED) { OSThread* to_schedule = scheduler_context.to_schedule.back();
to_schedule->state = OSThreadState::BLOCKED_PAUSED; scheduler_context.to_schedule.pop_back();
} scheduler_context.action_count.fetch_sub(1);
else { debug_printf("[Scheduler] Scheduling thread %d\n", to_schedule->id);
to_schedule->state = OSThreadState::PAUSED;
running_thread_queue.push(to_schedule); running_thread_queue.push(to_schedule);
} }
} }
void handle_thread_stopping(thread_queue_t& running_thread_queue, const StopThreadAction& action) { void handle_thread_stopping(thread_queue_t& running_thread_queue) {
OSThread* to_stop = action.t; std::lock_guard lock{scheduler_context.mutex};
while (!scheduler_context.to_stop.empty()) {
OSThread* to_stop = scheduler_context.to_stop.back();
scheduler_context.to_stop.pop_back();
scheduler_context.action_count.fetch_sub(1);
debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id); debug_printf("[Scheduler] Stopping thread %d\n", to_stop->id);
running_thread_queue.remove(to_stop); running_thread_queue.remove(to_stop);
if (running_thread_queue.contains(to_stop)) {
assert(false);
}
if (to_stop->state == OSThreadState::BLOCKED_PAUSED) {
to_stop->state = OSThreadState::BLOCKED_STOPPED;
}
else {
to_stop->state = OSThreadState::STOPPED;
} }
} }
void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread, const CleanupThreadAction& action) { void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) {
OSThread* to_cleanup = action.t; std::lock_guard lock{scheduler_context.mutex};
while (!scheduler_context.to_cleanup.empty()) {
OSThread* to_cleanup = scheduler_context.to_cleanup.back();
scheduler_context.to_cleanup.pop_back();
scheduler_context.action_count.fetch_sub(1);
debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id); debug_printf("[Scheduler] Destroying thread %d\n", to_cleanup->id);
running_thread_queue.remove(to_cleanup); running_thread_queue.remove(to_cleanup);
@ -138,88 +97,44 @@ void handle_thread_cleanup(thread_queue_t& running_thread_queue, OSThread*& cur_
to_cleanup->context->host_thread.join(); to_cleanup->context->host_thread.join();
delete to_cleanup->context; delete to_cleanup->context;
to_cleanup->context = nullptr; to_cleanup->context = nullptr;
}
} }
void handle_thread_reprioritization(thread_queue_t& running_thread_queue, const ReprioritizeThreadAction& action) { void handle_thread_reprioritization(thread_queue_t& running_thread_queue) {
OSThread* to_reprioritize = action.t; std::lock_guard lock{scheduler_context.mutex};
OSPri pri = action.pri;
debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize->id, pri); while (!scheduler_context.to_reprioritize.empty()) {
running_thread_queue.remove(to_reprioritize); const std::pair<OSThread*, OSPri> to_reprioritize = scheduler_context.to_reprioritize.back();
to_reprioritize->priority = pri; scheduler_context.to_reprioritize.pop_back();
running_thread_queue.push(to_reprioritize); scheduler_context.action_count.fetch_sub(1);
debug_printf("[Scheduler] Reprioritizing thread %d to %d\n", to_reprioritize.first->id, to_reprioritize.second);
running_thread_queue.remove(to_reprioritize.first);
to_reprioritize.first->priority = to_reprioritize.second;
running_thread_queue.push(to_reprioritize.first);
}
} }
void handle_thread_yielded(thread_queue_t& running_thread_queue, const YieldedThreadAction& action) { void handle_scheduler_notifications() {
OSThread* yielded = action.t; std::lock_guard lock{scheduler_context.mutex};
int32_t notify_count = scheduler_context.notify_count.exchange(0);
debug_printf("[Scheduler] Thread %d has yielded\n", yielded->id); if (notify_count) {
// Remove the yielded thread from the thread queue. If it was in the queue then re-add it so that it's placed after any other threads with the same priority. debug_printf("Received %d notifications\n", notify_count);
if (running_thread_queue.remove(yielded)) { scheduler_context.action_count.fetch_sub(notify_count);
running_thread_queue.push(yielded);
}
yielded->state = OSThreadState::PAUSED;
debug_printf("[Scheduler] Set thread %d to PAUSED\n", yielded->id);
}
void handle_thread_blocked(thread_queue_t& running_thread_queue, const BlockedThreadAction& action) {
OSThread* blocked = action.t;
debug_printf("[Scheduler] Thread %d has been blocked\n", blocked->id);
// Remove the thread from the running queue.
running_thread_queue.remove(blocked);
// Update the thread's state accordingly.
if (blocked->state == OSThreadState::STOPPED) {
blocked->state = OSThreadState::BLOCKED_STOPPED;
}
else if (blocked->state == OSThreadState::RUNNING) {
blocked->state = OSThreadState::BLOCKED_PAUSED;
}
else {
assert(false);
}
running_thread_queue.remove(blocked);
}
void handle_thread_unblocking(thread_queue_t& running_thread_queue, const UnblockThreadAction& action) {
OSThread* unblocked = action.t;
// Do nothing if this thread has already been unblocked.
if (unblocked->state != OSThreadState::BLOCKED_STOPPED && unblocked->state != OSThreadState::BLOCKED_PAUSED) {
return;
}
debug_printf("[Scheduler] Thread %d has been unblocked\n", unblocked->id);
// Update the thread's state accordingly.
if (unblocked->state == OSThreadState::BLOCKED_STOPPED) {
unblocked->state = OSThreadState::STOPPED;
}
else if (unblocked->state == OSThreadState::BLOCKED_PAUSED) {
// The thread wasn't stopped, so put it back in the running queue now that it's been unblocked.
unblocked->state = OSThreadState::PAUSED;
running_thread_queue.push(unblocked);
}
else {
assert(false);
} }
} }
void swap_running_thread(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) { void swap_running_thread(thread_queue_t& running_thread_queue, OSThread*& cur_running_thread) {
if (running_thread_queue.size() > 0) { if (running_thread_queue.size() > 0) {
OSThread* new_running_thread = running_thread_queue.top(); OSThread* new_running_thread = running_thread_queue.top();
// If the running thread has changed or the running thread is paused, run the running thread if (cur_running_thread != new_running_thread) {
if (cur_running_thread != new_running_thread || (cur_running_thread && cur_running_thread->state != OSThreadState::RUNNING)) {
if (cur_running_thread && cur_running_thread->state == OSThreadState::RUNNING) { if (cur_running_thread && cur_running_thread->state == OSThreadState::RUNNING) {
debug_printf("[Scheduler] Need to wait for thread %d to pause itself\n", cur_running_thread->id); debug_printf("[Scheduler] Need to wait for thread %d to pause itself\n", cur_running_thread->id);
return; return;
} } else {
debug_printf("[Scheduler] Switching execution to thread %d (%d)\n", new_running_thread->id, new_running_thread->priority); debug_printf("[Scheduler] Switching execution to thread %d (%d)\n", new_running_thread->id, new_running_thread->priority);
Multilibultra::resume_thread_impl(new_running_thread);
if (cur_running_thread) {
cur_running_thread->context->descheduled.store(true);
cur_running_thread->context->descheduled.notify_all();
} }
Multilibultra::resume_thread_impl(new_running_thread);
cur_running_thread = new_running_thread; cur_running_thread = new_running_thread;
} else if (cur_running_thread && cur_running_thread->state != OSThreadState::RUNNING) { } else if (cur_running_thread && cur_running_thread->state != OSThreadState::RUNNING) {
Multilibultra::resume_thread_impl(cur_running_thread); Multilibultra::resume_thread_impl(cur_running_thread);
@ -233,45 +148,26 @@ void scheduler_func() {
thread_queue_t running_thread_queue{}; thread_queue_t running_thread_queue{};
OSThread* cur_running_thread = nullptr; OSThread* cur_running_thread = nullptr;
Multilibultra::set_native_thread_name("Scheduler Thread");
Multilibultra::set_native_thread_priority(Multilibultra::ThreadPriority::VeryHigh);
while (true) { while (true) {
using namespace std::chrono_literals;
ThreadAction action{};
OSThread* old_running_thread = cur_running_thread; OSThread* old_running_thread = cur_running_thread;
//scheduler_context.action_queue.wait_dequeue_timed(action, 1ms); scheduler_context.action_count.wait(0);
scheduler_context.action_queue.wait_dequeue(action);
if (std::get_if<std::monostate>(&action) == nullptr) { std::lock_guard lock{scheduler_context.premption_mutex};
// Determine the action type and act on it
if (const auto* notify_action = std::get_if<NotifySchedulerAction>(&action)) {
// Nothing to do
}
else if (const auto* stop_action = std::get_if<StopThreadAction>(&action)) {
handle_thread_stopping(running_thread_queue, *stop_action);
}
else if (const auto* cleanup_action = std::get_if<CleanupThreadAction>(&action)) {
handle_thread_cleanup(running_thread_queue, cur_running_thread, *cleanup_action);
}
else if (const auto* schedule_action = std::get_if<ScheduleThreadAction>(&action)) {
handle_thread_queueing(running_thread_queue, *schedule_action);
}
else if (const auto* reprioritize_action = std::get_if<ReprioritizeThreadAction>(&action)) {
handle_thread_reprioritization(running_thread_queue, *reprioritize_action);
}
else if (const auto* yielded_action = std::get_if<YieldedThreadAction>(&action)) {
handle_thread_yielded(running_thread_queue, *yielded_action);
}
else if (const auto* blocked_action = std::get_if<BlockedThreadAction>(&action)) {
handle_thread_blocked(running_thread_queue, *blocked_action);
}
else if (const auto* unblock_action = std::get_if<UnblockThreadAction>(&action)) {
handle_thread_unblocking(running_thread_queue, *unblock_action);
}
}
running_thread_queue.print(); // Handle notifications
handle_scheduler_notifications();
// Handle stopping threads
handle_thread_stopping(running_thread_queue);
// Handle cleaning up threads
handle_thread_cleanup(running_thread_queue, cur_running_thread);
// Handle queueing threads to run
handle_thread_queueing(running_thread_queue);
// Handle threads that have changed priority
handle_thread_reprioritization(running_thread_queue);
// Determine which thread to run, stopping the current running thread if necessary // Determine which thread to run, stopping the current running thread if necessary
swap_running_thread(running_thread_queue, cur_running_thread); swap_running_thread(running_thread_queue, cur_running_thread);
@ -291,90 +187,97 @@ extern "C" void do_yield() {
namespace Multilibultra { namespace Multilibultra {
void init_scheduler() { void init_scheduler() {
scheduler_context.can_preempt = true;
std::thread scheduler_thread{scheduler_func}; std::thread scheduler_thread{scheduler_func};
scheduler_thread.detach(); scheduler_thread.detach();
} }
void schedule_running_thread(OSThread *t) { void schedule_running_thread(OSThread *t) {
debug_printf("[Thread] Queuing Thread %d to be scheduled\n", t->id); debug_printf("[Scheduler] Queuing Thread %d to be scheduled\n", t->id);
scheduler_context.action_queue.enqueue(ScheduleThreadAction{t}); std::lock_guard lock{scheduler_context.mutex};
scheduler_context.to_schedule.push_back(t);
scheduler_context.action_count.fetch_add(1);
scheduler_context.action_count.notify_all();
} }
void swap_to_thread(RDRAM_ARG OSThread *to) { void swap_to_thread(RDRAM_ARG OSThread *to) {
OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread()); OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread());
debug_printf("[Thread] Scheduling swap from thread %d to %d\n", self->id, to->id); debug_printf("[Scheduler] Scheduling swap from thread %d to %d\n", self->id, to->id);
{
// Tell the scheduler that the swapped-to thread is ready to run and that this thread is yielding. std::lock_guard lock{scheduler_context.mutex};
schedule_running_thread(to); scheduler_context.to_schedule.push_back(to);
yield_self(PASS_RDRAM1); Multilibultra::set_self_paused(PASS_RDRAM1);
scheduler_context.action_count.fetch_add(1);
// Wait for the scheduler to resume this thread. scheduler_context.action_count.notify_all();
wait_for_resumed(PASS_RDRAM1); }
Multilibultra::wait_for_resumed(PASS_RDRAM1);
} }
void reprioritize_thread(OSThread *t, OSPri pri) { void reprioritize_thread(OSThread *t, OSPri pri) {
debug_printf("[Thread] Adjusting Thread %d priority to %d\n", t->id, pri); debug_printf("[Scheduler] Adjusting Thread %d priority to %d\n", t->id, pri);
std::lock_guard lock{scheduler_context.mutex};
scheduler_context.action_queue.enqueue(ReprioritizeThreadAction{t, pri}); scheduler_context.to_reprioritize.emplace_back(t, pri);
scheduler_context.action_count.fetch_add(1);
scheduler_context.action_count.notify_all();
} }
void stop_thread(OSThread *t) { void pause_self(RDRAM_ARG1) {
debug_printf("[Thread] Queueing stopping of thread %d\n", t->id); OSThread *self = TO_PTR(OSThread, Multilibultra::this_thread());
debug_printf("[Scheduler] Thread %d pausing itself\n", self->id);
scheduler_context.action_queue.enqueue(StopThreadAction{t}); {
} std::lock_guard lock{scheduler_context.mutex};
Multilibultra::set_self_paused(PASS_RDRAM1);
void Multilibultra::yield_self(RDRAM_ARG1) { scheduler_context.to_stop.push_back(self);
OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread()); scheduler_context.action_count.fetch_add(1);
debug_printf("[Thread] Thread %d yielding itself\n", self->id); scheduler_context.action_count.notify_all();
}
scheduler_context.action_queue.enqueue(YieldedThreadAction{ self }); Multilibultra::wait_for_resumed(PASS_RDRAM1);
}
void Multilibultra::block_self(RDRAM_ARG1) {
OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread());
debug_printf("[Thread] Thread %d has been blocked\n", self->id);
scheduler_context.action_queue.enqueue(BlockedThreadAction{ self });
}
void Multilibultra::unblock_thread(OSThread *t) {
debug_printf("[Thread] Unblocking thread %d\n", t->id);
scheduler_context.action_queue.enqueue(UnblockThreadAction{ t });
}
void halt_self(RDRAM_ARG1) {
OSThread* self = TO_PTR(OSThread, Multilibultra::this_thread());
debug_printf("[Thread] Thread %d pausing itself\n", self->id);
stop_thread(self);
yield_self(PASS_RDRAM1);
wait_for_resumed(PASS_RDRAM1);
} }
void cleanup_thread(OSThread *t) { void cleanup_thread(OSThread *t) {
scheduler_context.action_queue.enqueue(CleanupThreadAction{t}); std::lock_guard lock{scheduler_context.mutex};
scheduler_context.to_cleanup.push_back(t);
scheduler_context.action_count.fetch_add(1);
scheduler_context.action_count.notify_all();
}
void disable_preemption() {
scheduler_context.premption_mutex.lock();
if (Multilibultra::is_game_thread()) {
scheduler_context.can_preempt = false;
}
}
void enable_preemption() {
if (Multilibultra::is_game_thread()) {
scheduler_context.can_preempt = true;
}
#pragma warning(push)
#pragma warning( disable : 26110)
scheduler_context.premption_mutex.unlock();
#pragma warning( pop )
}
// lock's constructor is called first, so can_preempt is set after locking
preemption_guard::preemption_guard() : lock{scheduler_context.premption_mutex} {
scheduler_context.can_preempt = false;
}
// lock's destructor is called last, so can_preempt is set before unlocking
preemption_guard::~preemption_guard() {
scheduler_context.can_preempt = true;
} }
void notify_scheduler() { void notify_scheduler() {
scheduler_context.action_queue.enqueue(NotifySchedulerAction{}); std::lock_guard lock{scheduler_context.mutex};
} scheduler_context.notify_count.fetch_add(1);
scheduler_context.action_count.fetch_add(1);
void resume_thread_impl(OSThread* t) { scheduler_context.action_count.notify_all();
if (t->state == OSThreadState::PREEMPTED) {
// Nothing to do here
}
t->state = OSThreadState::RUNNING;
debug_printf("[Scheduler] Set thread %d to RUNNING\n", t->id);
t->context->scheduled.store(true);
t->context->scheduled.notify_all();
} }
} }
extern "C" void pause_self(uint8_t* rdram) { extern "C" void pause_self(uint8_t* rdram) {
Multilibultra::halt_self(rdram); Multilibultra::pause_self(rdram);
} }

View File

@ -131,6 +131,7 @@ static void _thread_func(RDRAM_ARG PTR(OSThread) self_, PTR(thread_func_t) entry
debug_printf("[Thread] Thread waiting to be started: %d\n", self->id); debug_printf("[Thread] Thread waiting to be started: %d\n", self->id);
// Wait until the thread is marked as running. // Wait until the thread is marked as running.
Multilibultra::set_self_paused(PASS_RDRAM1);
Multilibultra::wait_for_resumed(PASS_RDRAM1); Multilibultra::wait_for_resumed(PASS_RDRAM1);
debug_printf("[Thread] Thread started: %d\n", self->id); debug_printf("[Thread] Thread started: %d\n", self->id);
@ -150,7 +151,7 @@ extern "C" void osStartThread(RDRAM_ARG PTR(OSThread) t_) {
OSThread* t = TO_PTR(OSThread, t_); OSThread* t = TO_PTR(OSThread, t_);
debug_printf("[os] Start Thread %d\n", t->id); debug_printf("[os] Start Thread %d\n", t->id);
// Wait until the thread is initialized to indicate that it's queued to be started. // Wait until the thread is initialized to indicate that it's action_queued to be started.
t->context->initialized.wait(false); t->context->initialized.wait(false);
debug_printf("[os] Thread %d is ready to be started\n", t->id); debug_printf("[os] Thread %d is ready to be started\n", t->id);
@ -175,33 +176,20 @@ extern "C" void osCreateThread(RDRAM_ARG PTR(OSThread) t_, OSId id, PTR(thread_f
t->next = NULLPTR; t->next = NULLPTR;
t->priority = pri; t->priority = pri;
t->id = id; t->id = id;
t->state = OSThreadState::STOPPED; t->state = OSThreadState::PAUSED;
t->sp = sp - 0x10; // Set up the first stack frame t->sp = sp - 0x10; // Set up the first stack frame
t->destroyed = false; t->destroyed = false;
// Spawn a new thread, which will immediately pause itself and wait until it's been started. // Spawn a new thread, which will immediately pause itself and wait until it's been started.
t->context = new UltraThreadContext{}; t->context = new UltraThreadContext{};
t->context->initialized.store(false); t->context->initialized.store(false);
t->context->scheduled.store(false); t->context->running.store(false);
t->context->descheduled.store(true);
t->context->host_thread = std::thread{_thread_func, PASS_RDRAM t_, entrypoint, arg}; t->context->host_thread = std::thread{_thread_func, PASS_RDRAM t_, entrypoint, arg};
} }
extern "C" void osStopThread(RDRAM_ARG PTR(OSThread) t_) { extern "C" void osStopThread(RDRAM_ARG PTR(OSThread) t_) {
// If null is passed in as the thread then the calling thread is stopping itself. assert(false);
if (t_ == NULLPTR) {
t_ = Multilibultra::this_thread();
}
// Remove the thread in question from the scheduler so it doesn't get scheduled again.
OSThread* t = TO_PTR(OSThread, t_);
Multilibultra::stop_thread(t);
// If a thread is stopping itself, tell the scheduler that it has yielded.
if (t_ == Multilibultra::this_thread()) {
Multilibultra::yield_self(PASS_RDRAM1);
}
} }
extern "C" void osDestroyThread(RDRAM_ARG PTR(OSThread) t_) { extern "C" void osDestroyThread(RDRAM_ARG PTR(OSThread) t_) {
@ -217,12 +205,6 @@ extern "C" void osDestroyThread(RDRAM_ARG PTR(OSThread) t_) {
} }
} }
// TODO make the thread queue stable to ensure correct yielding behavior
extern "C" void osYieldThread(RDRAM_ARG1) {
Multilibultra::yield_self(PASS_RDRAM1);
Multilibultra::wait_for_resumed(PASS_RDRAM1);
}
extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri) { extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri) {
if (t == NULLPTR) { if (t == NULLPTR) {
t = thread_self; t = thread_self;
@ -230,12 +212,13 @@ extern "C" void osSetThreadPri(RDRAM_ARG PTR(OSThread) t, OSPri pri) {
bool pause_self = false; bool pause_self = false;
if (pri > TO_PTR(OSThread, thread_self)->priority) { if (pri > TO_PTR(OSThread, thread_self)->priority) {
pause_self = true; pause_self = true;
Multilibultra::set_self_paused(PASS_RDRAM1);
} else if (t == thread_self && pri < TO_PTR(OSThread, thread_self)->priority) { } else if (t == thread_self && pri < TO_PTR(OSThread, thread_self)->priority) {
pause_self = true; pause_self = true;
Multilibultra::set_self_paused(PASS_RDRAM1);
} }
Multilibultra::reprioritize_thread(TO_PTR(OSThread, t), pri); Multilibultra::reprioritize_thread(TO_PTR(OSThread, t), pri);
if (pause_self) { if (pause_self) {
Multilibultra::yield_self(PASS_RDRAM1);
Multilibultra::wait_for_resumed(PASS_RDRAM1); Multilibultra::wait_for_resumed(PASS_RDRAM1);
} }
} }
@ -254,6 +237,15 @@ extern "C" OSId osGetThreadId(RDRAM_ARG PTR(OSThread) t) {
return TO_PTR(OSThread, t)->id; return TO_PTR(OSThread, t)->id;
} }
// TODO yield thread, need a stable priority queue in the scheduler
void Multilibultra::set_self_paused(RDRAM_ARG1) {
debug_printf("[Thread] Thread pausing itself: %d\n", TO_PTR(OSThread, thread_self)->id);
TO_PTR(OSThread, thread_self)->state = OSThreadState::PAUSED;
TO_PTR(OSThread, thread_self)->context->running.store(false);
TO_PTR(OSThread, thread_self)->context->running.notify_all();
}
void check_destroyed(OSThread* t) { void check_destroyed(OSThread* t) {
if (t->destroyed) { if (t->destroyed) {
throw thread_terminated{}; throw thread_terminated{};
@ -262,13 +254,25 @@ void check_destroyed(OSThread* t) {
void Multilibultra::wait_for_resumed(RDRAM_ARG1) { void Multilibultra::wait_for_resumed(RDRAM_ARG1) {
check_destroyed(TO_PTR(OSThread, thread_self)); check_destroyed(TO_PTR(OSThread, thread_self));
//TO_PTR(OSThread, thread_self)->context->descheduled.wait(false); TO_PTR(OSThread, thread_self)->context->running.wait(false);
//TO_PTR(OSThread, thread_self)->context->descheduled.store(false);
TO_PTR(OSThread, thread_self)->context->scheduled.wait(false);
TO_PTR(OSThread, thread_self)->context->scheduled.store(false);
check_destroyed(TO_PTR(OSThread, thread_self)); check_destroyed(TO_PTR(OSThread, thread_self));
} }
void Multilibultra::pause_thread_impl(OSThread* t) {
t->state = OSThreadState::PREEMPTED;
t->context->running.store(false);
t->context->running.notify_all();
}
void Multilibultra::resume_thread_impl(OSThread *t) {
if (t->state == OSThreadState::PREEMPTED) {
// Nothing to do here
}
t->state = OSThreadState::RUNNING;
t->context->running.store(true);
t->context->running.notify_all();
}
PTR(OSThread) Multilibultra::this_thread() { PTR(OSThread) Multilibultra::this_thread() {
return thread_self; return thread_self;
} }

View File

@ -29,7 +29,8 @@ extern "C" void osDestroyThread_recomp(uint8_t * rdram, recomp_context * ctx) {
} }
extern "C" void osYieldThread_recomp(uint8_t * rdram, recomp_context * ctx) { extern "C" void osYieldThread_recomp(uint8_t * rdram, recomp_context * ctx) {
osYieldThread(rdram); assert(false);
// osYieldThread(rdram);
} }
extern "C" void osSetThreadPri_recomp(uint8_t* rdram, recomp_context* ctx) { extern "C" void osSetThreadPri_recomp(uint8_t* rdram, recomp_context* ctx) {