tower_lsp/service/client/
pending.rs1use std::fmt::{self, Debug, Formatter};
4use std::future::Future;
5
6use dashmap::{mapref::entry::Entry, DashMap};
7use futures::channel::oneshot;
8use tracing::warn;
9
10use crate::jsonrpc::{Id, Response};
11
12pub struct Pending(DashMap<Id, Vec<oneshot::Sender<Response>>>);
14
15impl Pending {
16 #[inline]
18 pub fn new() -> Self {
19 Pending(DashMap::new())
20 }
21
22 pub fn insert(&self, r: Response) {
26 match r.id() {
27 Id::Null => warn!("received response with request ID of `null`, ignoring"),
28 id => match self.0.entry(id.clone()) {
29 Entry::Vacant(_) => warn!("received response with unknown request ID: {}", id),
30 Entry::Occupied(mut entry) => {
31 let tx = match entry.get().len() {
32 1 => entry.remove().remove(0),
33 _ => entry.get_mut().remove(0),
34 };
35
36 tx.send(r).expect("receiver already dropped");
37 }
38 },
39 }
40 }
41
42 pub fn wait(&self, id: Id) -> impl Future<Output = Response> + Send + 'static {
48 let (tx, rx) = oneshot::channel();
49
50 match self.0.entry(id) {
51 Entry::Vacant(entry) => {
52 entry.insert(vec![tx]);
53 }
54 Entry::Occupied(mut entry) => {
55 let txs = entry.get_mut();
56 txs.reserve(1); txs.push(tx);
58 }
59 }
60
61 async { rx.await.expect("sender already dropped") }
62 }
63}
64
65impl Debug for Pending {
66 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
67 #[derive(Debug)]
68 struct Waiters(usize);
69
70 let iter = self
71 .0
72 .iter()
73 .map(|e| (e.key().clone(), Waiters(e.value().len())));
74
75 f.debug_map().entries(iter).finish()
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use serde_json::json;
82
83 use super::*;
84
85 #[tokio::test(flavor = "current_thread")]
86 async fn waits_for_client_response() {
87 let pending = Pending::new();
88
89 let id = Id::Number(1);
90 let wait_fut = pending.wait(id.clone());
91
92 let response = Response::from_ok(id, json!({}));
93 pending.insert(response.clone());
94
95 assert_eq!(wait_fut.await, response);
96 }
97
98 #[tokio::test(flavor = "current_thread")]
99 async fn routes_responses_in_fifo_order() {
100 let pending = Pending::new();
101
102 let id = Id::Number(1);
103 let wait_fut1 = pending.wait(id.clone());
104 let wait_fut2 = pending.wait(id.clone());
105
106 let foo = Response::from_ok(id.clone(), json!("foo"));
107 let bar = Response::from_ok(id, json!("bar"));
108 pending.insert(bar.clone());
109 pending.insert(foo.clone());
110
111 assert_eq!(wait_fut1.await, bar);
112 assert_eq!(wait_fut2.await, foo);
113 }
114}