189 lines
6.4 KiB
C++
189 lines
6.4 KiB
C++
#include <thread>
|
|
|
|
#include "blockingconcurrentqueue.h"
|
|
|
|
#include "ultra64.h"
|
|
#include "ultramodern.hpp"
|
|
#include "recomp.h"
|
|
|
|
struct QueuedMessage {
|
|
PTR(OSMesgQueue) mq;
|
|
OSMesg mesg;
|
|
bool jam;
|
|
};
|
|
|
|
static moodycamel::BlockingConcurrentQueue<QueuedMessage> external_messages {};
|
|
|
|
void enqueue_external_message(PTR(OSMesgQueue) mq, OSMesg msg, bool jam) {
|
|
external_messages.enqueue({mq, msg, jam});
|
|
}
|
|
|
|
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block);
|
|
|
|
void dequeue_external_messages(RDRAM_ARG1) {
|
|
QueuedMessage to_send;
|
|
while (external_messages.try_dequeue(to_send)) {
|
|
do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false);
|
|
}
|
|
}
|
|
|
|
void ultramodern::wait_for_external_message(RDRAM_ARG1) {
|
|
QueuedMessage to_send;
|
|
external_messages.wait_dequeue(to_send);
|
|
do_send(PASS_RDRAM to_send.mq, to_send.mesg, to_send.jam, false);
|
|
}
|
|
|
|
extern "C" void osCreateMesgQueue(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg, s32 count) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
mq->blocked_on_recv = NULLPTR;
|
|
mq->blocked_on_send = NULLPTR;
|
|
mq->msgCount = count;
|
|
mq->msg = msg;
|
|
mq->validCount = 0;
|
|
mq->first = 0;
|
|
}
|
|
|
|
s32 MQ_GET_COUNT(OSMesgQueue *mq) {
|
|
return mq->validCount;
|
|
}
|
|
|
|
s32 MQ_IS_EMPTY(OSMesgQueue *mq) {
|
|
return mq->validCount == 0;
|
|
}
|
|
|
|
s32 MQ_IS_FULL(OSMesgQueue* mq) {
|
|
return MQ_GET_COUNT(mq) >= mq->msgCount;
|
|
}
|
|
|
|
bool do_send(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, bool jam, bool block) {
|
|
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
|
|
if (!block) {
|
|
// If non-blocking, fail if the queue is full.
|
|
if (MQ_IS_FULL(mq)) {
|
|
return false;
|
|
}
|
|
}
|
|
else {
|
|
// Otherwise, yield this thread until the queue has room.
|
|
while (MQ_IS_FULL(mq)) {
|
|
debug_printf("[Message Queue] Thread %d is blocked on send\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
|
|
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_send), ultramodern::this_thread());
|
|
ultramodern::run_next_thread_and_wait(PASS_RDRAM1);
|
|
}
|
|
}
|
|
|
|
if (jam) {
|
|
// Jams insert at the head of the message queue's buffer.
|
|
mq->first = (mq->first + mq->msgCount - 1) % mq->msgCount;
|
|
TO_PTR(OSMesg, mq->msg)[mq->first] = msg;
|
|
mq->validCount++;
|
|
}
|
|
else {
|
|
// Sends insert at the tail of the message queue's buffer.
|
|
s32 last = (mq->first + mq->validCount) % mq->msgCount;
|
|
TO_PTR(OSMesg, mq->msg)[last] = msg;
|
|
mq->validCount++;
|
|
}
|
|
|
|
// If any threads were blocked on receiving from this message queue, pop the first one and schedule it.
|
|
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv);
|
|
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
|
|
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool do_recv(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, bool block) {
|
|
OSMesgQueue* mq = TO_PTR(OSMesgQueue, mq_);
|
|
if (!block) {
|
|
// If non-blocking, fail if the queue is empty
|
|
if (MQ_IS_EMPTY(mq)) {
|
|
return false;
|
|
}
|
|
} else {
|
|
// Otherwise, yield this thread in a loop until the queue is no longer full
|
|
while (MQ_IS_EMPTY(mq)) {
|
|
debug_printf("[Message Queue] Thread %d is blocked on receive\n", TO_PTR(OSThread, ultramodern::this_thread())->id);
|
|
ultramodern::thread_queue_insert(PASS_RDRAM GET_MEMBER(OSMesgQueue, mq_, blocked_on_recv), ultramodern::this_thread());
|
|
ultramodern::run_next_thread_and_wait(PASS_RDRAM1);
|
|
}
|
|
}
|
|
|
|
if (msg_ != NULLPTR) {
|
|
*TO_PTR(OSMesg, msg_) = TO_PTR(OSMesg, mq->msg)[mq->first];
|
|
}
|
|
|
|
mq->first = (mq->first + 1) % mq->msgCount;
|
|
mq->validCount--;
|
|
|
|
// If any threads were blocked on sending to this message queue, pop the first one and schedule it.
|
|
PTR(PTR(OSThread)) blocked_queue = GET_MEMBER(OSMesgQueue, mq_, blocked_on_send);
|
|
if (!ultramodern::thread_queue_empty(PASS_RDRAM blocked_queue)) {
|
|
ultramodern::schedule_running_thread(PASS_RDRAM ultramodern::thread_queue_pop(PASS_RDRAM blocked_queue));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
extern "C" s32 osSendMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
bool jam = false;
|
|
|
|
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
|
|
if (!ultramodern::is_game_thread()) {
|
|
enqueue_external_message(mq_, msg, jam);
|
|
return 0;
|
|
}
|
|
|
|
// Handle any messages that have been received from an external thread.
|
|
dequeue_external_messages(PASS_RDRAM1);
|
|
|
|
// Try to send the message.
|
|
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
|
|
|
|
// Check the queue to see if this thread should swap execution to another.
|
|
ultramodern::check_running_queue(PASS_RDRAM1);
|
|
|
|
return sent ? 0 : -1;
|
|
}
|
|
|
|
extern "C" s32 osJamMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, OSMesg msg, s32 flags) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
bool jam = true;
|
|
|
|
// Don't directly send to the message queue if this isn't a game thread to avoid contention.
|
|
if (!ultramodern::is_game_thread()) {
|
|
enqueue_external_message(mq_, msg, jam);
|
|
return 0;
|
|
}
|
|
|
|
// Handle any messages that have been received from an external thread.
|
|
dequeue_external_messages(PASS_RDRAM1);
|
|
|
|
// Try to send the message.
|
|
bool sent = do_send(PASS_RDRAM mq_, msg, jam, flags == OS_MESG_BLOCK);
|
|
|
|
// Check the queue to see if this thread should swap execution to another.
|
|
ultramodern::check_running_queue(PASS_RDRAM1);
|
|
|
|
return sent ? 0 : -1;
|
|
}
|
|
|
|
extern "C" s32 osRecvMesg(RDRAM_ARG PTR(OSMesgQueue) mq_, PTR(OSMesg) msg_, s32 flags) {
|
|
OSMesgQueue *mq = TO_PTR(OSMesgQueue, mq_);
|
|
|
|
assert(ultramodern::is_game_thread() && "RecvMesg not allowed outside of game threads.");
|
|
|
|
// Handle any messages that have been received from an external thread.
|
|
dequeue_external_messages(PASS_RDRAM1);
|
|
|
|
// Try to receive a message.
|
|
bool received = do_recv(PASS_RDRAM mq_, msg_, flags == OS_MESG_BLOCK);
|
|
|
|
// Check the queue to see if this thread should swap execution to another.
|
|
ultramodern::check_running_queue(PASS_RDRAM1);
|
|
|
|
return received ? 0 : -1;
|
|
}
|