@@ -14,79 +14,189 @@ using namespace facade;
1414
1515namespace dfly ::cluster {
1616
17- class Coordinator ::CrossShardClient : private ProtocolClient {
17+ class Coordinator ::CrossShardRequest {
18+ public:
19+ CrossShardRequest (std::string cmd, Coordinator::RespCB cb, uint32_t total_shards)
20+ : command_(std::move(cmd)), cb_(std::move(cb)), shard_processed_(total_shards) {
21+ }
22+
23+ const std::string& GetCommand () const {
24+ return command_;
25+ }
26+
27+ template <class ... Args> void Exec (Args&&... args) {
28+ cb_ (std::forward<Args>(args)...);
29+ if (shard_processed_.fetch_sub (1 , std::memory_order_relaxed) == 1 ) {
30+ future_.Resolve (GenericError{});
31+ }
32+ }
33+
34+ util::fb2::Future<GenericError>& GetFuture () {
35+ return future_;
36+ }
37+
38+ private:
39+ std::string command_;
40+ Coordinator::RespCB cb_;
41+ util::fb2::Future<GenericError> future_;
42+ std::atomic_uint32_t shard_processed_;
43+ };
44+
45+ class Coordinator ::CrossShardClient : public ProtocolClient {
1846 public:
1947 CrossShardClient (std::string host, uint16_t port) : ProtocolClient(std::move(host), port) {
2048 }
2149
2250 using ProtocolClient::CloseSocket;
2351 ~CrossShardClient () {
52+ exec_st_.Cancel ();
53+ waker_.notifyAll ();
2454 CloseSocket ();
55+ send_fb_.Join ();
56+ resp_fb_.Join ();
2557 }
2658
27- void Init () {
59+ [[nodiscard]] bool Init () {
2860 VLOG (1 ) << " Resolving host DNS to " << server ().Description ();
2961 if (error_code ec = ResolveHostDns (); ec) {
3062 LOG (WARNING) << " Could not resolve host DNS to " << server ().Description () << " : "
3163 << ec.message ();
3264 exec_st_.ReportError (GenericError (ec, " Could not resolve host dns." ));
33- return ;
65+ return false ;
3466 }
3567 VLOG (1 ) << " Start coordinator connection to " << server ().Description ();
3668 auto timeout = 3000ms; // TODO add flag;
3769 if (auto ec = ConnectAndAuth (timeout, &exec_st_); ec) {
3870 LOG (WARNING) << " Couldn't connect to " << server ().Description () << " : " << ec.message ()
3971 << " , socket state: " << GetSocketInfo (Sock ()->native_handle ());
4072 exec_st_.ReportError (GenericError (ec, " Couldn't connect to source." ));
41- return ;
73+ return false ;
4274 }
4375
4476 ResetParser (RedisParser::Mode::CLIENT);
77+ send_fb_ = util::fb2::Fiber (" CSS_SendFb" , &CrossShardClient::SendFb, this );
78+ resp_fb_ = util::fb2::Fiber (" CSS_RespFb" , &CrossShardClient::RespFb, this );
79+ return true ;
4580 }
4681
4782 void Cancel () {
83+ exec_st_.Cancel ();
4884 ShutdownSocket ();
4985 }
5086
51- void SendCommand (std::string_view cmd, const RespCB& cb) {
52- if (auto ec = ProtocolClient::SendCommand (cmd); ec) {
53- LOG (WARNING) << " Coordinator could not send command to : " << server ().Description () << " : "
54- << ec.message () << " , socket state: " << GetSocketInfo (Sock ()->native_handle ());
55- exec_st_.ReportError (GenericError (ec, " Could not send command." ));
87+ void EnqueueCommand (CrossShardRequestPtr req) {
88+ std::lock_guard lk (mu_);
89+ send_queue_.push (req);
90+ resp_queue_.push (req);
91+ ready_to_send_ = true ;
92+ waker_.notifyAll ();
93+ }
94+
95+ void SendFb () {
96+ while (!exec_st_.IsCancelled ()) {
97+ waker_.await ([this ] { return exec_st_.IsCancelled () || ready_to_send_; });
98+ if (exec_st_.IsCancelled ())
99+ return ;
100+ std::lock_guard lk (mu_);
101+ while (!send_queue_.empty ()) {
102+ if (auto ec = ProtocolClient::SendCommand (send_queue_.front ()->GetCommand ()); ec) {
103+ exec_st_.ReportError (GenericError (
104+ ec, absl::StrCat (" Coordinator could not send command to : " , server ().Description (),
105+ " socket state: " , GetSocketInfo (Sock ()->native_handle ()))));
106+ // TODO reinit connection.
107+ break ;
108+ }
109+ send_queue_.pop ();
110+ }
111+ ready_to_send_ = false ;
56112 }
57- auto timeout = 30000 ; // TODO add flag;
58- if (auto resp = ReadRespReply (timeout); !resp) {
59- LOG (WARNING) << " Error reading response from " << server ().Description () << " : "
60- << resp.error () << " , socket state: " + GetSocketInfo (Sock ()->native_handle ());
113+ }
114+
115+ void RespFb () {
116+ while (!exec_st_.IsCancelled ()) {
117+ waker_.await ([this ] { return exec_st_.IsCancelled () || ready_to_send_; });
118+ if (exec_st_.IsCancelled ())
119+ return ;
120+ std::lock_guard lk (mu_);
121+ constexpr auto timeout = 3000 ; // TODO add flag and add usage in ReadRespReply.
122+ while (!resp_queue_.empty ()) {
123+ auto resp = TakeRespReply (timeout);
124+ if (!resp) {
125+ LOG (WARNING) << " Error reading response from " << server ().Description () << " : "
126+ << resp.error ()
127+ << " , socket state: " + GetSocketInfo (Sock ()->native_handle ());
128+
129+ // TODO make all requests fail in this case.
130+ // TODO reinit connection.
131+ LOG (FATAL) << " Coordinator RespFb read error, not implemented recovery yet." ;
132+ break ;
133+ }
134+ resp_queue_.front ()->Exec (*resp);
135+ resp_queue_.pop ();
136+ }
61137 }
62- cb (LastResponseArgs ());
63- // add response processing
64138 }
139+
140+ private:
141+ std::queue<std::shared_ptr<CrossShardRequest>> send_queue_;
142+ std::queue<std::shared_ptr<CrossShardRequest>> resp_queue_;
143+
144+ util::fb2::Fiber send_fb_;
145+ util::fb2::Fiber resp_fb_;
146+ util::fb2::EventCount waker_;
147+
148+ mutable util::fb2::Mutex mu_;
149+ std::atomic_bool ready_to_send_ = false ;
65150};
66151
67152Coordinator& Coordinator::Current () {
68153 static Coordinator instance;
69154 return instance;
70155}
71156
72- void Coordinator::DispatchAll (std::string_view command, RespCB cb) {
157+ std::shared_ptr<Coordinator::CrossShardClient> Coordinator::GetClient (const std::string& host,
158+ uint16_t port) {
159+ for (const auto & client : clients_) {
160+ if (client->GetHost () == host && client->GetPort () == port) {
161+ return client;
162+ }
163+ }
164+ auto new_client = std::make_shared<CrossShardClient>(host, port);
165+ if (new_client->Init ()) {
166+ clients_.emplace_back (new_client);
167+ return new_client;
168+ }
169+ return nullptr ;
170+ }
171+
172+ util::fb2::Future<GenericError> Coordinator::DispatchAll (std::string command, RespCB cb) {
73173 auto cluster_config = ClusterConfig::Current ();
74174 if (!cluster_config) {
75175 VLOG (2 ) << " No cluster config found for coordinator plan creation." ;
76- return ;
176+ LOG (FATAL) << " No cluster config, not implemented logic yet." ;
177+ return {};
77178 }
78179 VLOG (2 ) << " Dispatching command to all shards: " << command;
79180 auto shards_config = cluster_config->GetConfig ();
80181
81- std::vector<std::unique_ptr<CrossShardClient>> clients;
182+ auto shard_request = std::make_shared<CrossShardRequest>(std::move (command), std::move (cb),
183+ shards_config.size () - 1 );
184+
82185 for (const auto & shard : shards_config) {
83186 if (shard.master .id == cluster_config->MyId ()) {
84187 continue ;
85188 }
86- clients.emplace_back (std::make_unique<CrossShardClient>(shard.master .ip , shard.master .port ));
87- clients.back ()->Init ();
88- clients.back ()->SendCommand (std::string (command), cb);
189+ const auto & client = GetClient (shard.master .ip , shard.master .port );
190+ if (!client) {
191+ VLOG (1 ) << " Could not get coordinator client for " << shard.master .ip << " :"
192+ << shard.master .port ;
193+ cb ({}); // TODO add error propagation.
194+ LOG (FATAL) << " No error processing, not implemented logic yet." ;
195+ return {};
196+ }
197+ client->EnqueueCommand (shard_request);
89198 }
199+ return shard_request->GetFuture ();
90200}
91201
92202} // namespace dfly::cluster
0 commit comments