Skip to content

Commit 251d82e

Browse files
committed
feat: workspace keepalive task
1 parent f9f6144 commit 251d82e

File tree

3 files changed

+42
-16
lines changed

3 files changed

+42
-16
lines changed

src/client.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,26 +171,49 @@ impl Client {
171171
/// Join and return a [`Workspace`].
172172
#[tracing::instrument(skip(self, workspace), fields(ws = %workspace))]
173173
pub async fn attach_workspace(&self, workspace: uuid::Uuid) -> ConnectionResult<Workspace> {
174-
let token = self
175-
.0
176-
.session
177-
.clone()
178-
.access_workspace(WorkspaceRequest {
174+
let mut session_client = self.0.session.clone();
175+
let token = session_client
176+
.get_workspace_token(WorkspaceRequest {
179177
id: Identifier::from(workspace),
180178
})
181179
.await?
182180
.into_inner();
183181

182+
let workspace_claims = InternallyMutable::new(token);
183+
184184
let ws = Workspace::connect(
185185
workspace,
186186
self.0.user.clone(),
187187
self.0.config.clone(),
188-
token,
188+
workspace_claims.channel(),
189189
self.0.claims.channel(),
190190
)
191191
.await?;
192-
193192
self.0.workspaces.insert(workspace, ws.clone());
193+
let mut workspace_client = ws.services().ws();
194+
195+
let weak = Arc::downgrade(&ws.0);
196+
tokio::spawn(async move {
197+
let fut = async move {
198+
loop {
199+
// TODO either configurable token refresh time or calculate depending on token lifetime
200+
tokio::time::sleep(std::time::Duration::from_secs(240)).await;
201+
if weak.upgrade().is_none() { break };
202+
let new_credentials = session_client.get_workspace_token(
203+
tonic::Request::new(WorkspaceRequest { id: Identifier::from(workspace) })
204+
)
205+
.await?
206+
.into_inner();
207+
workspace_claims.set(new_credentials);
208+
workspace_client.keep_alive(tonic::Request::new(Empty {})).await?;
209+
}
210+
Ok::<(), tonic::Status>(())
211+
};
212+
213+
if let Err(e) = fut.await {
214+
tracing::error!("error in keepalive task for workspace {workspace}: {e}");
215+
}
216+
});
194217

195218
Ok(ws)
196219
}

src/network.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ impl Services {
4040
let channel = Endpoint::from_shared(dest.to_string())?.connect().await?;
4141
let inter = WorkspaceInterceptor { session, workspace };
4242
Ok(Self {
43-
cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()),
4443
workspace: WorkspaceClient::with_interceptor(channel.clone(), inter.clone()),
45-
// TODO technically we could keep buffers on separate servers, and thus manage buffer
46-
// connections separately, but for now it's more convenient to bundle them with workspace
44+
// TODO technically we could keep buffers and cursors on separate servers, and thus manage
45+
// their connections separately, but for now it's more convenient to bundle them with workspace
46+
cursor: CursorClient::with_interceptor(channel.clone(), inter.clone()),
4747
buffer: BufferClient::with_interceptor(channel.clone(), inter.clone()),
4848
})
4949
}

src/workspace.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use napi_derive::napi;
4545
#[derive(Debug, Clone)]
4646
#[cfg_attr(feature = "py", pyo3::pyclass)]
4747
#[cfg_attr(feature = "js", napi)]
48-
pub struct Workspace(Arc<WorkspaceInner>);
48+
pub struct Workspace(pub(crate) Arc<WorkspaceInner>);
4949

5050
#[derive(Debug)]
5151
struct WorkspaceInner {
@@ -87,17 +87,16 @@ impl AsyncReceiver<Event> for Workspace {
8787
}
8888

8989
impl Workspace {
90-
#[tracing::instrument(skip(id, user, token, claims), fields(ws = %id))]
90+
#[tracing::instrument(skip(id, user, workspace_claim, user_claim), fields(ws = %id))]
9191
pub(crate) async fn connect(
9292
id: Uuid,
9393
user: Arc<User>,
9494
config: crate::api::Config,
95-
token: Token,
96-
claims: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
95+
workspace_claim: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
96+
user_claim: tokio::sync::watch::Receiver<codemp_proto::common::Token>,
9797
) -> ConnectionResult<Self> {
98-
let workspace_claim = InternallyMutable::new(token);
9998
let services =
100-
Services::try_new(&config.endpoint(), claims, workspace_claim.channel()).await?;
99+
Services::try_new(&config.endpoint(), user_claim, workspace_claim).await?;
101100
let ws_stream = services.ws().attach(Empty {}).await?.into_inner();
102101

103102
let (tx, rx) = mpsc::channel(128);
@@ -146,6 +145,10 @@ impl Workspace {
146145
Ok(ws)
147146
}
148147

148+
pub(crate) fn services(&self) -> &Services {
149+
&self.0.services
150+
}
151+
149152
/// drop arc, return true if was last
150153
pub(crate) fn consume(self) -> bool {
151154
Arc::into_inner(self.0).is_some()

0 commit comments

Comments
 (0)