net: Save H2 client in ChatConnection if available

...so it can be used later to make gRPC requests even while the
websocket is also being used.
This commit is contained in:
Jordan Rose
2025-11-21 15:49:31 -08:00
committed by GitHub
parent 8f30203f3d
commit f5a10a78f8
13 changed files with 510 additions and 52 deletions

1
Cargo.lock generated
View File

@@ -2676,6 +2676,7 @@ dependencies = [
"tokio-boring",
"tokio-stream",
"tokio-tungstenite",
"tonic",
"tungstenite",
"url",
"uuid",

View File

@@ -3071,6 +3071,37 @@ SOFTWARE.
```
## percent-encoding 2.3.2
```
Copyright (c) 2013-2025 The rust-url developers
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
```
## backtrace 0.3.75, cc 1.2.37, cfg-if 1.0.3, cmake 0.1.48, find-msvc-tools 0.1.1, pkg-config 0.3.32, rustc-demangle 0.1.26, socket2 0.6.0
```
@@ -5096,7 +5127,7 @@ DEALINGS IN THE SOFTWARE.
```
## tracing-core 0.1.34, tracing 0.1.41
## tracing-attributes 0.1.30, tracing-core 0.1.34, tracing 0.1.41
```
Copyright (c) 2019 Tokio Contributors
@@ -5127,6 +5158,37 @@ DEALINGS IN THE SOFTWARE.
```
## tower-layer 0.3.3, tower-service 0.3.3
```
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
```
## universal-hash 0.5.1
```
@@ -5638,6 +5700,31 @@ THE SOFTWARE.
```
## tonic 0.13.1
```
Copyright (c) 2025 Lucio Franco
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
```
## toml_datetime 0.7.1, toml_edit 0.23.5, toml_parser 1.0.2
```

View File

@@ -3071,6 +3071,37 @@ SOFTWARE.
```
## percent-encoding 2.3.2
```
Copyright (c) 2013-2025 The rust-url developers
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
```
## backtrace 0.3.75, cc 1.2.37, cfg-if 1.0.3, cmake 0.1.48, find-msvc-tools 0.1.1, pkg-config 0.3.32, rustc-demangle 0.1.26, socket2 0.6.0
```
@@ -5096,7 +5127,7 @@ DEALINGS IN THE SOFTWARE.
```
## tracing-core 0.1.34, tracing 0.1.41
## tracing-attributes 0.1.30, tracing-core 0.1.34, tracing 0.1.41
```
Copyright (c) 2019 Tokio Contributors
@@ -5127,6 +5158,37 @@ DEALINGS IN THE SOFTWARE.
```
## tower-layer 0.3.3, tower-service 0.3.3
```
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
```
## universal-hash 0.5.1
```
@@ -5638,6 +5700,31 @@ THE SOFTWARE.
```
## tonic 0.13.1
```
Copyright (c) 2025 Lucio Franco
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
```
## toml_datetime 0.7.1, toml_edit 0.23.5, toml_parser 1.0.2
```

View File

@@ -3161,6 +3161,37 @@ SOFTWARE.
```
## percent-encoding 2.3.2
```
Copyright (c) 2013-2025 The rust-url developers
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
```
## backtrace 0.3.75, cc 1.2.37, cfg-if 1.0.3, cmake 0.1.48, find-msvc-tools 0.1.1, openssl-probe 0.1.6, rustc-demangle 0.1.26, socket2 0.6.0
```
@@ -5242,6 +5273,37 @@ DEALINGS IN THE SOFTWARE.
```
## tower-layer 0.3.3, tower-service 0.3.3
```
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
```
## universal-hash 0.5.1
```
@@ -5860,6 +5922,31 @@ SOFTWARE.
```
## tonic 0.13.1
```
Copyright (c) 2025 Lucio Franco
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
```
## toml_datetime 0.7.1, toml_edit 0.23.5, toml_parser 1.0.2
```

View File

@@ -3224,6 +3224,41 @@ SOFTWARE.
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
<dict>
<key>FooterText</key>
<string>Copyright (c) 2013-2025 The rust-url developers
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the &quot;Software&quot;), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
</string>
<key>License</key>
<string>MIT License</string>
<key>Title</key>
<string>percent-encoding 2.3.2</string>
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
<dict>
<key>FooterText</key>
<string>Copyright (c) 2014 Alex Crichton
@@ -5447,7 +5482,42 @@ DEALINGS IN THE SOFTWARE.
<key>License</key>
<string>MIT License</string>
<key>Title</key>
<string>tracing-core 0.1.34, tracing 0.1.41</string>
<string>tracing-attributes 0.1.30, tracing-core 0.1.34, tracing 0.1.41</string>
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
<dict>
<key>FooterText</key>
<string>Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the &quot;Software&quot;), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
</string>
<key>License</key>
<string>MIT License</string>
<key>Title</key>
<string>tower-layer 0.3.3, tower-service 0.3.3</string>
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
@@ -6030,6 +6100,35 @@ THE SOFTWARE.
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
<dict>
<key>FooterText</key>
<string>Copyright (c) 2025 Lucio Franco
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the &quot;Software&quot;), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
</string>
<key>License</key>
<string>MIT License</string>
<key>Title</key>
<string>tonic 0.13.1</string>
<key>Type</key>
<string>PSGroupSpecifier</string>
</dict>
<dict>
<key>FooterText</key>
<string>Copyright (c) Individual contributors

View File

@@ -62,6 +62,7 @@ tokio = { workspace = true, features = ["rt", "time", "macros"] }
tokio-boring-signal = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-tungstenite = { workspace = true }
tonic = { workspace = true, default-features = false }
tungstenite = { workspace = true }
uuid = { workspace = true, features = ["serde"] }
visibility = { workspace = true }

View File

@@ -7,7 +7,6 @@ use std::sync::Arc;
use anyhow::anyhow;
use clap::{Parser, ValueEnum};
use either::Either;
use libsignal_net::certs::SIGNAL_ROOT_CERTIFICATES;
use libsignal_net::chat::test_support::simple_chat_connection;
use libsignal_net::connect_state::{ConnectState, ConnectionResources, SUGGESTED_CONNECT_CONFIG};
@@ -66,13 +65,17 @@ async fn main() -> anyhow::Result<()> {
Environment::Production => libsignal_net::env::PROD,
};
let chat_connection = if use_grpc {
let grpc_connection;
let ws_connection;
if use_grpc {
let host = if host.is_empty() {
env.chat_domain_config.connect.hostname
} else {
&host
};
Either::Left(Unauth(grpc_connection(host).await?))
grpc_connection = Some(Unauth(make_grpc_connection(host).await?));
ws_connection = None;
} else {
if !host.is_empty() {
// This is cheating, but we're just using it for testing anyway.
@@ -83,22 +86,48 @@ async fn main() -> anyhow::Result<()> {
if h2 {
env.chat_domain_config.connect.http_version = Some(HttpVersion::Http2);
}
Either::Right(Unauth(
simple_chat_connection(
&env,
EnableDomainFronting::No,
DirectOrProxyMode::DirectOnly,
|_route| true,
)
.await?,
))
};
let chat_connection = simple_chat_connection(
&env,
EnableDomainFronting::No,
DirectOrProxyMode::DirectOnly,
|_route| true,
)
.await?;
grpc_connection = chat_connection.shared_h2_connection().await.map(Unauth);
ws_connection = Some(Unauth(chat_connection));
}
let username = usernames::Username::new(&username)?;
if let Some(aci) = chat_connection
.look_up_username_hash(&username.hash())
.await?
{
let username_hash = username.hash();
let result = match (grpc_connection, ws_connection) {
(Some(grpc_connection), Some(ws_connection)) => {
log::info!("trying both gRPC and websocket...");
let (grpc_result, ws_result) = futures_util::future::try_join(
grpc_connection.look_up_username_hash(&username_hash),
ws_connection.look_up_username_hash(&username_hash),
)
.await?;
assert_eq!(
grpc_result, ws_result,
"connections disagreed on the answer?"
);
ws_result
}
(Some(grpc_connection), None) => {
log::info!("sending request over gRPC...");
grpc_connection
.look_up_username_hash(&username_hash)
.await?
}
(None, Some(ws_connection)) => {
log::info!("sending request over websocket...");
ws_connection.look_up_username_hash(&username_hash).await?
}
(None, None) => unreachable!("we established at least one connection"),
};
if let Some(aci) = result {
log::info!("found {}", aci.service_id_string());
} else {
log::info!("no user found");
@@ -113,7 +142,7 @@ assert_impl_all!(Http2Client<tonic::body::Body>: tonic::client::GrpcService<toni
///
/// Eventually this should be covered by a libsignal-net-level API like `simple_chat_connection`.
/// We're not just making an *arbitrary* H2 connection; we're specifically talking to chat-server.
async fn grpc_connection(host: &str) -> anyhow::Result<Http2Client<tonic::body::Body>> {
async fn make_grpc_connection(host: &str) -> anyhow::Result<Http2Client<tonic::body::Body>> {
let host: Arc<str> = Arc::from(host);
let connect_state = Arc::new(ConnectState::new(SUGGESTED_CONNECT_CONFIG));
let resolver = DnsResolver::new(&no_network_change_events());

View File

@@ -223,13 +223,23 @@ pub enum HttpConnectError {
assert_impl_all!(TransportConnectError: LogSafeDisplay);
impl LogSafeDisplay for HttpConnectError {}
/// A refinement of [`hyper::body::Body`] that supports our use of hyper's H2 connections.
pub trait H2Body:
hyper::body::Body<Data: Send, Error: Into<Box<dyn Error + Send + Sync>>> + Send + Unpin + 'static
{
}
impl<T> H2Body for T where
T: hyper::body::Body<Data: Send, Error: Into<Box<dyn Error + Send + Sync>>>
+ Send
+ Unpin
+ 'static
{
}
impl<B, Inner> Connector<HttpRouteFragment, Inner> for Http2Connector<B>
where
Inner: Connection + AsyncDuplexStream + Send + 'static,
B: hyper::body::Body<Data: Send, Error: Into<Box<dyn Error + Send + Sync>>>
+ Send
+ Unpin
+ 'static,
B: H2Body,
{
type Connection = Http2Client<B>;
type Error = HttpConnectError;

View File

@@ -4,8 +4,10 @@
//
use std::fmt::{Debug, Display};
use std::marker::PhantomData;
use std::time::Duration;
use derive_where::derive_where;
use futures_util::future::Either;
use futures_util::{Sink, Stream, TryFutureExt};
use http::uri::PathAndQuery;
@@ -15,7 +17,7 @@ use tungstenite::{Message, Utf8Bytes, http};
use crate::AsyncDuplexStream;
use crate::errors::LogSafeDisplay;
use crate::http_client::Http2Connector;
use crate::http_client::{H2Body, Http2Client, Http2Connector};
use crate::route::{Connector, HttpRouteFragment, HttpVersion, WebSocketRouteFragment};
use crate::stream::StreamWithFixedTransportInfo;
use crate::ws::error::{HttpFormatError, ProtocolError, SpaceError};
@@ -109,8 +111,17 @@ impl crate::Connection for Box<dyn WebSocketTransportStream> {
}
/// Stateless [`Connector`] implementation for websocket-over-HTTPS routes.
#[derive(Default)]
pub struct Stateless;
///
/// The `B` parameter controls what body type to use to allow sharing an underlying H2 connection
/// (it will be ignored for an H1 connection). If you don't care, use the default, which you can get
/// in expression contexts by using angle brackets: `<ws::Stateless>`.
#[derive_where(Default)]
pub struct Stateless<B = http_body_util::Empty<bytes::Bytes>> {
// The connector does not itself carry a 'B', but it produces something that itself consumes
// 'B's, which as far as variance goes is the same as consuming 'B's now. (This is probably
// overthinking; the important part is that it's not just a plain B.)
body: PhantomData<fn(B)>,
}
/// [`Connector`] for websocket-over-HTTPS routes that discards the response headers.
#[derive(Default)]
@@ -126,21 +137,24 @@ impl WithoutResponseHeaders {
}
#[derive(Debug)]
pub struct StreamWithResponseHeaders<Inner> {
pub struct StreamWithResponseHeaders<Inner, B> {
pub stream: Inner,
pub response_headers: http::HeaderMap,
pub connection: Option<Http2Client<B>>,
}
/// Connects a websocket on top of an existing connection.
///
/// This can't just take as the route type a [`WebSocketRouteFragment`] because
/// there are HTTP-level fields that also affect connection establishment.
impl<Inner> Connector<(WebSocketRouteFragment, HttpRouteFragment), Inner> for Stateless
impl<Inner, B> Connector<(WebSocketRouteFragment, HttpRouteFragment), Inner> for Stateless<B>
where
Inner: WebSocketTransportStream,
B: H2Body + Default,
{
type Connection = StreamWithResponseHeaders<
tokio_tungstenite::WebSocketStream<Box<dyn WebSocketTransportStream>>,
B,
>;
type Error = WebSocketConnectError;
@@ -158,7 +172,7 @@ where
}
}
async fn connect_http1<Inner: WebSocketTransportStream>(
async fn connect_http1<Inner: WebSocketTransportStream, B>(
inner: Inner,
ws: WebSocketRouteFragment,
http: HttpRouteFragment,
@@ -166,6 +180,7 @@ async fn connect_http1<Inner: WebSocketTransportStream>(
) -> Result<
StreamWithResponseHeaders<
tokio_tungstenite::WebSocketStream<Box<dyn WebSocketTransportStream>>,
B,
>,
WebSocketConnectError,
> {
@@ -230,10 +245,11 @@ async fn connect_http1<Inner: WebSocketTransportStream>(
Ok(StreamWithResponseHeaders {
stream,
response_headers: response.into_parts().0.headers,
connection: None,
})
}
async fn connect_http2<Inner: WebSocketTransportStream>(
async fn connect_http2<Inner: WebSocketTransportStream, B: H2Body + Default>(
inner: Inner,
ws: WebSocketRouteFragment,
http: HttpRouteFragment,
@@ -241,6 +257,7 @@ async fn connect_http2<Inner: WebSocketTransportStream>(
) -> Result<
StreamWithResponseHeaders<
tokio_tungstenite::WebSocketStream<Box<dyn WebSocketTransportStream>>,
B,
>,
WebSocketConnectError,
> {
@@ -272,9 +289,7 @@ async fn connect_http2<Inner: WebSocketTransportStream>(
));
}
// For now, hardcode the body type to Empty; in the future, we'll want to make this generic so
// that it's compatible with tonic::body::Body.
let h2_connector = Http2Connector::<http_body_util::Empty<bytes::Bytes>>::new();
let h2_connector = Http2Connector::<B>::new();
let transport_info = inner.transport_info();
let mut client = h2_connector
@@ -307,7 +322,7 @@ async fn connect_http2<Inner: WebSocketTransportStream>(
.body(Default::default())
.map_err(tungstenite::Error::from)?;
let response = async move {
let response = async {
client.ready().await?;
client.send_request(request).await
}
@@ -399,16 +414,17 @@ async fn connect_http2<Inner: WebSocketTransportStream>(
Ok(StreamWithResponseHeaders {
stream: ws,
response_headers,
connection: Some(client),
})
}
impl<T, Inner, C> Connector<(WebSocketRouteFragment, HttpRouteFragment), Inner>
impl<T, Inner, C, B> Connector<(WebSocketRouteFragment, HttpRouteFragment), Inner>
for WithoutResponseHeaders<T>
where
T: Connector<
(WebSocketRouteFragment, HttpRouteFragment),
Inner,
Connection = StreamWithResponseHeaders<C>,
Connection = StreamWithResponseHeaders<C, B>,
>,
{
type Connection = C;
@@ -424,6 +440,7 @@ where
|StreamWithResponseHeaders {
stream,
response_headers: _,
connection: _,
}| stream,
)
}
@@ -535,7 +552,8 @@ pub mod testutil {
WebSocketStream<Box<dyn WebSocketTransportStream>>,
) {
let (client, server) = tokio::io::duplex(1024);
let client_future = Stateless.connect_over(
let connector = WithoutResponseHeaders::new();
let client_future = connector.connect_over(
client,
(
WebSocketRouteFragment {
@@ -554,10 +572,7 @@ pub mod testutil {
);
let server_future = tokio_tungstenite::accept_async(server);
let (client_res, server_res) = tokio::join!(client_future, server_future);
let StreamWithResponseHeaders {
stream: client_stream,
response_headers: _,
} = client_res.unwrap();
let client_stream = client_res.unwrap();
let server_stream = server_res.unwrap();
(server_stream, client_stream)
}
@@ -780,7 +795,7 @@ mod test {
.await;
let server_task = tokio::spawn(server);
let mut ws = Stateless
let mut ws = <Stateless>::default()
.connect_over(
stream,
(
@@ -820,7 +835,7 @@ mod test {
let (stream, server) = localhost_h2_ws(upgrade_failure_tx, Default::default()).await;
tokio::spawn(server);
let err = Stateless
let err = <Stateless>::default()
.connect_over(
stream,
(
@@ -860,7 +875,7 @@ mod test {
// "Oops, the server dropped the connection."
drop(server);
let err = Stateless
let err = <Stateless>::default()
.connect_over(
stream,
(
@@ -903,7 +918,7 @@ mod test {
.await;
tokio::spawn(server);
let err = Stateless
let err = <Stateless>::default()
.connect_over(
stream,
(
@@ -950,7 +965,7 @@ mod test {
.await;
let server_task = tokio::spawn(server);
let mut ws = Stateless
let mut ws = <Stateless>::default()
.connect_over(
stream,
(

View File

@@ -12,6 +12,7 @@ use ::http::uri::PathAndQuery;
use ::http::{HeaderMap, HeaderName, HeaderValue, StatusCode};
use bytes::Bytes;
use either::Either;
use libsignal_net_infra::http_client::Http2Client;
use libsignal_net_infra::route::{
DefaultGetCurrentInterface, HttpsTlsRoute, RouteProvider, RouteProviderExt,
ThrottlingConnector, TransportRoute, UnresolvedHttpsServiceRoute,
@@ -168,10 +169,13 @@ pub struct ChatConnection {
connection_info: ConnectionInfo,
}
pub type GrpcBody = tonic::body::Body;
/// A connection to the chat service that isn't yet active.
#[derive(Debug)]
pub struct PendingChatConnection {
connection: WebSocketStream<Box<dyn WebSocketTransportStream>>,
shared_h2_connection: Option<Http2Client<GrpcBody>>,
connect_response_headers: http::HeaderMap,
ws_config: ws::Config,
route_info: RouteInfo,
@@ -286,7 +290,7 @@ impl ChatConnection {
// lets us get connection parallelism at the transport level (which
// is useful) while limiting us to one fully established connection
// at a time.
ThrottlingConnector::new(crate::infra::ws::Stateless, 1),
ThrottlingConnector::new(crate::infra::ws::Stateless::default(), 1),
&log_tag,
)
.await?;
@@ -296,10 +300,12 @@ impl ChatConnection {
let StreamWithResponseHeaders {
stream,
response_headers,
connection: shared_h2_connection,
} = connection.into_inner();
Ok(PendingChatConnection {
connection: stream,
shared_h2_connection,
connect_response_headers: response_headers,
route_info,
ws_config,
@@ -315,6 +321,7 @@ impl ChatConnection {
) -> Self {
let PendingChatConnection {
connection,
shared_h2_connection,
connect_response_headers,
ws_config,
route_info,
@@ -339,6 +346,7 @@ impl ChatConnection {
transport_info,
get_current_interface: DefaultGetCurrentInterface,
},
shared_h2_connection,
network_change_event,
listener,
),
@@ -359,6 +367,10 @@ impl ChatConnection {
pub fn connection_info(&self) -> &ConnectionInfo {
&self.connection_info
}
pub async fn shared_h2_connection(&self) -> Option<Http2Client<GrpcBody>> {
self.inner.shared_h2_connection().await
}
}
impl PendingChatConnection {
@@ -370,6 +382,7 @@ impl PendingChatConnection {
}
pub async fn disconnect(&mut self) {
_ = self.shared_h2_connection.take();
if let Err(error) = self.connection.close(None).await {
log::warn!(
"[{}] pending chat connection disconnect failed with {error}",

View File

@@ -97,6 +97,7 @@ impl ChatConnection {
transport_info: connection_info.transport_info.clone(),
get_current_interface: FakeCurrentInterface,
},
None,
no_network_change_events(),
listener,
),

View File

@@ -19,6 +19,7 @@ use http::uri::PathAndQuery;
use http::{Method, StatusCode};
use itertools::Itertools as _;
use libsignal_net_infra::TransportInfo;
use libsignal_net_infra::http_client::Http2Client;
use libsignal_net_infra::route::GetCurrentInterface;
use libsignal_net_infra::utils::NetworkChangeEvent;
use libsignal_net_infra::utils::future::SomeOrPending;
@@ -34,7 +35,9 @@ use tokio::time::{Duration, Instant};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tungstenite::protocol::frame::coding::CloseCode;
use crate::chat::{ChatMessageType, MessageProto, Request, RequestProto, Response, ResponseProto};
use crate::chat::{
ChatMessageType, GrpcBody, MessageProto, Request, RequestProto, Response, ResponseProto,
};
use crate::env::{
ALERT_HEADER_NAME, CONNECTED_ELSEWHERE_CLOSE_CODE, CONNECTION_INVALIDATED_CLOSE_CODE,
};
@@ -188,6 +191,7 @@ impl Chat {
connection_config: ConnectionConfig<
impl GetCurrentInterface<Representation = IpAddr> + Send + Sync + 'static,
>,
shared_h2_connection: Option<Http2Client<GrpcBody>>,
network_change_event: NetworkChangeEvent,
mut listener: EventListener,
) -> Self
@@ -219,6 +223,7 @@ impl Chat {
},
),
connection_config,
shared_h2_connection,
network_change_event,
initial_request_id,
listener,
@@ -241,6 +246,20 @@ impl Chat {
listener(ListenerEvent::ReceivedAlerts(alerts))
}
pub async fn shared_h2_connection(&self) -> Option<Http2Client<GrpcBody>> {
let state = self.state.lock().await;
match &*state {
TaskState::MaybeStillRunning {
request_tx: _,
response_tx: _,
task: _,
shared_h2_connection,
} => shared_h2_connection.clone(),
TaskState::SignaledToEnd(_) => None,
TaskState::Finished(_) => None,
}
}
/// Sends a request to the server and waits for the response.
///
/// If the request can't be sent or the response isn't received, this
@@ -289,11 +308,13 @@ impl Chat {
request_tx,
response_tx,
task,
shared_h2_connection,
} => {
// Signal to the task, if it's still running, that it should
// quit. Do this by hanging up on it, at which point it will
// exit.
drop((request_tx, response_tx));
drop(shared_h2_connection);
TaskState::SignaledToEnd(task)
}
state @ (TaskState::SignaledToEnd(_) | TaskState::Finished(_)) => state,
@@ -315,6 +336,7 @@ impl Chat {
request_tx: _,
response_tx: _,
task,
shared_h2_connection: _,
} => {
if !task.is_finished() {
return true;
@@ -339,6 +361,7 @@ impl Chat {
connection_config: ConnectionConfig<
impl GetCurrentInterface<Representation = IpAddr> + Send + Sync + 'static,
>,
shared_h2_connection: Option<Http2Client<GrpcBody>>,
network_change_event: NetworkChangeEvent,
initial_request_id: u64,
listener: EventListener,
@@ -400,6 +423,7 @@ impl Chat {
request_tx,
response_tx,
task,
shared_h2_connection,
};
Self {
@@ -437,6 +461,7 @@ enum TaskState {
request_tx: mpsc::Sender<OutgoingRequest>,
response_tx: mpsc::UnboundedSender<OutgoingResponse>,
task: JoinHandle<Result<FinishReason, TaskErrorState>>,
shared_h2_connection: Option<Http2Client<GrpcBody>>,
},
/// The task has been signalled to end and should be terminating soon, but
/// not necessarily immediately.
@@ -772,6 +797,7 @@ async fn send_request(
request_tx,
response_tx: _,
task: _,
shared_h2_connection: _,
} => request_tx.clone(),
TaskState::SignaledToEnd(_) => {
return Err(SendError::Disconnected(DisconnectedReason::SocketClosed {
@@ -828,6 +854,7 @@ async fn wait_for_task_to_finish(state: &mut TaskState) -> &Result<FinishReason,
task,
request_tx: _,
response_tx: _,
shared_h2_connection: _,
} => {
// The send can only fail if the task has ended since it owns the
// other end of the channel.
@@ -1582,6 +1609,7 @@ mod test {
},
get_current_interface,
},
None,
network_change_event,
initial_request_id,
listener,

View File

@@ -921,7 +921,7 @@ mod test {
#[tokio::test(start_paused = true)]
async fn connect_ws_timeout() {
let ws_connector = crate::infra::ws::Stateless;
let ws_connector = <crate::infra::ws::Stateless>::default();
let resolver = DnsResolver::new_from_static_map(HashMap::from([(
FAKE_HOST_NAME,
LookupResult::new(vec![ip_addr!(v4, "192.0.2.1")], vec![]),
@@ -978,7 +978,7 @@ mod test {
// can't actually change the local IP detection logic. But we can test a ClientAbort
// produced by the underlying connector.
let ws_connector = crate::infra::ws::Stateless;
let ws_connector = <crate::infra::ws::Stateless>::default();
let resolver = DnsResolver::new_from_static_map(HashMap::from([(
FAKE_HOST_NAME,
LookupResult::new(vec![ip_addr!(v4, "192.0.2.1")], vec![]),