event_listener/
std.rs

1//! libstd-based implementation of `event-listener`.
2//!
3//! This implementation crates an intrusive linked list of listeners.
4
5use crate::notify::{GenericNotify, Internal, Notification};
6use crate::sync::atomic::Ordering;
7use crate::sync::cell::{Cell, UnsafeCell};
8use crate::sync::{Mutex, MutexGuard};
9use crate::{RegisterResult, State, TaskRef};
10
11use core::marker::PhantomPinned;
12use core::mem;
13use core::ops::{Deref, DerefMut};
14use core::pin::Pin;
15use core::ptr::NonNull;
16
17pub(super) struct List<T>(Mutex<Inner<T>>);
18
19struct Inner<T> {
20    /// The head of the linked list.
21    head: Option<NonNull<Link<T>>>,
22
23    /// The tail of the linked list.
24    tail: Option<NonNull<Link<T>>>,
25
26    /// The first unnotified listener.
27    next: Option<NonNull<Link<T>>>,
28
29    /// Total number of listeners.
30    len: usize,
31
32    /// The number of notified listeners.
33    notified: usize,
34}
35
36impl<T> List<T> {
37    /// Create a new, empty event listener list.
38    pub(super) fn new() -> Self {
39        Self(Mutex::new(Inner {
40            head: None,
41            tail: None,
42            next: None,
43            len: 0,
44            notified: 0,
45        }))
46    }
47
48    /// Get the total number of listeners without blocking.
49    pub(crate) fn try_total_listeners(&self) -> Option<usize> {
50        self.0.try_lock().ok().map(|list| list.len)
51    }
52
53    /// Get the total number of listeners with blocking.
54    pub(crate) fn total_listeners(&self) -> usize {
55        self.0.lock().unwrap_or_else(|e| e.into_inner()).len
56    }
57}
58
59impl<T> crate::Inner<T> {
60    fn lock(&self) -> ListLock<'_, '_, T> {
61        ListLock {
62            inner: self,
63            lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()),
64        }
65    }
66
67    /// Add a new listener to the list.
68    pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
69        let mut inner = self.lock();
70
71        listener.as_mut().set(Some(Listener {
72            link: UnsafeCell::new(Link {
73                state: Cell::new(State::Created),
74                prev: Cell::new(inner.tail),
75                next: Cell::new(None),
76            }),
77            _pin: PhantomPinned,
78        }));
79        let listener = listener.as_pin_mut().unwrap();
80
81        {
82            let entry_guard = listener.link.get();
83            // SAFETY: We are locked, so we can access the inner `link`.
84            let entry = unsafe { entry_guard.deref() };
85
86            // Replace the tail with the new entry.
87            match mem::replace(&mut inner.tail, Some(entry.into())) {
88                None => inner.head = Some(entry.into()),
89                Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
90            };
91        }
92
93        // If there are no unnotified entries, this is the first one.
94        if inner.next.is_none() {
95            inner.next = inner.tail;
96        }
97
98        // Bump the entry count.
99        inner.len += 1;
100    }
101
102    /// Remove a listener from the list.
103    pub(crate) fn remove(
104        &self,
105        listener: Pin<&mut Option<Listener<T>>>,
106        propagate: bool,
107    ) -> Option<State<T>> {
108        self.lock().remove(listener, propagate)
109    }
110
111    /// Notifies a number of entries.
112    #[cold]
113    pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
114        self.lock().notify(notify)
115    }
116
117    /// Register a task to be notified when the event is triggered.
118    ///
119    /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
120    /// isn't inserted, returns `None`.
121    pub(crate) fn register(
122        &self,
123        mut listener: Pin<&mut Option<Listener<T>>>,
124        task: TaskRef<'_>,
125    ) -> RegisterResult<T> {
126        let mut inner = self.lock();
127        let entry_guard = match listener.as_mut().as_pin_mut() {
128            Some(listener) => listener.link.get(),
129            None => return RegisterResult::NeverInserted,
130        };
131        // SAFETY: We are locked, so we can access the inner `link`.
132        let entry = unsafe { entry_guard.deref() };
133
134        // Take out the state and check it.
135        match entry.state.replace(State::NotifiedTaken) {
136            State::Notified { tag, .. } => {
137                // We have been notified, remove the listener.
138                inner.remove(listener, false);
139                RegisterResult::Notified(tag)
140            }
141
142            State::Task(other_task) => {
143                // Only replace the task if it's different.
144                entry.state.set(State::Task({
145                    if !task.will_wake(other_task.as_task_ref()) {
146                        task.into_task()
147                    } else {
148                        other_task
149                    }
150                }));
151
152                RegisterResult::Registered
153            }
154
155            _ => {
156                // We have not been notified, register the task.
157                entry.state.set(State::Task(task.into_task()));
158                RegisterResult::Registered
159            }
160        }
161    }
162}
163
164impl<T> Inner<T> {
165    fn remove(
166        &mut self,
167        mut listener: Pin<&mut Option<Listener<T>>>,
168        propagate: bool,
169    ) -> Option<State<T>> {
170        let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
171        let entry = unsafe { entry_guard.deref() };
172
173        let prev = entry.prev.get();
174        let next = entry.next.get();
175
176        // Unlink from the previous entry.
177        match prev {
178            None => self.head = next,
179            Some(p) => unsafe {
180                p.as_ref().next.set(next);
181            },
182        }
183
184        // Unlink from the next entry.
185        match next {
186            None => self.tail = prev,
187            Some(n) => unsafe {
188                n.as_ref().prev.set(prev);
189            },
190        }
191
192        // If this was the first unnotified entry, update the next pointer.
193        if self.next == Some(entry.into()) {
194            self.next = next;
195        }
196
197        // The entry is now fully unlinked, so we can now take it out safely.
198        let entry = unsafe {
199            listener
200                .get_unchecked_mut()
201                .take()
202                .unwrap()
203                .link
204                .into_inner()
205        };
206
207        // This State::Created is immediately dropped and exists as a workaround for the absence of
208        // loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
209        //
210        // refs: https://github.com/tokio-rs/loom/pull/341
211        let mut state = entry.state.replace(State::Created);
212
213        // Update the notified count.
214        if state.is_notified() {
215            self.notified -= 1;
216
217            if propagate {
218                let state = mem::replace(&mut state, State::NotifiedTaken);
219                if let State::Notified { additional, tag } = state {
220                    let tags = {
221                        let mut tag = Some(tag);
222                        move || tag.take().expect("tag already taken")
223                    };
224                    self.notify(GenericNotify::new(1, additional, tags));
225                }
226            }
227        }
228        self.len -= 1;
229
230        Some(state)
231    }
232
233    #[cold]
234    fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
235        let mut n = notify.count(Internal::new());
236        let is_additional = notify.is_additional(Internal::new());
237
238        if !is_additional {
239            if n < self.notified {
240                return 0;
241            }
242            n -= self.notified;
243        }
244
245        let original_count = n;
246        while n > 0 {
247            n -= 1;
248
249            // Notify the next entry.
250            match self.next {
251                None => return original_count - n - 1,
252
253                Some(e) => {
254                    // Get the entry and move the pointer forwards.
255                    let entry = unsafe { e.as_ref() };
256                    self.next = entry.next.get();
257
258                    // Set the state to `Notified` and notify.
259                    let tag = notify.next_tag(Internal::new());
260                    if let State::Task(task) = entry.state.replace(State::Notified {
261                        additional: is_additional,
262                        tag,
263                    }) {
264                        task.wake();
265                    }
266
267                    // Bump the notified count.
268                    self.notified += 1;
269                }
270            }
271        }
272
273        original_count - n
274    }
275}
276
277struct ListLock<'a, 'b, T> {
278    lock: MutexGuard<'a, Inner<T>>,
279    inner: &'b crate::Inner<T>,
280}
281
282impl<T> Deref for ListLock<'_, '_, T> {
283    type Target = Inner<T>;
284
285    fn deref(&self) -> &Self::Target {
286        &self.lock
287    }
288}
289
290impl<T> DerefMut for ListLock<'_, '_, T> {
291    fn deref_mut(&mut self) -> &mut Self::Target {
292        &mut self.lock
293    }
294}
295
296impl<T> Drop for ListLock<'_, '_, T> {
297    fn drop(&mut self) {
298        let list = &mut **self;
299
300        // Update the notified count.
301        let notified = if list.notified < list.len {
302            list.notified
303        } else {
304            usize::MAX
305        };
306
307        self.inner.notified.store(notified, Ordering::Release);
308    }
309}
310
311pub(crate) struct Listener<T> {
312    /// The inner link in the linked list.
313    ///
314    /// # Safety
315    ///
316    /// This can only be accessed while the central mutex is locked.
317    link: UnsafeCell<Link<T>>,
318
319    /// This listener cannot be moved after being pinned.
320    _pin: PhantomPinned,
321}
322
323struct Link<T> {
324    /// The current state of the listener.
325    state: Cell<State<T>>,
326
327    /// The previous link in the linked list.
328    prev: Cell<Option<NonNull<Link<T>>>>,
329
330    /// The next link in the linked list.
331    next: Cell<Option<NonNull<Link<T>>>>,
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use futures_lite::pin;
338
339    #[cfg(target_family = "wasm")]
340    use wasm_bindgen_test::wasm_bindgen_test as test;
341
342    macro_rules! make_listeners {
343        ($($id:ident),*) => {
344            $(
345                let $id = Option::<Listener<()>>::None;
346                pin!($id);
347            )*
348        };
349    }
350
351    #[test]
352    fn insert() {
353        let inner = crate::Inner::new();
354        make_listeners!(listen1, listen2, listen3);
355
356        // Register the listeners.
357        inner.insert(listen1.as_mut());
358        inner.insert(listen2.as_mut());
359        inner.insert(listen3.as_mut());
360
361        assert_eq!(inner.lock().len, 3);
362
363        // Remove one.
364        assert_eq!(inner.remove(listen2, false), Some(State::Created));
365        assert_eq!(inner.lock().len, 2);
366
367        // Remove another.
368        assert_eq!(inner.remove(listen1, false), Some(State::Created));
369        assert_eq!(inner.lock().len, 1);
370    }
371
372    #[test]
373    fn drop_non_notified() {
374        let inner = crate::Inner::new();
375        make_listeners!(listen1, listen2, listen3);
376
377        // Register the listeners.
378        inner.insert(listen1.as_mut());
379        inner.insert(listen2.as_mut());
380        inner.insert(listen3.as_mut());
381
382        // Notify one.
383        inner.notify(GenericNotify::new(1, false, || ()));
384
385        // Remove one.
386        inner.remove(listen3, true);
387
388        // Remove the rest.
389        inner.remove(listen1, true);
390        inner.remove(listen2, true);
391    }
392}