git: 90cd9c203eb5 - main - buf_ring: Use atomic accesses for head/tail values

From: Andrew Turner <andrew_at_FreeBSD.org>
Date: Mon, 30 Sep 2024 12:23:47 UTC
The branch main has been updated by andrew:

URL: https://cgit.FreeBSD.org/src/commit/?id=90cd9c203eb536581e19a6fdfe43e6dedb22089f

commit 90cd9c203eb536581e19a6fdfe43e6dedb22089f
Author:     Andrew Turner <andrew@FreeBSD.org>
AuthorDate: 2024-09-27 15:15:32 +0000
Commit:     Andrew Turner <andrew@FreeBSD.org>
CommitDate: 2024-09-30 12:04:24 +0000

    buf_ring: Use atomic accesses for head/tail values
    
    Use explicit atomic load/store operations for all producer and consumer
    head and tail accesses. This allows us to remove the volatile
    annotation from these variables.
    
    Reviewed by:    alc, imp, kib, markj
    Sponsored by:   Arm Ltd
    Differential Revision:  https://reviews.freebsd.org/D46380
---
 sys/sys/buf_ring.h | 59 ++++++++++++++++++++++++++++++------------------------
 1 file changed, 33 insertions(+), 26 deletions(-)

diff --git a/sys/sys/buf_ring.h b/sys/sys/buf_ring.h
index c99cf81d8b6d..9a4bfa9fb549 100644
--- a/sys/sys/buf_ring.h
+++ b/sys/sys/buf_ring.h
@@ -51,13 +51,13 @@
  *
  */
 struct buf_ring {
-	volatile uint32_t	br_prod_head;
-	volatile uint32_t	br_prod_tail;	
+	uint32_t		br_prod_head;
+	uint32_t		br_prod_tail;
 	int              	br_prod_size;
 	int              	br_prod_mask;
 	uint64_t		br_drops;
-	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
-	volatile uint32_t	br_cons_tail;
+	uint32_t		br_cons_head __aligned(CACHE_LINE_SIZE);
+	uint32_t		br_cons_tail;
 	int		 	br_cons_size;
 	int              	br_cons_mask;
 #if defined(DEBUG_BUFRING) && defined(_KERNEL)
@@ -83,10 +83,12 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
 	 * via drbr_peek(), and then re-added via drbr_putback() and
 	 * trigger a spurious panic.
 	 */
-	for (uint32_t i = br->br_cons_head; i != br->br_prod_head; i++)
+	for (uint32_t i = atomic_load_32(&br->br_cons_head);
+	    i != atomic_load_32(&br->br_prod_head); i++)
 		if (br->br_ring[i & mask] == buf)
 			panic("buf=%p already enqueue at %d prod=%d cons=%d",
-			    buf, i, br->br_prod_tail, br->br_cons_tail);
+			    buf, i, atomic_load_32(&br->br_prod_tail),
+			    atomic_load_32(&br->br_cons_tail));
 #endif	
 	critical_enter();
 	do {
@@ -106,8 +108,8 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
 
 		if ((int32_t)(cons_tail + br->br_prod_size - prod_next) < 1) {
 			rmb();
-			if (prod_head == br->br_prod_head &&
-			    cons_tail == br->br_cons_tail) {
+			if (prod_head == atomic_load_32(&br->br_prod_head) &&
+			    cons_tail == atomic_load_32(&br->br_cons_tail)) {
 				br->br_drops++;
 				critical_exit();
 				return (ENOBUFS);
@@ -127,7 +129,7 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
 	 * that preceded us, we need to wait for them
 	 * to complete 
 	 */   
-	while (br->br_prod_tail != prod_head)
+	while (atomic_load_32(&br->br_prod_tail) != prod_head)
 		cpu_spinwait();
 	atomic_store_rel_32(&br->br_prod_tail, prod_next);
 	critical_exit();
@@ -173,7 +175,7 @@ buf_ring_dequeue_mc(struct buf_ring *br)
 	 * that preceded us, we need to wait for them
 	 * to complete 
 	 */   
-	while (br->br_cons_tail != cons_head)
+	while (atomic_load_32(&br->br_cons_tail) != cons_head)
 		cpu_spinwait();
 
 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
@@ -195,7 +197,7 @@ buf_ring_dequeue_sc(struct buf_ring *br)
 	void *buf;
 
 	mask = br->br_cons_mask;
-	cons_head = br->br_cons_head;
+	cons_head = atomic_load_32(&br->br_cons_head);
 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
 
 	cons_next = cons_head + 1;
@@ -204,7 +206,7 @@ buf_ring_dequeue_sc(struct buf_ring *br)
 		return (NULL);
 
 	cons_idx = cons_head & mask;
-	br->br_cons_head = cons_next;
+	atomic_store_32(&br->br_cons_head, cons_next);
 	buf = br->br_ring[cons_idx];
 
 #ifdef DEBUG_BUFRING
@@ -213,9 +215,9 @@ buf_ring_dequeue_sc(struct buf_ring *br)
 	if (!mtx_owned(br->br_lock))
 		panic("lock not held on single consumer dequeue");
 #endif
-	if (br->br_cons_tail != cons_head)
+	if (atomic_load_32(&br->br_cons_tail) != cons_head)
 		panic("inconsistent list cons_tail=%d cons_head=%d",
-		    br->br_cons_tail, cons_head);
+		    atomic_load_32(&br->br_cons_tail), cons_head);
 #endif
 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
 	return (buf);
@@ -235,13 +237,13 @@ buf_ring_advance_sc(struct buf_ring *br)
 
 	mask = br->br_cons_mask;
 #endif
-	cons_head = br->br_cons_head;
-	prod_tail = br->br_prod_tail;
+	cons_head = atomic_load_32(&br->br_cons_head);
+	prod_tail = atomic_load_32(&br->br_prod_tail);
 
 	cons_next = cons_head + 1;
 	if (cons_head == prod_tail)
 		return;
-	br->br_cons_head = cons_next;
+	atomic_store_32(&br->br_cons_head, cons_next);
 #ifdef DEBUG_BUFRING
 	br->br_ring[cons_head & mask] = NULL;
 #endif
@@ -267,12 +269,13 @@ buf_ring_advance_sc(struct buf_ring *br)
 static __inline void
 buf_ring_putback_sc(struct buf_ring *br, void *new)
 {
-	uint32_t mask;
+	uint32_t cons_idx, mask;
 
 	mask = br->br_cons_mask;
-	KASSERT((br->br_cons_head & mask) != (br->br_prod_tail & mask),
+	cons_idx = atomic_load_32(&br->br_cons_head) & mask;
+	KASSERT(cons_idx != (atomic_load_32(&br->br_prod_tail) & mask),
 		("Buf-Ring has none in putback")) ;
-	br->br_ring[br->br_cons_head & mask] = new;
+	br->br_ring[cons_idx] = new;
 }
 
 /*
@@ -291,7 +294,7 @@ buf_ring_peek(struct buf_ring *br)
 #endif	
 	mask = br->br_cons_mask;
 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
-	cons_head = br->br_cons_head;
+	cons_head = atomic_load_32(&br->br_cons_head);
 
 	if (cons_head == prod_tail)
 		return (NULL);
@@ -312,7 +315,7 @@ buf_ring_peek_clear_sc(struct buf_ring *br)
 
 	mask = br->br_cons_mask;
 	prod_tail = atomic_load_acq_32(&br->br_prod_tail);
-	cons_head = br->br_cons_head;
+	cons_head = atomic_load_32(&br->br_cons_head);
 
 	if (cons_head == prod_tail)
 		return (NULL);
@@ -332,22 +335,26 @@ static __inline int
 buf_ring_full(struct buf_ring *br)
 {
 
-	return (br->br_prod_head == br->br_cons_tail + br->br_cons_size - 1);
+	return (atomic_load_32(&br->br_prod_head) ==
+	    atomic_load_32(&br->br_cons_tail) + br->br_cons_size - 1);
 }
 
 static __inline int
 buf_ring_empty(struct buf_ring *br)
 {
 
-	return (br->br_cons_head == br->br_prod_tail);
+	return (atomic_load_32(&br->br_cons_head) ==
+	    atomic_load_32(&br->br_prod_tail));
 }
 
 static __inline int
 buf_ring_count(struct buf_ring *br)
 {
+	uint32_t cons_tail, prod_tail;
 
-	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
-	    & br->br_prod_mask);
+	cons_tail = atomic_load_32(&br->br_cons_tail);
+	prod_tail = atomic_load_32(&br->br_prod_tail);
+	return ((br->br_prod_size + prod_tail - cons_tail) & br->br_prod_mask);
 }
 
 #ifdef _KERNEL