mirror of
https://github.com/Ryujinx/SDL.git
synced 2025-06-22 07:07:56 +00:00
dataqueue: Make thread safe.
Each data queue gets its own mutex and each function obtains it. Fixes #7390. (cherry picked from commit 8b9a9384132e39a49a14cf41b292018ce4f4f251)
This commit is contained in:
parent
f5bb286b76
commit
46945354cc
|
@ -33,6 +33,7 @@ typedef struct SDL_DataQueuePacket
|
||||||
|
|
||||||
struct SDL_DataQueue
|
struct SDL_DataQueue
|
||||||
{
|
{
|
||||||
|
SDL_mutex *lock;
|
||||||
SDL_DataQueuePacket *head; /* device fed from here. */
|
SDL_DataQueuePacket *head; /* device fed from here. */
|
||||||
SDL_DataQueuePacket *tail; /* queue fills to here. */
|
SDL_DataQueuePacket *tail; /* queue fills to here. */
|
||||||
SDL_DataQueuePacket *pool; /* these are unused packets. */
|
SDL_DataQueuePacket *pool; /* these are unused packets. */
|
||||||
|
@ -49,24 +50,26 @@ static void SDL_FreeDataQueueList(SDL_DataQueuePacket *packet)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* this all expects that you managed thread safety elsewhere. */
|
|
||||||
|
|
||||||
SDL_DataQueue *
|
SDL_DataQueue *
|
||||||
SDL_NewDataQueue(const size_t _packetlen, const size_t initialslack)
|
SDL_NewDataQueue(const size_t _packetlen, const size_t initialslack)
|
||||||
{
|
{
|
||||||
SDL_DataQueue *queue = (SDL_DataQueue *)SDL_malloc(sizeof(SDL_DataQueue));
|
SDL_DataQueue *queue = (SDL_DataQueue *)SDL_calloc(1, sizeof(SDL_DataQueue));
|
||||||
|
|
||||||
if (queue == NULL) {
|
if (queue == NULL) {
|
||||||
SDL_OutOfMemory();
|
SDL_OutOfMemory();
|
||||||
return NULL;
|
|
||||||
} else {
|
} else {
|
||||||
const size_t packetlen = _packetlen ? _packetlen : 1024;
|
const size_t packetlen = _packetlen ? _packetlen : 1024;
|
||||||
const size_t wantpackets = (initialslack + (packetlen - 1)) / packetlen;
|
const size_t wantpackets = (initialslack + (packetlen - 1)) / packetlen;
|
||||||
size_t i;
|
size_t i;
|
||||||
|
|
||||||
SDL_zerop(queue);
|
|
||||||
queue->packet_size = packetlen;
|
queue->packet_size = packetlen;
|
||||||
|
|
||||||
|
queue->lock = SDL_CreateMutex();
|
||||||
|
if (!queue->lock) {
|
||||||
|
SDL_free(queue);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
for (i = 0; i < wantpackets; i++) {
|
for (i = 0; i < wantpackets; i++) {
|
||||||
SDL_DataQueuePacket *packet = (SDL_DataQueuePacket *)SDL_malloc(sizeof(SDL_DataQueuePacket) + packetlen);
|
SDL_DataQueuePacket *packet = (SDL_DataQueuePacket *)SDL_malloc(sizeof(SDL_DataQueuePacket) + packetlen);
|
||||||
if (packet) { /* don't care if this fails, we'll deal later. */
|
if (packet) { /* don't care if this fails, we'll deal later. */
|
||||||
|
@ -86,6 +89,7 @@ void SDL_FreeDataQueue(SDL_DataQueue *queue)
|
||||||
if (queue) {
|
if (queue) {
|
||||||
SDL_FreeDataQueueList(queue->head);
|
SDL_FreeDataQueueList(queue->head);
|
||||||
SDL_FreeDataQueueList(queue->pool);
|
SDL_FreeDataQueueList(queue->pool);
|
||||||
|
SDL_DestroyMutex(queue->lock);
|
||||||
SDL_free(queue);
|
SDL_free(queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,6 +106,8 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_LockMutex(queue->lock);
|
||||||
|
|
||||||
packet = queue->head;
|
packet = queue->head;
|
||||||
|
|
||||||
/* merge the available pool and the current queue into one list. */
|
/* merge the available pool and the current queue into one list. */
|
||||||
|
@ -129,9 +135,12 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack)
|
||||||
queue->pool = NULL;
|
queue->pool = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_UnlockMutex(queue->lock);
|
||||||
|
|
||||||
SDL_FreeDataQueueList(packet); /* free extra packets */
|
SDL_FreeDataQueueList(packet); /* free extra packets */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* You must hold queue->lock before calling this! */
|
||||||
static SDL_DataQueuePacket *AllocateDataQueuePacket(SDL_DataQueue *queue)
|
static SDL_DataQueuePacket *AllocateDataQueuePacket(SDL_DataQueue *queue)
|
||||||
{
|
{
|
||||||
SDL_DataQueuePacket *packet;
|
SDL_DataQueuePacket *packet;
|
||||||
|
@ -178,6 +187,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
|
||||||
return SDL_InvalidParamError("queue");
|
return SDL_InvalidParamError("queue");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_LockMutex(queue->lock);
|
||||||
|
|
||||||
orighead = queue->head;
|
orighead = queue->head;
|
||||||
origtail = queue->tail;
|
origtail = queue->tail;
|
||||||
origlen = origtail ? origtail->datalen : 0;
|
origlen = origtail ? origtail->datalen : 0;
|
||||||
|
@ -201,6 +212,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
|
||||||
queue->tail = origtail;
|
queue->tail = origtail;
|
||||||
queue->pool = NULL;
|
queue->pool = NULL;
|
||||||
|
|
||||||
|
SDL_UnlockMutex(queue->lock);
|
||||||
SDL_FreeDataQueueList(packet); /* give back what we can. */
|
SDL_FreeDataQueueList(packet); /* give back what we can. */
|
||||||
return SDL_OutOfMemory();
|
return SDL_OutOfMemory();
|
||||||
}
|
}
|
||||||
|
@ -214,6 +226,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _
|
||||||
queue->queued_bytes += datalen;
|
queue->queued_bytes += datalen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_UnlockMutex(queue->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,6 +243,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_LockMutex(queue->lock);
|
||||||
|
|
||||||
for (packet = queue->head; len && packet; packet = packet->next) {
|
for (packet = queue->head; len && packet; packet = packet->next) {
|
||||||
const size_t avail = packet->datalen - packet->startpos;
|
const size_t avail = packet->datalen - packet->startpos;
|
||||||
const size_t cpy = SDL_min(len, avail);
|
const size_t cpy = SDL_min(len, avail);
|
||||||
|
@ -239,6 +255,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
|
||||||
len -= cpy;
|
len -= cpy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_UnlockMutex(queue->lock);
|
||||||
|
|
||||||
return (size_t)(ptr - buf);
|
return (size_t)(ptr - buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,6 +272,8 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_LockMutex(queue->lock);
|
||||||
|
|
||||||
while ((len > 0) && ((packet = queue->head) != NULL)) {
|
while ((len > 0) && ((packet = queue->head) != NULL)) {
|
||||||
const size_t avail = packet->datalen - packet->startpos;
|
const size_t avail = packet->datalen - packet->startpos;
|
||||||
const size_t cpy = SDL_min(len, avail);
|
const size_t cpy = SDL_min(len, avail);
|
||||||
|
@ -279,13 +299,27 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len)
|
||||||
queue->tail = NULL; /* in case we drained the queue entirely. */
|
queue->tail = NULL; /* in case we drained the queue entirely. */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SDL_UnlockMutex(queue->lock);
|
||||||
|
|
||||||
return (size_t)(ptr - buf);
|
return (size_t)(ptr - buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t
|
size_t
|
||||||
SDL_CountDataQueue(SDL_DataQueue *queue)
|
SDL_CountDataQueue(SDL_DataQueue *queue)
|
||||||
{
|
{
|
||||||
return queue ? queue->queued_bytes : 0;
|
size_t retval = 0;
|
||||||
|
if (queue) {
|
||||||
|
SDL_LockMutex(queue->lock);
|
||||||
|
retval = queue->queued_bytes;
|
||||||
|
SDL_UnlockMutex(queue->lock);
|
||||||
|
}
|
||||||
|
return retval;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDL_mutex *
|
||||||
|
SDL_GetDataQueueMutex(SDL_DataQueue *queue)
|
||||||
|
{
|
||||||
|
return queue ? queue->lock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* vi: set ts=4 sw=4 expandtab: */
|
/* vi: set ts=4 sw=4 expandtab: */
|
||||||
|
|
|
@ -33,6 +33,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *data, const size_t le
|
||||||
size_t SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
|
size_t SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
|
||||||
size_t SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
|
size_t SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *buf, const size_t len);
|
||||||
size_t SDL_CountDataQueue(SDL_DataQueue *queue);
|
size_t SDL_CountDataQueue(SDL_DataQueue *queue);
|
||||||
|
SDL_mutex *SDL_GetDataQueueMutex(SDL_DataQueue *queue); /* don't destroy this, obviously. */
|
||||||
|
|
||||||
#endif /* SDL_dataqueue_h_ */
|
#endif /* SDL_dataqueue_h_ */
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue