From 46945354cc81a761d508d27f7b866bd960c80ae6 Mon Sep 17 00:00:00 2001 From: "Ryan C. Gordon" Date: Thu, 2 Mar 2023 16:27:32 -0500 Subject: [PATCH] dataqueue: Make thread safe. Each data queue gets its own mutex and each function obtains it. Fixes #7390. (cherry picked from commit 8b9a9384132e39a49a14cf41b292018ce4f4f251) --- src/SDL_dataqueue.c | 46 +++++++++++++++++++++++++++++++++++++++------ src/SDL_dataqueue.h | 1 + 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/SDL_dataqueue.c b/src/SDL_dataqueue.c index 8abe7f7a9..4f63fcad9 100644 --- a/src/SDL_dataqueue.c +++ b/src/SDL_dataqueue.c @@ -33,6 +33,7 @@ typedef struct SDL_DataQueuePacket struct SDL_DataQueue { + SDL_mutex *lock; SDL_DataQueuePacket *head; /* device fed from here. */ SDL_DataQueuePacket *tail; /* queue fills to here. */ SDL_DataQueuePacket *pool; /* these are unused packets. */ @@ -49,24 +50,26 @@ static void SDL_FreeDataQueueList(SDL_DataQueuePacket *packet) } } -/* this all expects that you managed thread safety elsewhere. */ - SDL_DataQueue * SDL_NewDataQueue(const size_t _packetlen, const size_t initialslack) { - SDL_DataQueue *queue = (SDL_DataQueue *)SDL_malloc(sizeof(SDL_DataQueue)); + SDL_DataQueue *queue = (SDL_DataQueue *)SDL_calloc(1, sizeof(SDL_DataQueue)); if (queue == NULL) { SDL_OutOfMemory(); - return NULL; } else { const size_t packetlen = _packetlen ? _packetlen : 1024; const size_t wantpackets = (initialslack + (packetlen - 1)) / packetlen; size_t i; - SDL_zerop(queue); queue->packet_size = packetlen; + queue->lock = SDL_CreateMutex(); + if (!queue->lock) { + SDL_free(queue); + return NULL; + } + for (i = 0; i < wantpackets; i++) { SDL_DataQueuePacket *packet = (SDL_DataQueuePacket *)SDL_malloc(sizeof(SDL_DataQueuePacket) + packetlen); if (packet) { /* don't care if this fails, we'll deal later. */ @@ -86,6 +89,7 @@ void SDL_FreeDataQueue(SDL_DataQueue *queue) if (queue) { SDL_FreeDataQueueList(queue->head); SDL_FreeDataQueueList(queue->pool); + SDL_DestroyMutex(queue->lock); SDL_free(queue); } } @@ -102,6 +106,8 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack) return; } + SDL_LockMutex(queue->lock); + packet = queue->head; /* merge the available pool and the current queue into one list. */ @@ -129,9 +135,12 @@ void SDL_ClearDataQueue(SDL_DataQueue *queue, const size_t slack) queue->pool = NULL; } + SDL_UnlockMutex(queue->lock); + SDL_FreeDataQueueList(packet); /* free extra packets */ } +/* You must hold queue->lock before calling this! */ static SDL_DataQueuePacket *AllocateDataQueuePacket(SDL_DataQueue *queue) { SDL_DataQueuePacket *packet; @@ -178,6 +187,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _ return SDL_InvalidParamError("queue"); } + SDL_LockMutex(queue->lock); + orighead = queue->head; origtail = queue->tail; origlen = origtail ? origtail->datalen : 0; @@ -201,6 +212,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _ queue->tail = origtail; queue->pool = NULL; + SDL_UnlockMutex(queue->lock); SDL_FreeDataQueueList(packet); /* give back what we can. */ return SDL_OutOfMemory(); } @@ -214,6 +226,8 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *_data, const size_t _ queue->queued_bytes += datalen; } + SDL_UnlockMutex(queue->lock); + return 0; } @@ -229,6 +243,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len) return 0; } + SDL_LockMutex(queue->lock); + for (packet = queue->head; len && packet; packet = packet->next) { const size_t avail = packet->datalen - packet->startpos; const size_t cpy = SDL_min(len, avail); @@ -239,6 +255,8 @@ SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len) len -= cpy; } + SDL_UnlockMutex(queue->lock); + return (size_t)(ptr - buf); } @@ -254,6 +272,8 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len) return 0; } + SDL_LockMutex(queue->lock); + while ((len > 0) && ((packet = queue->head) != NULL)) { const size_t avail = packet->datalen - packet->startpos; const size_t cpy = SDL_min(len, avail); @@ -279,13 +299,27 @@ SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *_buf, const size_t _len) queue->tail = NULL; /* in case we drained the queue entirely. */ } + SDL_UnlockMutex(queue->lock); + return (size_t)(ptr - buf); } size_t SDL_CountDataQueue(SDL_DataQueue *queue) { - return queue ? queue->queued_bytes : 0; + size_t retval = 0; + if (queue) { + SDL_LockMutex(queue->lock); + retval = queue->queued_bytes; + SDL_UnlockMutex(queue->lock); + } + return retval; +} + +SDL_mutex * +SDL_GetDataQueueMutex(SDL_DataQueue *queue) +{ + return queue ? queue->lock : NULL; } /* vi: set ts=4 sw=4 expandtab: */ diff --git a/src/SDL_dataqueue.h b/src/SDL_dataqueue.h index 45ef607cf..aee2cda62 100644 --- a/src/SDL_dataqueue.h +++ b/src/SDL_dataqueue.h @@ -33,6 +33,7 @@ int SDL_WriteToDataQueue(SDL_DataQueue *queue, const void *data, const size_t le size_t SDL_ReadFromDataQueue(SDL_DataQueue *queue, void *buf, const size_t len); size_t SDL_PeekIntoDataQueue(SDL_DataQueue *queue, void *buf, const size_t len); size_t SDL_CountDataQueue(SDL_DataQueue *queue); +SDL_mutex *SDL_GetDataQueueMutex(SDL_DataQueue *queue); /* don't destroy this, obviously. */ #endif /* SDL_dataqueue_h_ */