toys

toys.git
git clone git://git.lenczewski.org/toys.git
Log | Files | Refs | README | LICENSE

queue.h (7938B)


      1 #ifndef QUEUE_H
      2 #define QUEUE_H
      3 
      4 #define _GNU_SOURCE 1
      5 
      6 #include <stdalign.h>
      7 #include <stddef.h>
      8 #include <stdint.h>
      9 #include <unistd.h>
     10 
     11 #include <sys/mman.h>
     12 
     13 #include "assert.h"
     14 #include "utils.h"
     15 
     16 #define PAGESZ_4K KiB(4)
     17 #define PAGESZ_2M MiB(2)
     18 #define PAGESZ_1G GiB(1)
     19 
     20 #define HW_CACHELINE_SZ 64
     21 
     22 /* This helper will map a memory region of the given size and pagesize
     23  * alignment (optionally at a given base address). It will then map a given
     24  * number of "mirrors", contiguous in the virtual address space but mapping
     25  * the same physical address space. This allows implementing efficient
     26  * circular buffers without needing any special logic in the consumer.
     27  */
     28 inline void *
     29 mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot)
     30 {
     31 	ASSERT(IS_ALIGNED(size, PAGESZ_4K));
     32 	ASSERT(IS_ALIGNED(alignment, PAGESZ_4K));
     33 
     34 	size_t mirrors_size = mirrors * size;
     35 	size_t placeholder_size = (alignment - 1) + mirrors_size;
     36 
     37 	int mirror_flags = MAP_SHARED | MAP_FIXED;
     38 	int placeholder_flags = MAP_PRIVATE | MAP_ANONYMOUS;
     39 
     40 	if (base) {
     41 		placeholder_flags |= MAP_FIXED;
     42 	}
     43 
     44 	/* overallocate initial placeholder mapping */
     45 	void *placeholder = mmap(base, placeholder_size, prot, placeholder_flags, -1, 0);
     46 	if (placeholder == MAP_FAILED)
     47 		return NULL;
     48 
     49 	/* align within placeholder region, and unmap excess regions */
     50 	uintptr_t placeholder_ptr = (uintptr_t) placeholder;
     51 	uintptr_t placeholder_end = placeholder_ptr + placeholder_size;
     52 	uintptr_t aligned_ptr = ALIGN_NEXT(placeholder_ptr, alignment);
     53 	uintptr_t aligned_end = aligned_ptr + mirrors_size;
     54 
     55 	if (placeholder_ptr < aligned_ptr)
     56 		munmap((void *) placeholder_ptr, aligned_ptr - placeholder_ptr);
     57 	munmap((void *) aligned_end, placeholder_end - aligned_end);
     58 
     59 	/* create shared memory file and map mirrors */
     60 	int fd = memfd_create("mirrormap", MFD_CLOEXEC);
     61 	if (fd == -1)
     62 		goto error;
     63 
     64 	ftruncate(fd, mirrors_size);
     65 
     66 	for (uintptr_t ptr = aligned_ptr; ptr < aligned_end; ptr += size) {
     67 		void *mirror = mmap((void *) ptr, size, prot, mirror_flags, fd, 0);
     68 		ASSERT(mirror != MAP_FAILED);
     69 	}
     70 
     71 	madvise((void *) aligned_ptr, mirrors_size, MADV_HUGEPAGE);
     72 
     73 	/* shared memory file can now be closed, and pointer to first mirror returned */
     74 	close(fd);
     75 
     76 	return (void *) aligned_ptr;
     77 
     78 error:
     79 	munmap(placeholder, placeholder_size);
     80 
     81 	if (fd != -1)
     82 		close(fd);
     83 
     84 	return NULL;
     85 }
     86 
     87 /* queue
     88  * ---------------------------------------------------------------------------
     89  *  An unsynchronised, zero-copy, circular queue.
     90  */
     91 
     92 struct queue {
     93 	void *ptr;
     94 	size_t cap, mask;
     95 
     96 	size_t head;
     97 	size_t tail;
     98 };
     99 
    100 inline int
    101 queue_init(struct queue *queue, size_t capacity, size_t alignment)
    102 {
    103 	ASSERT(IS_POW2(capacity));
    104 	ASSERT(IS_ALIGNED(capacity, PAGESZ_4K));
    105 	ASSERT(IS_ALIGNED(alignment, PAGESZ_4K));
    106 
    107 	queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE);
    108 	if (!queue->ptr)
    109 		return -1;
    110 
    111 	queue->cap = capacity;
    112 	queue->mask = capacity - 1;
    113 
    114 	queue->head = queue->tail = 0;
    115 
    116 	return 0;
    117 }
    118 
    119 inline void
    120 queue_free(struct queue *queue)
    121 {
    122 	munmap(queue->ptr, queue->cap * 2);
    123 }
    124 
    125 inline size_t
    126 queue_length(struct queue *queue)
    127 {
    128 	return queue->head - queue->tail;
    129 }
    130 
    131 inline size_t
    132 queue_capacity(struct queue *queue)
    133 {
    134 	return queue->cap - queue_length(queue);
    135 }
    136 
    137 inline void *
    138 queue_write(struct queue *queue, size_t len)
    139 {
    140 	if (queue_capacity(queue) < len)
    141 		return NULL;
    142 
    143 	size_t off = queue->head & queue->mask;
    144 	uintptr_t ptr = (uintptr_t) queue->ptr + off;
    145 
    146 	return (void *) ptr;
    147 }
    148 
    149 inline void
    150 queue_write_commit(struct queue *queue, size_t len)
    151 {
    152 	ASSERT(len <= queue_capacity(queue));
    153 
    154 	queue->head += len;
    155 }
    156 
    157 inline void *
    158 queue_read(struct queue *queue, size_t len)
    159 {
    160 	if (queue_length(queue) < len)
    161 		return NULL;
    162 
    163 	size_t off = queue->tail & queue->mask;
    164 	uintptr_t ptr = (uintptr_t) queue->ptr + off;
    165 
    166 	return (void *) ptr;
    167 }
    168 
    169 inline void
    170 queue_read_commit(struct queue *queue, size_t len)
    171 {
    172 	ASSERT(len <= queue_length(queue));
    173 
    174 	queue->tail += len;
    175 }
    176 
    177 /* spsc queue
    178  * ---------------------------------------------------------------------------
    179  *  A lockless, zero-copy, circular queue.
    180  */
    181 
    182 #include <stdatomic.h>
    183 
    184 struct spsc_queue {
    185 	void *ptr;
    186 	size_t cap, mask;
    187 
    188 	// writer cacheline state
    189 	alignas(HW_CACHELINE_SZ) atomic_size_t head;
    190 	size_t cached_tail;
    191 
    192 	// reader cacheline state
    193 	alignas(HW_CACHELINE_SZ) atomic_size_t tail;
    194 	size_t cached_head;
    195 };
    196 
    197 inline int
    198 spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment)
    199 {
    200 	ASSERT(IS_POW2(capacity));
    201 	ASSERT(IS_ALIGNED(capacity, PAGESZ_4K));
    202 	ASSERT(IS_ALIGNED(alignment, PAGESZ_4K));
    203 
    204 	queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE);
    205 	if (!queue->ptr)
    206 		return -1;
    207 
    208 	queue->cap = capacity;
    209 	queue->mask = capacity - 1;
    210 
    211 	queue->head = queue->tail = 0;
    212 	queue->cached_head = queue->cached_tail = 0;
    213 
    214 	return 0;
    215 }
    216 
    217 inline void
    218 spsc_queue_free(struct spsc_queue *queue)
    219 {
    220 	munmap(queue->ptr, queue->cap * 2);
    221 }
    222 
    223 inline size_t
    224 spsc_queue_length(struct spsc_queue *queue)
    225 {
    226 	size_t head = atomic_load_explicit(&queue->head, memory_order_acquire);
    227 	size_t tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
    228 	return head - tail;
    229 }
    230 
    231 inline size_t
    232 spsc_queue_capacity(struct spsc_queue *queue)
    233 {
    234 	return queue->cap - spsc_queue_length(queue);
    235 }
    236 
    237 inline void *
    238 spsc_queue_write(struct spsc_queue *queue, size_t len)
    239 {
    240 	size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed);
    241 	if (queue->cap - (head - queue->cached_tail) < len) {
    242 		queue->cached_tail = atomic_load_explicit(&queue->tail, memory_order_acquire);
    243 		if (queue->cap - (head - queue->cached_tail) < len)
    244 			return NULL;
    245 	}
    246 
    247 	size_t off = head & queue->mask;
    248 	uintptr_t ptr = (uintptr_t) queue->ptr + off;
    249 
    250 	return (void *) ptr;
    251 }
    252 
    253 inline void
    254 spsc_queue_write_commit(struct spsc_queue *queue, size_t len)
    255 {
    256 	size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed);
    257 	atomic_store_explicit(&queue->head, head + len, memory_order_release);
    258 }
    259 
    260 inline void *
    261 spsc_queue_read(struct spsc_queue *queue, size_t len)
    262 {
    263 	size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed);
    264 	if (queue->cached_head - tail < len) {
    265 		queue->cached_head = atomic_load_explicit(&queue->head, memory_order_acquire);
    266 		if (queue->cached_head - tail < len)
    267 			return NULL;
    268 	}
    269 
    270 	size_t off = tail & queue->mask;
    271 	uintptr_t ptr = (uintptr_t) queue->ptr + off;
    272 
    273 	return (void *) ptr;
    274 }
    275 
    276 inline void
    277 spsc_queue_read_commit(struct spsc_queue *queue, size_t len)
    278 {
    279 	size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed);
    280 	atomic_store_explicit(&queue->tail, tail + len, memory_order_release);
    281 }
    282 
    283 #endif /* QUEUE_H */
    284 
    285 #ifdef HEADER_IMPL
    286 
    287 extern inline void *
    288 mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot);
    289 
    290 extern inline int
    291 queue_init(struct queue *queue, size_t capacity, size_t alignment);
    292 
    293 extern inline void
    294 queue_free(struct queue *queue);
    295 
    296 extern inline size_t
    297 queue_length(struct queue *queue);
    298 
    299 extern inline size_t
    300 queue_capacity(struct queue *queue);
    301 
    302 extern inline void *
    303 queue_write(struct queue *queue, size_t len);
    304 
    305 extern inline void
    306 queue_write_commit(struct queue *queue, size_t len);
    307 
    308 extern inline void *
    309 queue_read(struct queue *queue, size_t len);
    310 
    311 extern inline void
    312 queue_read_commit(struct queue *queue, size_t len);
    313 
    314 extern inline int
    315 spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment);
    316 
    317 extern inline void
    318 spsc_queue_free(struct spsc_queue *queue);
    319 
    320 extern inline size_t
    321 spsc_queue_length(struct spsc_queue *queue);
    322 
    323 extern inline size_t
    324 spsc_queue_capacity(struct spsc_queue *queue);
    325 
    326 extern inline void *
    327 spsc_queue_write(struct spsc_queue *queue, size_t len);
    328 
    329 extern inline void
    330 spsc_queue_write_commit(struct spsc_queue *queue, size_t len);
    331 
    332 extern inline void *
    333 spsc_queue_read(struct spsc_queue *queue, size_t len);
    334 
    335 extern inline void
    336 spsc_queue_read_commit(struct spsc_queue *queue, size_t len);
    337 
    338 #endif /* HEADER_IMPL */