Line data Source code
1 : /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2 :
3 : /***
4 : This file is part of systemd.
5 :
6 : Copyright 2014 David Herrmann <dh.herrmann@gmail.com>
7 :
8 : systemd is free software; you can redistribute it and/or modify it
9 : under the terms of the GNU Lesser General Public License as published by
10 : the Free Software Foundation; either version 2.1 of the License, or
11 : (at your option) any later version.
12 :
13 : systemd is distributed in the hope that it will be useful, but
14 : WITHOUT ANY WARRANTY; without even the implied warranty of
15 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 : Lesser General Public License for more details.
17 :
18 : You should have received a copy of the GNU Lesser General Public License
19 : along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 : ***/
21 :
22 : #include <errno.h>
23 : #include <fcntl.h>
24 : #include <poll.h>
25 : #include <stdbool.h>
26 : #include <stdint.h>
27 : #include <stdlib.h>
28 : #include <sys/eventfd.h>
29 : #include <sys/types.h>
30 : #include <unistd.h>
31 :
32 : #include "barrier.h"
33 : #include "macro.h"
34 : #include "util.h"
35 :
36 : /**
37 : * Barriers
38 : * This barrier implementation provides a simple synchronization method based
39 : * on file-descriptors that can safely be used between threads and processes. A
40 : * barrier object contains 2 shared counters based on eventfd. Both processes
41 : * can now place barriers and wait for the other end to reach a random or
42 : * specific barrier.
43 : * Barriers are numbered, so you can either wait for the other end to reach any
44 : * barrier or the last barrier that you placed. This way, you can use barriers
45 : * for one-way *and* full synchronization. Note that even-though barriers are
46 : * numbered, these numbers are internal and recycled once both sides reached the
47 : * same barrier (implemented as a simple signed counter). It is thus not
48 : * possible to address barriers by their ID.
49 : *
50 : * Barrier-API: Both ends can place as many barriers via barrier_place() as
51 : * they want and each pair of barriers on both sides will be implicitly linked.
52 : * Each side can use the barrier_wait/sync_*() family of calls to wait for the
53 : * other side to place a specific barrier. barrier_wait_next() waits until the
54 : * other side calls barrier_place(). No links between the barriers are
55 : * considered and this simply serves as most basic asynchronous barrier.
56 : * barrier_sync_next() is like barrier_wait_next() and waits for the other side
57 : * to place their next barrier via barrier_place(). However, it only waits for
58 : * barriers that are linked to a barrier we already placed. If the other side
59 : * already placed more barriers than we did, barrier_sync_next() returns
60 : * immediately.
61 : * barrier_sync() extends barrier_sync_next() and waits until the other end
62 : * placed as many barriers via barrier_place() as we did. If they already placed
63 : * as many as we did (or more), it returns immediately.
64 : *
65 : * Additionally to basic barriers, an abortion event is available.
66 : * barrier_abort() places an abortion event that cannot be undone. An abortion
67 : * immediately cancels all placed barriers and replaces them. Any running and
68 : * following wait/sync call besides barrier_wait_abortion() will immediately
69 : * return false on both sides (otherwise, they always return true).
70 : * barrier_abort() can be called multiple times on both ends and will be a
71 : * no-op if already called on this side.
72 : * barrier_wait_abortion() can be used to wait for the other side to call
73 : * barrier_abort() and is the only wait/sync call that does not return
74 : * immediately if we aborted outself. It only returns once the other side
75 : * called barrier_abort().
76 : *
77 : * Barriers can be used for in-process and inter-process synchronization.
78 : * However, for in-process synchronization you could just use mutexes.
79 : * Therefore, main target is IPC and we require both sides to *not* share the FD
80 : * table. If that's given, barriers provide target tracking: If the remote side
81 : * exit()s, an abortion event is implicitly queued on the other side. This way,
82 : * a sync/wait call will be woken up if the remote side crashed or exited
83 : * unexpectedly. However, note that these abortion events are only queued if the
84 : * barrier-queue has been drained. Therefore, it is safe to place a barrier and
85 : * exit. The other side can safely wait on the barrier even though the exit
86 : * queued an abortion event. Usually, the abortion event would overwrite the
87 : * barrier, however, that's not true for exit-abortion events. Those are only
88 : * queued if the barrier-queue is drained (thus, the receiving side has placed
89 : * more barriers than the remote side).
90 : */
91 :
92 : /**
93 : * barrier_create() - Initialize a barrier object
94 : * @obj: barrier to initialize
95 : *
96 : * This initializes a barrier object. The caller is responsible of allocating
97 : * the memory and keeping it valid. The memory does not have to be zeroed
98 : * beforehand.
99 : * Two eventfd objects are allocated for each barrier. If allocation fails, an
100 : * error is returned.
101 : *
102 : * If this function fails, the barrier is reset to an invalid state so it is
103 : * safe to call barrier_destroy() on the object regardless whether the
104 : * initialization succeeded or not.
105 : *
106 : * The caller is responsible to destroy the object via barrier_destroy() before
107 : * releasing the underlying memory.
108 : *
109 : * Returns: 0 on success, negative error code on failure.
110 : */
111 1000 : int barrier_create(Barrier *b) {
112 2000 : _cleanup_(barrier_destroyp) Barrier *staging = b;
113 : int r;
114 :
115 1000 : assert(b);
116 :
117 1000 : b->me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
118 1000 : if (b->me < 0)
119 0 : return -errno;
120 :
121 1000 : b->them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
122 1000 : if (b->them < 0)
123 0 : return -errno;
124 :
125 1000 : r = pipe2(b->pipe, O_CLOEXEC | O_NONBLOCK);
126 1000 : if (r < 0)
127 0 : return -errno;
128 :
129 1000 : staging = NULL;
130 1000 : return 0;
131 : }
132 :
133 : /**
134 : * barrier_destroy() - Destroy a barrier object
135 : * @b: barrier to destroy or NULL
136 : *
137 : * This destroys a barrier object that has previously been passed to
138 : * barrier_create(). The object is released and reset to invalid
139 : * state. Therefore, it is safe to call barrier_destroy() multiple
140 : * times or even if barrier_create() failed. However, barrier must be
141 : * always initialized with BARRIER_NULL.
142 : *
143 : * If @b is NULL, this is a no-op.
144 : */
145 1000 : void barrier_destroy(Barrier *b) {
146 1000 : if (!b)
147 0 : return;
148 :
149 1000 : b->me = safe_close(b->me);
150 1000 : b->them = safe_close(b->them);
151 1000 : safe_close_pair(b->pipe);
152 1000 : b->barriers = 0;
153 : }
154 :
155 : /**
156 : * barrier_set_role() - Set the local role of the barrier
157 : * @b: barrier to operate on
158 : * @role: role to set on the barrier
159 : *
160 : * This sets the roles on a barrier object. This is needed to know
161 : * which side of the barrier you're on. Usually, the parent creates
162 : * the barrier via barrier_create() and then calls fork() or clone().
163 : * Therefore, the FDs are duplicated and the child retains the same
164 : * barrier object.
165 : *
166 : * Both sides need to call barrier_set_role() after fork() or clone()
167 : * are done. If this is not done, barriers will not work correctly.
168 : *
169 : * Note that barriers could be supported without fork() or clone(). However,
170 : * this is currently not needed so it hasn't been implemented.
171 : */
172 1000 : void barrier_set_role(Barrier *b, unsigned int role) {
173 : int fd;
174 :
175 1000 : assert(b);
176 1000 : assert(role == BARRIER_PARENT || role == BARRIER_CHILD);
177 : /* make sure this is only called once */
178 1000 : assert(b->pipe[0] >= 0 && b->pipe[1] >= 0);
179 :
180 1000 : if (role == BARRIER_PARENT)
181 0 : b->pipe[1] = safe_close(b->pipe[1]);
182 : else {
183 1000 : b->pipe[0] = safe_close(b->pipe[0]);
184 :
185 : /* swap me/them for children */
186 1000 : fd = b->me;
187 1000 : b->me = b->them;
188 1000 : b->them = fd;
189 : }
190 1000 : }
191 :
192 : /* places barrier; returns false if we aborted, otherwise true */
193 2000 : static bool barrier_write(Barrier *b, uint64_t buf) {
194 : ssize_t len;
195 :
196 : /* prevent new sync-points if we already aborted */
197 2000 : if (barrier_i_aborted(b))
198 0 : return false;
199 :
200 : do {
201 2000 : len = write(b->me, &buf, sizeof(buf));
202 2000 : } while (len < 0 && IN_SET(errno, EAGAIN, EINTR));
203 :
204 2000 : if (len != sizeof(buf))
205 0 : goto error;
206 :
207 : /* lock if we aborted */
208 2000 : if (buf >= (uint64_t)BARRIER_ABORTION) {
209 0 : if (barrier_they_aborted(b))
210 0 : b->barriers = BARRIER_WE_ABORTED;
211 : else
212 0 : b->barriers = BARRIER_I_ABORTED;
213 2000 : } else if (!barrier_is_aborted(b))
214 2000 : b->barriers += buf;
215 :
216 2000 : return !barrier_i_aborted(b);
217 :
218 : error:
219 : /* If there is an unexpected error, we have to make this fatal. There
220 : * is no way we can recover from sync-errors. Therefore, we close the
221 : * pipe-ends and treat this as abortion. The other end will notice the
222 : * pipe-close and treat it as abortion, too. */
223 :
224 0 : safe_close_pair(b->pipe);
225 0 : b->barriers = BARRIER_WE_ABORTED;
226 0 : return false;
227 : }
228 :
229 : /* waits for barriers; returns false if they aborted, otherwise true */
230 2000 : static bool barrier_read(Barrier *b, int64_t comp) {
231 2000 : if (barrier_they_aborted(b))
232 0 : return false;
233 :
234 6000 : while (b->barriers > comp) {
235 6000 : struct pollfd pfd[2] = {
236 2000 : { .fd = b->pipe[0] >= 0 ? b->pipe[0] : b->pipe[1],
237 : .events = POLLHUP },
238 2000 : { .fd = b->them,
239 : .events = POLLIN }};
240 : uint64_t buf;
241 : int r;
242 :
243 2000 : r = poll(pfd, 2, -1);
244 2000 : if (r < 0 && IN_SET(errno, EAGAIN, EINTR))
245 0 : continue;
246 2000 : else if (r < 0)
247 0 : goto error;
248 :
249 2000 : if (pfd[1].revents) {
250 : ssize_t len;
251 :
252 : /* events on @them signal new data for us */
253 2000 : len = read(b->them, &buf, sizeof(buf));
254 2000 : if (len < 0 && IN_SET(errno, EAGAIN, EINTR))
255 0 : continue;
256 :
257 2000 : if (len != sizeof(buf))
258 0 : goto error;
259 0 : } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL))
260 : /* POLLHUP on the pipe tells us the other side exited.
261 : * We treat this as implicit abortion. But we only
262 : * handle it if there's no event on the eventfd. This
263 : * guarantees that exit-abortions do not overwrite real
264 : * barriers. */
265 0 : buf = BARRIER_ABORTION;
266 : else
267 0 : continue;
268 :
269 : /* lock if they aborted */
270 2000 : if (buf >= (uint64_t)BARRIER_ABORTION) {
271 0 : if (barrier_i_aborted(b))
272 0 : b->barriers = BARRIER_WE_ABORTED;
273 : else
274 0 : b->barriers = BARRIER_THEY_ABORTED;
275 2000 : } else if (!barrier_is_aborted(b))
276 2000 : b->barriers -= buf;
277 : }
278 :
279 2000 : return !barrier_they_aborted(b);
280 :
281 : error:
282 : /* If there is an unexpected error, we have to make this fatal. There
283 : * is no way we can recover from sync-errors. Therefore, we close the
284 : * pipe-ends and treat this as abortion. The other end will notice the
285 : * pipe-close and treat it as abortion, too. */
286 :
287 0 : safe_close_pair(b->pipe);
288 0 : b->barriers = BARRIER_WE_ABORTED;
289 0 : return false;
290 : }
291 :
292 : /**
293 : * barrier_place() - Place a new barrier
294 : * @b: barrier object
295 : *
296 : * This places a new barrier on the barrier object. If either side already
297 : * aborted, this is a no-op and returns "false". Otherwise, the barrier is
298 : * placed and this returns "true".
299 : *
300 : * Returns: true if barrier was placed, false if either side aborted.
301 : */
302 2000 : bool barrier_place(Barrier *b) {
303 2000 : assert(b);
304 :
305 2000 : if (barrier_is_aborted(b))
306 0 : return false;
307 :
308 2000 : barrier_write(b, BARRIER_SINGLE);
309 2000 : return true;
310 : }
311 :
312 : /**
313 : * barrier_abort() - Abort the synchronization
314 : * @b: barrier object to abort
315 : *
316 : * This aborts the barrier-synchronization. If barrier_abort() was already
317 : * called on this side, this is a no-op. Otherwise, the barrier is put into the
318 : * ABORT-state and will stay there. The other side is notified about the
319 : * abortion. Any following attempt to place normal barriers or to wait on normal
320 : * barriers will return immediately as "false".
321 : *
322 : * You can wait for the other side to call barrier_abort(), too. Use
323 : * barrier_wait_abortion() for that.
324 : *
325 : * Returns: false if the other side already aborted, true otherwise.
326 : */
327 0 : bool barrier_abort(Barrier *b) {
328 0 : assert(b);
329 :
330 0 : barrier_write(b, BARRIER_ABORTION);
331 0 : return !barrier_they_aborted(b);
332 : }
333 :
334 : /**
335 : * barrier_wait_next() - Wait for the next barrier of the other side
336 : * @b: barrier to operate on
337 : *
338 : * This waits until the other side places its next barrier. This is independent
339 : * of any barrier-links and just waits for any next barrier of the other side.
340 : *
341 : * If either side aborted, this returns false.
342 : *
343 : * Returns: false if either side aborted, true otherwise.
344 : */
345 0 : bool barrier_wait_next(Barrier *b) {
346 0 : assert(b);
347 :
348 0 : if (barrier_is_aborted(b))
349 0 : return false;
350 :
351 0 : barrier_read(b, b->barriers - 1);
352 0 : return !barrier_is_aborted(b);
353 : }
354 :
355 : /**
356 : * barrier_wait_abortion() - Wait for the other side to abort
357 : * @b: barrier to operate on
358 : *
359 : * This waits until the other side called barrier_abort(). This can be called
360 : * regardless whether the local side already called barrier_abort() or not.
361 : *
362 : * If the other side has already aborted, this returns immediately.
363 : *
364 : * Returns: false if the local side aborted, true otherwise.
365 : */
366 0 : bool barrier_wait_abortion(Barrier *b) {
367 0 : assert(b);
368 :
369 0 : barrier_read(b, BARRIER_THEY_ABORTED);
370 0 : return !barrier_i_aborted(b);
371 : }
372 :
373 : /**
374 : * barrier_sync_next() - Wait for the other side to place a next linked barrier
375 : * @b: barrier to operate on
376 : *
377 : * This is like barrier_wait_next() and waits for the other side to call
378 : * barrier_place(). However, this only waits for linked barriers. That means, if
379 : * the other side already placed more barriers than (or as much as) we did, this
380 : * returns immediately instead of waiting.
381 : *
382 : * If either side aborted, this returns false.
383 : *
384 : * Returns: false if either side aborted, true otherwise.
385 : */
386 0 : bool barrier_sync_next(Barrier *b) {
387 0 : assert(b);
388 :
389 0 : if (barrier_is_aborted(b))
390 0 : return false;
391 :
392 0 : barrier_read(b, MAX((int64_t)0, b->barriers - 1));
393 0 : return !barrier_is_aborted(b);
394 : }
395 :
396 : /**
397 : * barrier_sync() - Wait for the other side to place as many barriers as we did
398 : * @b: barrier to operate on
399 : *
400 : * This is like barrier_sync_next() but waits for the other side to call
401 : * barrier_place() as often as we did (in total). If they already placed as much
402 : * as we did (or more), this returns immediately instead of waiting.
403 : *
404 : * If either side aborted, this returns false.
405 : *
406 : * Returns: false if either side aborted, true otherwise.
407 : */
408 2000 : bool barrier_sync(Barrier *b) {
409 2000 : assert(b);
410 :
411 2000 : if (barrier_is_aborted(b))
412 0 : return false;
413 :
414 2000 : barrier_read(b, 0);
415 2000 : return !barrier_is_aborted(b);
416 : }
|