tower_lsp/service/client/
socket.rs1use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use futures::channel::mpsc::Receiver;
8use futures::sink::Sink;
9use futures::stream::{FusedStream, Stream, StreamExt};
10
11use super::{ExitedError, Pending, ServerState, State};
12use crate::jsonrpc::{Request, Response};
13
14#[derive(Debug)]
16pub struct ClientSocket {
17 pub(super) rx: Receiver<Request>,
18 pub(super) pending: Arc<Pending>,
19 pub(super) state: Arc<ServerState>,
20}
21
22impl ClientSocket {
23 pub fn split(self) -> (RequestStream, ResponseSink) {
30 let ClientSocket { rx, pending, state } = self;
31 let state_ = state.clone();
32
33 (
34 RequestStream { rx, state: state_ },
35 ResponseSink { pending, state },
36 )
37 }
38}
39
40impl Stream for ClientSocket {
42 type Item = Request;
43
44 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45 if self.state.get() == State::Exited || self.rx.is_terminated() {
46 Poll::Ready(None)
47 } else {
48 self.rx.poll_next_unpin(cx)
49 }
50 }
51
52 #[inline]
53 fn size_hint(&self) -> (usize, Option<usize>) {
54 self.rx.size_hint()
55 }
56}
57
58impl FusedStream for ClientSocket {
59 #[inline]
60 fn is_terminated(&self) -> bool {
61 self.rx.is_terminated()
62 }
63}
64
65impl Sink<Response> for ClientSocket {
67 type Error = ExitedError;
68
69 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70 if self.state.get() == State::Exited || self.rx.is_terminated() {
71 Poll::Ready(Err(ExitedError(())))
72 } else {
73 Poll::Ready(Ok(()))
74 }
75 }
76
77 fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
78 self.pending.insert(item);
79 Ok(())
80 }
81
82 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
83 Poll::Ready(Ok(()))
84 }
85
86 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 Poll::Ready(Ok(()))
88 }
89}
90
91#[derive(Debug)]
93#[must_use = "streams do nothing unless polled"]
94pub struct RequestStream {
95 rx: Receiver<Request>,
96 state: Arc<ServerState>,
97}
98
99impl Stream for RequestStream {
100 type Item = Request;
101
102 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
103 if self.state.get() == State::Exited || self.rx.is_terminated() {
104 Poll::Ready(None)
105 } else {
106 self.rx.poll_next_unpin(cx)
107 }
108 }
109
110 #[inline]
111 fn size_hint(&self) -> (usize, Option<usize>) {
112 self.rx.size_hint()
113 }
114}
115
116impl FusedStream for RequestStream {
117 #[inline]
118 fn is_terminated(&self) -> bool {
119 self.rx.is_terminated()
120 }
121}
122
123#[derive(Debug)]
125pub struct ResponseSink {
126 pending: Arc<Pending>,
127 state: Arc<ServerState>,
128}
129
130impl Sink<Response> for ResponseSink {
131 type Error = ExitedError;
132
133 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134 if self.state.get() == State::Exited {
135 Poll::Ready(Err(ExitedError(())))
136 } else {
137 Poll::Ready(Ok(()))
138 }
139 }
140
141 fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
142 self.pending.insert(item);
143 Ok(())
144 }
145
146 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147 Poll::Ready(Ok(()))
148 }
149
150 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
151 Poll::Ready(Ok(()))
152 }
153}