toys

toys.git
git clone git://git.lenczewski.org/toys.git
Log | Files | Refs | README | LICENSE

commit babca69b3485ac8d3434490773c07419849e62b0
parent 457012a136f9a1c00d34fa74cf0ecfa6cf5d8e22
Author: MikoĊ‚aj Lenczewski <mblenczewski@gmail.com>
Date:   Mon, 13 Oct 2025 00:31:45 +0100

Add queue implementation

Diffstat:
Mallocators.c | 4++--
Mallocators.h | 27++++++++++++++++-----------
Mbuild.sh | 2+-
Aqueue.c | 123+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aqueue.h | 338+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Autils.h | 19+++++++++++++++++++
6 files changed, 499 insertions(+), 14 deletions(-)

diff --git a/allocators.c b/allocators.c @@ -8,8 +8,8 @@ struct mystruct { int a, b, c; }; -struct alignas(64) mystruct2 { - char buf[17]; +struct mystruct2 { + alignas(64) char buf[17]; }; static void diff --git a/allocators.h b/allocators.h @@ -6,14 +6,7 @@ #include <stdint.h> #include "assert.h" - -/* helper macros to allow us to use bitwise tricks to quickly and efficiently - * calculate aligned addresses and sizes. - */ -#define IS_POW2(v) (((v) & ((v) - 1)) == 0) -#define IS_ALIGNED(v, align) (((v) & ((align) - 1)) == 0) -#define ALIGN_PREV(v, align) ((v) & ~((align) - 1)) -#define ALIGN_NEXT(v, align) ALIGN_PREV(((v) + ((align) - 1)), (align)) +#include "utils.h" /* arena allocator * --------------------------------------------------------------------------- @@ -189,12 +182,11 @@ pool_free(struct pool_allocator *allocator, void *res) allocator->freelist.next = block; } -/* heap allocator +/* freelist-based heap allocator * --------------------------------------------------------------------------- * A general-purpose allocator. Supports allocations of arbitrary size, and * with power-of-two alignments. Supports unordered frees. Suffers from - * internal fragmentation. Supports both a freelist implementation, and a - * red-black tree implementation. + * internal fragmentation. */ struct freelist_allocator_header { @@ -338,6 +330,19 @@ freelist_free(struct freelist_allocator *allocator, void *res) } } +/* rb-tree-based heap allocator + * --------------------------------------------------------------------------- + * A general-purpose allocator. Supports allocations of arbitrary size, and + * with power-of-two alignments. Supports unordered frees. Suffers less from + * internal fragmentation than a freelist implementation. Has bettern average + * time complexity than a freelist implementation. + */ + +struct rbtree_allocator { + void *ptr; + size_t cap; +}; + /* buddy allocator * --------------------------------------------------------------------------- * A block-based allocator. Supports allocating out of a tree of memory blocks, diff --git a/build.sh b/build.sh @@ -1,7 +1,7 @@ #!/bin/sh WARNINGS="-Wall -Wextra -Wno-format-pedantic -Wno-unused-variable" -FLAGS="$WARNINGS -std=c99 -O0 -g3" +FLAGS="$WARNINGS -std=c11 -O0 -g3" set -ex diff --git a/queue.c b/queue.c @@ -0,0 +1,123 @@ +#define HEADER_IMPL +#include "queue.h" + +#include <assert.h> +#include <pthread.h> +#include <stdio.h> +#include <string.h> +#include <time.h> + +static const size_t limit = 1000000000; + +static void * +bench_writer(void *data) +{ + struct spsc_queue *queue = data; + + for (size_t i = 0; i < limit; i++) { + size_t *ptr; + do { + ptr = spsc_queue_write(queue, sizeof *ptr); + } while (!ptr); + + *ptr = i; + + spsc_queue_write_commit(queue, sizeof *ptr); + } + + return NULL; +} + +static void * +bench_reader(void *data) +{ + struct spsc_queue *queue = data; + + for (size_t i = 0; i < limit; i++) { + size_t *ptr; + do { + ptr = spsc_queue_read(queue, sizeof *ptr); + } while (!ptr); + + ASSERT(*ptr == i); + + spsc_queue_read_commit(queue, sizeof *ptr); + } + + return NULL; +} + +static void +benchmark(void) +{ + struct spsc_queue queue; + int res = spsc_queue_init(&queue, PAGESZ_4K, PAGESZ_4K); + assert(res == 0); + + struct timespec start, end; + + pthread_t writer_thread, reader_thread; + int reader = pthread_create(&reader_thread, NULL, bench_reader, &queue); + assert(reader == 0); + + int writer = pthread_create(&writer_thread, NULL, bench_writer, &queue); + assert(writer == 0); + + clock_gettime(CLOCK_MONOTONIC_RAW, &start); + + pthread_join(writer_thread, NULL); + pthread_join(reader_thread, NULL); + + clock_gettime(CLOCK_MONOTONIC_RAW, &end); + + uint64_t ns = ((end.tv_sec - start.tv_sec) * 1000000000) + (end.tv_nsec - start.tv_nsec); + + printf("Iters: %zu\n", limit); + printf("Elapsed time: ms: %lu, ns: %lu\n", ns / 1000000, ns); + printf("Ops/sec: %lu\n", (limit * 1000000000) / ns); +} + +int +main(void) +{ + int *foo = mirrormap(NULL, PAGESZ_4K, PAGESZ_4K, 2, PROT_READ | PROT_WRITE); + int *bar = (void *) ((uintptr_t) foo + PAGESZ_4K); + + *foo = 42; + assert(*bar == 42); + + munmap(foo, 0); + + struct queue queue; + int res = queue_init(&queue, PAGESZ_4K, PAGESZ_4K); + assert(res == 0); + + assert(queue_length(&queue) == 0); + assert(queue_capacity(&queue) == PAGESZ_4K); + + char buf[] = "Hello, World!"; + + char *mydst = queue_write(&queue, sizeof buf); + assert(mydst); + queue_write_commit(&queue, sizeof buf); + + strcpy(mydst, buf); + + assert(queue_length(&queue) == sizeof buf); + assert(queue_capacity(&queue) == (PAGESZ_4K - sizeof buf)); + + char *mysrc = queue_read(&queue, sizeof buf); + assert(mysrc); + queue_read_commit(&queue, sizeof buf); + + printf("%s\n", mysrc); + + assert(queue_length(&queue) == 0); + assert(queue_capacity(&queue) == PAGESZ_4K); + + queue_free(&queue); + + benchmark(); + + return 0; +} diff --git a/queue.h b/queue.h @@ -0,0 +1,338 @@ +#ifndef QUEUE_H +#define QUEUE_H + +#define _GNU_SOURCE 1 + +#include <stdalign.h> +#include <stddef.h> +#include <stdint.h> +#include <unistd.h> + +#include <sys/mman.h> + +#include "assert.h" +#include "utils.h" + +#define PAGESZ_4K KiB(4) +#define PAGESZ_2M MiB(2) +#define PAGESZ_1G GiB(1) + +#define HW_CACHELINE_SZ 64 + +/* This helper will map a memory region of the given size and pagesize + * alignment (optionally at a given base address). It will then map a given + * number of "mirrors", contiguous in the virtual address space but mapping + * the same physical address space. This allows implementing efficient + * circular buffers without needing any special logic in the consumer. + */ +inline void * +mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot) +{ + ASSERT(IS_ALIGNED(size, PAGESZ_4K)); + ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); + + size_t mirrors_size = mirrors * size; + size_t placeholder_size = (alignment - 1) + mirrors_size; + + int mirror_flags = MAP_SHARED | MAP_FIXED; + int placeholder_flags = MAP_PRIVATE | MAP_ANONYMOUS; + + if (base) { + placeholder_flags |= MAP_FIXED; + } + + /* overallocate initial placeholder mapping */ + void *placeholder = mmap(base, placeholder_size, prot, placeholder_flags, -1, 0); + if (placeholder == MAP_FAILED) + return NULL; + + /* align within placeholder region, and unmap excess regions */ + uintptr_t placeholder_ptr = (uintptr_t) placeholder; + uintptr_t placeholder_end = placeholder_ptr + placeholder_size; + uintptr_t aligned_ptr = ALIGN_NEXT(placeholder_ptr, alignment); + uintptr_t aligned_end = aligned_ptr + mirrors_size; + + if (placeholder_ptr < aligned_ptr) + munmap((void *) placeholder_ptr, aligned_ptr - placeholder_ptr); + munmap((void *) aligned_end, placeholder_end - aligned_end); + + /* create shared memory file and map mirrors */ + int fd = memfd_create("mirrormap", MFD_CLOEXEC); + if (fd == -1) + goto error; + + ftruncate(fd, mirrors_size); + + for (uintptr_t ptr = aligned_ptr; ptr < aligned_end; ptr += size) { + void *mirror = mmap((void *) ptr, size, prot, mirror_flags, fd, 0); + ASSERT(mirror != MAP_FAILED); + } + + madvise((void *) aligned_ptr, mirrors_size, MADV_HUGEPAGE); + + /* shared memory file can now be closed, and pointer to first mirror returned */ + close(fd); + + return (void *) aligned_ptr; + +error: + munmap(placeholder, placeholder_size); + + if (fd != -1) + close(fd); + + return NULL; +} + +/* queue + * --------------------------------------------------------------------------- + * An unsynchronised, zero-copy, circular queue. + */ + +struct queue { + void *ptr; + size_t cap, mask; + + size_t head; + size_t tail; +}; + +inline int +queue_init(struct queue *queue, size_t capacity, size_t alignment) +{ + ASSERT(IS_POW2(capacity)); + ASSERT(IS_ALIGNED(capacity, PAGESZ_4K)); + ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); + + queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE); + if (!queue->ptr) + return -1; + + queue->cap = capacity; + queue->mask = capacity - 1; + + queue->head = queue->tail = 0; + + return 0; +} + +inline void +queue_free(struct queue *queue) +{ + munmap(queue->ptr, queue->cap * 2); +} + +inline size_t +queue_length(struct queue *queue) +{ + return queue->head - queue->tail; +} + +inline size_t +queue_capacity(struct queue *queue) +{ + return queue->cap - queue_length(queue); +} + +inline void * +queue_write(struct queue *queue, size_t len) +{ + if (queue_capacity(queue) < len) + return NULL; + + size_t off = queue->head & queue->mask; + uintptr_t ptr = (uintptr_t) queue->ptr + off; + + return (void *) ptr; +} + +inline void +queue_write_commit(struct queue *queue, size_t len) +{ + ASSERT(len <= queue_capacity(queue)); + + queue->head += len; +} + +inline void * +queue_read(struct queue *queue, size_t len) +{ + if (queue_length(queue) < len) + return NULL; + + size_t off = queue->tail & queue->mask; + uintptr_t ptr = (uintptr_t) queue->ptr + off; + + return (void *) ptr; +} + +inline void +queue_read_commit(struct queue *queue, size_t len) +{ + ASSERT(len <= queue_length(queue)); + + queue->tail += len; +} + +/* spsc queue + * --------------------------------------------------------------------------- + * A lockless, zero-copy, circular queue. + */ + +#include <stdatomic.h> + +struct spsc_queue { + void *ptr; + size_t cap, mask; + + // writer cacheline state + alignas(HW_CACHELINE_SZ) atomic_size_t head; + size_t cached_tail; + + // reader cacheline state + alignas(HW_CACHELINE_SZ) atomic_size_t tail; + size_t cached_head; +}; + +inline int +spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment) +{ + ASSERT(IS_POW2(capacity)); + ASSERT(IS_ALIGNED(capacity, PAGESZ_4K)); + ASSERT(IS_ALIGNED(alignment, PAGESZ_4K)); + + queue->ptr = mirrormap(NULL, capacity, alignment, 2, PROT_READ | PROT_WRITE); + if (!queue->ptr) + return -1; + + queue->cap = capacity; + queue->mask = capacity - 1; + + queue->head = queue->tail = 0; + queue->cached_head = queue->cached_tail = 0; + + return 0; +} + +inline void +spsc_queue_free(struct spsc_queue *queue) +{ + munmap(queue->ptr, queue->cap * 2); +} + +inline size_t +spsc_queue_length(struct spsc_queue *queue) +{ + size_t head = atomic_load_explicit(&queue->head, memory_order_acquire); + size_t tail = atomic_load_explicit(&queue->tail, memory_order_acquire); + return head - tail; +} + +inline size_t +spsc_queue_capacity(struct spsc_queue *queue) +{ + return queue->cap - spsc_queue_length(queue); +} + +inline void * +spsc_queue_write(struct spsc_queue *queue, size_t len) +{ + size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed); + if (queue->cap - (head - queue->cached_tail) < len) { + queue->cached_tail = atomic_load_explicit(&queue->tail, memory_order_acquire); + if (queue->cap - (head - queue->cached_tail) < len) + return NULL; + } + + size_t off = head & queue->mask; + uintptr_t ptr = (uintptr_t) queue->ptr + off; + + return (void *) ptr; +} + +inline void +spsc_queue_write_commit(struct spsc_queue *queue, size_t len) +{ + size_t head = atomic_load_explicit(&queue->head, memory_order_relaxed); + atomic_store_explicit(&queue->head, head + len, memory_order_release); +} + +inline void * +spsc_queue_read(struct spsc_queue *queue, size_t len) +{ + size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed); + if (queue->cached_head - tail < len) { + queue->cached_head = atomic_load_explicit(&queue->head, memory_order_acquire); + if (queue->cached_head - tail < len) + return NULL; + } + + size_t off = tail & queue->mask; + uintptr_t ptr = (uintptr_t) queue->ptr + off; + + return (void *) ptr; +} + +inline void +spsc_queue_read_commit(struct spsc_queue *queue, size_t len) +{ + size_t tail = atomic_load_explicit(&queue->tail, memory_order_relaxed); + atomic_store_explicit(&queue->tail, tail + len, memory_order_release); +} + +#endif /* QUEUE_H */ + +#ifdef HEADER_IMPL + +extern inline void * +mirrormap(void *base, size_t size, size_t alignment, size_t mirrors, int prot); + +extern inline int +queue_init(struct queue *queue, size_t capacity, size_t alignment); + +extern inline void +queue_free(struct queue *queue); + +extern inline size_t +queue_length(struct queue *queue); + +extern inline size_t +queue_capacity(struct queue *queue); + +extern inline void * +queue_write(struct queue *queue, size_t len); + +extern inline void +queue_write_commit(struct queue *queue, size_t len); + +extern inline void * +queue_read(struct queue *queue, size_t len); + +extern inline void +queue_read_commit(struct queue *queue, size_t len); + +extern inline int +spsc_queue_init(struct spsc_queue *queue, size_t capacity, size_t alignment); + +extern inline void +spsc_queue_free(struct spsc_queue *queue); + +extern inline size_t +spsc_queue_length(struct spsc_queue *queue); + +extern inline size_t +spsc_queue_capacity(struct spsc_queue *queue); + +extern inline void * +spsc_queue_write(struct spsc_queue *queue, size_t len); + +extern inline void +spsc_queue_write_commit(struct spsc_queue *queue, size_t len); + +extern inline void * +spsc_queue_read(struct spsc_queue *queue, size_t len); + +extern inline void +spsc_queue_read_commit(struct spsc_queue *queue, size_t len); + +#endif /* HEADER_IMPL */ diff --git a/utils.h b/utils.h @@ -0,0 +1,19 @@ +#ifndef UTILS_H +#define UTILS_H + +#define KiB(v) (1024ull * (v)) +#define MiB(v) (1024 * KiB(v)) +#define GiB(v) (1024 * MiB(v)) +#define TiB(v) (1024 * GiB(v)) + +#define ARRLEN(arr) (sizeof (arr) / sizeof (arr)[0]) + +/* helper macros to allow us to use bitwise tricks to quickly and efficiently + * calculate aligned addresses and sizes. + */ +#define IS_POW2(v) (((v) & ((v) - 1)) == 0) +#define IS_ALIGNED(v, align) (((v) & ((align) - 1)) == 0) +#define ALIGN_PREV(v, align) ((v) & ~((align) - 1)) +#define ALIGN_NEXT(v, align) ALIGN_PREV(((v) + ((align) - 1)), (align)) + +#endif /* UTILS_H */