tower_lsp/service/client/
socket.rs

1//! Loopback connection to the language client.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use futures::channel::mpsc::Receiver;
8use futures::sink::Sink;
9use futures::stream::{FusedStream, Stream, StreamExt};
10
11use super::{ExitedError, Pending, ServerState, State};
12use crate::jsonrpc::{Request, Response};
13
14/// A loopback channel for server-to-client communication.
15#[derive(Debug)]
16pub struct ClientSocket {
17    pub(super) rx: Receiver<Request>,
18    pub(super) pending: Arc<Pending>,
19    pub(super) state: Arc<ServerState>,
20}
21
22impl ClientSocket {
23    /// Splits this `ClientSocket` into two halves capable of operating independently.
24    ///
25    /// The two halves returned implement the [`Stream`] and [`Sink`] traits, respectively.
26    ///
27    /// [`Stream`]: futures::Stream
28    /// [`Sink`]: futures::Sink
29    pub fn split(self) -> (RequestStream, ResponseSink) {
30        let ClientSocket { rx, pending, state } = self;
31        let state_ = state.clone();
32
33        (
34            RequestStream { rx, state: state_ },
35            ResponseSink { pending, state },
36        )
37    }
38}
39
40/// Yields a stream of pending server-to-client requests.
41impl Stream for ClientSocket {
42    type Item = Request;
43
44    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45        if self.state.get() == State::Exited || self.rx.is_terminated() {
46            Poll::Ready(None)
47        } else {
48            self.rx.poll_next_unpin(cx)
49        }
50    }
51
52    #[inline]
53    fn size_hint(&self) -> (usize, Option<usize>) {
54        self.rx.size_hint()
55    }
56}
57
58impl FusedStream for ClientSocket {
59    #[inline]
60    fn is_terminated(&self) -> bool {
61        self.rx.is_terminated()
62    }
63}
64
65/// Routes client-to-server responses back to the server.
66impl Sink<Response> for ClientSocket {
67    type Error = ExitedError;
68
69    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70        if self.state.get() == State::Exited || self.rx.is_terminated() {
71            Poll::Ready(Err(ExitedError(())))
72        } else {
73            Poll::Ready(Ok(()))
74        }
75    }
76
77    fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
78        self.pending.insert(item);
79        Ok(())
80    }
81
82    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
83        Poll::Ready(Ok(()))
84    }
85
86    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        Poll::Ready(Ok(()))
88    }
89}
90
91/// Yields a stream of pending server-to-client requests.
92#[derive(Debug)]
93#[must_use = "streams do nothing unless polled"]
94pub struct RequestStream {
95    rx: Receiver<Request>,
96    state: Arc<ServerState>,
97}
98
99impl Stream for RequestStream {
100    type Item = Request;
101
102    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
103        if self.state.get() == State::Exited || self.rx.is_terminated() {
104            Poll::Ready(None)
105        } else {
106            self.rx.poll_next_unpin(cx)
107        }
108    }
109
110    #[inline]
111    fn size_hint(&self) -> (usize, Option<usize>) {
112        self.rx.size_hint()
113    }
114}
115
116impl FusedStream for RequestStream {
117    #[inline]
118    fn is_terminated(&self) -> bool {
119        self.rx.is_terminated()
120    }
121}
122
123/// Routes client-to-server responses back to the server.
124#[derive(Debug)]
125pub struct ResponseSink {
126    pending: Arc<Pending>,
127    state: Arc<ServerState>,
128}
129
130impl Sink<Response> for ResponseSink {
131    type Error = ExitedError;
132
133    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134        if self.state.get() == State::Exited {
135            Poll::Ready(Err(ExitedError(())))
136        } else {
137            Poll::Ready(Ok(()))
138        }
139    }
140
141    fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
142        self.pending.insert(item);
143        Ok(())
144    }
145
146    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        Poll::Ready(Ok(()))
148    }
149
150    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151        Poll::Ready(Ok(()))
152    }
153}