|
77 | 77 | shared: ProxyHttpSharedState<C, P>, |
78 | 78 |
|
79 | 79 | backend: ProxyHttpBackendEndpoint<C, P>, |
80 | | - requests: HashMap<Uuid, ProxiedHttpRequest>, |
| 80 | + requests: Arc<HashMap<Uuid, ProxiedHttpRequest>>, |
81 | 81 | postprocessor: actix::Addr<ProxyHttpPostprocessor<C, P>>, |
82 | 82 | } |
83 | 83 |
|
@@ -130,7 +130,7 @@ where |
130 | 130 | } |
131 | 131 | .start(); |
132 | 132 |
|
133 | | - Self { id, shared, backend, requests: HashMap::default(), postprocessor } |
| 133 | + Self { id, shared, backend, requests: Arc::new(HashMap::default()), postprocessor } |
134 | 134 | } |
135 | 135 |
|
136 | 136 | pub(crate) async fn run( |
@@ -641,35 +641,49 @@ where |
641 | 641 | } |
642 | 642 |
|
643 | 643 | fn postprocess_client_request(&self, req: ProxiedHttpRequest) { |
644 | | - let id = req.info.req_id; |
| 644 | + let req_id = req.info.req_id; |
645 | 645 | let conn_id = req.info.conn_id; |
| 646 | + let worker_id = self.id; |
646 | 647 |
|
647 | | - if self.requests.insert_sync(id, req).is_err() { |
648 | | - error!( |
649 | | - proxy = P::name(), |
650 | | - request_id = %id, |
651 | | - connection_id = %conn_id, |
652 | | - worker_id = %self.id, |
653 | | - "Duplicate request id", |
654 | | - ); |
655 | | - }; |
| 648 | + let requests = self.requests.clone(); |
| 649 | + |
| 650 | + actix_web::rt::task::spawn_blocking(move || { |
| 651 | + if requests.insert_sync(req_id, req).is_err() { |
| 652 | + error!( |
| 653 | + proxy = P::name(), |
| 654 | + request_id = %req_id, |
| 655 | + connection_id = %conn_id, |
| 656 | + worker_id = %worker_id, |
| 657 | + "Duplicate request id", |
| 658 | + ); |
| 659 | + }; |
| 660 | + }); |
656 | 661 | } |
657 | 662 |
|
658 | 663 | fn postprocess_backend_response(&self, bknd_res: ProxiedHttpResponse) { |
659 | | - let Some((_, clnt_req)) = self.requests.remove_sync(&bknd_res.info.req_id) else { |
660 | | - error!( |
661 | | - proxy = P::name(), |
662 | | - request_id = %bknd_res.info.req_id, |
663 | | - connection_id = %bknd_res.info.conn_id, |
664 | | - worker_id = %self.id, |
665 | | - "Proxied http response for unmatching request", |
666 | | - ); |
667 | | - return; |
668 | | - }; |
| 664 | + let req_id = bknd_res.info.req_id; |
| 665 | + let conn_id = bknd_res.info.conn_id; |
| 666 | + let worker_id = self.id; |
| 667 | + |
| 668 | + let requests = self.requests.clone(); |
| 669 | + let postprocessor = self.postprocessor.clone(); |
| 670 | + |
| 671 | + actix_web::rt::task::spawn_blocking(move || { |
| 672 | + let Some((_, clnt_req)) = requests.remove_sync(&req_id) else { |
| 673 | + error!( |
| 674 | + proxy = P::name(), |
| 675 | + request_id = %req_id, |
| 676 | + connection_id = %conn_id, |
| 677 | + worker_id = %worker_id, |
| 678 | + "Proxied http response for unmatching request", |
| 679 | + ); |
| 680 | + return; |
| 681 | + }; |
669 | 682 |
|
670 | | - // hand over to postprocessor asynchronously so that we can return the |
671 | | - // response to the client as early as possible |
672 | | - self.postprocessor.do_send(ProxiedHttpCombo { req: clnt_req, res: bknd_res }); |
| 683 | + // hand over to postprocessor asynchronously so that we can return the |
| 684 | + // response to the client as early as possible |
| 685 | + postprocessor.do_send(ProxiedHttpCombo { req: clnt_req, res: bknd_res }); |
| 686 | + }); |
673 | 687 | } |
674 | 688 |
|
675 | 689 | fn finalise_proxying( |
|
0 commit comments