queue.h (8710B)
1 #ifndef QUEUE_H 2 #define QUEUE_H 3 4 #define _GNU_SOURCE 1 5 6 #include <stdalign.h> 7 #include <stddef.h> 8 #include <stdint.h> 9 #include <unistd.h> 10 11 #include <sys/mman.h> 12 13 #include "assert.h" 14 #include "utils.h" 15 16 #define PAGESZ_4K KiB(4) 17 #define PAGESZ_2M MiB(2) 18 #define PAGESZ_1G GiB(1) 19 20 #define HW_CACHELINE_SZ 64 21 22 /* This helper will map a memory region of the given size and pagesize 23 * alignment (optionally at a given base address). It will then map a given 24 * number of "mirrors", contiguous in the virtual address space but mapping 25 * the same physical address space. This allows implementing efficient 26 * circular buffers without needing any special logic in the consumer. 27 */ 28 inline void * 29 mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot) 30 { 31 ASSERT(IS_ALIGNED(size, PAGESZ_4K)); 32 ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); 33 34 size_t mirrors_size = mirrors * size; 35 size_t placeholder_size = (alignment - 1) + mirrors_size; 36 37 int mirror_flags = MAP_SHARED | MAP_FIXED; 38 int placeholder_flags = MAP_PRIVATE | MAP_ANONYMOUS; 39 40 if (base) { 41 placeholder_flags |= MAP_FIXED; 42 } 43 44 /* overallocate initial placeholder mapping */ 45 void *placeholder = mmap(base, placeholder_size, prot, placeholder_flags, -1, 0); 46 if (placeholder == MAP_FAILED) 47 return NULL; 48 49 /* align within placeholder region, and unmap excess regions */ 50 uintptr_t placeholder_ptr = (uintptr_t) placeholder; 51 uintptr_t placeholder_end = placeholder_ptr + placeholder_size; 52 uintptr_t aligned_ptr = ALIGN_NEXT(placeholder_ptr, alignment); 53 uintptr_t aligned_end = aligned_ptr + mirrors_size; 54 55 if (placeholder_ptr < aligned_ptr) 56 munmap((void *) placeholder_ptr, aligned_ptr - placeholder_ptr); 57 munmap((void *) aligned_end, placeholder_end - aligned_end); 58 59 /* create shared memory file and map mirrors */ 60 int fd = memfd_create("mirrormap", MFD_CLOEXEC); 61 if (fd == -1) 62 goto error; 63 64 ftruncate(fd, mirrors_size); 65 66 for (uintptr_t ptr = aligned_ptr; ptr < aligned_end; ptr += size) { 67 void *mirror = mmap((void *) ptr, size, prot, mirror_flags, fd, 0); 68 ASSERT(mirror != MAP_FAILED); 69 } 70 71 madvise((void *) aligned_ptr, mirrors_size, MADV_HUGEPAGE); 72 73 /* shared memory file can now be closed, and pointer to first mirror returned */ 74 close(fd); 75 76 return (void *) aligned_ptr; 77 78 error: 79 munmap(placeholder, placeholder_size); 80 81 if (fd != -1) 82 close(fd); 83 84 return NULL; 85 } 86 87 /* queue 88 * --------------------------------------------------------------------------- 89 * An unsynchronised, zero-copy, circular queue. 90 */ 91 92 struct queue { 93 void *ptr; 94 size_t cap, mask; 95 96 size_t head; 97 size_t tail; 98 }; 99 100 inline int 101 queue_init(struct queue *queue, size_t capacity, size_t alignment) 102 { 103 ASSERT(IS_POW2(capacity)); 104 ASSERT(IS_ALIGNED(capacity, PAGESZ_4K)); 105 ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); 106 107 queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE); 108 if (!queue->ptr) 109 return -1; 110 111 queue->cap = capacity; 112 queue->mask = capacity - 1; 113 114 queue->head = queue->tail = 0; 115 116 return 0; 117 } 118 119 inline void 120 queue_free(struct queue *queue) 121 { 122 munmap(queue->ptr, queue->cap * 2); 123 } 124 125 inline size_t 126 queue_length(struct queue *queue) 127 { 128 return queue->head - queue->tail; 129 } 130 131 inline size_t 132 queue_capacity(struct queue *queue) 133 { 134 return queue->cap - queue_length(queue); 135 } 136 137 inline void * 138 queue_write(struct queue *queue, size_t len) 139 { 140 if (queue_capacity(queue) < len) 141 return NULL; 142 143 size_t off = queue->head & queue->mask; 144 uintptr_t ptr = (uintptr_t) queue->ptr + off; 145 146 return (void *) ptr; 147 } 148 149 inline void 150 queue_write_commit(struct queue *queue, size_t len) 151 { 152 ASSERT(len <= queue_capacity(queue)); 153 154 queue->head += len; 155 } 156 157 inline void * 158 queue_read(struct queue *queue, size_t len) 159 { 160 if (queue_length(queue) < len) 161 return NULL; 162 163 size_t off = queue->tail & queue->mask; 164 uintptr_t ptr = (uintptr_t) queue->ptr + off; 165 166 return (void *) ptr; 167 } 168 169 inline void 170 queue_read_commit(struct queue *queue, size_t len) 171 { 172 ASSERT(len <= queue_length(queue)); 173 174 queue->tail += len; 175 } 176 177 /* spsc queue 178 * --------------------------------------------------------------------------- 179 * A lockless, zero-copy, circular queue. 180 */ 181 182 #include <stdatomic.h> 183 184 struct spsc_queue { 185 void *ptr; 186 size_t cap, mask; 187 188 // writer cacheline state 189 alignas(HW_CACHELINE_SZ) atomic_size_t head; 190 size_t cached_tail; 191 192 // reader cacheline state 193 alignas(HW_CACHELINE_SZ) atomic_size_t tail; 194 size_t cached_head; 195 }; 196 197 inline int 198 spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment) 199 { 200 ASSERT(IS_POW2(capacity)); 201 ASSERT(IS_ALIGNED(capacity, PAGESZ_4K)); 202 ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); 203 204 queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE); 205 if (!queue->ptr) 206 return -1; 207 208 queue->cap = capacity; 209 queue->mask = capacity - 1; 210 211 queue->head = queue->tail = 0; 212 queue->cached_head = queue->cached_tail = 0; 213 214 return 0; 215 } 216 217 inline void 218 spsc_queue_free(struct spsc_queue *queue) 219 { 220 munmap(queue->ptr, queue->cap * 2); 221 } 222 223 // NOTE: this (spsc_queue_length()) and its cousin (spsc_queue_capacity()) are 224 // not a great interface. For one, they are slow as molasses, and to be 225 // honest I'm neither sure of it being correct or it being useful (since 226 // in high-contention scenarios the length of the queue at the moment of 227 // calling the function will become stale pretty quickly). It might be 228 // far better to simply attempt a read or write, and detect a full queue 229 // by the return value (a return value of NULL means it was not possible 230 // to reserve a buffer of the given size). 231 // 232 // Included solely for "feature parity" with the unsynchronised queue 233 234 inline size_t 235 spsc_queue_length(struct spsc_queue *queue) 236 { 237 size_t head, tail; 238 239 do { 240 head = atomic_load_explicit(&queue->head, memory_order_acquire); 241 tail = atomic_load_explicit(&queue->tail, memory_order_acquire); 242 } while (atomic_load_explicit(&queue->head, memory_order_acquire) != head); 243 244 return head - tail; 245 } 246 247 inline size_t 248 spsc_queue_capacity(struct spsc_queue *queue) 249 { 250 return queue->cap - spsc_queue_length(queue); 251 } 252 253 inline void * 254 spsc_queue_write(struct spsc_queue *queue, size_t len) 255 { 256 size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed); 257 if (queue->cap - (head - queue->cached_tail) < len) { 258 queue->cached_tail = atomic_load_explicit(&queue->tail, memory_order_acquire); 259 if (queue->cap - (head - queue->cached_tail) < len) 260 return NULL; 261 } 262 263 size_t off = head & queue->mask; 264 uintptr_t ptr = (uintptr_t) queue->ptr + off; 265 266 return (void *) ptr; 267 } 268 269 inline void 270 spsc_queue_write_commit(struct spsc_queue *queue, size_t len) 271 { 272 size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed); 273 atomic_store_explicit(&queue->head, head + len, memory_order_release); 274 } 275 276 inline void * 277 spsc_queue_read(struct spsc_queue *queue, size_t len) 278 { 279 size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed); 280 if (queue->cached_head - tail < len) { 281 queue->cached_head = atomic_load_explicit(&queue->head, memory_order_acquire); 282 if (queue->cached_head - tail < len) 283 return NULL; 284 } 285 286 size_t off = tail & queue->mask; 287 uintptr_t ptr = (uintptr_t) queue->ptr + off; 288 289 return (void *) ptr; 290 } 291 292 inline void 293 spsc_queue_read_commit(struct spsc_queue *queue, size_t len) 294 { 295 size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed); 296 atomic_store_explicit(&queue->tail, tail + len, memory_order_release); 297 } 298 299 #endif /* QUEUE_H */ 300 301 #ifdef HEADER_IMPL 302 303 extern inline void * 304 mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot); 305 306 extern inline int 307 queue_init(struct queue *queue, size_t capacity, size_t alignment); 308 309 extern inline void 310 queue_free(struct queue *queue); 311 312 extern inline size_t 313 queue_length(struct queue *queue); 314 315 extern inline size_t 316 queue_capacity(struct queue *queue); 317 318 extern inline void * 319 queue_write(struct queue *queue, size_t len); 320 321 extern inline void 322 queue_write_commit(struct queue *queue, size_t len); 323 324 extern inline void * 325 queue_read(struct queue *queue, size_t len); 326 327 extern inline void 328 queue_read_commit(struct queue *queue, size_t len); 329 330 extern inline int 331 spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment); 332 333 extern inline void 334 spsc_queue_free(struct spsc_queue *queue); 335 336 extern inline size_t 337 spsc_queue_length(struct spsc_queue *queue); 338 339 extern inline size_t 340 spsc_queue_capacity(struct spsc_queue *queue); 341 342 extern inline void * 343 spsc_queue_write(struct spsc_queue *queue, size_t len); 344 345 extern inline void 346 spsc_queue_write_commit(struct spsc_queue *queue, size_t len); 347 348 extern inline void * 349 spsc_queue_read(struct spsc_queue *queue, size_t len); 350 351 extern inline void 352 spsc_queue_read_commit(struct spsc_queue *queue, size_t len); 353 354 #endif /* HEADER_IMPL */