tower_lsp/service/client/
pending.rs

1//! Types for tracking server-to-client JSON-RPC requests.
2
3use std::fmt::{self, Debug, Formatter};
4use std::future::Future;
5
6use dashmap::{mapref::entry::Entry, DashMap};
7use futures::channel::oneshot;
8use tracing::warn;
9
10use crate::jsonrpc::{Id, Response};
11
12/// A hashmap containing pending client requests, keyed by request ID.
13pub struct Pending(DashMap<Id, Vec<oneshot::Sender<Response>>>);
14
15impl Pending {
16    /// Creates a new pending client requests map.
17    #[inline]
18    pub fn new() -> Self {
19        Pending(DashMap::new())
20    }
21
22    /// Inserts the given response into the map.
23    ///
24    /// The corresponding `.wait()` future will then resolve to the given value.
25    pub fn insert(&self, r: Response) {
26        match r.id() {
27            Id::Null => warn!("received response with request ID of `null`, ignoring"),
28            id => match self.0.entry(id.clone()) {
29                Entry::Vacant(_) => warn!("received response with unknown request ID: {}", id),
30                Entry::Occupied(mut entry) => {
31                    let tx = match entry.get().len() {
32                        1 => entry.remove().remove(0),
33                        _ => entry.get_mut().remove(0),
34                    };
35
36                    tx.send(r).expect("receiver already dropped");
37                }
38            },
39        }
40    }
41
42    /// Marks the given request ID as pending and waits for its corresponding response to arrive.
43    ///
44    /// If the same request ID is being waited upon in multiple locations, then the incoming
45    /// response will be routed to one of the callers in a first come, first served basis. To
46    /// ensure correct routing of JSON-RPC requests, each identifier value used _must_ be unique.
47    pub fn wait(&self, id: Id) -> impl Future<Output = Response> + Send + 'static {
48        let (tx, rx) = oneshot::channel();
49
50        match self.0.entry(id) {
51            Entry::Vacant(entry) => {
52                entry.insert(vec![tx]);
53            }
54            Entry::Occupied(mut entry) => {
55                let txs = entry.get_mut();
56                txs.reserve(1); // We assume concurrent waits are rare, so reserve one by one.
57                txs.push(tx);
58            }
59        }
60
61        async { rx.await.expect("sender already dropped") }
62    }
63}
64
65impl Debug for Pending {
66    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
67        #[derive(Debug)]
68        struct Waiters(usize);
69
70        let iter = self
71            .0
72            .iter()
73            .map(|e| (e.key().clone(), Waiters(e.value().len())));
74
75        f.debug_map().entries(iter).finish()
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use serde_json::json;
82
83    use super::*;
84
85    #[tokio::test(flavor = "current_thread")]
86    async fn waits_for_client_response() {
87        let pending = Pending::new();
88
89        let id = Id::Number(1);
90        let wait_fut = pending.wait(id.clone());
91
92        let response = Response::from_ok(id, json!({}));
93        pending.insert(response.clone());
94
95        assert_eq!(wait_fut.await, response);
96    }
97
98    #[tokio::test(flavor = "current_thread")]
99    async fn routes_responses_in_fifo_order() {
100        let pending = Pending::new();
101
102        let id = Id::Number(1);
103        let wait_fut1 = pending.wait(id.clone());
104        let wait_fut2 = pending.wait(id.clone());
105
106        let foo = Response::from_ok(id.clone(), json!("foo"));
107        let bar = Response::from_ok(id, json!("bar"));
108        pending.insert(bar.clone());
109        pending.insert(foo.clone());
110
111        assert_eq!(wait_fut1.await, bar);
112        assert_eq!(wait_fut2.await, foo);
113    }
114}