Skip to content

Commit 4defdcb

Browse files
committed
UPD | quic
1 parent 47e7d5a commit 4defdcb

File tree

4 files changed

+80
-55
lines changed

4 files changed

+80
-55
lines changed

include/worker/ManapiBaseUtils.hpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,9 @@ namespace manapi::net::worker {
5858

5959
manapi::bytebuffer recv_first_buffer (const shared_conn &conn, connection_prepared_t *data) MANAPIHTTP_NOEXCEPT;
6060

61-
inline ssize_t sync_write(interface_worker *w, const shared_conn &conn, ev::buff_t *buff, uint32_t nbuff, bool finish) MANAPIHTTP_NOEXCEPT {
62-
auto const connection = conn->as<connection_prepared_base_t>();
63-
auto const config = w->config();
64-
ssize_t const limit_size = config->speed_limit_rate - connection->transfered;
61+
ssize_t sync_write(interface_worker *w, const shared_conn &conn, ev::buff_t *buff, uint32_t nbuff, bool finish) MANAPIHTTP_NOEXCEPT;
6562

66-
auto const size = base::buffs_cut_by_size (buff, nbuff, limit_size, finish);
67-
68-
if (!size)
69-
return 0;
70-
71-
return w->sync_write_ex(conn, buff, nbuff, size, finish, config->max_buffer_stack);
72-
}
63+
ssize_t sync_write(interface_worker *w, const shared_conn &conn, connection_prepared_base_t *connection, ev::buff_t *buff, uint32_t nbuff, std::size_t limit_rate, bool finish) MANAPIHTTP_NOEXCEPT;
7364

7465
void waiting(const shared_conn &conn, connection_base2_t *data, bool state) MANAPIHTTP_NOEXCEPT;
7566

src/http/ManapiBaseHttp.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "../include/ManapiSiteInternal.hpp"
2020
#include "../include/ManapiDefaultErrors.hpp"
2121
#include "../include/ManapiHttpStructs.hpp"
22+
#include "std/ManapiAsyncTimer.hpp"
2223

2324
static const std::set<std::string> methods = {"POST", "GET", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE", "PATCH", "CONNECT"};
2425

@@ -1098,7 +1099,7 @@ namespace manapi::net::http::internal {
10981099
co_return;
10991100
}
11001101

1101-
manapi_log_trace(manapi::debug::LOG_TRACE_LOW, "conn:%p is %s:%u",
1102+
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "conn:%p is %s:%u",
11021103
cdata->conn.get(), client->ip.data(), static_cast<uint32_t>(client->port));
11031104

11041105
auto const handler = (cdata->router->statics->layer
@@ -1153,7 +1154,7 @@ namespace manapi::net::http::internal {
11531154
return;
11541155
}
11551156

1156-
manapi_log_trace(manapi::debug::LOG_TRACE_LOW, "conn:%p is %s:%u",
1157+
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "conn:%p is %s:%u",
11571158
cdata->conn.get(), client->ip.data(), static_cast<uint32_t>(client->port));
11581159

11591160
auto handler = &cdata->router->handler->handler;
@@ -1186,7 +1187,7 @@ namespace manapi::net::http::internal {
11861187

11871188

11881189
void manapi::net::http::internal::handle_income_request(uq_handle_data_t cdata, int status) {
1189-
manapi_log_trace(manapi::debug::LOG_TRACE_LOW, "Handle HTTP request on %.*s conn:%p",
1190+
manapi_log_trace(manapi::debug::LOG_TRACE_MEDIUM, "Handle HTTP request on %.*s conn:%p",
11901191
cdata->req_data->uri.size(), cdata->req_data->uri.data(), cdata->conn.get());
11911192
if (status >= 200 && status < 300)
11921193
return handle_income_request_(std::move(cdata), status);
@@ -1209,6 +1210,8 @@ manapi::future<void> manapi::net::http::internal::send_file(std::unique_ptr<resp
12091210
ssize_t const block_size = 4096 * 16;
12101211
auto const cdata = res->connection_data();
12111212

1213+
//bool check_conn = false;
1214+
12121215
auto write_block = cdata->worker->bufferpool().slice(block_size).unwrap();
12131216
auto read_block = cdata->worker->bufferpool().slice(block_size).unwrap();
12141217

@@ -1253,6 +1256,14 @@ manapi::future<void> manapi::net::http::internal::send_file(std::unique_ptr<resp
12531256
/* failed to send */
12541257
goto err;
12551258
}
1259+
//
1260+
// if (!check_conn && current >= 10240) {
1261+
// check_conn = true;
1262+
//
1263+
// if (size >= 20971520) {
1264+
// co_await async::delay (100, res->req()->cancellation().sub());
1265+
// }
1266+
// }
12561267

12571268
if ((rhs = co_await parallel.get_or(0)) <= 0) {
12581269
break;

src/worker/ManapiBaseUtils.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,31 @@ manapi::bytebuffer manapi::net::worker::prepared::recv_first_buffer(const shared
3232
return std::move(object);
3333
}
3434

35+
ssize_t manapi::net::worker::prepared::sync_write(interface_worker *w, const shared_conn &conn, ev::buff_t *buff,uint32_t nbuff, bool finish) MANAPIHTTP_NOEXCEPT {
36+
auto const connection = conn->as<connection_prepared_base_t>();
37+
auto const config = w->config();
38+
ssize_t const limit_size = config->speed_limit_rate - connection->transfered;
39+
40+
auto const size = base::buffs_cut_by_size (buff, nbuff, limit_size, finish);
41+
42+
if (!size)
43+
return 0;
44+
45+
return w->sync_write_ex(conn, buff, nbuff, size, finish, config->max_buffer_stack);
46+
}
47+
48+
ssize_t manapi::net::worker::prepared::sync_write(interface_worker *w, const shared_conn &conn, connection_prepared_base_t *connection, ev::buff_t *buff, uint32_t nbuff, std::size_t limit_rate, bool finish) MANAPIHTTP_NOEXCEPT {
49+
auto const config = w->config();
50+
ssize_t const limit_size = std::min<ssize_t>(limit_rate, config->speed_limit_rate) - connection->transfered;
51+
52+
auto const size = base::buffs_cut_by_size (buff, nbuff, limit_size, finish);
53+
54+
if (!size)
55+
return 0;
56+
57+
return w->sync_write_ex(conn, buff, nbuff, size, finish, config->max_buffer_stack);
58+
}
59+
3560
void manapi::net::worker::prepared::waiting(const shared_conn &conn, connection_base2_t *data, bool state) MANAPIHTTP_NOEXCEPT {
3661

3762
if (state)

src/worker/ManapiQuicOpenSsl.cpp

Lines changed: 39 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
# include <openssl/quic.h>
1919
# include <openssl/err.h>
2020

21+
# define SPEED_LIMIT_DEFAULT 1024000
22+
# define SPEED_LIMIT_INIT 2048000
23+
# define SPEED_LIMIT_STEP 1024000
2124

2225
# define MANAPI_AS_BIO(n) static_cast<BIO*>(n)
2326
# define MANAPI_AS_SSL(n) static_cast<SSL*>(n)
@@ -78,8 +81,7 @@ struct manapi::net::worker::openssl_quic::quic_stream_t : connection_prepared_t
7881
SSL *stream;
7982
worker::connection *parent;
8083
std::size_t poll_id;
81-
uint32_t sent_an_tick;
82-
uint32_t send_an_tick_state;
84+
std::size_t cur_speed_lim;
8385
};
8486

8587
manapi::net::worker::openssl_quic::openssl_quic(net::http::site site, std::shared_ptr<multithread_storage::worker_t> wdata, manapi::net::http::config *config)
@@ -638,6 +640,10 @@ int manapi::net::worker::openssl_quic::event_flags(const shared_conn &conn, int
638640
}
639641
}
640642

643+
if (status & ev::WRITE) {
644+
this->feed_event(conn, ev::WRITE, nullptr, 0, nullptr);
645+
}
646+
641647
MANAPIHTTP_WORKER_EVENT_BREAK(data)
642648
}
643649

@@ -668,7 +674,8 @@ manapi::bytebuffer manapi::net::worker::openssl_quic::recv_first_buffer(const sh
668674
}
669675

670676
ssize_t manapi::net::worker::openssl_quic::sync_write(const shared_conn &conn, ev::buff_t *buff, uint32_t nbuff, bool finish) MANAPIHTTP_NOEXCEPT {
671-
return prepared::sync_write(this, conn, buff, nbuff, finish);
677+
auto p = conn->as<quic_stream_t>();
678+
return prepared::sync_write(this, conn, p, buff, nbuff, p->cur_speed_lim, finish);
672679
}
673680

674681
ssize_t manapi::net::worker::openssl_quic::sync_write_ex(const shared_conn &conn, ev::buff_t *buff, uint32_t nbuff, ssize_t size, bool finish, std::size_t maxcnt) MANAPIHTTP_NOEXCEPT {
@@ -678,11 +685,14 @@ ssize_t manapi::net::worker::openssl_quic::sync_write_ex(const shared_conn &conn
678685
if (s->flags & ev::DISCONNECT)
679686
return -1;
680687

688+
681689
assert(!(s->flags & CONN_SEND_END));
682690

683691
if (prepared::write_buffs_is_full(s->top.get(), maxcnt))
684692
return 0;
685693

694+
this->bio_flush_write();
695+
686696
while (nbuff) {
687697
int flags = 0;
688698

@@ -692,39 +702,15 @@ ssize_t manapi::net::worker::openssl_quic::sync_write_ex(const shared_conn &conn
692702
std::size_t written = 0;
693703
int rhs;
694704

695-
if (s->sent_an_tick > s->send_an_tick_state) {
696-
rhs = 1;
697-
written = 0;
705+
if (!s->top->send_size) {
706+
ERR_clear_error();
707+
rhs = SSL_write_ex2(s->stream, buff->base, buff->len, flags, &written);
708+
709+
this->bio_flush_write();
698710
}
699711
else {
700-
if (!s->top->send_size) {
701-
ERR_clear_error();
702-
rhs = SSL_write_ex2(s->stream, buff->base, buff->len, flags, &written);
703-
s->sent_an_tick += static_cast<decltype(s->sent_an_tick)>(written);
704-
if (s->sent_an_tick > s->send_an_tick_state) {
705-
try {
706-
manapi::async::current()->etaskpool()->append_task(
707-
[this, conn, s] () -> void {
708-
if (s->flags & ev::DISCONNECT)
709-
return;
710-
711-
s->sent_an_tick = 0;
712-
if (s->send_an_tick_state < DATA_SIZE_TOPBYTE)
713-
s->send_an_tick_state += DATA_SIZE_PARTBYTE;
714-
715-
this->flush_write_(conn, s);
716-
this->feed_event(conn, ev::WRITE, nullptr, 0, nullptr);
717-
});
718-
}
719-
catch (...) {
720-
return -1;
721-
}
722-
}
723-
}
724-
else {
725-
rhs = 1;
726-
written = 0;
727-
}
712+
rhs = 1;
713+
written = 0;
728714
}
729715

730716
if (rhs!=1) {
@@ -733,7 +719,6 @@ ssize_t manapi::net::worker::openssl_quic::sync_write_ex(const shared_conn &conn
733719
switch (err) {
734720
case SSL_ERROR_WANT_READ:
735721
case SSL_ERROR_WANT_WRITE:
736-
this->bio_flush_write ();
737722
break;
738723
default: {
739724
auto bioerr = ERR_peek_error();
@@ -750,8 +735,6 @@ ssize_t manapi::net::worker::openssl_quic::sync_write_ex(const shared_conn &conn
750735

751736
s->transfered += written;
752737

753-
this->bio_flush_write();
754-
755738
if (!written) {
756739
auto sent = interface_worker::connection_io_send(&s->top->send, buff->base, static_cast<ssize_t>(buff->len),
757740
&this->bufferpool(), this->config_->buffer_size, &s->top->send_size, maxcnt);
@@ -783,7 +766,7 @@ ssize_t manapi::net::worker::openssl_quic::sync_write_ex(const shared_conn &conn
783766
}
784767

785768
buff->base += written;
786-
buff->len -= static_cast<decltype(buff->len)>(written);
769+
buff->len -= written;
787770
}
788771

789772
return res;
@@ -958,7 +941,7 @@ std::size_t manapi::net::worker::openssl_quic::streams_size(const shared_conn &c
958941
void manapi::net::worker::openssl_quic::bio_flush_write() MANAPIHTTP_NOEXCEPT {
959942
ERR_clear_error();
960943
try {
961-
char buffer[2000];
944+
char buffer[4100];
962945
sockaddr_storage storage_peer{};
963946
sockaddr_storage storage_local{};
964947

@@ -1167,7 +1150,7 @@ void manapi::net::worker::openssl_quic::flush_write_(const shared_conn &conn, qu
11671150
break;
11681151
}
11691152

1170-
if (!data->top->send.last_deque)
1153+
if (!prepared::write_buffs_is_full(data->top.get(), this->config_->max_buffer_stack))
11711154
this->feed_event(conn, ev::WRITE, nullptr, 0, nullptr);
11721155
}
11731156

@@ -1200,8 +1183,20 @@ void manapi::net::worker::openssl_quic::update_limit_rate_connection(const share
12001183
auto data = it->second->as<quic_stream_t>();
12011184

12021185
conn_data->transfered += data->transfered;
1186+
bool force_recall = false;
12031187

1204-
if (data->transfered >= config->speed_limit_rate
1188+
if (data->transfered >= data->cur_speed_lim) {
1189+
data->cur_speed_lim += std::max<std::size_t>(SPEED_LIMIT_STEP, data->cur_speed_lim / 2);
1190+
force_recall = true;
1191+
}
1192+
else {
1193+
auto const s = std::max<std::size_t>(data->transfered, SPEED_LIMIT_DEFAULT);
1194+
data->cur_speed_lim = std::max<std::size_t>(s, config->speed_stream_check_bytes / config->speed_stream_check_delay);
1195+
}
1196+
1197+
std::cout << data->cur_speed_lim << " - SPEED\n";
1198+
1199+
if ((force_recall || data->transfered >= config->speed_limit_rate)
12051200
&& data->ev_callback) {
12061201
data->transfered = 0;
12071202

@@ -1641,6 +1636,9 @@ manapi::error::status_or<manapi::net::worker::shared_conn> manapi::net::worker::
16411636
auto p = std::make_unique<quic_stream_t>();
16421637
auto top = std::make_unique<connection_io>();
16431638

1639+
p->cur_speed_lim = std::max<std::size_t>(SPEED_LIMIT_INIT, this->config_->speed_stream_check_bytes
1640+
/ this->config_->speed_stream_check_delay);
1641+
16441642
auto stream_conn = std::shared_ptr<worker::connection> (
16451643
new worker::connection{p.release()}, stream_interface_eraser);
16461644

0 commit comments

Comments
 (0)