Add batcher module.

This can be used to batch up simple operation commands for later use by another
thread.
This commit is contained in:
David Goldblatt 2024-01-22 15:34:58 -08:00 committed by David Goldblatt
parent 86f4851f5d
commit 70c94d7474
13 changed files with 404 additions and 9 deletions

View File

@ -98,6 +98,7 @@ C_SRCS := $(srcroot)src/jemalloc.c \
$(srcroot)src/arena.c \
$(srcroot)src/background_thread.c \
$(srcroot)src/base.c \
$(srcroot)src/batcher.c \
$(srcroot)src/bin.c \
$(srcroot)src/bin_info.c \
$(srcroot)src/bitmap.c \
@ -204,6 +205,7 @@ TESTS_UNIT := \
$(srcroot)test/unit/background_thread_enable.c \
$(srcroot)test/unit/base.c \
$(srcroot)test/unit/batch_alloc.c \
$(srcroot)test/unit/batcher.c \
$(srcroot)test/unit/binshard.c \
$(srcroot)test/unit/bitmap.c \
$(srcroot)test/unit/bit_util.c \

View File

@ -0,0 +1,44 @@
#ifndef JEMALLOC_INTERNAL_BATCHER_H
#define JEMALLOC_INTERNAL_BATCHER_H
#include "jemalloc/internal/jemalloc_preamble.h"
#include "jemalloc/internal/atomic.h"
#include "jemalloc/internal/mutex.h"
#define BATCHER_NO_IDX ((size_t)-1)
typedef struct batcher_s batcher_t;
struct batcher_s {
/*
* Optimize for locality -- nelems_max and nelems are always touched
* togehter, along with the front of the mutex. The end of the mutex is
* only touched if there's contention.
*/
atomic_zu_t nelems;
size_t nelems_max;
malloc_mutex_t mtx;
};
void batcher_init(batcher_t *batcher, size_t nelems_max);
/*
* Returns an index (into some user-owned array) to use for pushing, or
* BATCHER_NO_IDX if no index is free. If the former, the caller must call
* batcher_push_end once done.
*/
size_t batcher_push_begin(tsdn_t *tsdn, batcher_t *batcher,
size_t elems_to_push);
void batcher_push_end(tsdn_t *tsdn, batcher_t *batcher);
/*
* Returns the number of items to pop, or BATCHER_NO_IDX if there are none.
* If the former, must be followed by a call to batcher_pop_end.
*/
size_t batcher_pop_begin(tsdn_t *tsdn, batcher_t *batcher);
void batcher_pop_end(tsdn_t *tsdn, batcher_t *batcher);
void batcher_prefork(tsdn_t *tsdn, batcher_t *batcher);
void batcher_postfork_parent(tsdn_t *tsdn, batcher_t *batcher);
void batcher_postfork_child(tsdn_t *tsdn, batcher_t *batcher);
#endif /* JEMALLOC_INTERNAL_BATCHER_H */

View File

@ -64,9 +64,10 @@ enum witness_rank_e {
WITNESS_RANK_BASE,
WITNESS_RANK_ARENA_LARGE,
WITNESS_RANK_HOOK,
WITNESS_RANK_BIN,
WITNESS_RANK_LEAF=0x1000,
WITNESS_RANK_BIN = WITNESS_RANK_LEAF,
WITNESS_RANK_BATCHER=WITNESS_RANK_LEAF,
WITNESS_RANK_ARENA_STATS = WITNESS_RANK_LEAF,
WITNESS_RANK_COUNTER_ACCUM = WITNESS_RANK_LEAF,
WITNESS_RANK_DSS = WITNESS_RANK_LEAF,

View File

@ -38,6 +38,7 @@
<ClCompile Include="..\..\..\..\src\arena.c" />
<ClCompile Include="..\..\..\..\src\background_thread.c" />
<ClCompile Include="..\..\..\..\src\base.c" />
<ClCompile Include="..\..\..\..\src\batcher.c" />
<ClCompile Include="..\..\..\..\src\bin.c" />
<ClCompile Include="..\..\..\..\src\bin_info.c" />
<ClCompile Include="..\..\..\..\src\bitmap.c" />
@ -378,4 +379,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -16,6 +16,9 @@
<ClCompile Include="..\..\..\..\src\base.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\batcher.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\bin.c">
<Filter>Source Files</Filter>
</ClCompile>
@ -197,4 +200,4 @@
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
</Project>

View File

@ -38,6 +38,7 @@
<ClCompile Include="..\..\..\..\src\arena.c" />
<ClCompile Include="..\..\..\..\src\background_thread.c" />
<ClCompile Include="..\..\..\..\src\base.c" />
<ClCompile Include="..\..\..\..\src\batcher.c" />
<ClCompile Include="..\..\..\..\src\bin.c" />
<ClCompile Include="..\..\..\..\src\bin_info.c" />
<ClCompile Include="..\..\..\..\src\bitmap.c" />
@ -377,4 +378,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -16,6 +16,9 @@
<ClCompile Include="..\..\..\..\src\base.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\batcher.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\bin.c">
<Filter>Source Files</Filter>
</ClCompile>
@ -197,4 +200,4 @@
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
</Project>

View File

@ -38,6 +38,7 @@
<ClCompile Include="..\..\..\..\src\arena.c" />
<ClCompile Include="..\..\..\..\src\background_thread.c" />
<ClCompile Include="..\..\..\..\src\base.c" />
<ClCompile Include="..\..\..\..\src\batcher.c" />
<ClCompile Include="..\..\..\..\src\bin.c" />
<ClCompile Include="..\..\..\..\src\bin_info.c" />
<ClCompile Include="..\..\..\..\src\bitmap.c" />
@ -377,4 +378,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -16,6 +16,9 @@
<ClCompile Include="..\..\..\..\src\base.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\batcher.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\bin.c">
<Filter>Source Files</Filter>
</ClCompile>
@ -197,4 +200,4 @@
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
</Project>

View File

@ -38,6 +38,7 @@
<ClCompile Include="..\..\..\..\src\arena.c" />
<ClCompile Include="..\..\..\..\src\background_thread.c" />
<ClCompile Include="..\..\..\..\src\base.c" />
<ClCompile Include="..\..\..\..\src\batcher.c" />
<ClCompile Include="..\..\..\..\src\bin.c" />
<ClCompile Include="..\..\..\..\src\bin_info.c" />
<ClCompile Include="..\..\..\..\src\bitmap.c" />
@ -377,4 +378,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -16,6 +16,9 @@
<ClCompile Include="..\..\..\..\src\base.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\batcher.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\bin.c">
<Filter>Source Files</Filter>
</ClCompile>
@ -197,4 +200,4 @@
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
</Project>
</Project>

86
src/batcher.c Normal file
View File

@ -0,0 +1,86 @@
#include "jemalloc/internal/jemalloc_preamble.h"
#include "jemalloc/internal/batcher.h"
#include "jemalloc/internal/assert.h"
#include "jemalloc/internal/atomic.h"
void
batcher_init(batcher_t *batcher, size_t nelems_max) {
atomic_store_zu(&batcher->nelems, 0, ATOMIC_RELAXED);
batcher->nelems_max = nelems_max;
malloc_mutex_init(&batcher->mtx, "batcher", WITNESS_RANK_BATCHER,
malloc_mutex_rank_exclusive);
}
/*
* Returns an index (into some user-owned array) to use for pushing, or
* BATCHER_NO_IDX if no index is free.
*/
size_t batcher_push_begin(tsdn_t *tsdn, batcher_t *batcher,
size_t elems_to_push) {
assert(elems_to_push > 0);
size_t nelems_guess = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED);
if (nelems_guess + elems_to_push > batcher->nelems_max) {
return BATCHER_NO_IDX;
}
malloc_mutex_lock(tsdn, &batcher->mtx);
size_t nelems = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED);
if (nelems + elems_to_push > batcher->nelems_max) {
malloc_mutex_unlock(tsdn, &batcher->mtx);
return BATCHER_NO_IDX;
}
assert(elems_to_push <= batcher->nelems_max - nelems);
/*
* We update nelems at push time (instead of during pop) so that other
* racing accesses of the batcher can fail fast instead of trying to
* acquire a mutex only to discover that there's no space for them.
*/
atomic_store_zu(&batcher->nelems, nelems + elems_to_push, ATOMIC_RELAXED);
return nelems;
}
void
batcher_push_end(tsdn_t *tsdn, batcher_t *batcher) {
malloc_mutex_assert_owner(tsdn, &batcher->mtx);
assert(atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED) > 0);
malloc_mutex_unlock(tsdn, &batcher->mtx);
}
size_t
batcher_pop_begin(tsdn_t *tsdn, batcher_t *batcher) {
size_t nelems_guess = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED);
assert(nelems_guess <= batcher->nelems_max);
if (nelems_guess == 0) {
return BATCHER_NO_IDX;
}
malloc_mutex_lock(tsdn, &batcher->mtx);
size_t nelems = atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED);
assert(nelems <= batcher->nelems_max);
if (nelems == 0) {
malloc_mutex_unlock(tsdn, &batcher->mtx);
return BATCHER_NO_IDX;
}
atomic_store_zu(&batcher->nelems, 0, ATOMIC_RELAXED);
return nelems;
}
void batcher_pop_end(tsdn_t *tsdn, batcher_t *batcher) {
assert(atomic_load_zu(&batcher->nelems, ATOMIC_RELAXED) == 0);
malloc_mutex_unlock(tsdn, &batcher->mtx);
}
void
batcher_prefork(tsdn_t *tsdn, batcher_t *batcher) {
malloc_mutex_prefork(tsdn, &batcher->mtx);
}
void
batcher_postfork_parent(tsdn_t *tsdn, batcher_t *batcher) {
malloc_mutex_postfork_parent(tsdn, &batcher->mtx);
}
void
batcher_postfork_child(tsdn_t *tsdn, batcher_t *batcher) {
malloc_mutex_postfork_child(tsdn, &batcher->mtx);
}

246
test/unit/batcher.c Normal file
View File

@ -0,0 +1,246 @@
#include "test/jemalloc_test.h"
#include "jemalloc/internal/batcher.h"
TEST_BEGIN(test_simple) {
enum { NELEMS_MAX = 10, DATA_BASE_VAL = 100, NRUNS = 5 };
batcher_t batcher;
size_t data[NELEMS_MAX];
for (size_t nelems = 0; nelems < NELEMS_MAX; nelems++) {
batcher_init(&batcher, nelems);
for (int run = 0; run < NRUNS; run++) {
for (int i = 0; i < NELEMS_MAX; i++) {
data[i] = (size_t)-1;
}
for (size_t i = 0; i < nelems; i++) {
size_t idx = batcher_push_begin(TSDN_NULL,
&batcher, 1);
assert_zu_eq(i, idx, "Wrong index");
assert_zu_eq((size_t)-1, data[idx],
"Expected uninitialized slot");
data[idx] = DATA_BASE_VAL + i;
batcher_push_end(TSDN_NULL, &batcher);
}
if (nelems > 0) {
size_t idx = batcher_push_begin(TSDN_NULL,
&batcher, 1);
assert_zu_eq(BATCHER_NO_IDX, idx,
"Shouldn't be able to push into a full "
"batcher");
}
size_t npop = batcher_pop_begin(TSDN_NULL, &batcher);
if (nelems == 0) {
assert_zu_eq(npop, BATCHER_NO_IDX,
"Shouldn't get any items out of an empty "
"batcher");
} else {
assert_zu_eq(npop, nelems,
"Wrong number of elements popped");
}
for (size_t i = 0; i < nelems; i++) {
assert_zu_eq(data[i], DATA_BASE_VAL + i,
"Item popped out of order!");
}
if (nelems != 0) {
batcher_pop_end(TSDN_NULL, &batcher);
}
}
}
}
TEST_END
TEST_BEGIN(test_multi_push) {
size_t idx, nelems;
batcher_t batcher;
batcher_init(&batcher, 11);
/* Push two at a time, 5 times, for 10 total. */
for (int i = 0; i < 5; i++) {
idx = batcher_push_begin(TSDN_NULL, &batcher, 2);
assert_zu_eq(2 * i, idx, "Should push in order");
batcher_push_end(TSDN_NULL, &batcher);
}
/* Pushing two more should fail -- would put us at 12 elems. */
idx = batcher_push_begin(TSDN_NULL, &batcher, 2);
assert_zu_eq(BATCHER_NO_IDX, idx, "Should be out of space");
/* But one more should work */
idx = batcher_push_begin(TSDN_NULL, &batcher, 1);
assert_zu_eq(10, idx, "Should be out of space");
batcher_push_end(TSDN_NULL, &batcher);
nelems = batcher_pop_begin(TSDN_NULL, &batcher);
batcher_pop_end(TSDN_NULL, &batcher);
assert_zu_eq(11, nelems, "Should have popped everything");
}
TEST_END
enum {
STRESS_TEST_ELEMS = 10,
STRESS_TEST_THREADS = 4,
STRESS_TEST_OPS = 1000 * 1000,
STRESS_TEST_PUSH_TO_POP_RATIO = 5,
};
typedef struct stress_test_data_s stress_test_data_t;
struct stress_test_data_s {
batcher_t batcher;
mtx_t pop_mtx;
atomic_u32_t thread_id;
uint32_t elems_data[STRESS_TEST_ELEMS];
size_t push_count[STRESS_TEST_ELEMS];
size_t pop_count[STRESS_TEST_ELEMS];
atomic_zu_t atomic_push_count[STRESS_TEST_ELEMS];
atomic_zu_t atomic_pop_count[STRESS_TEST_ELEMS];
};
/*
* Note: 0-indexed. If one element is set and you want to find it, you call
* get_nth_set(elems, 0).
*/
static size_t
get_nth_set(bool elems_owned[STRESS_TEST_ELEMS], size_t n) {
size_t ntrue = 0;
for (size_t i = 0; i < STRESS_TEST_ELEMS; i++) {
if (elems_owned[i]) {
ntrue++;
}
if (ntrue > n) {
return i;
}
}
assert_not_reached("Asked for the %zu'th set element when < %zu are "
"set",
n, n);
/* Just to silence a compiler warning. */
return 0;
}
static void *
stress_test_thd(void *arg) {
stress_test_data_t *data = arg;
size_t prng = atomic_fetch_add_u32(&data->thread_id, 1,
ATOMIC_RELAXED);
size_t nelems_owned = 0;
bool elems_owned[STRESS_TEST_ELEMS] = {0};
size_t local_push_count[STRESS_TEST_ELEMS] = {0};
size_t local_pop_count[STRESS_TEST_ELEMS] = {0};
for (int i = 0; i < STRESS_TEST_OPS; i++) {
size_t rnd = prng_range_zu(&prng,
STRESS_TEST_PUSH_TO_POP_RATIO);
if (rnd == 0 || nelems_owned == 0) {
size_t nelems = batcher_pop_begin(TSDN_NULL,
&data->batcher);
if (nelems == BATCHER_NO_IDX) {
continue;
}
for (size_t i = 0; i < nelems; i++) {
uint32_t elem = data->elems_data[i];
assert_false(elems_owned[elem],
"Shouldn't already own what we just "
"popped");
elems_owned[elem] = true;
nelems_owned++;
local_pop_count[elem]++;
data->pop_count[elem]++;
}
batcher_pop_end(TSDN_NULL, &data->batcher);
} else {
size_t elem_to_push_idx = prng_range_zu(&prng,
nelems_owned);
size_t elem = get_nth_set(elems_owned,
elem_to_push_idx);
assert_true(
elems_owned[elem],
"Should own element we're about to pop");
elems_owned[elem] = false;
local_push_count[elem]++;
data->push_count[elem]++;
nelems_owned--;
size_t idx = batcher_push_begin(TSDN_NULL,
&data->batcher, 1);
assert_zu_ne(idx, BATCHER_NO_IDX,
"Batcher can't be full -- we have one of its "
"elems!");
data->elems_data[idx] = (uint32_t)elem;
batcher_push_end(TSDN_NULL, &data->batcher);
}
}
/* Push all local elems back, flush local counts to the shared ones. */
size_t push_idx = 0;
if (nelems_owned != 0) {
push_idx = batcher_push_begin(TSDN_NULL, &data->batcher,
nelems_owned);
assert_zu_ne(BATCHER_NO_IDX, push_idx,
"Should be space to push");
}
for (size_t i = 0; i < STRESS_TEST_ELEMS; i++) {
if (elems_owned[i]) {
data->elems_data[push_idx] = (uint32_t)i;
push_idx++;
local_push_count[i]++;
data->push_count[i]++;
}
atomic_fetch_add_zu(
&data->atomic_push_count[i], local_push_count[i],
ATOMIC_RELAXED);
atomic_fetch_add_zu(
&data->atomic_pop_count[i], local_pop_count[i],
ATOMIC_RELAXED);
}
if (nelems_owned != 0) {
batcher_push_end(TSDN_NULL, &data->batcher);
}
return NULL;
}
TEST_BEGIN(test_stress) {
stress_test_data_t data;
batcher_init(&data.batcher, STRESS_TEST_ELEMS);
bool err = mtx_init(&data.pop_mtx);
assert_false(err, "mtx_init failure");
atomic_store_u32(&data.thread_id, 0, ATOMIC_RELAXED);
for (int i = 0; i < STRESS_TEST_ELEMS; i++) {
data.push_count[i] = 0;
data.pop_count[i] = 0;
atomic_store_zu(&data.atomic_push_count[i], 0, ATOMIC_RELAXED);
atomic_store_zu(&data.atomic_pop_count[i], 0, ATOMIC_RELAXED);
size_t idx = batcher_push_begin(TSDN_NULL, &data.batcher, 1);
assert_zu_eq(i, idx, "Should push in order");
data.elems_data[idx] = i;
batcher_push_end(TSDN_NULL, &data.batcher);
}
thd_t threads[STRESS_TEST_THREADS];
for (int i = 0; i < STRESS_TEST_THREADS; i++) {
thd_create(&threads[i], stress_test_thd, &data);
}
for (int i = 0; i < STRESS_TEST_THREADS; i++) {
thd_join(threads[i], NULL);
}
for (int i = 0; i < STRESS_TEST_ELEMS; i++) {
assert_zu_ne(0, data.push_count[i],
"Should have done something!");
assert_zu_eq(data.push_count[i], data.pop_count[i],
"every element should be pushed and popped an equal number "
"of times");
assert_zu_eq(data.push_count[i],
atomic_load_zu(&data.atomic_push_count[i], ATOMIC_RELAXED),
"atomic and non-atomic count should be equal given proper "
"synchronization");
assert_zu_eq(data.pop_count[i],
atomic_load_zu(&data.atomic_pop_count[i], ATOMIC_RELAXED),
"atomic and non-atomic count should be equal given proper "
"synchronization");
}
}
TEST_END
int
main(void) {
return test_no_reentrancy(test_simple, test_multi_push, test_stress);
}