• 正文
    • 設(shè)計(jì)思想
    • 代碼實(shí)現(xiàn)
  • 相關(guān)推薦
申請入駐 產(chǎn)業(yè)圖譜

如何設(shè)計(jì)實(shí)現(xiàn)一個(gè)無鎖高并發(fā)的環(huán)形連續(xù)內(nèi)存緩沖隊(duì)列

05/01 08:25
373
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點(diǎn)資訊討論

隊(duì)列,也稱作FIFO,常用數(shù)據(jù)結(jié)構(gòu)之一,特點(diǎn)是先進(jìn)先出。

隊(duì)列是一種特殊的線性表,特殊之處在于它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作,和棧一樣,隊(duì)列是一種操作受限制的線性表。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。

設(shè)計(jì)思想

首先從理想的無限緩沖區(qū)到實(shí)際的使用進(jìn)行設(shè)計(jì)思考。

理想化的無限緩沖區(qū)

在理想情況下,寫入器(數(shù)據(jù)生產(chǎn)者)和讀取器(數(shù)據(jù)消費(fèi)者)在使用過程中,能夠訪問相同的、連續(xù)的、并且是無限的緩沖區(qū)。寫入器和讀取器各自保存一個(gè)索引,分別指向緩沖區(qū)中寫和讀的位置,即與之對齊的“寫”和“讀”指針開始進(jìn)行操作。

當(dāng)寫入器想要在末端加入數(shù)據(jù)時(shí),它會(huì)將數(shù)據(jù)追加到“寫索引”后面的位置,之后將“寫索引”移動(dòng)到新寫數(shù)據(jù)塊的末尾。而讀取器在頂端讀取數(shù)據(jù)時(shí),如果“寫索引”比“讀索引”位置大時(shí),讀寫器就可以對已有數(shù)據(jù)進(jìn)行讀取操作。完成之后,讀寫器將“讀索引”移動(dòng)到讀取數(shù)據(jù)塊的末尾,以跟蹤記錄已經(jīng)處理至緩沖區(qū)的哪一部分。

讀取器永遠(yuǎn)不會(huì)試圖讀取超過“寫索引”位置的數(shù)據(jù),因?yàn)椴荒鼙WC那里有有效的數(shù)據(jù),即寫入器在那里寫入了東西。這也意味著“讀索引”永遠(yuǎn)不能超過“寫索引”。目前為止,我們假設(shè)這個(gè)理想內(nèi)存系統(tǒng)總是連貫一致的,也就是說一旦執(zhí)行了寫操作,數(shù)據(jù)可以立即地、順序地進(jìn)行讀取出來。

有界限的緩沖區(qū)

而現(xiàn)實(shí)中計(jì)算機(jī)并沒有神奇的無限緩沖區(qū)。我們必須分配有限的內(nèi)存空間,以供寫入器和讀取器之間進(jìn)行或許無限的使用。在循環(huán)緩沖區(qū)中,“寫索引”可以在抵達(dá)緩沖區(qū)末尾時(shí)跨越緩沖區(qū)的邊界回繞到緩沖區(qū)的開始位置。當(dāng)“寫索引”接近緩沖區(qū)末尾并且又有新數(shù)據(jù)輸入進(jìn)來時(shí),會(huì)將寫操作分成兩個(gè)數(shù)據(jù)塊:一個(gè)寫入緩沖區(qū)末尾剩余的空間,另一個(gè)將剩下的數(shù)據(jù)寫入緩沖區(qū)開頭。請注意,如果此時(shí)“讀索引”位置接近緩沖區(qū)開頭,那么這個(gè)寫入操作可能會(huì)破壞尚未被讀取器處理的數(shù)據(jù)。由于這個(gè)原因,“寫索引”在被回繞后不允許超過“讀索引”。

這樣下來,可能會(huì)得到兩種內(nèi)存分布情況:

  • “寫索引”在前面,“讀索引”在后面,即在索引移動(dòng)方向上,“寫索引”超前于“讀索引”,已寫入但未被讀取器處理的有效數(shù)據(jù)位于“讀索引”和“寫索引”之間的緩沖區(qū)內(nèi);
  • “讀索引”在前面,“寫索引”在后面,即在索引移動(dòng)方向上,“讀索引”超前于“寫索引”,有效數(shù)據(jù)開始于“讀索引”,結(jié)束于緩沖區(qū)末尾,并回繞到緩沖區(qū)開頭直至“寫索引”位置。

注意:上述第二種情況下,“寫索引”和“讀索引”可能存在重合,為了區(qū)分這兩種情況,一般規(guī)定第二種情況下“寫索引”和“讀索引”不允許重合,即“寫索引”位置必須落后于“讀索引”一步。

因此,在循環(huán)緩沖區(qū)中,不斷地從內(nèi)存分布情況 1 進(jìn)行到內(nèi)存分布情況 2,又從 2 再回到 1,如此循環(huán)往復(fù),當(dāng)“讀索引”到達(dá)緩沖區(qū)的末尾時(shí),也回繞到緩沖區(qū)開頭繼續(xù)進(jìn)行讀取。

并發(fā)性設(shè)計(jì)

通常在使用緩沖區(qū)隊(duì)列時(shí),讀數(shù)據(jù)和寫數(shù)據(jù)大部分情況都是多線程或者前后臺(tái)(中斷)分別處理的,為了減少寫入器、讀取器兩個(gè)線程之間或者前后臺(tái)系統(tǒng)之間需要發(fā)生的協(xié)調(diào),一種常見策略是,將讀寫變量獨(dú)立出來,分別由讀取器和寫入器進(jìn)行改變。這也簡化了并發(fā)性設(shè)計(jì)中的邏輯推理,因?yàn)檎l負(fù)責(zé)更改哪個(gè)變量總是很清楚。

讓我們從一個(gè)簡單的循環(huán)緩沖區(qū)開始,它具有原始的“寫索引”和“讀索引”。唯有寫入器能更改“寫索引”,而唯有讀取器能更改“讀索引”。

這樣就可以不用鎖進(jìn)行操作,提高效率。

如何保證地址的連續(xù)性

在上述提到的有界緩沖區(qū)內(nèi)存分布情況,第二種情況無法保證地址的連續(xù)性,因?yàn)橛行﹫鼍靶枰褂玫竭B續(xù)的內(nèi)存塊地址,解決這種場景的辦法有:可以對緩存區(qū)進(jìn)行分塊,每一塊固定的長度,即固定長度的隊(duì)列,這樣就能在一定程度上保證了地址的連續(xù)性。

代碼實(shí)現(xiàn)

隊(duì)列結(jié)構(gòu)體定義先定義

一個(gè)隊(duì)列結(jié)構(gòu)體,包含了每個(gè)塊的大小、數(shù)目、寫入塊索引、讀取塊索引等,為了解決“寫索引”和“讀索引”可能存在重合的兩種情況,加入狀態(tài)變量用來區(qū)分。

typedef?uint16_t?queuesize_t;

typedef?struct{
volatile?uint8_t?state;?/*!<?控制狀態(tài)?*/

queuesize_t?end;????????/*!<?循環(huán)隊(duì)列尾哨兵?*/

queuesize_t?head;???????/*!<?循環(huán)隊(duì)列首哨兵?*/

queuesize_t?num;????????/*!<?循環(huán)隊(duì)列中能存儲(chǔ)的最多組數(shù)?*/

queuesize_t?size;???????/*!<?單組內(nèi)存大小?*/

char??*pBufMem;?????????/*!<?Buf?指針?*/
}?QueueCtrl_t;

隊(duì)列初始化

定義結(jié)構(gòu)體后一定要對數(shù)據(jù)初始化,同時(shí)為接口提供一些入?yún)⒆兞吭O(shè)置隊(duì)列的信息進(jìn)行封裝,如緩存區(qū)地址、隊(duì)列的組數(shù)目、組內(nèi)存大小和是否內(nèi)存覆蓋等信息。

/**
*?@brief??????隊(duì)列控制初始化,?可采用的是動(dòng)態(tài)/靜態(tài)內(nèi)存分配方式
*
*?@param[in,out]?pCtrl?隊(duì)列控制句柄
*?@param[in]??buf??????buf?地址
*?@param[in]??queueNum?隊(duì)列數(shù)目大小
*?@param[in]??size?????隊(duì)列中單個(gè)內(nèi)存大小
*?@param[in]??isCover??false,不覆蓋;?true,隊(duì)列滿了覆蓋未讀取的最早舊數(shù)據(jù)
*?@return?????0,成功;?-1,失敗
*/
int?Queue_Init(QueueCtrl_t?*pCtrl,?const?void?*pBufMem,?queuesize_t?queueNum,?queuesize_t?size,?bool?isCover)
{
if?(pCtrl?==?NULL?||?pBufMem?==?NULL?||?queueNum?==?0?||?size?==?0)
{
return?-1;
}

pCtrl->end?????=?0;
pCtrl->head????=?0;
pCtrl->pBufMem?=?(char?*)pBufMem;
pCtrl->num??=?queueNum;
pCtrl->size????=?size;
pCtrl->state???=?0x00;

if?(isCover)
{
QUEUE_STATE_SET(pCtrl,?QUEUE_ENABLE_COVER);
}

return?0;
}

隊(duì)列寫操作

隊(duì)列都是在末端增加數(shù)據(jù),因?yàn)殛?duì)列組的大小已經(jīng)固定,因此在寫操作數(shù)據(jù)時(shí)可以省略新數(shù)據(jù)的內(nèi)存大小。

/**
*?@brief??????在隊(duì)列末尾加入新的數(shù)據(jù)
*
*?@param[in,out]?pCtrl?隊(duì)列控制句柄
*?@param[in]?????src???新的數(shù)據(jù)
*?@retval?????返回的值含義如下
*?????????????@arg?0:?寫入成功
*?????????????@arg?-1:?寫入失敗
*/
extern?int?Queue_Push(QueueCtrl_t?*pCtrl,?const?void?*src)
{
if?(pCtrl?==?NULL?||?src?==?NULL)
{
return?-1;
}

if?(IS_QUEUE_STATE_SET(pCtrl,?QUEUE_DISABLE_PUSH))
{
return?-1;
}

memcpy(&pCtrl->pBufMem[pCtrl->end?*?pCtrl->size],?src,?pCtrl->size);
pCtrl->end++;
QUEUE_STATE_SET(pCtrl,?QUEUE_EXIT_DATA);

if?((pCtrl->end)?>=?(pCtrl->num))
{
(pCtrl->end)?=?0;
}

if?(IS_QUEUE_STATE_SET(pCtrl,?QUEUE_DATA_FULL))
{
(pCtrl->head)?=?(pCtrl->end);
}
else?if?((pCtrl->end)?==?(pCtrl->head))
{
QUEUE_STATE_SET(pCtrl,?QUEUE_DATA_FULL);

if?(!IS_QUEUE_STATE_SET(pCtrl,?QUEUE_ENABLE_COVER))
{
QUEUE_STATE_SET(pCtrl,?QUEUE_DISABLE_PUSH);
}
}

return?0;
}

隊(duì)列讀操作

隊(duì)列都是在頂端讀取數(shù)據(jù),因?yàn)殛?duì)列組的大小已經(jīng)固定,因此在都操作數(shù)據(jù)時(shí)可以省略數(shù)據(jù)讀取存入的內(nèi)存大?。ū仨毐WC讀取的內(nèi)存大小足夠)。

/**
*?@brief??????讀取并彈出隊(duì)列頂端的數(shù)據(jù)
*
*?@param[in,out]?pCtrl?隊(duì)列控制句柄
*?@param[in]?????dest??讀取的數(shù)據(jù)
*?@retval?????返回的值含義如下
*?????????????@arg?0:?成功
*?????????????@arg?-1:?失敗
*/
int?Queue_Pop(QueueCtrl_t?*pCtrl,?void?*dest)
{
if?(pCtrl?==?NULL)
{
return?-1;
}

if?(!IS_QUEUE_STATE_SET(pCtrl,?QUEUE_EXIT_DATA))
{
return?-1;
}

memcpy((char?*)dest,?&pCtrl->pBufMem[pCtrl->head?*?pCtrl->size],?pCtrl->size);
pCtrl->head++;

if?((pCtrl->head)?>=?(pCtrl->num))
{
pCtrl->head?=?0;
}

if?((pCtrl->head)?==?(pCtrl->end))
{
if?(!IS_QUEUE_STATE_SET(pCtrl,?QUEUE_DATA_FULL))
{
QUEUE_STATE_CLEAR(pCtrl,?QUEUE_EXIT_DATA);
}
}

QUEUE_STATE_CLEAR(pCtrl,?QUEUE_DISABLE_PUSH);
QUEUE_STATE_CLEAR(pCtrl,?QUEUE_DATA_FULL);

return?0;
}

相關(guān)推薦