diff --git a/scripts/telego/dist_waverless/deployment.yml b/scripts/telego/dist_waverless/deployment.yml index 0668ad2..3a19f32 100644 --- a/scripts/telego/dist_waverless/deployment.yml +++ b/scripts/telego/dist_waverless/deployment.yml @@ -3,7 +3,12 @@ comment: 存算融合的serverless计算平台 local_values: {} -prepare: [] +prepare: + # 部署前自动生成 prometheus 配置文件 + - pyscript: gen_prometheus_config.py + trans: + - copy: + - /etc/prometheus/prometheus.yml: prometheus-config/prometheus.yml dist: waverless-test: @@ -111,3 +116,52 @@ dist: cp /usr/bin/waverless_entry ./ ./waverless_entry $DIST_UNIQUE_ID +# 部署 prometheus 和 grafana +k8s: + prometheus: + type: k8s + image: prom/prometheus:latest + conf: + - name: prometheus-config + mountPath: /etc/prometheus/prometheus.yml + subPath: prometheus.yml + configMap: prometheus-config + ports: + - 9090:9090 + env: {} + resources: + limits: + cpu: "500m" + memory: "512Mi" + requests: + cpu: "100m" + memory: "128Mi" + command: ["/bin/prometheus", "--config.file=/etc/prometheus/prometheus.yml"] + + grafana: + type: k8s + image: grafana/grafana:latest + ports: + - 3000:3000 + env: + GF_SECURITY_ADMIN_PASSWORD: "admin" + resources: + limits: + cpu: "500m" + memory: "512Mi" + requests: + cpu: "100m" + memory: "128Mi" +#实际部署时会被prepare生成的覆盖 +configmap: + prometheus-config: + prometheus.yml: | + global: + scrape_interval: 15s + scrape_configs: + - job_name: 'waverless' + static_configs: + - targets: + - '1.2.3.4:2500' + - '5.6.7.8:2500' + - '9.10.11.12:2500' diff --git a/scripts/telego/dist_waverless/gen_prometheus_config.py b/scripts/telego/dist_waverless/gen_prometheus_config.py new file mode 100644 index 0000000..09e0cfc --- /dev/null +++ b/scripts/telego/dist_waverless/gen_prometheus_config.py @@ -0,0 +1,20 @@ +import os + +node_ids = ["1", "2", "3"] + +targets = [] +for node_id in node_ids: + ip = os.environ.get(f"DIST_CONF_{node_id}_NODE_IP") + port = os.environ.get(f"DIST_CONF_{node_id}_port", "2500") + if ip: + targets.append(f"'{ip}:{port}'") + +with open("/etc/prometheus/prometheus.yml", "w") as f: + f.write("global:\n scrape_interval: 15s\n\n") + f.write("scrape_configs:\n - job_name: 'waverless'\n static_configs:\n - targets: [\n") + for i, t in enumerate(targets): + if i < len(targets) - 1: + f.write(f" {t},\n") + else: + f.write(f" {t}\n") + f.write(" ]\n") \ No newline at end of file diff --git a/scripts/telego/dist_waverless/prometheus.yml b/scripts/telego/dist_waverless/prometheus.yml new file mode 100644 index 0000000..e1255d9 --- /dev/null +++ b/scripts/telego/dist_waverless/prometheus.yml @@ -0,0 +1,8 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: 'waverless' + static_configs: + - targets: ['192.168.31.109:2500' + ] diff --git a/src/main/src/apis.rs b/src/main/src/apis.rs index d7640de..ed0ffa2 100644 --- a/src/main/src/apis.rs +++ b/src/main/src/apis.rs @@ -3,6 +3,8 @@ use async_trait::async_trait; use axum::{http::StatusCode, routing::post, Json, Router}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use crate::metrics::metrics_handler; +use axum::routing::get; #[derive(Debug, Serialize, Deserialize)] pub struct NodeBasic { @@ -185,6 +187,5 @@ pub fn add_routers(mut router: Router) -> Router { ) } router = router.route("/run_service_action", post(run_service_action)); - router } diff --git a/src/main/src/general/data/m_data_general/http.rs b/src/main/src/general/data/m_data_general/http.rs index 6114cad..a422acc 100644 --- a/src/main/src/general/data/m_data_general/http.rs +++ b/src/main/src/general/data/m_data_general/http.rs @@ -11,7 +11,8 @@ use async_raft::State as RaftState; use axum::extract::{Multipart, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use axum::routing::post; +use axum::routing::{get, post}; +use crate::metrics::metrics_handler; // 导入metrics处理函数 use axum::Router; use serde::Serialize; use std::io; @@ -24,7 +25,10 @@ impl DataGeneral { with_option!(router_holder.option_mut(), router => { // router.route("/upload_data", post(handle_write_data)) - router.merge(Router::new().route("/upload_data", post(handle_upload_data).with_state(self.view.clone()))) + router.merge(Router::new().route("/upload_data", post(handle_upload_data).with_state(self.view.clone())) + .route("/metrics", get(metrics_handler)) + ) + }); Ok(()) } diff --git a/src/main/src/general/metrics/collector.rs b/src/main/src/general/metrics/collector.rs new file mode 100644 index 0000000..be91165 --- /dev/null +++ b/src/main/src/general/metrics/collector.rs @@ -0,0 +1,167 @@ +// waverless/src/main/src/general/metrics/collector.rs +use crate::general::{LogicalModule, LogicalModuleNewArgs}; +use crate::result::WSResult; +use crate::util::JoinHandleWrapper; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::registry::Registry; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::time; +use sysinfo::{System, SystemExt, ProcessExt}; +use std::process; +use super::types::NodeMetrics; +use async_trait::async_trait; + +/// 指标收集器,负责收集本节点的性能指标 +/// 在Worker节点上运行时,会自动将指标通过RPC发送给Master节点 +pub struct MetricsCollector { + // 本地收集的指标 + metrics: Arc>, + + // 进程指标 + process_cpu: Arc, + process_memory: Arc, + + // 模块引用 + args: LogicalModuleNewArgs, + + // 是否是 Master 节点 + is_master: bool, + + // 当前进程ID + pid: i32, +} + +#[async_trait] +impl LogicalModule for MetricsCollector { + fn inner_new(args: LogicalModuleNewArgs) -> Self { + let process_cpu = Arc::new(Gauge::default()); + let process_memory = Arc::new(Gauge::default()); + + let mut registry = Registry::default(); + + // 注册进程指标 + registry.register( + "waverless_process_cpu_percent", + "当前进程CPU使用率(百分比)", + process_cpu.clone(), + ); + registry.register( + "waverless_process_memory_bytes", + "当前进程内存使用量(字节)", + process_memory.clone(), + ); + + let is_master = args.nodes_config.this.1.is_master(); + let pid = process::id() as i32; + + Self { + metrics: Arc::new(Mutex::new(registry)), + process_cpu, + process_memory, + args: args.clone(), + is_master, + pid, + } + } + + async fn init(&self) -> WSResult<()> { + // 初始化时立即收集一次指标 + self.collect_process_metrics(); + tracing::info!("指标收集器已初始化 (节点类型: {})", + if self.is_master { "Master" } else { "Worker" }); + Ok(()) + } + + async fn start(&self) -> WSResult> { + let process_cpu = self.process_cpu.clone(); + let process_memory = self.process_memory.clone(); + let is_master = self.is_master; + let args = self.args.clone(); + let pid = self.pid; + + // 启动定期收集任务 + let handle = tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(15)); + let mut sys = System::new_all(); + + loop { + interval.tick().await; + + // 收集进程指标 + sys.refresh_all(); + + // 获取当前进程 + if let Some(process) = sys.process(pid) { + // 进程CPU使用率 + let cpu_usage = process.cpu_usage(); + process_cpu.set((cpu_usage * 100.0) as i64); // 转为百分比整数 + + // 进程内存使用 + let mem_usage = process.memory(); + process_memory.set(mem_usage as i64); + + tracing::debug!("收集到本地指标: CPU={}%, 内存={}字节", + cpu_usage, mem_usage); + } + + // 如果是 Worker 节点,上报指标给 Master + if !is_master { + let node_id = args.nodes_config.this.0; + let node_addr = args.nodes_config.this.1.addr.to_string(); + + let metrics = NodeMetrics { + node_id, + node_addr, + process_cpu_percent: (process_cpu.get() as f64) / 100.0, + process_memory_bytes: process_memory.get(), + timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + // 使用 P2P 模块发送 RPC 请求 + if let Ok(modules) = args.inner.upgrade() { + if let Some(modules) = modules.as_ref() { + let p2p = &modules.p2p; + let master_node = args.nodes_config.get_master_node(); + + // 发送 RPC 请求给 Master + match p2p.send_rpc_request( + master_node, + "report_metrics", + serde_json::to_string(&metrics).unwrap(), + ).await { + Ok(_) => tracing::debug!("成功发送指标到Master节点"), + Err(e) => tracing::warn!("发送指标到Master节点失败: {:?}", e), + } + } + } + } + } + }); + + tracing::info!("指标收集器已启动"); + Ok(vec![JoinHandleWrapper::new(handle)]) + } +} + +/// 公共方法 +impl MetricsCollector { + /// 收集进程指标 + pub fn collect_process_metrics(&self) { + let mut sys = System::new_all(); + sys.refresh_all(); + + if let Some(process) = sys.process(self.pid) { + self.process_cpu.set((process.cpu_usage() * 100.0) as i64); + self.process_memory.set(process.memory() as i64); + } + } + + /// 获取指标 Registry + pub fn registry(&self) -> Arc> { + self.metrics.clone() + } +} \ No newline at end of file diff --git a/src/main/src/general/metrics/mod.rs b/src/main/src/general/metrics/mod.rs new file mode 100644 index 0000000..39df6e7 --- /dev/null +++ b/src/main/src/general/metrics/mod.rs @@ -0,0 +1,6 @@ +// waverless/src/main/src/general/metrics/mod.rs +mod types; +mod collector; + +pub use types::{NodeMetrics, AggregatedMetrics}; +pub use collector::MetricsCollector; \ No newline at end of file diff --git a/src/main/src/general/metrics/types.rs b/src/main/src/general/metrics/types.rs new file mode 100644 index 0000000..ad67afb --- /dev/null +++ b/src/main/src/general/metrics/types.rs @@ -0,0 +1,22 @@ +use serde::{Serialize, Deserialize}; +use std::collections::HashMap; + +// 节点指标数据 +#[derive(Clone, Serialize, Deserialize)] +pub struct NodeMetrics { + pub node_id: u32, + pub node_addr: String, + + // 进程指标 + pub process_cpu_percent: f64, + pub process_memory_bytes: i64, + + pub timestamp: u64, +} + +// 聚合的指标数据 +#[derive(Default)] +pub struct AggregatedMetrics { + // 节点ID到指标的映射 + pub node_metrics: HashMap, +} \ No newline at end of file diff --git a/src/main/src/general/mod.rs b/src/main/src/general/mod.rs index 60f1427..c83f455 100644 --- a/src/main/src/general/mod.rs +++ b/src/main/src/general/mod.rs @@ -3,6 +3,6 @@ pub mod data; pub mod m_metric_publisher; pub mod m_os; pub mod network; - +pub mod metrics; #[cfg(test)] pub mod test_utils; diff --git a/src/main/src/main.rs b/src/main/src/main.rs index fefd8ca..1571c93 100644 --- a/src/main/src/main.rs +++ b/src/main/src/main.rs @@ -33,6 +33,7 @@ pub mod modules_global_bridge; pub mod result; pub mod sys; pub mod util; +pub mod metrics; #[tokio::main] async fn main() { diff --git a/src/main/src/master/metrics/aggregator.rs b/src/main/src/master/metrics/aggregator.rs new file mode 100644 index 0000000..473d061 --- /dev/null +++ b/src/main/src/master/metrics/aggregator.rs @@ -0,0 +1,229 @@ +// waverless/src/main/src/master/metrics/aggregator.rs +use crate::general::{LogicalModule, LogicalModuleNewArgs}; +use crate::general::metrics::types::{NodeMetrics, AggregatedMetrics}; +use crate::result::WSResult; +use crate::util::JoinHandleWrapper; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::time; +use tokio::net::TcpListener; +use tokio::io::{AsyncWriteExt}; +use std::collections::HashMap; +use async_trait::async_trait; +use prometheus_client::encoding::text::encode; +use prometheus_client::registry::Registry; +use prometheus_client::metrics::gauge::Gauge; + +/// Master节点上的指标聚合器 +/// 1. 负责聚合所有节点的指标 +/// 2. 提供Prometheus HTTP采样接口 +pub struct MetricsAggregator { + // 聚合的指标数据 + metrics: Arc>, + + // Prometheus指标注册表 + registry: Arc>, + + // 各节点CPU使用率指标 + node_cpu_gauges: Arc>>, + + // 各节点内存使用量指标 + node_memory_gauges: Arc>>, + + // 模块引用 + args: LogicalModuleNewArgs, + + // HTTP监听端口 + http_port: u16, +} + +#[async_trait] +impl LogicalModule for MetricsAggregator { + fn inner_new(args: LogicalModuleNewArgs) -> Self { + // 默认监听在9100端口 + let http_port = 9100; + + Self { + metrics: Arc::new(Mutex::new(AggregatedMetrics::default())), + registry: Arc::new(Mutex::new(Registry::default())), + node_cpu_gauges: Arc::new(Mutex::new(HashMap::new())), + node_memory_gauges: Arc::new(Mutex::new(HashMap::new())), + args, + http_port, + } + } + + async fn init(&self) -> WSResult<()> { + // 注册RPC处理器,处理来自Worker节点的指标报告 + if let Ok(modules) = self.args.inner.upgrade() { + if let Some(modules) = modules.as_ref() { + // 获取P2P模块 + let p2p = &modules.p2p; + + // 注册RPC处理器 + let metrics = self.metrics.clone(); + p2p.register_rpc_handler("report_metrics", Box::new(move |payload| { + let metrics_clone = metrics.clone(); + Box::pin(async move { + if let Ok(node_metrics) = serde_json::from_str::(payload) { + MetricsAggregator::handle_metrics_report(metrics_clone, node_metrics); + Ok("success".to_string()) + } else { + Err("Invalid metrics format".to_string()) + } + }) + })); + + tracing::info!("已注册RPC处理器: report_metrics"); + } + } + + tracing::info!("指标聚合器已初始化,将在端口 {} 提供Prometheus指标", self.http_port); + Ok(()) + } + + async fn start(&self) -> WSResult> { + let mut handles = Vec::new(); + + // 任务1: 启动定期清理过期指标的任务 + let metrics = self.metrics.clone(); + let clean_handle = tokio::spawn(async move { + let mut interval = time::interval(Duration::from_secs(60)); + + loop { + interval.tick().await; + + // 清理超过5分钟没有更新的节点指标 + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let mut metrics_lock = metrics.lock().unwrap(); + let before_count = metrics_lock.node_metrics.len(); + metrics_lock.node_metrics.retain(|_, node_metric| { + // 保留5分钟内更新的指标 + now - node_metric.timestamp < 300 + }); + let after_count = metrics_lock.node_metrics.len(); + + if before_count != after_count { + tracing::info!("清理过期节点指标: {} -> {}", before_count, after_count); + } + } + }); + handles.push(JoinHandleWrapper::new(clean_handle)); + + // 任务2: 启动HTTP服务器,提供Prometheus指标采集端点 + let metrics = self.metrics.clone(); + let registry = self.registry.clone(); + let node_cpu_gauges = self.node_cpu_gauges.clone(); + let node_memory_gauges = self.node_memory_gauges.clone(); + let http_port = self.http_port; + + let http_handle = tokio::spawn(async move { + // 绑定TCP监听器 + let addr = format!("0.0.0.0:{}", http_port); + let listener = match TcpListener::bind(&addr).await { + Ok(l) => l, + Err(e) => { + tracing::error!("无法绑定到HTTP端口 {}: {}", http_port, e); + return; + } + }; + + tracing::info!("Prometheus指标HTTP服务器已启动在: {}", addr); + + loop { + match listener.accept().await { + Ok((mut socket, addr)) => { + let metrics = metrics.clone(); + let registry = registry.clone(); + let node_cpu_gauges = node_cpu_gauges.clone(); + let node_memory_gauges = node_memory_gauges.clone(); + + tokio::spawn(async move { + tracing::debug!("收到来自 {} 的HTTP请求", addr); + + // 更新指标表 + { + let metrics_lock = metrics.lock().unwrap(); + let mut registry_lock = registry.lock().unwrap(); + let mut cpu_gauges_lock = node_cpu_gauges.lock().unwrap(); + let mut memory_gauges_lock = node_memory_gauges.lock().unwrap(); + + // 清空注册表 + *registry_lock = Registry::default(); + + // 添加所有节点的指标 + for (node_id, node_metric) in &metrics_lock.node_metrics { + // 创建或获取该节点的CPU指标 + let cpu_gauge = cpu_gauges_lock.entry(*node_id) + .or_insert_with(Gauge::default); + cpu_gauge.set((node_metric.process_cpu_percent * 100.0) as i64); + + // 创建或获取该节点的内存指标 + let memory_gauge = memory_gauges_lock.entry(*node_id) + .or_insert_with(Gauge::default); + memory_gauge.set(node_metric.process_memory_bytes); + + // 注册到指标表 + registry_lock.register( + format!("waverless_node_{}_cpu_percent", node_id), + format!("节点 {} ({}) 的CPU使用率", node_id, node_metric.node_addr), + cpu_gauge.clone(), + ); + + registry_lock.register( + format!("waverless_node_{}_memory_bytes", node_id), + format!("节点 {} ({}) 的内存使用量", node_id, node_metric.node_addr), + memory_gauge.clone(), + ); + } + } + + // 生成Prometheus格式的响应 + let mut buffer = String::new(); + encode(&mut buffer, ®istry.lock().unwrap()) + .expect("Failed to encode metrics"); + + // 发送HTTP响应 + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}", + buffer.len(), + buffer + ); + + if let Err(e) = socket.write_all(response.as_bytes()).await { + tracing::warn!("发送HTTP响应失败: {}", e); + } + }); + } + Err(e) => { + tracing::error!("接受HTTP连接失败: {}", e); + } + } + } + }); + handles.push(JoinHandleWrapper::new(http_handle)); + + tracing::info!("指标聚合器已启动"); + Ok(handles) + } +} + +// 公共方法和工具函数 +impl MetricsAggregator { + /// 处理来自Worker节点的指标报告 + fn handle_metrics_report(metrics: Arc>, node_metrics: NodeMetrics) { + let mut aggregated = metrics.lock().unwrap(); + let node_id = node_metrics.node_id; + aggregated.node_metrics.insert(node_id, node_metrics); + tracing::debug!("收到节点 {} 的指标报告", node_id); + } + + /// 获取所有节点的聚合指标 + pub fn get_aggregated_metrics(&self) -> Arc> { + self.metrics.clone() + } +} \ No newline at end of file diff --git a/src/main/src/master/metrics/mod.rs b/src/main/src/master/metrics/mod.rs new file mode 100644 index 0000000..71f6a1b --- /dev/null +++ b/src/main/src/master/metrics/mod.rs @@ -0,0 +1,4 @@ +// waverless/src/main/src/master/metrics/mod.rs +mod aggregator; + +pub use aggregator::MetricsAggregator; \ No newline at end of file diff --git a/src/main/src/master/mod.rs b/src/main/src/master/mod.rs index 0a7e261..be52873 100644 --- a/src/main/src/master/mod.rs +++ b/src/main/src/master/mod.rs @@ -3,3 +3,4 @@ pub mod data; pub mod m_http_handler; pub mod m_master; pub mod m_metric_observor; +pub mod metrics; \ No newline at end of file diff --git a/src/main/src/metrics.rs b/src/main/src/metrics.rs index b73ec41..8086518 100644 --- a/src/main/src/metrics.rs +++ b/src/main/src/metrics.rs @@ -1,3 +1,4 @@ +<<<<<<< HEAD use prometheus::{register_counter, register_gauge, Counter, Gauge, Encoder, TextEncoder}; use lazy_static::lazy_static; use axum::response::IntoResponse; @@ -27,6 +28,56 @@ lazy_static! { "waverless_node_memory_usage_bytes", "节点内存使用量" ).unwrap(); +======= +use prometheus_client::encoding::text::encode; +use prometheus_client::metrics::counter::Counter; +use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::metrics::family::Family; +use prometheus_client::registry::Registry; +use std::sync::Mutex; +use lazy_static::lazy_static; +use axum::response::{IntoResponse, Response}; +use sysinfo::{System, SystemExt}; + +lazy_static! { + // 全局指标注册表 + pub static ref METRICS_REGISTRY: Mutex = { + let mut registry = Registry::default(); + + // HTTP请求总数 + registry.register( + "waverless_http_requests_total", + "HTTP请求总数", + HTTP_REQUESTS_TOTAL.clone() + ); + // 函数调用总数 + registry.register( + "waverless_function_calls_total", + "函数调用总数", + FUNCTION_CALLS_TOTAL.clone() + ); + // 批处理任务数 + registry.register( + "waverless_batch_tasks_total", + "批处理任务数", + BATCH_TASKS_TOTAL.clone() + ); + // 节点内存使用量 + registry.register( + "waverless_node_memory_usage_bytes", + "节点内存使用量", + NODE_MEMORY_USAGE.clone() + ); + + Mutex::new(registry) + }; + + // 指标定义 + pub static ref HTTP_REQUESTS_TOTAL: Counter = Counter::default(); + pub static ref FUNCTION_CALLS_TOTAL: Counter = Counter::default(); + pub static ref BATCH_TASKS_TOTAL: Counter = Counter::default(); + pub static ref NODE_MEMORY_USAGE: Gauge = Gauge::default(); +>>>>>>> prometheus } // /metrics 路由的 handler @@ -34,6 +85,7 @@ pub async fn metrics_handler() -> impl IntoResponse { // 每次请求时更新内存指标 let mut sys = System::new_all(); sys.refresh_memory(); +<<<<<<< HEAD NODE_MEMORY_USAGE.set(sys.used_memory() as f64 * 1024.0); // 转为字节 let encoder = TextEncoder::new(); @@ -41,4 +93,12 @@ pub async fn metrics_handler() -> impl IntoResponse { let mut buffer = Vec::new(); encoder.encode(&metric_families, &mut buffer).unwrap(); String::from_utf8(buffer).unwrap() +======= + let _= NODE_MEMORY_USAGE.set((sys.used_memory() * 1024) as i64); // 转为字节 + + let mut buffer = String::new(); + let registry = METRICS_REGISTRY.lock().unwrap(); + encode(&mut buffer, ®istry).unwrap(); + buffer +>>>>>>> prometheus } \ No newline at end of file diff --git a/src/main/src/sys.rs b/src/main/src/sys.rs index b3e1b72..92bd3d8 100644 --- a/src/main/src/sys.rs +++ b/src/main/src/sys.rs @@ -2,6 +2,8 @@ use crate::general::app::app_owned::wasm_host_funcs; use crate::general::app::instance::m_instance_manager::InstanceManager; use crate::general::app::m_executor::Executor; use crate::general::data::m_kv_user_client::KvUserClient; +use crate::general::metrics::MetricsCollector; +use crate::master::metrics::MetricsAggregator; use crate::{ config::NodesConfig, general::{ @@ -358,7 +360,9 @@ start_modules!( executor, Executor, kv_user_client, - KvUserClient + KvUserClient, + metrics_collector, + MetricsCollector ], [ metric_observor, @@ -372,7 +376,9 @@ start_modules!( data_master, DataMaster, app_master, - MasterAppMgmt + MasterAppMgmt, + metrics_aggregator, + MetricsAggregator ], [worker, WorkerCore] );