queue.h (7938B)
1 #ifndef QUEUE_H 2 #define QUEUE_H 3 4 #define _GNU_SOURCE 1 5 6 #include <stdalign.h> 7 #include <stddef.h> 8 #include <stdint.h> 9 #include <unistd.h> 10 11 #include <sys/mman.h> 12 13 #include "assert.h" 14 #include "utils.h" 15 16 #define PAGESZ_4K KiB(4) 17 #define PAGESZ_2M MiB(2) 18 #define PAGESZ_1G GiB(1) 19 20 #define HW_CACHELINE_SZ 64 21 22 /* This helper will map a memory region of the given size and pagesize 23 * alignment (optionally at a given base address). It will then map a given 24 * number of "mirrors", contiguous in the virtual address space but mapping 25 * the same physical address space. This allows implementing efficient 26 * circular buffers without needing any special logic in the consumer. 27 */ 28 inline void * 29 mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot) 30 { 31 ASSERT(IS_ALIGNED(size, PAGESZ_4K)); 32 ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); 33 34 size_t mirrors_size = mirrors * size; 35 size_t placeholder_size = (alignment - 1) + mirrors_size; 36 37 int mirror_flags = MAP_SHARED | MAP_FIXED; 38 int placeholder_flags = MAP_PRIVATE | MAP_ANONYMOUS; 39 40 if (base) { 41 placeholder_flags |= MAP_FIXED; 42 } 43 44 /* overallocate initial placeholder mapping */ 45 void *placeholder = mmap(base, placeholder_size, prot, placeholder_flags, -1, 0); 46 if (placeholder == MAP_FAILED) 47 return NULL; 48 49 /* align within placeholder region, and unmap excess regions */ 50 uintptr_t placeholder_ptr = (uintptr_t) placeholder; 51 uintptr_t placeholder_end = placeholder_ptr + placeholder_size; 52 uintptr_t aligned_ptr = ALIGN_NEXT(placeholder_ptr, alignment); 53 uintptr_t aligned_end = aligned_ptr + mirrors_size; 54 55 if (placeholder_ptr < aligned_ptr) 56 munmap((void *) placeholder_ptr, aligned_ptr - placeholder_ptr); 57 munmap((void *) aligned_end, placeholder_end - aligned_end); 58 59 /* create shared memory file and map mirrors */ 60 int fd = memfd_create("mirrormap", MFD_CLOEXEC); 61 if (fd == -1) 62 goto error; 63 64 ftruncate(fd, mirrors_size); 65 66 for (uintptr_t ptr = aligned_ptr; ptr < aligned_end; ptr += size) { 67 void *mirror = mmap((void *) ptr, size, prot, mirror_flags, fd, 0); 68 ASSERT(mirror != MAP_FAILED); 69 } 70 71 madvise((void *) aligned_ptr, mirrors_size, MADV_HUGEPAGE); 72 73 /* shared memory file can now be closed, and pointer to first mirror returned */ 74 close(fd); 75 76 return (void *) aligned_ptr; 77 78 error: 79 munmap(placeholder, placeholder_size); 80 81 if (fd != -1) 82 close(fd); 83 84 return NULL; 85 } 86 87 /* queue 88 * --------------------------------------------------------------------------- 89 * An unsynchronised, zero-copy, circular queue. 90 */ 91 92 struct queue { 93 void *ptr; 94 size_t cap, mask; 95 96 size_t head; 97 size_t tail; 98 }; 99 100 inline int 101 queue_init(struct queue *queue, size_t capacity, size_t alignment) 102 { 103 ASSERT(IS_POW2(capacity)); 104 ASSERT(IS_ALIGNED(capacity, PAGESZ_4K)); 105 ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); 106 107 queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE); 108 if (!queue->ptr) 109 return -1; 110 111 queue->cap = capacity; 112 queue->mask = capacity - 1; 113 114 queue->head = queue->tail = 0; 115 116 return 0; 117 } 118 119 inline void 120 queue_free(struct queue *queue) 121 { 122 munmap(queue->ptr, queue->cap * 2); 123 } 124 125 inline size_t 126 queue_length(struct queue *queue) 127 { 128 return queue->head - queue->tail; 129 } 130 131 inline size_t 132 queue_capacity(struct queue *queue) 133 { 134 return queue->cap - queue_length(queue); 135 } 136 137 inline void * 138 queue_write(struct queue *queue, size_t len) 139 { 140 if (queue_capacity(queue) < len) 141 return NULL; 142 143 size_t off = queue->head & queue->mask; 144 uintptr_t ptr = (uintptr_t) queue->ptr + off; 145 146 return (void *) ptr; 147 } 148 149 inline void 150 queue_write_commit(struct queue *queue, size_t len) 151 { 152 ASSERT(len <= queue_capacity(queue)); 153 154 queue->head += len; 155 } 156 157 inline void * 158 queue_read(struct queue *queue, size_t len) 159 { 160 if (queue_length(queue) < len) 161 return NULL; 162 163 size_t off = queue->tail & queue->mask; 164 uintptr_t ptr = (uintptr_t) queue->ptr + off; 165 166 return (void *) ptr; 167 } 168 169 inline void 170 queue_read_commit(struct queue *queue, size_t len) 171 { 172 ASSERT(len <= queue_length(queue)); 173 174 queue->tail += len; 175 } 176 177 /* spsc queue 178 * --------------------------------------------------------------------------- 179 * A lockless, zero-copy, circular queue. 180 */ 181 182 #include <stdatomic.h> 183 184 struct spsc_queue { 185 void *ptr; 186 size_t cap, mask; 187 188 // writer cacheline state 189 alignas(HW_CACHELINE_SZ) atomic_size_t head; 190 size_t cached_tail; 191 192 // reader cacheline state 193 alignas(HW_CACHELINE_SZ) atomic_size_t tail; 194 size_t cached_head; 195 }; 196 197 inline int 198 spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment) 199 { 200 ASSERT(IS_POW2(capacity)); 201 ASSERT(IS_ALIGNED(capacity, PAGESZ_4K)); 202 ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); 203 204 queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE); 205 if (!queue->ptr) 206 return -1; 207 208 queue->cap = capacity; 209 queue->mask = capacity - 1; 210 211 queue->head = queue->tail = 0; 212 queue->cached_head = queue->cached_tail = 0; 213 214 return 0; 215 } 216 217 inline void 218 spsc_queue_free(struct spsc_queue *queue) 219 { 220 munmap(queue->ptr, queue->cap * 2); 221 } 222 223 inline size_t 224 spsc_queue_length(struct spsc_queue *queue) 225 { 226 size_t head = atomic_load_explicit(&queue->head, memory_order_acquire); 227 size_t tail = atomic_load_explicit(&queue->tail, memory_order_acquire); 228 return head - tail; 229 } 230 231 inline size_t 232 spsc_queue_capacity(struct spsc_queue *queue) 233 { 234 return queue->cap - spsc_queue_length(queue); 235 } 236 237 inline void * 238 spsc_queue_write(struct spsc_queue *queue, size_t len) 239 { 240 size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed); 241 if (queue->cap - (head - queue->cached_tail) < len) { 242 queue->cached_tail = atomic_load_explicit(&queue->tail, memory_order_acquire); 243 if (queue->cap - (head - queue->cached_tail) < len) 244 return NULL; 245 } 246 247 size_t off = head & queue->mask; 248 uintptr_t ptr = (uintptr_t) queue->ptr + off; 249 250 return (void *) ptr; 251 } 252 253 inline void 254 spsc_queue_write_commit(struct spsc_queue *queue, size_t len) 255 { 256 size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed); 257 atomic_store_explicit(&queue->head, head + len, memory_order_release); 258 } 259 260 inline void * 261 spsc_queue_read(struct spsc_queue *queue, size_t len) 262 { 263 size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed); 264 if (queue->cached_head - tail < len) { 265 queue->cached_head = atomic_load_explicit(&queue->head, memory_order_acquire); 266 if (queue->cached_head - tail < len) 267 return NULL; 268 } 269 270 size_t off = tail & queue->mask; 271 uintptr_t ptr = (uintptr_t) queue->ptr + off; 272 273 return (void *) ptr; 274 } 275 276 inline void 277 spsc_queue_read_commit(struct spsc_queue *queue, size_t len) 278 { 279 size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed); 280 atomic_store_explicit(&queue->tail, tail + len, memory_order_release); 281 } 282 283 #endif /* QUEUE_H */ 284 285 #ifdef HEADER_IMPL 286 287 extern inline void * 288 mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot); 289 290 extern inline int 291 queue_init(struct queue *queue, size_t capacity, size_t alignment); 292 293 extern inline void 294 queue_free(struct queue *queue); 295 296 extern inline size_t 297 queue_length(struct queue *queue); 298 299 extern inline size_t 300 queue_capacity(struct queue *queue); 301 302 extern inline void * 303 queue_write(struct queue *queue, size_t len); 304 305 extern inline void 306 queue_write_commit(struct queue *queue, size_t len); 307 308 extern inline void * 309 queue_read(struct queue *queue, size_t len); 310 311 extern inline void 312 queue_read_commit(struct queue *queue, size_t len); 313 314 extern inline int 315 spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment); 316 317 extern inline void 318 spsc_queue_free(struct spsc_queue *queue); 319 320 extern inline size_t 321 spsc_queue_length(struct spsc_queue *queue); 322 323 extern inline size_t 324 spsc_queue_capacity(struct spsc_queue *queue); 325 326 extern inline void * 327 spsc_queue_write(struct spsc_queue *queue, size_t len); 328 329 extern inline void 330 spsc_queue_write_commit(struct spsc_queue *queue, size_t len); 331 332 extern inline void * 333 spsc_queue_read(struct spsc_queue *queue, size_t len); 334 335 extern inline void 336 spsc_queue_read_commit(struct spsc_queue *queue, size_t len); 337 338 #endif /* HEADER_IMPL */