ESPResSo
Extensible Simulation Package for Research on Soft Matter Systems
Loading...
Searching...
No Matches
MpiCallbacks.hpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2010-2022 The ESPResSo project
3 * Copyright (C) 2002,2003,2004,2005,2006,2007,2008,2009,2010
4 * Max-Planck-Institute for Polymer Research, Theory Group
5 *
6 * This file is part of ESPResSo.
7 *
8 * ESPResSo is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
12 *
13 * ESPResSo is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 */
21
22#ifndef COMMUNICATION_MPI_CALLBACKS
23#define COMMUNICATION_MPI_CALLBACKS
24
25/**
26 * @file
27 *
28 * @ref Communication::MpiCallbacks manages MPI communication using a
29 * visitor pattern. The program runs on the head node and is responsible
30 * for calling callback functions on the worker nodes when necessary,
31 * e.g. to broadcast global variables or run an algorithm in parallel.
32 *
33 * Callbacks are registered on the head node as function pointers via
34 * the @ref REGISTER_CALLBACK. The visitor pattern allows using arbitrary
35 * function signatures.
36 */
37
39#include <utils/tuple.hpp>
40#include <utils/type_traits.hpp>
41
42#include <boost/mpi/collectives/broadcast.hpp>
43#include <boost/mpi/communicator.hpp>
44#include <boost/mpi/environment.hpp>
45#include <boost/mpi/packed_iarchive.hpp>
46#include <boost/range/algorithm/remove_if.hpp>
47
48#include <cassert>
49#include <memory>
50#include <tuple>
51#include <type_traits>
52#include <utility>
53#include <vector>
54
55namespace Communication {
56
57class MpiCallbacks;
58
59namespace detail {
60/**
61 * @brief Check if a type can be used as a callback argument.
62 *
63 * This checks is a type can be a parameter type for a MPI callback.
64 * Not allowed are pointers and non-const references, as output
65 * parameters can not work across ranks.
66 */
67template <class T>
68using is_allowed_argument =
69 std::integral_constant<bool,
70 not(std::is_pointer_v<T> ||
71 (!std::is_const_v<std::remove_reference_t<T>> &&
72 std::is_lvalue_reference_v<T>))>;
73
74template <class... Args>
75using are_allowed_arguments =
77
78/**
79 * @brief Invoke a callable with arguments from an mpi buffer.
80 *
81 * @tparam F A Callable that can be called with Args as parameters.
82 * @tparam Args Pack of arguments for @p F
83 *
84 * @param f Functor to be called
85 * @param ia Buffer to extract the parameters from
86 *
87 * @return Return value of calling @p f.
88 */
89template <class F, class... Args>
90auto invoke(F f, boost::mpi::packed_iarchive &ia) {
91 static_assert(are_allowed_arguments<Args...>::value,
92 "Pointers and non-const references are not allowed as "
93 "arguments for callbacks.");
94
95 /* This is the local receive buffer for the parameters. We have to strip
96 away const so we can actually deserialize into it. */
97 std::tuple<std::remove_const_t<std::remove_reference_t<Args>>...> params;
98 Utils::for_each([&ia](auto &e) { ia >> e; }, params);
99
100 /* We add const here, so that parameters can only be by value
101 or const reference. Output parameters on callbacks are not
102 sensible because the changes are not propagated back, so
103 we make sure this does not compile. */
104 return std::apply(f, std::as_const(params));
105}
106
107/**
108 * @brief Type-erased interface for callbacks.
109 *
110 * This encapsulates the signature of the callback
111 * and the parameter transfer, so that it can be
112 * called without any type information on the parameters.
113 */
114struct callback_concept_t {
115 /**
116 * @brief Execute the callback.
117 *
118 * Unpack parameters for this callback, and then call it.
119 */
120 virtual void operator()(boost::mpi::communicator const &,
121 boost::mpi::packed_iarchive &) const = 0;
122 virtual ~callback_concept_t() = default;
123};
124
125/**
126 * @brief Callback without a return value.
127 *
128 * This is an implementation of a callback for a specific callable
129 * @p F and a set of arguments to call it with.
130 */
131template <class F, class... Args>
132struct callback_void_t final : public callback_concept_t {
133 F m_f;
134
135 callback_void_t(callback_void_t const &) = delete;
136 callback_void_t(callback_void_t &&) = delete;
137
138 template <class FRef>
139 explicit callback_void_t(FRef &&f) : m_f(std::forward<FRef>(f)) {}
140 void operator()(boost::mpi::communicator const &,
141 boost::mpi::packed_iarchive &ia) const override {
142 detail::invoke<F, Args...>(m_f, ia);
143 }
144};
145
146template <class F, class R, class... Args> struct FunctorTypes {
147 using functor_type = F;
148 using return_type = R;
149 using argument_types = std::tuple<Args...>;
150};
151
152template <class C, class R, class... Args>
153auto functor_types_impl(R (C::*)(Args...) const) {
154 return FunctorTypes<C, R, Args...>{};
155}
156
157template <class F>
158using functor_types =
159 decltype(functor_types_impl(&std::remove_reference_t<F>::operator()));
160
161template <class CRef, class C, class R, class... Args>
162auto make_model_impl(CRef &&c, FunctorTypes<C, R, Args...>) {
163 return std::make_unique<callback_void_t<C, Args...>>(std::forward<CRef>(c));
164}
165
166/**
167 * @brief Make a @ref callback_model_t for a functor or lambda.
168 *
169 * The signature is deduced from F::operator() const, which has
170 * to exist and can not be overloaded.
171 */
172template <typename F> auto make_model(F &&f) {
173 return make_model_impl(std::forward<F>(f), functor_types<F>{});
174}
175
176/**
177 * @brief Make a @ref callback_model_t for a function pointer.
178 */
179template <class... Args> auto make_model(void (*f_ptr)(Args...)) {
180 return std::make_unique<callback_void_t<void (*)(Args...), Args...>>(f_ptr);
181}
182} // namespace detail
183
184/**
185 * @brief The interface of the MPI callback mechanism.
186 */
188public:
189 /**
190 * @brief RAII handle for a callback.
191 *
192 * This is what the client gets for registering a
193 * dynamic (= not function pointer) callback.
194 * It manages the lifetime of the callback handle
195 * needed to call it. The handle has a type derived
196 * from the signature of the callback, which makes
197 * it possible to do static type checking on the
198 * arguments.
199 */
200 template <class... Args> class CallbackHandle {
201 public:
202 template <typename F, class = std::enable_if_t<std::is_same_v<
203 typename detail::functor_types<F>::argument_types,
204 std::tuple<Args...>>>>
205 CallbackHandle(std::shared_ptr<MpiCallbacks> cb, F &&f)
206 : m_id(cb->add(std::forward<F>(f))), m_cb(std::move(cb)) {}
207
209 CallbackHandle(CallbackHandle &&rhs) noexcept = default;
211 CallbackHandle &operator=(CallbackHandle &&rhs) noexcept = default;
212
213 private:
214 int m_id;
215 std::shared_ptr<MpiCallbacks> m_cb;
216
217 public:
218 /**
219 * @brief Call the callback managed by this handle.
220 *
221 * The arguments are passed to the remote callees, it
222 * must be possible to call the function with the provided
223 * arguments, otherwise this will not compile.
224 */
225 template <class... ArgRef>
226 auto operator()(ArgRef &&...args) const
227 /* Enable if a hypothetical function with signature void(Args..)
228 * could be called with the provided arguments. */
229 -> std::enable_if_t<
230 std::is_void_v<decltype(std::declval<void (*)(Args...)>()(
231 std::forward<ArgRef>(args)...))>> {
232 if (m_cb)
233 m_cb->call(m_id, std::forward<ArgRef>(args)...);
234 }
235
237 if (m_cb)
238 m_cb->remove(m_id);
239 }
240
241 int id() const { return m_id; }
242 };
243
244 /* Avoid accidental copy, leads to mpi deadlock or split brain */
245 MpiCallbacks(MpiCallbacks const &) = delete;
247
248private:
249 static auto &static_callbacks() {
250 static std::vector<
251 std::pair<void (*)(), std::unique_ptr<detail::callback_concept_t>>>
252 callbacks;
253
254 return callbacks;
255 }
256
257public:
258 MpiCallbacks(boost::mpi::communicator comm,
259 std::shared_ptr<boost::mpi::environment> mpi_env)
260 : m_comm(std::move(comm)), m_mpi_env(std::move(mpi_env)) {
261 /* Add a dummy at id 0 for loop abort. */
262 m_callback_map.add(nullptr);
263
264 for (auto &kv : static_callbacks()) {
265 m_func_ptr_to_id[kv.first] = m_callback_map.add(kv.second.get());
266 }
267 }
268
270 /* Release the clients on exit */
271 if (m_comm.rank() == 0) {
272 try {
273 abort_loop();
274 } catch (...) { // NOLINT(bugprone-empty-catch)
275 }
276 }
277 }
278
279private:
280 /**
281 * @brief Add a new callback.
282 *
283 * Add a new callback to the system. This is a collective
284 * function that must be run on all nodes.
285 *
286 * @tparam F An object with a const call operator.
287 *
288 * @param f The callback function to add.
289 * @return A handle with which the callback can be called.
290 */
291 template <typename F> auto add(F &&f) {
292 m_callbacks.emplace_back(detail::make_model(std::forward<F>(f)));
293 return m_callback_map.add(m_callbacks.back().get());
294 }
295
296public:
297 /**
298 * @brief Add a new callback.
299 *
300 * Add a new callback to the system. This is a collective
301 * function that must be run on all nodes.
302 *
303 * @param fp Pointer to the static callback function to add.
304 */
305 template <class... Args> void add(void (*fp)(Args...)) {
306 m_callbacks.emplace_back(detail::make_model(fp));
307 const int id = m_callback_map.add(m_callbacks.back().get());
308 m_func_ptr_to_id[reinterpret_cast<void (*)()>(fp)] = id;
309 }
310
311 /**
312 * @brief Add a new callback.
313 *
314 * Add a new callback to the system. This is a collective
315 * function that must be run on all nodes.
316 *
317 * @param fp Pointer to the static callback function to add.
318 */
319 template <class... Args> static void add_static(void (*fp)(Args...)) {
320 static_callbacks().emplace_back(reinterpret_cast<void (*)()>(fp),
321 detail::make_model(fp));
322 }
323
324private:
325 /**
326 * @brief Remove callback.
327 *
328 * Remove the callback id from the callback list.
329 * This is a collective call that must be run on all nodes.
330 *
331 * @param id Identifier of the callback to remove.
332 */
333 void remove(int id) {
334 m_callbacks.erase(
335 boost::remove_if(m_callbacks,
336 [ptr = m_callback_map[id]](auto const &e) {
337 return e.get() == ptr;
338 }),
339 m_callbacks.end());
340 m_callback_map.remove(id);
341 }
342
343private:
344 /**
345 * @brief call a callback.
346 *
347 * Call the callback id.
348 * The method can only be called on the head node
349 * and has the prerequisite that the other nodes are
350 * in the MPI loop.
351 *
352 * @param id The callback to call.
353 * @param args Arguments for the callback.
354 */
355 template <class... Args> void call(int id, Args &&...args) const {
356 if (m_comm.rank() != 0) {
357 throw std::logic_error("Callbacks can only be invoked on rank 0.");
358 }
359
360 assert(m_callback_map.find(id) != m_callback_map.end() &&
361 "m_callback_map and m_func_ptr_to_id disagree");
362
363 /* Send request to worker nodes */
364 boost::mpi::packed_oarchive oa(m_comm);
365 oa << id;
366
367 /* Pack the arguments into a packed mpi buffer. */
368 Utils::for_each([&oa](auto &&e) { oa << e; },
369 std::forward_as_tuple(std::forward<Args>(args)...));
370
371 boost::mpi::broadcast(m_comm, oa, 0);
372 }
373
374public:
375 /**
376 * @brief Call a callback on worker nodes.
377 *
378 * The callback is **not** called on the head node.
379 *
380 * This method can only be called on the head node.
381 *
382 * @param fp Pointer to the function to call.
383 * @param args Arguments for the callback.
384 */
385 template <class... Args, class... ArgRef>
386 auto call(void (*fp)(Args...), ArgRef &&...args) const ->
387 /* enable only if fp can be called with the provided arguments */
388 std::enable_if_t<std::is_void_v<decltype(fp(args...))>> {
389 const int id = m_func_ptr_to_id.at(reinterpret_cast<void (*)()>(fp));
390
391 call(id, std::forward<ArgRef>(args)...);
392 }
393
394 /**
395 * @brief Call a callback on all nodes.
396 *
397 * This calls a callback on all nodes, including the head node.
398 *
399 * This method can only be called on the head node.
400 *
401 * @param fp Pointer to the function to call.
402 * @param args Arguments for the callback.
403 */
404 template <class... Args, class... ArgRef>
405 auto call_all(void (*fp)(Args...), ArgRef &&...args) const ->
406 /* enable only if fp can be called with the provided arguments */
407 std::enable_if_t<std::is_void_v<decltype(fp(args...))>> {
408 call(fp, args...);
409 fp(args...);
410 }
411
412 /**
413 * @brief Start the MPI loop.
414 *
415 * This is the callback loop for the worker nodes. They block
416 * on the MPI call and wait for a new callback request
417 * coming from the head node.
418 * This should be run on the worker nodes and must be running
419 * so that the head node can issue call().
420 */
421 void loop() const {
422 for (;;) {
423 /* Communicate callback id and parameters */
424 boost::mpi::packed_iarchive ia(m_comm);
425 boost::mpi::broadcast(m_comm, ia, 0);
426
427 int request;
428 ia >> request;
429
430 if (request == LOOP_ABORT) {
431 break;
432 }
433 /* Call the callback */
434 m_callback_map[request]->operator()(m_comm, ia);
435 }
436 }
437
438 /**
439 * @brief Abort the MPI loop.
440 *
441 * Make the worker nodes exit the MPI loop.
442 */
443 void abort_loop() { call(LOOP_ABORT); }
444
445 /**
446 * @brief The boost mpi communicator used by this instance
447 */
448 boost::mpi::communicator const &comm() const { return m_comm; }
449
450 std::shared_ptr<boost::mpi::environment> share_mpi_env() const {
451 return m_mpi_env;
452 }
453
454private:
455 /**
456 * @brief Id for the @ref abort_loop. Has to be 0.
457 */
458 static constexpr int LOOP_ABORT = 0;
459
460 /**
461 * The MPI communicator used for the callbacks.
462 */
463 boost::mpi::communicator m_comm;
464
465 /**
466 * The MPI environment used for the callbacks.
467 */
468 std::shared_ptr<boost::mpi::environment> m_mpi_env;
469
470 /**
471 * Internal storage for the callback functions.
472 */
473 std::vector<std::unique_ptr<detail::callback_concept_t>> m_callbacks;
474
475 /**
476 * Map of ids to callbacks.
477 */
479
480 /**
481 * Mapping of function pointers to ids, so static callbacks can be
482 * called by their pointer.
483 */
484 std::unordered_map<void (*)(), int> m_func_ptr_to_id;
485};
486
487template <class... Args>
489
490/**
491 * @brief Helper class to add callbacks before main.
492 *
493 * Should not be used directly, but via @ref REGISTER_CALLBACK.
494 */
496
497public:
499
500 template <class... Args> explicit RegisterCallback(void (*cb)(Args...)) {
502 }
503};
504} /* namespace Communication */
505
506/**
507 * @brief Register a static callback without return value.
508 *
509 * This registers a function as an mpi callback.
510 * The macro should be used at global scope.
511 *
512 * @param cb A function
513 */
514#define REGISTER_CALLBACK(cb) \
515 namespace Communication { \
516 static ::Communication::RegisterCallback register_##cb(&(cb)); \
517 }
518
519#endif
Keep an enumerated list of T objects, managed by the class.
float f[3]
auto operator()(ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(std::declval< void(*)(Args...)>()(std::forward< ArgRef >(args)...))> >
Call the callback managed by this handle.
CallbackHandle(CallbackHandle &&rhs) noexcept=default
CallbackHandle(CallbackHandle const &)=delete
CallbackHandle & operator=(CallbackHandle &&rhs) noexcept=default
CallbackHandle(std::shared_ptr< MpiCallbacks > cb, F &&f)
CallbackHandle & operator=(CallbackHandle const &)=delete
The interface of the MPI callback mechanism.
auto call_all(void(*fp)(Args...), ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(fp(args...))> >
Call a callback on all nodes.
auto call(void(*fp)(Args...), ArgRef &&...args) const -> std::enable_if_t< std::is_void_v< decltype(fp(args...))> >
Call a callback on worker nodes.
MpiCallbacks(boost::mpi::communicator comm, std::shared_ptr< boost::mpi::environment > mpi_env)
void add(void(*fp)(Args...))
Add a new callback.
std::shared_ptr< boost::mpi::environment > share_mpi_env() const
boost::mpi::communicator const & comm() const
The boost mpi communicator used by this instance.
void abort_loop()
Abort the MPI loop.
static void add_static(void(*fp)(Args...))
Add a new callback.
MpiCallbacks & operator=(MpiCallbacks const &)=delete
MpiCallbacks(MpiCallbacks const &)=delete
void loop() const
Start the MPI loop.
Helper class to add callbacks before main.
RegisterCallback(void(*cb)(Args...))
Container for objects that are identified by a numeric id.
void for_each(F &&f, Tuple &t)
Definition tuple.hpp:46
static SteepestDescentParameters params
Currently active steepest descent instance.
Algorithms for tuple-like inhomogeneous containers.