From 3e3a3c8de91c07759d3561bed8bd4b708ad74604 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Thu, 31 Jul 2025 09:57:58 +0100 Subject: [PATCH 1/6] TELECOM-11880: rest_client refactor to use curl_multi_socket_action --- modules/rest_client/rest_cb.c | 20 + modules/rest_client/rest_cb.h | 8 + modules/rest_client/rest_client.c | 345 ++++++++++++++++ modules/rest_client/rest_client.h | 6 + modules/rest_client/rest_methods.c | 608 ++++++++++++++++++++++++++--- modules/rest_client/rest_methods.h | 15 +- modules/rest_client/rest_sockets.c | 178 +++++++++ modules/rest_client/rest_sockets.h | 35 ++ 8 files changed, 1164 insertions(+), 51 deletions(-) create mode 100644 modules/rest_client/rest_sockets.c create mode 100644 modules/rest_client/rest_sockets.h diff --git a/modules/rest_client/rest_cb.c b/modules/rest_client/rest_cb.c index fb390ffdecc..2f16ebafb39 100644 --- a/modules/rest_client/rest_cb.c +++ b/modules/rest_client/rest_cb.c @@ -109,3 +109,23 @@ size_t header_func(char *ptr, size_t size, size_t nmemb, void *userdata) return len; } +int timer_cb(CURLM *multi_handle, long timeout_ms, void *cbp) +{ + LM_DBG("multi_handle timer called %ld\n", timeout_ms); + long *p = (long*) cbp; + + *p = timeout_ms; + + return 0; +} + +int prereq_callback(void *cbp, + char *conn_primary_ip, + char *conn_local_ip, + int conn_primary_port, + int conn_local_port) +{ + enum curl_status *p = (enum curl_status*) cbp; + *p = CURL_REQUEST_SENT; + return 0; +} \ No newline at end of file diff --git a/modules/rest_client/rest_cb.h b/modules/rest_client/rest_cb.h index 2f40349351f..870017a303f 100644 --- a/modules/rest_client/rest_cb.h +++ b/modules/rest_client/rest_cb.h @@ -25,6 +25,8 @@ #ifndef _REST_CB_H_ #define _REST_CB_H_ +#include + #include "rest_client.h" #include "../../str.h" @@ -39,8 +41,14 @@ #define MAX_CONTENT_TYPE_LEN 64 #define MAX_HEADER_FIELD_LEN 1024 /* arbitrary */ +enum curl_status { + CURL_NONE=0, CURL_CONNECTED=1, CURL_REQUEST_SENDING=2, CURL_REQUEST_SENT=4, CURL_FINISHED=8, CURL_TIMEOUT=16, CURL_ERROR=32 +}; + size_t write_func(char *ptr, size_t size, size_t nmemb, void *userdata); size_t header_func(char *ptr, size_t size, size_t nmemb, void *userdata); +int timer_cb(CURLM *multi_handle, long timeout_ms, void *cbp); +int prereq_callback(void *cbp, char *conn_primary_ip, char *conn_local_ip, int conn_primary_port, int conn_local_port); #endif /* _REST_CB_H_ */ diff --git a/modules/rest_client/rest_client.c b/modules/rest_client/rest_client.c index 1bb37a2a31c..390a2388afb 100644 --- a/modules/rest_client/rest_client.c +++ b/modules/rest_client/rest_client.c @@ -28,6 +28,8 @@ #include #include +#include + #include "../../async.h" #include "../../sr_module.h" #include "../../dprint.h" @@ -39,9 +41,11 @@ #include "../tls_mgm/api.h" #include "rest_client.h" #include "rest_methods.h" +#include "rest_sockets.h" #include "../../ssl_init_tweaks.h" #include "../../pt.h" #include "../../redact_pii.h" +#include "../../globals.h" /* * Module parameters @@ -50,9 +54,11 @@ long connection_timeout = 20; /* s */ long connect_poll_interval = 20; /* ms */ long connection_timeout_ms; int max_async_transfers = 100; +int max_connections = 100; long curl_timeout = 20; char *ssl_capath; unsigned int max_transfer_size = 10240; /* KB (10MB) */ +int share_connections = 0; /* * curl_multi_perform() may indicate a "try again" response even @@ -72,11 +78,17 @@ int enable_expect_100; struct tls_mgm_binds tls_api; +static preconnect_urls *precon_urls = 0; +static int total_cons = 0; + /* trace parameters for this module */ int rest_proto_id; trace_proto_t tprot; char* rest_id_s = "rest"; +/* file descriptor limits */ +struct rlimit lim; + /* * Module initialization and cleanup */ @@ -104,6 +116,16 @@ static int w_async_rest_put(struct sip_msg *msg, async_ctx *ctx, str *url, str *body, str *_ctype, pv_spec_t *body_pv, pv_spec_t *ctype_pv, pv_spec_t *code_pv); +// Temporary to expose in script +static int w_async_rest_get_v2(struct sip_msg *msg, async_ctx *ctx, str *url, + pv_spec_t *body_pv, pv_spec_t *ctype_pv, pv_spec_t *code_pv); +static int w_async_rest_post_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv); +static int w_async_rest_put_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv); + static int w_rest_append_hf(struct sip_msg *msg, str *hfv); static int w_rest_init_client_tls(struct sip_msg *msg, str *tls_client_dom); int validate_curl_http_version(const int *http_version); @@ -139,6 +161,25 @@ static const acmd_export_t acmds[] = { {CMD_PARAM_VAR,0,0}, {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, + {"rest_get_v2",(acmd_function)w_async_rest_get_v2, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, + {"rest_post_v2",(acmd_function)w_async_rest_post_v2, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, + {"rest_put_v2",(acmd_function)w_async_rest_put_v2, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}}, {0,0,{{0,0,0}}} }; @@ -169,6 +210,28 @@ static const cmd_export_t cmds[] = { {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, ALL_ROUTES}, + {"rest_get_v2",(cmd_function)w_rest_get, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"rest_post_v2",(cmd_function)w_rest_post, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + ALL_ROUTES}, + {"rest_put_v2",(cmd_function)w_rest_put, { + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR,0,0}, + {CMD_PARAM_STR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, + {CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}}, + ALL_ROUTES}, {"rest_append_hf",(cmd_function)w_rest_append_hf, { {CMD_PARAM_STR,0,0}, {0,0,0}}, ALL_ROUTES}, @@ -190,6 +253,76 @@ static const trans_export_t trans[] = { {{0,0},0,0} }; +static int warm_pool_urls(modparam_t type, void *val) { + unsigned int num_conns; + char *mod_param, *delim, *host; + size_t delim_index, string_end; + preconnect_urls *tmp; + str num_conns_s; + + if (!share_connections) { + goto done; + } + + mod_param = (char*) val; + + if ((delim = strchr(mod_param, ',')) == NULL) { + goto error; + } + + delim_index = (size_t)(delim - mod_param); + if (delim_index == 0) { + goto error; + } + + host = (char*) pkg_malloc(delim_index + 1); + if (host == NULL) { + goto error; + } + + strncpy(host, mod_param, delim_index); + host[delim_index + 1] = '\0'; + + string_end = strlen(mod_param + delim_index + 1); + if (string_end == 0) { + goto error; + } + + num_conns_s.s = mod_param + delim_index + 1; + num_conns_s.len = string_end; + if (str2int(&num_conns_s, &num_conns) != 0) { + goto error; + } + + tmp = (preconnect_urls*) pkg_malloc(sizeof(preconnect_urls)); + if (tmp == NULL) { + goto error; + } + + tmp->url = host; + tmp->connections = (long) num_conns; + tmp->next = 0; + + if (precon_urls != NULL) { + tmp->next = precon_urls; + } + + precon_urls = tmp; + total_cons += num_conns; +done: + return 0; +error: + if (host != NULL) { + pkg_free(host); + } + + if (tmp != NULL) { + pkg_free(tmp); + } + + return -1; +} + /* * Exported parameters */ @@ -206,6 +339,11 @@ static const param_export_t params[] = { { "enable_expect_100", INT_PARAM, &enable_expect_100 }, { "no_concurrent_connects", INT_PARAM, &no_concurrent_connects }, { "curl_conn_lifetime", INT_PARAM, &curl_conn_lifetime }, + { "use_multi_socket_api", INT_PARAM, &use_multi_socket_api }, + { "share_connections", INT_PARAM, &share_connections }, + { "max_connections", INT_PARAM, &max_connections }, + { "warm_pool_urls", STR_PARAM|USE_FUNC_PARAM, + (void*)&warm_pool_urls }, { 0, 0, 0 } }; @@ -298,6 +436,16 @@ static int cfg_validate(void) return 1; } +static int get_fd_limit(void) { + if (getrlimit(RLIMIT_NOFILE, &lim) < 0) { + LM_ERR("cannot get the maximum number of file descriptors: %s\n", + strerror(errno)); + return -1; + } + + return 0; +} + static int child_init(int rank) { @@ -306,6 +454,28 @@ static int child_init(int rank) return -1; } + if (get_fd_limit() != 0) { + LM_WARN("Could not get file descriptor limits\n"); + return 0; + } + + if (init_process_limits(lim.rlim_cur) != 0) { + LM_WARN("Could not set file descriptor limits\n"); + return 0; + } + + if (pt[process_no].type != TYPE_UDP && pt[process_no].type != TYPE_TCP) { + return 0; + } + + if (precon_urls == NULL) { + return 0; + } + + if (connect_only(precon_urls, total_cons) != 0) { + LM_WARN("Could not create warm pool\n"); + } + return 0; } @@ -774,3 +944,178 @@ static int w_rest_init_client_tls(struct sip_msg *msg, str *tls_client_dom) { return rest_init_client_tls(msg, tls_client_dom); } + +// Temporary duplication for feature toggle in script +int async_rest_method_v2(enum rest_client_method method, struct sip_msg *msg, + char *url, str *body, str *ctype, async_ctx *ctx, + pv_spec_p body_pv, pv_spec_p ctype_pv, pv_spec_p code_pv) +{ + rest_async_param *param; + pv_value_t val; + long http_rc; + char *host; + int read_fd, rc, lrc = RCL_OK; + + param = pkg_malloc(sizeof *param); + if (!param) { + LM_ERR("no more shm\n"); + return RCL_INTERNAL_ERR; + } + memset(param, '\0', sizeof *param); + + if (no_concurrent_connects && (lrc=rcl_acquire_url(url, &host)) < RCL_OK) + return lrc; + + rc = start_async_http_req_v2(msg, method, url, body, ctype, + param, ¶m->body, ctype_pv ? ¶m->ctype : NULL, &read_fd); + + /* error occurred; no transfer done */ + if (read_fd == ASYNC_NO_IO) { + ctx->resume_param = NULL; + ctx->resume_f = NULL; + if (code_pv) { + val.flags = PV_VAL_INT|PV_TYPE_INT; + val.ri = 0; + if (pv_set_value(msg, (pv_spec_p)code_pv, 0, &val) != 0) + LM_ERR("failed to set output code pv\n"); + } + + /* keep default async status of NO_IO */ + pkg_free(param); + return rc; + + /* no need for async - transfer already completed! */ + } else if (read_fd == ASYNC_SYNC) { + if (code_pv) { + curl_easy_getinfo(param->handle, CURLINFO_RESPONSE_CODE, &http_rc); + LM_DBG("HTTP response code: %ld\n", http_rc); + + val.flags = PV_VAL_INT|PV_TYPE_INT; + val.ri = (int)http_rc; + if (pv_set_value(msg, (pv_spec_p)code_pv, 0, &val) != 0) { + LM_ERR("failed to set output code pv\n"); + return RCL_INTERNAL_ERR; + } + } + + val.flags = PV_VAL_STR; + val.rs = param->body; + if (pv_set_value(msg, (pv_spec_p)body_pv, 0, &val) != 0) { + LM_ERR("failed to set output body pv\n"); + return RCL_INTERNAL_ERR; + } + + if (ctype_pv) { + val.rs = param->ctype; + if (pv_set_value(msg, (pv_spec_p)ctype_pv, 0, &val) != 0) { + LM_ERR("failed to set output ctype pv\n"); + return RCL_INTERNAL_ERR; + } + } + + pkg_free(param->body.s); + if (ctype_pv && param->ctype.s) + pkg_free(param->ctype.s); + curl_easy_cleanup(param->handle); + pkg_free(param); + + async_status = ASYNC_SYNC; + return rc; + } + + /* the TCP connection is established, async started with success */ + + if (lrc == RCL_OK_LOCKED) + rcl_release_url(host, rc == RCL_OK); + + ctx->resume_f = resume_async_http_req_v2; + ctx->timeout_s = curl_timeout; + ctx->timeout_f = time_out_async_http_req_v2; + + param->method = method; + param->body_pv = (pv_spec_p)body_pv; + param->ctype_pv = (pv_spec_p)ctype_pv; + param->code_pv = (pv_spec_p)code_pv; + ctx->resume_param = param; + + async_status = read_fd; + return 1; + +done: + if (lrc == RCL_OK_LOCKED) + rcl_release_url(host, rc == RCL_OK); + return rc; +} + +static int w_async_rest_get_v2(struct sip_msg *msg, async_ctx *ctx, str *url, + pv_spec_t *body_pv, pv_spec_t *ctype_pv, pv_spec_t *code_pv) +{ + str url_nt; + int rc; + + if (pkg_nt_str_dup(&url_nt, url) < 0) { + LM_ERR("No more pkg memory\n"); + return RCL_INTERNAL_ERR; + } + + LM_DBG("async rest get %.*s %p %p %p\n", url->len, url->s, + body_pv, ctype_pv, code_pv); + + rc = async_rest_method_v2(REST_CLIENT_GET, msg, url_nt.s, NULL, NULL, ctx, + body_pv, ctype_pv, code_pv); + + pkg_free(url_nt.s); + return rc; +} + +static int w_async_rest_post_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv) +{ + str ctype = { NULL, 0 }; + str url_nt; + int rc; + + if (pkg_nt_str_dup(&url_nt, url) < 0) { + LM_ERR("No more pkg memory\n"); + return RCL_INTERNAL_ERR; + } + + if (_ctype) + ctype = *_ctype; + + LM_DBG("async rest post '%.*s' %p %p %p\n", url->len, url->s, + body_pv, ctype_pv, code_pv); + + rc = async_rest_method_v2(REST_CLIENT_POST, msg, url_nt.s, body, &ctype, ctx, + body_pv, ctype_pv, code_pv); + + pkg_free(url_nt.s); + return rc; +} + +static int w_async_rest_put_v2(struct sip_msg *msg, async_ctx *ctx, + str *url, str *body, str *_ctype, pv_spec_t *body_pv, + pv_spec_t *ctype_pv, pv_spec_t *code_pv) +{ + str ctype = { NULL, 0 }; + str url_nt; + int rc; + + if (pkg_nt_str_dup(&url_nt, url) < 0) { + LM_ERR("No more pkg memory\n"); + return RCL_INTERNAL_ERR; + } + + if (_ctype) + ctype = *_ctype; + + LM_DBG("async rest put '%.*s' %p %p %p\n", + url->len, url->s, body_pv, ctype_pv, code_pv); + + rc = async_rest_method_v2(REST_CLIENT_PUT, msg, url_nt.s, body, &ctype, ctx, + body_pv, ctype_pv, code_pv); + + pkg_free(url_nt.s); + return rc; +} diff --git a/modules/rest_client/rest_client.h b/modules/rest_client/rest_client.h index 6ae9e2e79d0..63cfb39f89e 100644 --- a/modules/rest_client/rest_client.h +++ b/modules/rest_client/rest_client.h @@ -30,6 +30,12 @@ enum tr_rest_subtype { TR_REST_ESCAPE, TR_REST_UNESCAPE }; +typedef struct _preconnect_urls { + char *url; + unsigned int connections; + struct _preconnect_urls *next; +} preconnect_urls; + extern int enable_expect_100; extern unsigned int max_transfer_size; diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index 9bdf800c35e..bda153f83f1 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -26,6 +26,7 @@ #include #include #include +#include #include "../../mem/shm_mem.h" #include "../../async.h" @@ -40,6 +41,20 @@ #include "rest_client.h" #include "rest_methods.h" #include "rest_cb.h" +#include "rest_sockets.h" + +/* + * So the connections never timeout + * version 8.15 allows 0 to disable the check + * Need to use limits.h to get LONG_MAX otherwise + */ + +#if (LIBCURL_VERSION_NUM >= 0x080f00) +long socket_keep_alive = 0; +#else +#include +long socket_keep_alive = LONG_MAX; +#endif #define REST_CORRELATION_COOKIE "RESTCORR" @@ -73,6 +88,8 @@ extern int rest_proto_id; extern trace_proto_t tprot; extern char *rest_id_s; +static CURLSH *curl_share = NULL; + /** * We cannot use the "parallel transfers" feature of libcurl's multi interface * because that would consume read events from some of its file descriptors that @@ -92,7 +109,8 @@ static int multi_pool_sz; static map_t rcl_connections; static gen_hash_t *rcl_parallel_connects; int no_concurrent_connects; -int curl_conn_lifetime; +int use_multi_socket_api; +unsigned int curl_conn_lifetime; static inline int rest_trace_enabled(void); static int trace_rest_message( rest_trace_param_t* tparam ); @@ -159,6 +177,15 @@ int rcl_init_internals(void) } \ } while (0) +#define w_curl_multi_setopt(mh, opt, value) \ + do { \ + rc = curl_multi_setopt(mh, opt, value); \ + if (rc != CURLE_OK) { \ + LM_ERR("curl_multi_setopt(%d): (%s)\n", opt, curl_easy_strerror(rc)); \ + goto cleanup; \ + } \ + } while (0) + int trace_rest_request_cb(CURL *handle, curl_infotype type, char *data, size_t size, void *userptr) { int is_req; @@ -399,7 +426,24 @@ static inline int get_easy_status(CURL *handle, CURLM *multi, CURLcode *code) return -1; } -static int init_transfer(CURL *handle, char *url) +static CURLSH *get_curl_share(void) { + CURLSHcode src; + + if (!curl_share) { + curl_share = curl_share_init(); + + src = curl_share_setopt(curl_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); + + if (src != CURLSHE_OK) { + LM_WARN("curl_share_setopt: %s\n", curl_share_strerror(src)); + return NULL; + } + } + + return curl_share; +} + +static int init_transfer(CURL *handle, char *url, unsigned long timeout_s) { CURLcode rc; @@ -414,13 +458,17 @@ static int init_transfer(CURL *handle, char *url) tls_dom = NULL; } - w_curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, connection_timeout); - w_curl_easy_setopt(handle, CURLOPT_TIMEOUT, curl_timeout); + w_curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, + timeout_s && timeout_s > connection_timeout ? timeout_s : connection_timeout); + w_curl_easy_setopt(handle, CURLOPT_TIMEOUT, + timeout_s && timeout_s > curl_timeout ? timeout_s : curl_timeout); w_curl_easy_setopt(handle, CURLOPT_VERBOSE, 1); - w_curl_easy_setopt(handle, CURLOPT_STDERR, stdout); w_curl_easy_setopt(handle, CURLOPT_FAILONERROR, 0); + if (is_printable(L_DBG)) + w_curl_easy_setopt(handle, CURLOPT_STDERR, stderr); + if (ssl_capath) w_curl_easy_setopt(handle, CURLOPT_CAPATH, ssl_capath); @@ -436,6 +484,23 @@ static int init_transfer(CURL *handle, char *url) return -1; } +static int init_socket_keepalive(CURL *handle) { + CURLcode rc; + + curl_share = get_curl_share(); + w_curl_easy_setopt(handle, CURLOPT_SHARE, curl_share); + + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, 1L); + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, 5L); + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, 5L); + w_curl_easy_setopt(handle, CURLOPT_MAXAGE_CONN, socket_keep_alive); + + return 0; +cleanup: + LM_WARN("Error creating keep alive sockets\n"); + return -1; +} + #define init_rest_trace(handle, msg, trace_data) \ do { \ memset(trace_data, 0, sizeof *(trace_data)); \ @@ -579,7 +644,7 @@ int rcl_acquire_url(const char *url, char **url_host) } if (*connected_ts != 0 && (get_ticks() - - (unsigned int)*(unsigned long *)(*connected_ts) < curl_conn_lifetime)) { + (unsigned int)*(unsigned long *)connected_ts < curl_conn_lifetime)) { new_connection = 0; } else { new_connection = 1; @@ -702,7 +767,7 @@ int rest_sync_transfer(enum rest_client_method method, struct sip_msg *msg, str st = STR_NULL, res_body = STR_NULL, tbody, ttype; curl_easy_reset(sync_handle); - if (init_transfer(sync_handle, url) != 0) { + if (init_transfer(sync_handle, url, 0) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -792,34 +857,17 @@ int rest_sync_transfer(enum rest_client_method method, struct sip_msg *msg, return RCL_INTERNAL_ERR; } -/** - * start_async_http_req - launch an async HTTP request - * - TCP connect phase is synchronous, due to libcurl limitations - * - TCP read phase is asynchronous, thanks to the libcurl multi interface - * - * @msg: sip message struct - * @method: HTTP verb - * @url: HTTP URL to be queried - * @req_body: Body of the request (NULL if not needed) - * @req_ctype: Value for the "Content-Type: " header of the request (same as ^) - * @async_parm: output param, will contain async handles - * @body: reply body; gradually reallocated as data arrives - * @ctype: will eventually hold the last "Content-Type" header of the reply - * @out_fd: the fd to poll on, or a negative error code - * - * @return: 1 on success, negative on failure - */ -int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, - char *url, str *req_body, str *req_ctype, - rest_async_param *async_parm, str *body, str *ctype, - enum async_ret_code *out_fd) +static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd) { CURL *handle; CURLcode rc; CURLMcode mrc; fd_set rset, wset, eset; int max_fd, fd, http_rc, ret = RCL_INTERNAL_ERR; - long busy_wait, timeout; + long busy_wait, timeout, connect_timeout; long retry_time; OSS_CURLM *multi_list; CURLM *multi_handle; @@ -835,7 +883,7 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, goto cleanup; } - if (init_transfer(handle, url) != 0) { + if (init_transfer(handle, url, async_parm->timeout_s) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -889,18 +937,27 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, multi_handle = multi_list->multi_handle; curl_multi_add_handle(multi_handle, handle); - timeout = connection_timeout_ms; + connect_timeout = (async_parm->timeout_s*1000) > connection_timeout_ms ? + (async_parm->timeout_s*1000) : connection_timeout_ms; + timeout = connect_timeout; busy_wait = connect_poll_interval; /* obtain a read fd in "connection_timeout" seconds at worst */ - for (timeout = connection_timeout_ms; timeout > 0; timeout -= busy_wait) { + for (timeout = connect_timeout; timeout > 0; timeout -= busy_wait) { + double connect = -1; + long req_sz = -1; + mrc = curl_multi_perform(multi_handle, &running_handles); if (mrc != CURLM_OK && mrc != CURLM_CALL_MULTI_PERFORM) { LM_ERR("curl_multi_perform: %s\n", curl_multi_strerror(mrc)); goto error; } - LM_DBG("perform code: %d, handles: %d\n", mrc, running_handles); + curl_easy_getinfo(handle, CURLINFO_CONNECT_TIME, &connect); + curl_easy_getinfo(handle, CURLINFO_REQUEST_SIZE, &req_sz); + + LM_DBG("perform code: %d, handles: %d, connect: %.3lfs, reqsz: %ldB\n", + mrc, running_handles, connect, req_sz); /* transfer completed! But how well? */ if (running_handles == 0) { @@ -923,8 +980,8 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, case CURLE_OPERATION_TIMEDOUT: if (http_rc == 0) { - LM_ERR("connect timeout on %s (%lds)\n", url, - connection_timeout); + LM_ERR("connect timeout on %s (%ldms)\n", url, + connect_timeout); ret = RCL_CONNECT_TIMEOUT; goto error; } @@ -962,9 +1019,8 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, if (max_fd != -1) { for (fd = 0; fd <= max_fd; fd++) { if (FD_ISSET(fd, &rset)) { - LM_DBG("ongoing transfer on fd %d\n", fd); - if (is_new_transfer(fd)) { + if (req_sz > 0 && is_new_transfer(fd)) { LM_DBG(">>> add fd %d to ongoing transfers\n", fd); add_transfer(fd); goto success; @@ -981,7 +1037,7 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, LM_DBG("libcurl TCP connect: we should wait up to %ldms " "(timeout=%ldms, poll=%ldms)!\n", retry_time, - connection_timeout_ms, connect_poll_interval); + connect_timeout, connect_poll_interval); /* from curl_multi_timeout() docs: @@ -1055,8 +1111,8 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, if (timed_out) { char *url = NULL; curl_easy_getinfo(param->handle, CURLINFO_EFFECTIVE_URL, &url); - LM_ERR("async %s timed out, URL: %s\n", - rest_client_method_str(param->method), url); + LM_ERR("async %s timed out, URL: %s (timeout: %lds)\n", + rest_client_method_str(param->method), url, param->timeout_s); goto cleanup; } @@ -1069,10 +1125,12 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, LM_DBG("perform result: %d, running: %d (break: %d)\n", mrc, running, mrc != CURLM_CALL_MULTI_PERFORM && (mrc != CURLM_OK || !running)); - if (mrc == CURLM_OK && running) { + if (mrc == CURLM_OK) { + if (!running) + break; + async_status = ASYNC_CONTINUE; return 1; - /* this rc has been removed since cURL 7.20.0 (Feb 2010), but it's not * yet marked as deprecated, so let's keep the do/while loop */ } else if (mrc != CURLM_CALL_MULTI_PERFORM) { @@ -1209,18 +1267,154 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, return ret; } - -enum async_ret_code resume_async_http_req(int fd, struct sip_msg *msg, void *_param) +static enum async_ret_code _resume_async_http_req_v2(int fd, struct sip_msg *msg, + rest_async_param *param, int timed_out) { - return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 0); -} + CURLcode rc; + CURLMcode mrc; + int running = 0; + long http_rc = 0; + pv_value_t val; + int ret = RCL_INTERNAL_ERR, retr; + CURLM *multi_handle; + LM_DBG("resume async processing...\n"); -enum async_ret_code time_out_async_http_req(int fd, struct sip_msg *msg, void *_param) -{ - return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 1); -} + multi_handle = param->multi_list->multi_handle; + + if (timed_out) { + char *url = NULL; + curl_easy_getinfo(param->handle, CURLINFO_EFFECTIVE_URL, &url); + LM_ERR("async %s timed out, URL: %s (timeout: %lds)\n", + rest_client_method_str(param->method), url, param->timeout_s); + goto cleanup; + } + + retr = 0; + do { + /* When @enable_expect_100 is on, both the client body upload and the + * server body download will be performed within this loop, blocking */ + + mrc = curl_multi_socket_action(multi_handle, fd, 0, &running); + LM_DBG("perform result: %d, running: %d (break: %d)\n", mrc, running, + mrc != CURLM_CALL_MULTI_PERFORM && (mrc != CURLM_OK || !running)); + + if (mrc == CURLM_OK) { + if (!running) + break; + + async_status = ASYNC_CONTINUE; + return 1; + /* this rc has been removed since cURL 7.20.0 (Feb 2010), but it's not + * yet marked as deprecated, so let's keep the do/while loop */ + } else if (mrc != CURLM_CALL_MULTI_PERFORM) { + break; + } + usleep(_async_resume_retr_itv); + retr += _async_resume_retr_itv; + } while (retr < _async_resume_retr_timeout); + + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_perform: %s\n", curl_multi_strerror(mrc)); + goto out; + } + + if (!timed_out) { + if (running == 1) { + LM_DBG("transfer in progress...\n"); + async_status = ASYNC_CONTINUE; + return 1; + } + + if (running != 0) { + LM_BUG("non-zero running handles!! (%d)", running); + goto out; + } + } + +cleanup: + curl_slist_free_all(param->header_list); + rc = curl_easy_getinfo(param->handle, CURLINFO_RESPONSE_CODE, &http_rc); + if (rc != CURLE_OK) { + LM_ERR("curl_easy_getinfo: %d, %s\n", rc, curl_easy_strerror(rc)); + http_rc = 0; + } + + if (get_easy_status(param->handle, multi_handle, &rc) < 0) + LM_DBG("download finished, but an HTTP status is not available " + "(timed_out: %d)\n", timed_out); + + if (param->code_pv) { + val.flags = PV_VAL_INT|PV_TYPE_INT; + val.ri = (int)http_rc; + if (pv_set_value(msg, param->code_pv, 0, &val) != 0) { + LM_ERR("failed to set output code pv\n"); + goto out; + } + } + + switch (rc) { + case CURLE_OK: + ret = RCL_OK; + break; + + case CURLE_COULDNT_CONNECT: + LM_ERR("connect refused\n"); + ret = RCL_CONNECT_REFUSED; + goto out; + + case CURLE_OPERATION_TIMEDOUT: + LM_ERR("connected, but transfer timed out (%lds)\n", curl_timeout); + ret = RCL_TRANSFER_TIMEOUT; + break; + + default: + LM_ERR("curl_easy_perform error %d, %s\n", + rc, curl_easy_strerror(rc)); + goto out; + } + + val.flags = PV_VAL_STR; + val.rs = param->body; + if (pv_set_value(msg, param->body_pv, 0, &val) != 0) { + LM_ERR("failed to set output body pv\n"); + goto out; + } + + if (param->ctype_pv) { + val.rs = param->ctype; + if (pv_set_value(msg, param->ctype_pv, 0, &val) != 0) { + LM_ERR("failed to set output ctype pv\n"); + goto out; + } + } + + LM_DBG("HTTP response code: %ld\n", http_rc); + +out: + mrc = curl_multi_remove_handle(multi_handle, param->handle); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + ret = RCL_INTERNAL_ERR; + } + put_multi(param->multi_list); + + pkg_free(param->body.s); + if (param->ctype_pv && param->ctype.s) + pkg_free(param->ctype.s); + curl_easy_cleanup(param->handle); + if ( param->tparam ) { + pkg_free( param->tparam ); + } + pkg_free(param); + + if (timed_out) + ret = RCL_TRANSFER_TIMEOUT; + + /* default async status is ASYNC_DONE */ + return ret; +} /** * rest_append_hf - add a custom HTTP header before a rest call @@ -1368,3 +1562,317 @@ static int trace_rest_message( rest_trace_param_t* tparam ) return 0; } + +int connect_only(preconnect_urls *precon_urls, int total_cons) { + CURLcode rc; + CURLMcode mrc; + CURL *handle; + CURLM *multi_handle; + OSS_CURLM *multi_list; + CURL **list; + struct CURLMsg *m; + preconnect_urls *start, *next; + char *url; + long busy_wait, timer; + int msgq, num_of_connections, exit_code = 0; + + curl_share = get_curl_share(); + + if (!curl_share) { + goto cleanup; + } + + multi_list = get_multi(); + if (!multi_list) { + goto cleanup; + } + + multi_handle = multi_list->multi_handle; + + start = precon_urls; + + while (start != NULL) { + num_of_connections = start->connections; + url = start->url; + + LM_DBG("connect to %s, num of connects %d\n", url, num_of_connections); + + for (int i = 0; i < num_of_connections; i++) { + handle = curl_easy_init(); + curl_multi_add_handle(multi_handle, handle); + + if (init_transfer(handle, url, 0) != 0) { + exit_code = -1; + goto cleanup; + } + + if (init_socket_keepalive(handle) != 0) { + exit_code = -1; + goto cleanup; + } + + w_curl_easy_setopt(handle, CURLOPT_NOBODY, 1L); + w_curl_easy_setopt(handle, CURLOPT_SHARE, curl_share); + } + + start = start->next; + } + + busy_wait = connect_poll_interval; + + if (setsocket_callback(multi_handle) != 0) { + goto cleanup; + } + + w_curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, (long) num_of_connections); + w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, (long) num_of_connections); + + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); + + if ((running_handles = start_multi_socket(multi_handle)) < 0) { + exit_code = -1; + goto cleanup; + } + + LM_DBG("Creating warm pool connection, running handles %d\n", running_handles); + + do { + running_handles = run_multi_socket(multi_handle); + + if (timer < 0) { + break; + } + + usleep(1000UL * busy_wait); + LM_DBG("Creating warm pool connection, running handles %d\n", running_handles); + } while (running_handles != 0); + +cleanup: + do { + m = curl_multi_info_read(multi_handle, &msgq); + if (m && m->msg == CURLMSG_DONE) { + mrc = curl_multi_remove_handle(multi_handle, m->easy_handle); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + } + curl_easy_cleanup(m->easy_handle); + } + } while (m); + + start = precon_urls; + + while (start != NULL) { + next = start->next; + + pkg_free(start->url); + pkg_free(start); + + start = next; + } + + put_multi(multi_list); + + return exit_code; +} + +int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd) +{ + CURL *handle; + CURLcode rc; + CURLMcode mrc; + OSS_CURLM *multi_list; + CURLM *multi_handle; + long busy_wait, timeout, connect_timeout, retry_time, timer; + enum curl_status status = CURL_NONE; + + handle = curl_easy_init(); + + if (!handle) { + LM_ERR("Init curl handle failed!\n"); + goto cleanup; + } + + if (init_transfer(handle, url, async_parm->timeout_s) != 0) { + LM_ERR("failed to init transfer to %s\n", url); + goto cleanup; + } + + switch (method) { + case REST_CLIENT_POST: + set_post_opts(handle, req_ctype, req_body); + break; + + case REST_CLIENT_GET: + if (header_list) + w_curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header_list); + break; + + case REST_CLIENT_PUT: + set_put_opts(handle, req_ctype, req_body); + break; + + default: + LM_ERR("unsupported method: %d, defaulting to GET\n", method); + } + + w_curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, write_func); + w_curl_easy_setopt(handle, CURLOPT_WRITEDATA, body); + + if (share_connections) { + // If the share cannot be created then log warn but keep going + init_socket_keepalive(handle); + } + + if (ctype) { + w_curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, header_func); + w_curl_easy_setopt(handle, CURLOPT_HEADERDATA, ctype); + } + + if (rest_trace_enabled()) { + async_parm->tparam = pkg_malloc(sizeof(rest_trace_param_t)); + if (!async_parm->tparam) { + LM_ERR("oom\n"); + goto cleanup; + } + + init_rest_trace(handle, msg, async_parm->tparam); + } + + multi_list = get_multi(); + if (!multi_list) { + LM_WARN("failed to get a multi handle, doing a blocking transfer\n"); + rc = rest_easy_perform(handle, url, NULL); + clean_header_list; + async_parm->handle = handle; + *out_fd = ASYNC_SYNC; + return rc; + } + + multi_handle = multi_list->multi_handle; + curl_multi_add_handle(multi_handle, handle); + + if (setsocket_callback(multi_handle) != 0) { + goto cleanup; + } + + w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, (long) max_connections); + + w_curl_easy_setopt(handle, CURLOPT_PREREQFUNCTION, prereq_callback); + w_curl_easy_setopt(handle, CURLOPT_PREREQDATA, &status); + + connect_timeout = (async_parm->timeout_s*1000) > connection_timeout_ms ? + (async_parm->timeout_s*1000) : connection_timeout_ms; + timeout = connect_timeout; + busy_wait = connect_poll_interval; + + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); + + if ((running_handles = start_multi_socket(multi_handle)) < 0) { + goto cleanup; + } + + do { + running_handles = run_multi_socket(multi_handle); + + if (status == CURL_REQUEST_SENT) { + goto success; + } + + if (timer < 0) { + break; + } + + usleep(1000UL * busy_wait); + LM_DBG("Connecting or sending, running handles %d\n", running_handles); + } while (running_handles != 0); + + LM_ERR("connect timeout on %s (%lds)\n", url, connection_timeout); + goto error; + +success: + async_parm->header_list = header_list; + async_parm->handle = handle; + async_parm->multi_list = multi_list; + header_list = NULL; + *out_fd = get_max_fd(ASYNC_SYNC); // Running only one socket at a time so it's always the max + return RCL_OK; + +error: + mrc = curl_multi_remove_handle(multi_handle, handle); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + } + put_multi(multi_list); + + curl_easy_cleanup(handle); + +cleanup: + clean_header_list; + if (tls_dom) { + tls_api.release_domain(tls_dom); + tls_dom = NULL; + } + if (rest_trace_enabled() && async_parm->tparam) + pkg_free(async_parm->tparam); + + *out_fd = ASYNC_NO_IO; + return RCL_INTERNAL_ERR; +} + +/** + * start_async_http_req - launch an async HTTP request + * - TCP connect phase is synchronous, due to libcurl limitations + * - TCP read phase is asynchronous, thanks to the libcurl multi interface + * + * @msg: sip message struct + * @method: HTTP verb + * @url: HTTP URL to be queried + * @req_body: Body of the request (NULL if not needed) + * @req_ctype: Value for the "Content-Type: " header of the request (same as ^) + * @async_parm: in/out param, will contain async handles + * @body: reply body; gradually reallocated as data arrives + * @ctype: will eventually hold the last "Content-Type" header of the reply + * @out_fd: the fd to poll on, or a negative error code + * + * @return: 1 on success, negative on failure + */ +int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd) +{ + if (!use_multi_socket_api) { + return start_async_http_req_v1(msg, method, url, req_body, req_ctype, async_parm, body, ctype, out_fd); + } else { + return start_async_http_req_v2(msg, method, url, req_body, req_ctype, async_parm, body, ctype, out_fd); + } +} + +enum async_ret_code resume_async_http_req(int fd, struct sip_msg *msg, void *_param) { + if (!use_multi_socket_api) { + return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 0); + } else { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 0); + } +} + +enum async_ret_code time_out_async_http_req(int fd, struct sip_msg *msg, void *_param) { + if (!use_multi_socket_api) { + return _resume_async_http_req(fd, msg, (rest_async_param *)_param, 1); + } else { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 1); + } +} + +enum async_ret_code resume_async_http_req_v2(int fd, struct sip_msg *msg, void *_param) { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 0); +} + +enum async_ret_code time_out_async_http_req_v2(int fd, struct sip_msg *msg, void *_param) { + return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 1); +} \ No newline at end of file diff --git a/modules/rest_client/rest_methods.h b/modules/rest_client/rest_methods.h index 3e2774d1ff3..d85341a1aec 100644 --- a/modules/rest_client/rest_methods.h +++ b/modules/rest_client/rest_methods.h @@ -39,6 +39,7 @@ extern long connection_timeout; extern long connect_poll_interval; extern long connection_timeout_ms; extern int max_async_transfers; +extern int max_connections; extern long curl_timeout; extern char *ssl_capath; @@ -47,7 +48,9 @@ extern int ssl_verifyhost; extern int curl_http_version; extern int no_concurrent_connects; -extern int curl_conn_lifetime; +extern int use_multi_socket_api; +extern int share_connections; +extern unsigned int curl_conn_lifetime; /* handle for use with synchronous reqs */ extern CURL *sync_handle; @@ -121,6 +124,7 @@ typedef struct rest_async_param_ { pv_spec_p body_pv; pv_spec_p ctype_pv; pv_spec_p code_pv; + unsigned int timeout_s; } rest_async_param; int init_sync_handle(void); @@ -139,9 +143,18 @@ int start_async_http_req(struct sip_msg *msg, enum rest_client_method method, enum async_ret_code resume_async_http_req(int fd, struct sip_msg *msg, void *_param); enum async_ret_code time_out_async_http_req(int fd, struct sip_msg *msg, void *_param); +// Temporary expose these +int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, + char *url, str *req_body, str *req_ctype, + rest_async_param *async_parm, str *body, str *ctype, + enum async_ret_code *out_fd); + +enum async_ret_code resume_async_http_req_v2(int fd, struct sip_msg *msg, void *_param); +enum async_ret_code time_out_async_http_req_v2(int fd, struct sip_msg *msg, void *_param); int rest_append_hf_method(struct sip_msg *msg, str *hfv); int rest_init_client_tls(struct sip_msg *msg, str *tls_client_dom); +int connect_only(preconnect_urls *precon_urls, int total_cons); #endif /* _REST_METHODS_ */ diff --git a/modules/rest_client/rest_sockets.c b/modules/rest_client/rest_sockets.c new file mode 100644 index 00000000000..8482d92452e --- /dev/null +++ b/modules/rest_client/rest_sockets.c @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2025 OpenSIPS Solutions + * + * This file is part of opensips, a free SIP server. + * + * opensips is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * opensips is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include +#include +#include + +#include "../../dprint.h" +#include "../../mem/mem.h" +#include "rest_sockets.h" + +#if defined(__CPU_aarch64) || defined(__CPU_x86_64) || defined(__CPU_mips64) +#define WORD_SIZE_BITS 64 +#define WORD_SIZE_BYTES 8 +#define COUNT_LEADING_ZEROS(s) __builtin_clzll(s) +#define COUNT_TRAILING_ZEROS(s) __builtin_ctzll(s) +typedef uint64_t cpuword_t; +#else +#define WORD_SIZE_BITS 32 +#define WORD_SIZE_BYTES 4 +#define COUNT_LEADING_ZEROS(s) __builtin_clz(s) +#define COUNT_TRAILING_ZEROS(s) __builtin_ctz(s) +typedef uint32_t cpuword_t; +#endif + +#define BYTE_LEN 8 + +typedef struct _file_descriptors { + unsigned char *tracked_socks; + int max_fd_index; +} file_descriptors; + +static file_descriptors fds; +size_t aligned_bitset_len; + +int init_process_limits(rlim_t rlim_cur) { + aligned_bitset_len = (((rlim_cur + BYTE_LEN - 1) / BYTE_LEN) * BYTE_LEN) / BYTE_LEN; + + fds.tracked_socks = (unsigned char*) pkg_malloc(aligned_bitset_len); + fds.max_fd_index = 0; + + if (fds.tracked_socks == NULL) { + return -1; + } + + return 0; +} + +int get_max_fd(int no_max_default) { + cpuword_t sockets; + + if (fds.max_fd_index < 0) { + return -2; + } + + memcpy(&sockets, fds.tracked_socks + fds.max_fd_index, sizeof(cpuword_t)); + + if (!sockets) { + fds.max_fd_index -= WORD_SIZE_BYTES; + return no_max_default; + } + + return ((fds.max_fd_index << 3) + WORD_SIZE_BITS - 1) - COUNT_LEADING_ZEROS(sockets); +} + +static void add_sock(int s) { + int sock_index = s >> 3; + + if (sock_index > fds.max_fd_index) { + fds.max_fd_index = sock_index; + } + + fds.tracked_socks[s / BYTE_LEN] |= (1 << (s % BYTE_LEN)); +} + +static void remove_sock(int s) { + cpuword_t sockets; + + fds.tracked_socks[s / BYTE_LEN] &= ~(1 << (s % BYTE_LEN)); + + if (fds.max_fd_index >= 0) { + memcpy(&sockets, fds.tracked_socks + fds.max_fd_index, sizeof(cpuword_t)); + + if (!sockets) { + fds.max_fd_index -= WORD_SIZE_BYTES; + } + } +} + +static int socket_action_cb(CURL *e, curl_socket_t s, int event, void *cbp, void *sockp) +{ + LM_DBG("called for socket %d status %d\n", s, event); + + if (event != CURL_POLL_REMOVE) { + add_sock(s); + } else if (event == CURL_POLL_REMOVE) { + remove_sock(s); + } + + return 0; +} + +int start_multi_socket(CURLM *multi_handle) { + CURLMcode mrc; + int running; + + memset(fds.tracked_socks, 0, aligned_bitset_len); + fds.max_fd_index = 0; + mrc = curl_multi_socket_action(multi_handle, CURL_SOCKET_TIMEOUT, 0, &running); + + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_socket_action: %s\n", curl_multi_strerror(mrc)); + return -1; + } + + return running; +} + +int run_multi_socket(CURLM *multi_handle) { + CURLMcode mrc; + cpuword_t sockets; + int running, curl_fd; + + for (int i = 0; i <= fds.max_fd_index; i += WORD_SIZE_BYTES) { + memcpy(&sockets, fds.tracked_socks + i, sizeof(cpuword_t)); + + while (sockets) { + curl_fd = (i * BYTE_LEN) + COUNT_TRAILING_ZEROS(sockets); + LM_DBG("Action on socket %d\n", curl_fd); + + mrc = curl_multi_socket_action(multi_handle, curl_fd, 0, &running); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_socket_action: %s\n", curl_multi_strerror(mrc)); + return -1; + } + + sockets &= sockets - 1; + } + } + + return running; +} + +int setsocket_callback(CURLM *multi_handle) { + CURLcode rc; + + rc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_cb); + if (rc != CURLE_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_easy_strerror(rc)); + return -1; + } + + rc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, &fds); + if (rc != CURLE_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_easy_strerror(rc)); + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/modules/rest_client/rest_sockets.h b/modules/rest_client/rest_sockets.h new file mode 100644 index 00000000000..e8e30c98854 --- /dev/null +++ b/modules/rest_client/rest_sockets.h @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2025 OpenSIPS Solutions + * + * This file is part of opensips, a free SIP server. + * + * opensips is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * opensips is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _REST_SOCKET_H_ +#define _REST_SOCKET_H_ + +#include +#include + +#include + +int init_process_limits(rlim_t rlim_cur); +int get_max_fd(int no_max_default); +int start_multi_socket(CURLM *multi_handle); +int run_multi_socket(CURLM *multi_handle); +int setsocket_callback(CURLM *multi_handle); + +#endif /* _REST_SOCKET_H_ */ \ No newline at end of file From 3483af72347bc762e2c7ebf11d9115fe120b9bc8 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Fri, 29 Aug 2025 15:21:47 +0100 Subject: [PATCH 2/6] TELECOM-11880: Implementing curl_share locking --- modules/rest_client/rest_client.c | 5 -- modules/rest_client/rest_methods.c | 90 ++++++++++++++++++++++-------- 2 files changed, 67 insertions(+), 28 deletions(-) diff --git a/modules/rest_client/rest_client.c b/modules/rest_client/rest_client.c index 390a2388afb..99ec82a437b 100644 --- a/modules/rest_client/rest_client.c +++ b/modules/rest_client/rest_client.c @@ -1040,11 +1040,6 @@ int async_rest_method_v2(enum rest_client_method method, struct sip_msg *msg, async_status = read_fd; return 1; - -done: - if (lrc == RCL_OK_LOCKED) - rcl_release_url(host, rc == RCL_OK); - return rc; } static int w_async_rest_get_v2(struct sip_msg *msg, async_ctx *ctx, str *url, diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index bda153f83f1..f5bcb259d46 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -35,6 +35,7 @@ #include "../../trace_api.h" #include "../../resolve.h" #include "../../timer.h" +#include "../../lock_ops.h" #include "../tls_mgm/api.h" @@ -89,6 +90,10 @@ extern trace_proto_t tprot; extern char *rest_id_s; static CURLSH *curl_share = NULL; +static gen_lock_t curl_share_locks[CURL_LOCK_DATA_LAST]; + +static enum curl_status status; +static long timer; /** * We cannot use the "parallel transfers" feature of libcurl's multi interface @@ -179,9 +184,18 @@ int rcl_init_internals(void) #define w_curl_multi_setopt(mh, opt, value) \ do { \ - rc = curl_multi_setopt(mh, opt, value); \ - if (rc != CURLE_OK) { \ - LM_ERR("curl_multi_setopt(%d): (%s)\n", opt, curl_easy_strerror(rc)); \ + mrc = curl_multi_setopt(mh, opt, value); \ + if (mrc != CURLE_OK) { \ + LM_ERR("curl_multi_setopt(%d): (%s)\n", opt, curl_multi_strerror(mrc)); \ + goto cleanup; \ + } \ + } while (0) + +#define w_curl_share_setopt(cs, opt, value) \ + do { \ + src = curl_share_setopt(cs, opt, value); \ + if (src != CURLE_OK) { \ + LM_ERR("curl_share_setopt: %s\n", curl_share_strerror(src)); \ goto cleanup; \ } \ } while (0) @@ -426,21 +440,40 @@ static inline int get_easy_status(CURL *handle, CURLM *multi, CURLcode *code) return -1; } +static void libcurl_share_lock(CURL *handle, curl_lock_data data, curl_lock_access access, void *clientp) { + LM_DBG("Locking libcurl share %d\n", data); + lock_get(&curl_share_locks[data]); +} + +static void libcurl_share_unlock(CURL *handle, curl_lock_data data, void *clientp) { + LM_DBG("Unlocking libcurl share %d\n", data); + lock_release(&curl_share_locks[data]); +} + static CURLSH *get_curl_share(void) { CURLSHcode src; if (!curl_share) { - curl_share = curl_share_init(); + for (int i =0; i < CURL_LOCK_DATA_LAST; ++i) { + lock_init(&curl_share_locks[i]); + } - src = curl_share_setopt(curl_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); + curl_share = curl_share_init(); - if (src != CURLSHE_OK) { - LM_WARN("curl_share_setopt: %s\n", curl_share_strerror(src)); - return NULL; - } + w_curl_share_setopt(curl_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); + w_curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, libcurl_share_lock); + w_curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, libcurl_share_unlock); } return curl_share; +cleanup: + for (int i =0; i < CURL_LOCK_DATA_LAST; ++i) { + lock_destroy(&curl_share_locks[i]); + } + + curl_share_cleanup(curl_share); + curl_share = NULL; + return NULL; } static int init_transfer(CURL *handle, char *url, unsigned long timeout_s) @@ -466,8 +499,7 @@ static int init_transfer(CURL *handle, char *url, unsigned long timeout_s) w_curl_easy_setopt(handle, CURLOPT_VERBOSE, 1); w_curl_easy_setopt(handle, CURLOPT_FAILONERROR, 0); - if (is_printable(L_DBG)) - w_curl_easy_setopt(handle, CURLOPT_STDERR, stderr); + w_curl_easy_setopt(handle, CURLOPT_STDERR, stdout); if (ssl_capath) w_curl_easy_setopt(handle, CURLOPT_CAPATH, ssl_capath); @@ -487,12 +519,15 @@ static int init_transfer(CURL *handle, char *url, unsigned long timeout_s) static int init_socket_keepalive(CURL *handle) { CURLcode rc; - curl_share = get_curl_share(); - w_curl_easy_setopt(handle, CURLOPT_SHARE, curl_share); + if (share_connections) { + // If the share cannot be created then log warn but keep going + curl_share = get_curl_share(); + w_curl_easy_setopt(handle, CURLOPT_SHARE, curl_share); + } w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, 1L); - w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, 5L); - w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, 5L); + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, 180L); + w_curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, 180L); w_curl_easy_setopt(handle, CURLOPT_MAXAGE_CONN, socket_keep_alive); return 0; @@ -1290,6 +1325,9 @@ static enum async_ret_code _resume_async_http_req_v2(int fd, struct sip_msg *msg goto cleanup; } + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); + w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); + retr = 0; do { /* When @enable_expect_100 is on, both the client body upload and the @@ -1686,8 +1724,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, CURLMcode mrc; OSS_CURLM *multi_list; CURLM *multi_handle; - long busy_wait, timeout, connect_timeout, retry_time, timer; - enum curl_status status = CURL_NONE; + long busy_wait, timeout, connect_timeout, retry_time; handle = curl_easy_init(); @@ -1722,10 +1759,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, w_curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, write_func); w_curl_easy_setopt(handle, CURLOPT_WRITEDATA, body); - if (share_connections) { - // If the share cannot be created then log warn but keep going - init_socket_keepalive(handle); - } + init_socket_keepalive(handle); if (ctype) { w_curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, header_func); @@ -1761,6 +1795,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, (long) max_connections); + status = CURL_NONE; w_curl_easy_setopt(handle, CURLOPT_PREREQFUNCTION, prereq_callback); w_curl_easy_setopt(handle, CURLOPT_PREREQDATA, &status); @@ -1779,6 +1814,10 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, do { running_handles = run_multi_socket(multi_handle); + if (running_handles < 0) { + goto error; + } + if (status == CURL_REQUEST_SENT) { goto success; } @@ -1797,9 +1836,14 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, success: async_parm->header_list = header_list; async_parm->handle = handle; - async_parm->multi_list = multi_list; header_list = NULL; *out_fd = get_max_fd(ASYNC_SYNC); // Running only one socket at a time so it's always the max + + if (*out_fd >= 0) { + async_parm->multi_list = multi_list; + } else { + put_multi(multi_list); + } return RCL_OK; error: @@ -1875,4 +1919,4 @@ enum async_ret_code resume_async_http_req_v2(int fd, struct sip_msg *msg, void * enum async_ret_code time_out_async_http_req_v2(int fd, struct sip_msg *msg, void *_param) { return _resume_async_http_req_v2(fd, msg, (rest_async_param *)_param, 1); -} \ No newline at end of file +} From d11086070c45852cb358ef920994c4e93ab3d643 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Wed, 3 Sep 2025 15:41:11 +0100 Subject: [PATCH 3/6] TELECOM-11880: Build compilation updates --- modules/rest_client/rest_client.c | 4 ++-- modules/rest_client/rest_methods.c | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/modules/rest_client/rest_client.c b/modules/rest_client/rest_client.c index 99ec82a437b..9aa1030a665 100644 --- a/modules/rest_client/rest_client.c +++ b/modules/rest_client/rest_client.c @@ -255,9 +255,9 @@ static const trans_export_t trans[] = { static int warm_pool_urls(modparam_t type, void *val) { unsigned int num_conns; - char *mod_param, *delim, *host; + char *mod_param, *delim, *host = NULL; size_t delim_index, string_end; - preconnect_urls *tmp; + preconnect_urls *tmp = NULL; str num_conns_s; if (!share_connections) { diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index f5bcb259d46..45f72763f46 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -185,7 +185,7 @@ int rcl_init_internals(void) #define w_curl_multi_setopt(mh, opt, value) \ do { \ mrc = curl_multi_setopt(mh, opt, value); \ - if (mrc != CURLE_OK) { \ + if (mrc != CURLM_OK) { \ LM_ERR("curl_multi_setopt(%d): (%s)\n", opt, curl_multi_strerror(mrc)); \ goto cleanup; \ } \ @@ -194,7 +194,7 @@ int rcl_init_internals(void) #define w_curl_share_setopt(cs, opt, value) \ do { \ src = curl_share_setopt(cs, opt, value); \ - if (src != CURLE_OK) { \ + if (src != CURLSHE_OK) { \ LM_ERR("curl_share_setopt: %s\n", curl_share_strerror(src)); \ goto cleanup; \ } \ @@ -1607,7 +1607,6 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { CURL *handle; CURLM *multi_handle; OSS_CURLM *multi_list; - CURL **list; struct CURLMsg *m; preconnect_urls *start, *next; char *url; @@ -1711,6 +1710,7 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { put_multi(multi_list); +done: return exit_code; } @@ -1724,7 +1724,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, CURLMcode mrc; OSS_CURLM *multi_list; CURLM *multi_handle; - long busy_wait, timeout, connect_timeout, retry_time; + long busy_wait, connect_timeout; handle = curl_easy_init(); @@ -1801,7 +1801,6 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, connect_timeout = (async_parm->timeout_s*1000) > connection_timeout_ms ? (async_parm->timeout_s*1000) : connection_timeout_ms; - timeout = connect_timeout; busy_wait = connect_poll_interval; w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); From c973924f16e8556ea371f6db432bd77e61fd0857 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 8 Sep 2025 11:10:05 +0100 Subject: [PATCH 4/6] TELECOM-11880: Remove prereq callback --- modules/rest_client/rest_cb.c | 11 --- modules/rest_client/rest_cb.h | 5 -- modules/rest_client/rest_client.c | 16 ++++- modules/rest_client/rest_client.h | 2 +- modules/rest_client/rest_methods.c | 109 +++++++++++++---------------- modules/rest_client/rest_methods.h | 3 +- modules/rest_client/rest_sockets.c | 14 ++-- 7 files changed, 75 insertions(+), 85 deletions(-) diff --git a/modules/rest_client/rest_cb.c b/modules/rest_client/rest_cb.c index 2f16ebafb39..1a8c0d97c01 100644 --- a/modules/rest_client/rest_cb.c +++ b/modules/rest_client/rest_cb.c @@ -117,15 +117,4 @@ int timer_cb(CURLM *multi_handle, long timeout_ms, void *cbp) *p = timeout_ms; return 0; -} - -int prereq_callback(void *cbp, - char *conn_primary_ip, - char *conn_local_ip, - int conn_primary_port, - int conn_local_port) -{ - enum curl_status *p = (enum curl_status*) cbp; - *p = CURL_REQUEST_SENT; - return 0; } \ No newline at end of file diff --git a/modules/rest_client/rest_cb.h b/modules/rest_client/rest_cb.h index 870017a303f..da2126bf51e 100644 --- a/modules/rest_client/rest_cb.h +++ b/modules/rest_client/rest_cb.h @@ -41,14 +41,9 @@ #define MAX_CONTENT_TYPE_LEN 64 #define MAX_HEADER_FIELD_LEN 1024 /* arbitrary */ -enum curl_status { - CURL_NONE=0, CURL_CONNECTED=1, CURL_REQUEST_SENDING=2, CURL_REQUEST_SENT=4, CURL_FINISHED=8, CURL_TIMEOUT=16, CURL_ERROR=32 -}; - size_t write_func(char *ptr, size_t size, size_t nmemb, void *userdata); size_t header_func(char *ptr, size_t size, size_t nmemb, void *userdata); int timer_cb(CURLM *multi_handle, long timeout_ms, void *cbp); -int prereq_callback(void *cbp, char *conn_primary_ip, char *conn_local_ip, int conn_primary_port, int conn_local_port); #endif /* _REST_CB_H_ */ diff --git a/modules/rest_client/rest_client.c b/modules/rest_client/rest_client.c index 9aa1030a665..17a6fa3bb4f 100644 --- a/modules/rest_client/rest_client.c +++ b/modules/rest_client/rest_client.c @@ -54,7 +54,8 @@ long connection_timeout = 20; /* s */ long connect_poll_interval = 20; /* ms */ long connection_timeout_ms; int max_async_transfers = 100; -int max_connections = 100; +long max_connections = 100; +long max_host_connections = 0; long curl_timeout = 20; char *ssl_capath; unsigned int max_transfer_size = 10240; /* KB (10MB) */ @@ -342,6 +343,7 @@ static const param_export_t params[] = { { "use_multi_socket_api", INT_PARAM, &use_multi_socket_api }, { "share_connections", INT_PARAM, &share_connections }, { "max_connections", INT_PARAM, &max_connections }, + { "max_host_connections", INT_PARAM, &max_host_connections }, { "warm_pool_urls", STR_PARAM|USE_FUNC_PARAM, (void*)&warm_pool_urls }, { 0, 0, 0 } @@ -415,6 +417,18 @@ static int mod_init(void) return -1; } + if (max_connections <= 0) { + LM_WARN("Bad max_connections value (%ld), setting to default of 100\n", max_connections); + max_connections = 100; + } + + if (max_host_connections < 0) { + LM_WARN("Bad max_host_connections value (%ld), setting to max_connections value (%ld)\n", + max_host_connections, max_connections); + + max_host_connections = max_connections; + } + LM_INFO("Module initialized!\n"); return 0; diff --git a/modules/rest_client/rest_client.h b/modules/rest_client/rest_client.h index 63cfb39f89e..8a6b7079b42 100644 --- a/modules/rest_client/rest_client.h +++ b/modules/rest_client/rest_client.h @@ -32,7 +32,7 @@ enum tr_rest_subtype { typedef struct _preconnect_urls { char *url; - unsigned int connections; + long connections; struct _preconnect_urls *next; } preconnect_urls; diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index 45f72763f46..4ece97fbbc0 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -90,9 +90,8 @@ extern trace_proto_t tprot; extern char *rest_id_s; static CURLSH *curl_share = NULL; -static gen_lock_t curl_share_locks[CURL_LOCK_DATA_LAST]; +static gen_lock_t *curl_socket_lock = NULL; -static enum curl_status status; static long timer; /** @@ -441,23 +440,31 @@ static inline int get_easy_status(CURL *handle, CURLM *multi, CURLcode *code) } static void libcurl_share_lock(CURL *handle, curl_lock_data data, curl_lock_access access, void *clientp) { - LM_DBG("Locking libcurl share %d\n", data); - lock_get(&curl_share_locks[data]); + if (data == CURL_LOCK_DATA_CONNECT) { + LM_DBG("Locking libcurl share %d\n", data); + lock_get(curl_socket_lock); + } } static void libcurl_share_unlock(CURL *handle, curl_lock_data data, void *clientp) { - LM_DBG("Unlocking libcurl share %d\n", data); - lock_release(&curl_share_locks[data]); + if (data == CURL_LOCK_DATA_CONNECT) { + LM_DBG("Unlocking libcurl share %d\n", data); + lock_release(curl_socket_lock); + } } static CURLSH *get_curl_share(void) { CURLSHcode src; if (!curl_share) { - for (int i =0; i < CURL_LOCK_DATA_LAST; ++i) { - lock_init(&curl_share_locks[i]); + curl_socket_lock = lock_alloc(); + + if (!curl_socket_lock) { + goto done; } + lock_init(curl_socket_lock); + curl_share = curl_share_init(); w_curl_share_setopt(curl_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT); @@ -467,16 +474,19 @@ static CURLSH *get_curl_share(void) { return curl_share; cleanup: - for (int i =0; i < CURL_LOCK_DATA_LAST; ++i) { - lock_destroy(&curl_share_locks[i]); + if (curl_socket_lock) { + lock_destroy(curl_socket_lock); + lock_dealloc(curl_socket_lock); + curl_socket_lock = NULL; } - curl_share_cleanup(curl_share); + curl_share_cleanup(curl_share); // Passing NULL returns early if init failed curl_share = NULL; +done: return NULL; } -static int init_transfer(CURL *handle, char *url, unsigned long timeout_s) +static int init_transfer(CURL *handle, char *url, unsigned long connect_timeout_s, unsigned long timeout_s) { CURLcode rc; @@ -491,10 +501,8 @@ static int init_transfer(CURL *handle, char *url, unsigned long timeout_s) tls_dom = NULL; } - w_curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, - timeout_s && timeout_s > connection_timeout ? timeout_s : connection_timeout); - w_curl_easy_setopt(handle, CURLOPT_TIMEOUT, - timeout_s && timeout_s > curl_timeout ? timeout_s : curl_timeout); + w_curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, connect_timeout_s); + w_curl_easy_setopt(handle, CURLOPT_TIMEOUT, timeout_s); w_curl_easy_setopt(handle, CURLOPT_VERBOSE, 1); w_curl_easy_setopt(handle, CURLOPT_FAILONERROR, 0); @@ -802,7 +810,7 @@ int rest_sync_transfer(enum rest_client_method method, struct sip_msg *msg, str st = STR_NULL, res_body = STR_NULL, tbody, ttype; curl_easy_reset(sync_handle); - if (init_transfer(sync_handle, url, 0) != 0) { + if (init_transfer(sync_handle, url, connection_timeout, curl_timeout) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -902,7 +910,7 @@ static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method CURLMcode mrc; fd_set rset, wset, eset; int max_fd, fd, http_rc, ret = RCL_INTERNAL_ERR; - long busy_wait, timeout, connect_timeout; + long busy_wait, timeout; long retry_time; OSS_CURLM *multi_list; CURLM *multi_handle; @@ -918,7 +926,7 @@ static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method goto cleanup; } - if (init_transfer(handle, url, async_parm->timeout_s) != 0) { + if (init_transfer(handle, url, connection_timeout, curl_timeout) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -972,27 +980,18 @@ static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method multi_handle = multi_list->multi_handle; curl_multi_add_handle(multi_handle, handle); - connect_timeout = (async_parm->timeout_s*1000) > connection_timeout_ms ? - (async_parm->timeout_s*1000) : connection_timeout_ms; - timeout = connect_timeout; + timeout = connection_timeout_ms; busy_wait = connect_poll_interval; /* obtain a read fd in "connection_timeout" seconds at worst */ - for (timeout = connect_timeout; timeout > 0; timeout -= busy_wait) { - double connect = -1; - long req_sz = -1; - + for (timeout = connection_timeout_ms; timeout > 0; timeout -= busy_wait) { mrc = curl_multi_perform(multi_handle, &running_handles); if (mrc != CURLM_OK && mrc != CURLM_CALL_MULTI_PERFORM) { LM_ERR("curl_multi_perform: %s\n", curl_multi_strerror(mrc)); goto error; } - curl_easy_getinfo(handle, CURLINFO_CONNECT_TIME, &connect); - curl_easy_getinfo(handle, CURLINFO_REQUEST_SIZE, &req_sz); - - LM_DBG("perform code: %d, handles: %d, connect: %.3lfs, reqsz: %ldB\n", - mrc, running_handles, connect, req_sz); + LM_DBG("perform code: %d, handles: %d\n", mrc, running_handles); /* transfer completed! But how well? */ if (running_handles == 0) { @@ -1015,8 +1014,8 @@ static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method case CURLE_OPERATION_TIMEDOUT: if (http_rc == 0) { - LM_ERR("connect timeout on %s (%ldms)\n", url, - connect_timeout); + LM_ERR("connect timeout on %s (%lds)\n", url, + connection_timeout); ret = RCL_CONNECT_TIMEOUT; goto error; } @@ -1055,7 +1054,7 @@ static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method for (fd = 0; fd <= max_fd; fd++) { if (FD_ISSET(fd, &rset)) { LM_DBG("ongoing transfer on fd %d\n", fd); - if (req_sz > 0 && is_new_transfer(fd)) { + if (is_new_transfer(fd)) { LM_DBG(">>> add fd %d to ongoing transfers\n", fd); add_transfer(fd); goto success; @@ -1072,7 +1071,7 @@ static int start_async_http_req_v1(struct sip_msg *msg, enum rest_client_method LM_DBG("libcurl TCP connect: we should wait up to %ldms " "(timeout=%ldms, poll=%ldms)!\n", retry_time, - connect_timeout, connect_poll_interval); + connection_timeout_ms, connect_poll_interval); /* from curl_multi_timeout() docs: @@ -1146,8 +1145,8 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, if (timed_out) { char *url = NULL; curl_easy_getinfo(param->handle, CURLINFO_EFFECTIVE_URL, &url); - LM_ERR("async %s timed out, URL: %s (timeout: %lds)\n", - rest_client_method_str(param->method), url, param->timeout_s); + LM_ERR("async %s timed out, URL: %s\n", + rest_client_method_str(param->method), url); goto cleanup; } @@ -1160,10 +1159,7 @@ static enum async_ret_code _resume_async_http_req(int fd, struct sip_msg *msg, LM_DBG("perform result: %d, running: %d (break: %d)\n", mrc, running, mrc != CURLM_CALL_MULTI_PERFORM && (mrc != CURLM_OK || !running)); - if (mrc == CURLM_OK) { - if (!running) - break; - + if (mrc == CURLM_OK && running) { async_status = ASYNC_CONTINUE; return 1; /* this rc has been removed since cURL 7.20.0 (Feb 2010), but it's not @@ -1613,10 +1609,9 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { long busy_wait, timer; int msgq, num_of_connections, exit_code = 0; - curl_share = get_curl_share(); - - if (!curl_share) { - goto cleanup; + if (!share_connections) { + exit_code = -1; + goto done; } multi_list = get_multi(); @@ -1638,7 +1633,7 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { handle = curl_easy_init(); curl_multi_add_handle(multi_handle, handle); - if (init_transfer(handle, url, 0) != 0) { + if (init_transfer(handle, url, curl_timeout, curl_timeout) != 0) { exit_code = -1; goto cleanup; } @@ -1649,7 +1644,6 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { } w_curl_easy_setopt(handle, CURLOPT_NOBODY, 1L); - w_curl_easy_setopt(handle, CURLOPT_SHARE, curl_share); } start = start->next; @@ -1724,7 +1718,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, CURLMcode mrc; OSS_CURLM *multi_list; CURLM *multi_handle; - long busy_wait, connect_timeout; + long busy_wait, req_sz; handle = curl_easy_init(); @@ -1733,7 +1727,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, goto cleanup; } - if (init_transfer(handle, url, async_parm->timeout_s) != 0) { + if (init_transfer(handle, url, connection_timeout, async_parm->timeout_s) != 0) { LM_ERR("failed to init transfer to %s\n", url); goto cleanup; } @@ -1793,15 +1787,8 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, goto cleanup; } - w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, (long) max_connections); - - status = CURL_NONE; - w_curl_easy_setopt(handle, CURLOPT_PREREQFUNCTION, prereq_callback); - w_curl_easy_setopt(handle, CURLOPT_PREREQDATA, &status); - - connect_timeout = (async_parm->timeout_s*1000) > connection_timeout_ms ? - (async_parm->timeout_s*1000) : connection_timeout_ms; - busy_wait = connect_poll_interval; + w_curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, max_host_connections); + w_curl_multi_setopt(multi_handle, CURLMOPT_MAXCONNECTS, max_connections); w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERFUNCTION, timer_cb); w_curl_multi_setopt(multi_handle, CURLMOPT_TIMERDATA, &timer); @@ -1810,6 +1797,8 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, goto cleanup; } + busy_wait = connect_poll_interval; + do { running_handles = run_multi_socket(multi_handle); @@ -1817,7 +1806,9 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, goto error; } - if (status == CURL_REQUEST_SENT) { + curl_easy_getinfo(handle, CURLINFO_REQUEST_SIZE, &req_sz); + + if (req_sz > 0) { goto success; } diff --git a/modules/rest_client/rest_methods.h b/modules/rest_client/rest_methods.h index d85341a1aec..c1ee2d7b17b 100644 --- a/modules/rest_client/rest_methods.h +++ b/modules/rest_client/rest_methods.h @@ -39,7 +39,8 @@ extern long connection_timeout; extern long connect_poll_interval; extern long connection_timeout_ms; extern int max_async_transfers; -extern int max_connections; +extern long max_connections; +extern long max_host_connections; extern long curl_timeout; extern char *ssl_capath; diff --git a/modules/rest_client/rest_sockets.c b/modules/rest_client/rest_sockets.c index 8482d92452e..2c58962fa6d 100644 --- a/modules/rest_client/rest_sockets.c +++ b/modules/rest_client/rest_sockets.c @@ -160,17 +160,17 @@ int run_multi_socket(CURLM *multi_handle) { } int setsocket_callback(CURLM *multi_handle) { - CURLcode rc; + CURLMcode mrc; - rc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_cb); - if (rc != CURLE_OK) { - LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_easy_strerror(rc)); + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_cb); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); return -1; } - rc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, &fds); - if (rc != CURLE_OK) { - LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_easy_strerror(rc)); + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, &fds); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); return -1; } From 259b99a635abf59022a25f376a37d2b35ce693d1 Mon Sep 17 00:00:00 2001 From: David Trihy Date: Mon, 22 Sep 2025 11:50:18 +0100 Subject: [PATCH 5/6] TELECOM-11880: rest socket change --- modules/rest_client/rest_methods.c | 6 +++--- modules/rest_client/rest_sockets.c | 15 +++++++++++++++ modules/rest_client/rest_sockets.h | 1 + 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index 4ece97fbbc0..c6c1abd0482 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -1649,7 +1649,7 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { start = start->next; } - busy_wait = connect_poll_interval; + busy_wait = 100; if (setsocket_callback(multi_handle) != 0) { goto cleanup; @@ -1673,11 +1673,11 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { if (timer < 0) { break; - } + } usleep(1000UL * busy_wait); LM_DBG("Creating warm pool connection, running handles %d\n", running_handles); - } while (running_handles != 0); + } while (running_sockets()); cleanup: do { diff --git a/modules/rest_client/rest_sockets.c b/modules/rest_client/rest_sockets.c index 2c58962fa6d..e7b41e3e466 100644 --- a/modules/rest_client/rest_sockets.c +++ b/modules/rest_client/rest_sockets.c @@ -81,6 +81,21 @@ int get_max_fd(int no_max_default) { return ((fds.max_fd_index << 3) + WORD_SIZE_BITS - 1) - COUNT_LEADING_ZEROS(sockets); } +int running_sockets(void) { + cpuword_t sockets; + int running, curl_fd; + + for (int i = 0; i <= fds.max_fd_index; i += WORD_SIZE_BYTES) { + memcpy(&sockets, fds.tracked_socks + i, sizeof(cpuword_t)); + + if (sockets) { + return 1; + } + } + + return 0; +} + static void add_sock(int s) { int sock_index = s >> 3; diff --git a/modules/rest_client/rest_sockets.h b/modules/rest_client/rest_sockets.h index e8e30c98854..281e290b06a 100644 --- a/modules/rest_client/rest_sockets.h +++ b/modules/rest_client/rest_sockets.h @@ -28,6 +28,7 @@ int init_process_limits(rlim_t rlim_cur); int get_max_fd(int no_max_default); +int running_sockets(void); int start_multi_socket(CURLM *multi_handle); int run_multi_socket(CURLM *multi_handle); int setsocket_callback(CURLM *multi_handle); From 96fc6e08368b1c03bdaa7f7616c35f91b4801d2c Mon Sep 17 00:00:00 2001 From: David Trihy Date: Thu, 25 Sep 2025 17:07:06 +0100 Subject: [PATCH 6/6] TELECOM-11880: Updating connect_only mechanism --- modules/rest_client/rest_methods.c | 68 ++++++++++++++++++-------- modules/rest_client/rest_sockets.c | 78 ++++++++++++++++++++++-------- modules/rest_client/rest_sockets.h | 9 +++- 3 files changed, 114 insertions(+), 41 deletions(-) diff --git a/modules/rest_client/rest_methods.c b/modules/rest_client/rest_methods.c index c6c1abd0482..67ebc0a1465 100644 --- a/modules/rest_client/rest_methods.c +++ b/modules/rest_client/rest_methods.c @@ -1602,12 +1602,13 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { CURLMcode mrc; CURL *handle; CURLM *multi_handle; + curl_socket_t sockfd; OSS_CURLM *multi_list; - struct CURLMsg *m; + CURLEasyHandles easy_handles = { 0, 0 }; preconnect_urls *start, *next; char *url; - long busy_wait, timer; - int msgq, num_of_connections, exit_code = 0; + long busy_wait, timer, local_timer; + int num_of_connections = 0, exit_code = 0, open_sockets = 0; if (!share_connections) { exit_code = -1; @@ -1649,10 +1650,21 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { start = start->next; } + if (num_of_connections == 0) { + goto error; + } + + easy_handles.size = 0; + easy_handles.handles = pkg_malloc(sizeof(CURL*) * num_of_connections); + + if (easy_handles.handles == NULL) { + goto error; + } + busy_wait = 100; - if (setsocket_callback(multi_handle) != 0) { - goto cleanup; + if (setsocket_callback_connect(multi_handle, &easy_handles) != 0) { + goto error; } w_curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, (long) num_of_connections); @@ -1666,31 +1678,45 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { goto cleanup; } - LM_DBG("Creating warm pool connection, running handles %d\n", running_handles); + LM_INFO("Creating warm pool connection, running handles %d\n", running_handles); + + local_timer = 0; do { running_handles = run_multi_socket(multi_handle); - if (timer < 0) { + if (timer < 0 || local_timer >= curl_timeout) { break; } + local_timer += busy_wait; usleep(1000UL * busy_wait); - LM_DBG("Creating warm pool connection, running handles %d\n", running_handles); } while (running_sockets()); - cleanup: - do { - m = curl_multi_info_read(multi_handle, &msgq); - if (m && m->msg == CURLMSG_DONE) { - mrc = curl_multi_remove_handle(multi_handle, m->easy_handle); - if (mrc != CURLM_OK) { - LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); - } - curl_easy_cleanup(m->easy_handle); + LM_INFO("Finishing warm pool connection, open sockets %d\n", easy_handles.size); + + if (running_sockets()) { + running_handles = end_multi_socket(multi_handle); + } + + for (int i = 0; i < easy_handles.size; i++) { + curl_easy_getinfo(easy_handles.handles[i], CURLINFO_ACTIVESOCKET, &sockfd); + + if (sockfd != CURL_SOCKET_BAD) { + open_sockets += 1; } - } while (m); + curl_multi_remove_handle(multi_handle, easy_handles.handles[i]); + + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_remove_handle: %s\n", curl_multi_strerror(mrc)); + } + + curl_easy_cleanup(easy_handles.handles[i]); + } + + LM_INFO("Finishing warm pool connection, open sockets %d\n", open_sockets); +error: start = precon_urls; while (start != NULL) { @@ -1702,6 +1728,10 @@ int connect_only(preconnect_urls *precon_urls, int total_cons) { start = next; } + if (easy_handles.handles != NULL) { + pkg_free(easy_handles.handles); + } + put_multi(multi_list); done: @@ -1783,7 +1813,7 @@ int start_async_http_req_v2(struct sip_msg *msg, enum rest_client_method method, multi_handle = multi_list->multi_handle; curl_multi_add_handle(multi_handle, handle); - if (setsocket_callback(multi_handle) != 0) { + if (setsocket_callback_request(multi_handle) != 0) { goto cleanup; } diff --git a/modules/rest_client/rest_sockets.c b/modules/rest_client/rest_sockets.c index e7b41e3e466..e5be50662ea 100644 --- a/modules/rest_client/rest_sockets.c +++ b/modules/rest_client/rest_sockets.c @@ -120,20 +120,66 @@ static void remove_sock(int s) { } } -static int socket_action_cb(CURL *e, curl_socket_t s, int event, void *cbp, void *sockp) -{ +static int socket_action_connect(CURL *e, curl_socket_t s, int event, void *cbp, void *sockp) { LM_DBG("called for socket %d status %d\n", s, event); + CURLEasyHandles *easy_handles = (CURLEasyHandles*) cbp; if (event != CURL_POLL_REMOVE) { add_sock(s); } else if (event == CURL_POLL_REMOVE) { remove_sock(s); + + LM_DBG("Adding handle for socket %d current size %d\n", s, easy_handles->size); + easy_handles->handles[easy_handles->size] = e; + easy_handles->size += 1; } return 0; } -int start_multi_socket(CURLM *multi_handle) { +static int socket_action_http(CURL *e, curl_socket_t s, int event, void *cbp, void *sockp) { + LM_DBG("called for socket %d status %d\n", s, event); + + if (event != CURL_POLL_REMOVE) { + add_sock(s); + } else if (event == CURL_POLL_REMOVE) { + remove_sock(s); + } + + return 0; +} + +int setsocket_callback_connect(CURLM *multi_handle, CURLEasyHandles *easy_handles) { + CURLMcode mrc; + + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_connect); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); + return -1; + } + + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, easy_handles); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); + return -1; + } + + return 0; +} + +int setsocket_callback_request(CURLM *multi_handle) { + CURLMcode mrc; + + mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_http); + if (mrc != CURLM_OK) { + LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); + return -1; + } + + return 0; +} + +static int run_all_multi_socket(CURLM *multi_handle, int ev_bitmask) { CURLMcode mrc; int running; @@ -149,6 +195,14 @@ int start_multi_socket(CURLM *multi_handle) { return running; } +int start_multi_socket(CURLM *multi_handle) { + return run_all_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT); +} + +int end_multi_socket(CURLM *multi_handle) { + return run_all_multi_socket(multi_handle, CURL_POLL_REMOVE); +} + int run_multi_socket(CURLM *multi_handle) { CURLMcode mrc; cpuword_t sockets; @@ -173,21 +227,3 @@ int run_multi_socket(CURLM *multi_handle) { return running; } - -int setsocket_callback(CURLM *multi_handle) { - CURLMcode mrc; - - mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_action_cb); - if (mrc != CURLM_OK) { - LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); - return -1; - } - - mrc = curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, &fds); - if (mrc != CURLM_OK) { - LM_ERR("curl_multi_setopt(%d): (%s)\n", CURLMOPT_SOCKETFUNCTION, curl_multi_strerror(mrc)); - return -1; - } - - return 0; -} \ No newline at end of file diff --git a/modules/rest_client/rest_sockets.h b/modules/rest_client/rest_sockets.h index 281e290b06a..0af32663652 100644 --- a/modules/rest_client/rest_sockets.h +++ b/modules/rest_client/rest_sockets.h @@ -26,11 +26,18 @@ #include +typedef struct _curl_easy_handles { + int size; + CURL **handles; +} CURLEasyHandles; + int init_process_limits(rlim_t rlim_cur); int get_max_fd(int no_max_default); int running_sockets(void); int start_multi_socket(CURLM *multi_handle); +int end_multi_socket(CURLM *multi_handle); int run_multi_socket(CURLM *multi_handle); -int setsocket_callback(CURLM *multi_handle); +int setsocket_callback_request(CURLM *multi_handle); +int setsocket_callback_connect(CURLM *multi_handle, CURLEasyHandles *easy_handles); #endif /* _REST_SOCKET_H_ */ \ No newline at end of file