From 4bd1b8f42868b8ec1abb203987bb0cf5c27e9564 Mon Sep 17 00:00:00 2001 From: Dnomd343 Date: Sat, 1 Jun 2024 11:32:08 +0800 Subject: [PATCH] feat: async builder of BasicRanges --- src/core/all_cases/all_cases.h | 13 +-- src/core/all_cases/internal/basic_ranges.cc | 70 ++++++++++++++++- src/core/benchmark/group.cc | 15 +++- src/core/group/group.h | 2 +- src/core/main.cc | 46 ++++++++++- src/core/utils/utility.h | 87 +++++++++++++++------ src/core/utils/worker.inl | 39 +++++++++ src/core_test/cases/basic_ranges.cc | 2 +- 8 files changed, 228 insertions(+), 46 deletions(-) create mode 100644 src/core/utils/worker.inl diff --git a/src/core/all_cases/all_cases.h b/src/core/all_cases/all_cases.h index 7486071..d718267 100644 --- a/src/core/all_cases/all_cases.h +++ b/src/core/all_cases/all_cases.h @@ -39,10 +39,7 @@ #include #include -#include -#include #include -#include #include "utils/utility.h" #include "ranges/ranges.h" @@ -53,9 +50,6 @@ namespace klotski::cases { typedef std::array RangesUnion; -typedef std::function Notifier; -typedef std::function&&)> Executor; - // ------------------------------------------------------------------------------------- // constexpr auto BASIC_RANGES_NUM = 7311885; @@ -78,6 +72,9 @@ public: /// Execute the build process and ensure thread safety. void build(); + /// Execute the build process in parallel without blocking. + void build_async(Executor &&executor, Notifier &&callback); + /// Get the basic-ranges and make sure the result is available. const Ranges& fetch(); @@ -104,10 +101,6 @@ public: /// Execute the build process and ensure thread safety. void build(); - /// TODO: remove this interface - /// Execute the build process with parallel support and ensure thread safety. - // void build_parallel(Executor &&executor); - /// Execute the build process in parallel without blocking. void build_parallel_async(Executor &&executor, Notifier &&callback); diff --git a/src/core/all_cases/internal/basic_ranges.cc b/src/core/all_cases/internal/basic_ranges.cc index bd35dcc..94e4280 100644 --- a/src/core/all_cases/internal/basic_ranges.cc +++ b/src/core/all_cases/internal/basic_ranges.cc @@ -48,7 +48,7 @@ void BasicRanges::build_ranges(Ranges &ranges) { ranges.clear(); ranges.reserve(BASIC_RANGES_NUM); - std::list flags { ranges.begin() }; + std::list flags {ranges.begin()}; // TODO: flags can be constexpr for (auto [n, n_2x1, n_1x1] : range_types()) { ranges.spawn(n, n_2x1, n_1x1); flags.emplace_back(ranges.end()); // mark ordered interval @@ -62,8 +62,70 @@ void BasicRanges::build_ranges(Ranges &ranges) { begin = end; } } while (flags.size() > 2); // merge until only one interval remains +} + +void do_sort(klotski::Executor &&executor, klotski::Notifier notifier, std::shared_ptr> flags) { + + klotski::Worker worker {std::move(executor)}; + + decltype(flags->begin()) begin = flags->begin(), mid, end; + while (++(mid = begin) != flags->end() && ++(end = mid) != flags->end()) { + + worker.post([begin = *begin, mid = *mid, end = *end]() { + inplace_merge(begin, mid, end); // merge two ordered interval + }); + + flags->erase(mid); + begin = end; + } + + worker.then([flags, notifier](klotski::Executor &&executor) { + + if (flags->size() == 2) { + notifier(); + return; + } + + do_sort(std::move(executor), notifier, flags); + + }); + +} + +void BasicRanges::build_async(Executor &&executor, Notifier &&callback) { + + // TODO: add mutex protect here + + Worker worker {std::move(executor)}; + auto cache = std::make_shared>(); + + for (uint32_t i = 0; i < 203; ++i) { + worker.post([cache, i] { + auto [n, n_2x1, n_1x1] = range_types()[i]; + cache->operator[](i).spawn(n, n_2x1, n_1x1); + }); + } + + // auto all_done = std::make_shared(std::move(callback)); + + worker.then([cache, this, callback](Executor &&executor) { + + auto &ranges = get_ranges(); + + ranges.clear(); + ranges.reserve(BASIC_RANGES_NUM); + + const auto flags = std::make_shared>(); + flags->emplace_back(ranges.end()); + + for (auto &tmp : *cache) { + ranges.insert(ranges.end(), tmp.begin(), tmp.end()); + flags->emplace_back(ranges.end()); // mark ordered interval + } + + do_sort(std::move(executor), callback, flags); + + available_ = true; - // for (auto &x : ranges) { - // x = range_reverse(x); // flip every 2-bit - // } + }); } diff --git a/src/core/benchmark/group.cc b/src/core/benchmark/group.cc index 7d24096..3d3d157 100644 --- a/src/core/benchmark/group.cc +++ b/src/core/benchmark/group.cc @@ -167,9 +167,20 @@ static void SpawnRanges(benchmark::State &state) { } static void OriginBasicRanges(benchmark::State &state) { + + BS::thread_pool pool {4}; + for (auto _ : state) { auto &kk = klotski::cases::BasicRanges::instance(); kk.build_ranges(kk.get_ranges()); + + // kk.build_async([](auto func) {func();}, [](){}); + + // kk.build_async([&pool](auto func) { + // pool.submit_task(func); + // }, [] {}); + // pool.wait(); + } } @@ -270,9 +281,9 @@ static void RangesDerive(benchmark::State &state) { // BENCHMARK(SpawnRanges)->Unit(benchmark::kMillisecond); -// BENCHMARK(OriginBasicRanges)->Unit(benchmark::kMillisecond); +BENCHMARK(OriginBasicRanges)->Unit(benchmark::kMillisecond); -BENCHMARK(OriginAllCases)->Unit(benchmark::kMillisecond); +// BENCHMARK(OriginAllCases)->Unit(benchmark::kMillisecond); // BENCHMARK(RangesDerive)->Unit(benchmark::kMillisecond); diff --git a/src/core/group/group.h b/src/core/group/group.h index ba3fdcf..313f524 100644 --- a/src/core/group/group.h +++ b/src/core/group/group.h @@ -164,7 +164,7 @@ public: static Group from_raw_code(codec::RawCode raw_code); static Group from_common_code(codec::CommonCode common_code); -private: +// private: uint32_t type_id_; uint32_t group_id_; diff --git a/src/core/main.cc b/src/core/main.cc index 6b6c4cf..4bc9c3d 100644 --- a/src/core/main.cc +++ b/src/core/main.cc @@ -29,18 +29,58 @@ using klotski::codec::SHORT_CODE_LIMIT; int main() { // const auto start = clock(); + // auto ret = klotski::cases::Group::extend(RawCode::from_common_code(0xDAAFE0C00).value()); + // std::cout << ret.size() << std::endl; + const auto start = std::chrono::system_clock::now(); + // klotski::cases::BasicRanges::instance().build(); + BS::thread_pool pool {}; - klotski::cases::BasicRanges::instance().build(); + // klotski::cases::BasicRanges::instance().build(); - klotski::cases::AllCases::instance().build_parallel_async([&pool](auto func) { + klotski::cases::BasicRanges::instance().build_async([&pool](auto &&func) { pool.submit_task(func); - }, [] {}); + }, [] { + std::cout << "all done" << std::endl; + }); + + // klotski::cases::BasicRanges::instance().build(); + // + // klotski::cases::AllCases::instance().build_parallel_async([&pool](auto func) { + // pool.submit_task(func); + // }, [] {}); + + // std::cout << "start call" << std::endl; + // klotski::Notifier kk {}; + // kk(); + // std::cout << "end call" << std::endl; + + // { + // klotski::Worker worker {[&pool](auto &&func) { pool.submit_task(func); }}; + // + // for (int i = 1; i < 3; ++i) { + // worker.post([i] { + // std::cout << std::format("task {} begin\n", i); + // std::this_thread::sleep_for(std::chrono::seconds(i)); + // std::cout << std::format("task {} complete\n", i); + // }); + // } + // + // worker.then([](klotski::Executor &&executor){ + // std::cout << "all tasks done\n"; + // }); + // + // std::cout << "worker start release\n"; + // } + // + // std::cout << "block exit\n"; pool.wait(); + // std::cout << BasicRanges::instance().fetch().size() << std::endl; + std::cerr << std::chrono::system_clock::now() - start << std::endl; // auto raw_code = RawCode::from_common_code(0x1A9BF0C00)->unwrap(); diff --git a/src/core/utils/utility.h b/src/core/utils/utility.h index 3db8b73..e0adb42 100644 --- a/src/core/utils/utility.h +++ b/src/core/utils/utility.h @@ -1,6 +1,8 @@ #pragma once -#include +#include +#include +#include /// Mark target class as a singleton. #define KLSK_INSTANCE(T) \ @@ -16,42 +18,77 @@ return ins; \ } -#define KLSK_INLINE __attribute__((always_inline)) - +/// Marking compiler assumptions. #define KLSK_ASSUME(expr) __builtin_assume(expr) +/// Force function declaration to be inline. +#define KLSK_INLINE __attribute__((always_inline)) + namespace klotski { /// Get the number of consecutive `0` in the low bits. -inline int low_zero_num(const uint32_t bin) { - return __builtin_ctzl(bin); - - // TODO: using (bin ^ (bin - 1)) when non-builtin - - // WARN: be aware of serious performance issues - // return __builtin_popcount(~(bin ^ -bin)) - 1; -} +// inline int low_zero_num(const uint32_t bin) { +// return __builtin_ctzl(bin); +// +// // TODO: using (bin ^ (bin - 1)) when non-builtin +// +// // WARN: be aware of serious performance issues +// // return __builtin_popcount(~(bin ^ -bin)) - 1; +// } /// Get the number of consecutive `0` in the low bits. -inline int low_zero_num(const uint64_t bin) { - return __builtin_ctzll(bin); - - // WARN: be aware of serious performance issues - // return __builtin_popcount(~(bin ^ -bin)) - 1; -} +// inline int low_zero_num(const uint64_t bin) { +// return __builtin_ctzll(bin); +// +// // WARN: be aware of serious performance issues +// // return __builtin_popcount(~(bin ^ -bin)) - 1; +// } /// Flips the input u32 every two bits in low-high symmetry. inline uint32_t range_reverse(uint32_t bin) { -#if defined(__GNUC__) || defined(__clang__) - bin = __builtin_bswap32(bin); -#else - // FIXME: `_byteswap_ulong` under MSVC - // TODO: using `std::byteswap` (c++23) - bin = ((bin << 16) & 0xFFFF0000) | ((bin >> 16) & 0x0000FFFF); - bin = ((bin << 8) & 0xFF00FF00) | ((bin >> 8) & 0x00FF00FF); -#endif + bin = std::byteswap(bin); +// #if defined(__GNUC__) || defined(__clang__) +// bin = __builtin_bswap32(bin); +// #else +// // FIXME: `_byteswap_ulong` under MSVC +// // TODO: using `std::byteswap` (c++23) +// bin = ((bin << 16) & 0xFFFF0000) | ((bin >> 16) & 0x0000FFFF); +// bin = ((bin << 8) & 0xFF00FF00) | ((bin >> 8) & 0x00FF00FF); +// #endif bin = ((bin << 4) & 0xF0F0F0F0) | ((bin >> 4) & 0x0F0F0F0F); return ((bin << 2) & 0xCCCCCCCC) | ((bin >> 2) & 0x33333333); } +/// Empty function calls that generally used for callbacks. +typedef std::function Notifier; + +/// Execute the passed function that generally used for concurrency. +typedef std::function &&)> Executor; + +/// Perform multiple tasks without blocking and trigger callback upon completion. +class Worker final { +public: + using Task = std::function; + using After = std::function; + + /// Construction based on executor. + explicit Worker(Executor &&executor); + + /// Post new task into the queue. + void post(Task &&task); + + /// Setting up callback entry. + void then(After &&after); + + /// Tasks will be triggered at destruction. + ~Worker(); + +private: + Notifier after_; + Executor executor_; + std::list tasks_; +}; + } // namespace klotski + +#include "worker.inl" diff --git a/src/core/utils/worker.inl b/src/core/utils/worker.inl new file mode 100644 index 0000000..e9e93c2 --- /dev/null +++ b/src/core/utils/worker.inl @@ -0,0 +1,39 @@ +#pragma once + +#include + +namespace klotski { + +inline Worker::Worker(Executor &&executor) + : after_([] {}), executor_(executor) {} + +inline void Worker::post(Task &&task) { + tasks_.emplace_back(std::move(task)); +} + +inline void Worker::then(After &&after) { + after_ = [after = std::move(after), executor = executor_]() mutable { + after(std::move(executor)); + }; +} + +inline Worker::~Worker() { + if (tasks_.empty()) { + executor_([after = after_] { + after(); // callback directly + }); + return; + } + + auto counter = std::make_shared(tasks_.size()); + for (auto &&task : tasks_) { + executor_([counter, after = after_, task = std::move(task)] { + task(); + if (counter->fetch_sub(1) == 1) { + after(); // all tasks done + } + }); + } +} + +} // namespace klotski diff --git a/src/core_test/cases/basic_ranges.cc b/src/core_test/cases/basic_ranges.cc index b85aad9..e0ced0d 100644 --- a/src/core_test/cases/basic_ranges.cc +++ b/src/core_test/cases/basic_ranges.cc @@ -1,7 +1,7 @@ #include "hash.h" #include "helper.h" -static constexpr uint64_t BASIC_RANGES_XXH3 = 0x2ced674494fe904d; +static constexpr uint64_t BASIC_RANGES_XXH3 = 0x34fce9da6a052533; class BasicRangesTest : public testing::Test, public Concurrent { protected: