From 352d3a858ad618d5be6c0f2e8e091c0a0f534e5e Mon Sep 17 00:00:00 2001 From: Vchanger Date: Fri, 16 May 2025 20:55:04 +0800 Subject: [PATCH] support set ringbuf map size of tcpprobe and endpointprobe --- ...5\217\243\350\256\276\350\256\241_v0.3.md" | 1 + src/common/args.h | 3 ++ src/lib/probe/probe_params_parser.c | 33 +++++++++++++++++++ .../ebpf.probe/src/endpointprobe/endpoint.c | 14 ++++---- .../ebpf.probe/src/endpointprobe/tcp.bpf.c | 2 +- .../ebpf.probe/src/endpointprobe/udp.bpf.c | 2 +- .../extends/ebpf.probe/src/include/__compat.h | 16 +++++++++ .../extends/ebpf.probe/src/include/__libbpf.h | 9 +++++ .../ebpf.probe/src/tcpprobe/tcp_link.h | 2 +- .../ebpf.probe/src/tcpprobe/tcp_probe.c | 32 +++++++++--------- .../ebpf.probe/src/tcpprobe/tcpprobe.h | 8 ++--- src/probes/extends/python.probe/common/ipc.py | 1 + 12 files changed, 94 insertions(+), 29 deletions(-) diff --git "a/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" "b/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" index 0ee073b0..a730c350 100644 --- "a/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" +++ "b/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" @@ -219,6 +219,7 @@ curl -X PUT http://localhost:9999/tcp -d json=' | drops_thr | 丢包上送门限 | 0, [10~100000] | package | tcp, nic | Y | | res_lower_thr | 资源百分比下限 | 0, [0~100] | percent | ALL | Y | | res_upper_thr | 资源百分比上限 | 0, [0~100] | percent | ALL | Y | +| ringbuf_map_size | 上报map大小 | 1,[1~32] | MB | tcp, socket | Y | | report_event | 上报异常事件 | 0, [0, 1] | | ALL | Y | | metrics_type | 上报telemetry metrics | "raw", ["raw", "telemetry"] | | ALL | N | | env | 工作环境类型 | "node", ["node", "container", "kubenet"] | | ALL | N | diff --git a/src/common/args.h b/src/common/args.h index 1b82c3f2..e6e04f4d 100644 --- a/src/common/args.h +++ b/src/common/args.h @@ -28,6 +28,8 @@ #define MAX_IP_NUM 8 #define MAX_TGIDS_LEN 64 #define DEFAULT_CADVISOR_PORT 8083 +#define DEFAULT_RB_MAP_SZ 1 +#define MAX_RB_MAP_SZ 32 #define PYSCOPE_SERVER_URL_LEN 64 // compat for [domainName]:4040 for most of domains and xxx.xxx.xxx.xxx:4040 #ifndef PATH_LEN @@ -70,6 +72,7 @@ struct probe_params { unsigned int drops_count_thr; // Threshold of the number of drop packets, default is 0 unsigned int kafka_port; // the port to which kafka server attach. char logs; // Enable the logs function + unsigned char ringbuf_map_size; // size of output ringbuf map(unit: MB), only for tcpprobe and endpointprboe char report_cport; // Enable tcpprobe to report true client port char metrics_flags; // Support for report metrics flags(0x01(raw metrics), 0x02(openTelemetry metrics, eg.. P50/P90/P99) ); char env_flags; // Support for env flags(default 0x01(node), 0x02(container), 0x04(K8S)); diff --git a/src/lib/probe/probe_params_parser.c b/src/lib/probe/probe_params_parser.c index 811897ab..3242d39f 100644 --- a/src/lib/probe/probe_params_parser.c +++ b/src/lib/probe/probe_params_parser.c @@ -77,6 +77,8 @@ struct param_key_s { int key_type; }; +#define IS_POWER_OF_TWO(n) ((n) != 0 && (((n) & ((n) - 1)) == 0)) + static int parser_sample_peirod(struct probe_s *probe, const struct param_key_s *param_key, const void *key_item) { int value = Json_GetValueInt(key_item); @@ -168,6 +170,29 @@ static int parser_res_upper_thr(struct probe_s *probe, const struct param_key_s return 0; } +static int parser_ringbuf_map_size(struct probe_s *probe, const struct param_key_s *param_key, const void *key_item) +{ + int value = Json_GetValueInt(key_item); + if (value < param_key->v.min || value > param_key->v.max || value == INVALID_INT_NUM) { + PARSE_ERR("params.%s invalid value %d, must be in [%d, %d]", + param_key->key, value, param_key->v.min, param_key->v.max); + return -1; + } + + if (!IS_POWER_OF_TWO(value)) { + PARSE_ERR("params.%s invalid value %d, must be power of 2", param_key->key, value); + return -1; + } + + if (probe->probe_param.ringbuf_map_size != value && !IS_STOPPED_PROBE(probe)) { + PARSE_ERR("params.%s invalid can only be changed when probe is stopped", param_key->key); + return -1; + } + + probe->probe_param.ringbuf_map_size = (unsigned char)value; + return 0; +} + static int parser_report_event(struct probe_s *probe, const struct param_key_s *param_key, const void *key_item) { int value = Json_GetValueInt(key_item); @@ -584,6 +609,7 @@ SET_DEFAULT_PARAMS_INTER(min_exec_dur); SET_DEFAULT_PARAMS_INTER(min_aggr_dur); SET_DEFAULT_PARAMS_CAHR(logs); +SET_DEFAULT_PARAMS_CAHR(ringbuf_map_size); SET_DEFAULT_PARAMS_CAHR(report_cport); SET_DEFAULT_PARAMS_CAHR(support_ssl); SET_DEFAULT_PARAMS_CAHR(res_percent_upper); @@ -604,6 +630,7 @@ SET_DEFAULT_PARAMS_STR(flame_dir); #define DROPS_THR "drops_thr" #define RES_LOWER_THR "res_lower_thr" #define RES_UPPER_THR "res_upper_thr" +#define RB_MAP_SZ "ringbuf_map_size" #define REPORT_EVENT "report_event" #define REPORT_CPORT "report_cport" #define L7_PROTOCOL "l7_protocol" @@ -638,6 +665,7 @@ struct param_key_s param_keys[] = { {RES_UPPER_THR, {0, 0, 100, ""}, parser_res_upper_thr, set_default_params_char_res_percent_upper, JSON_NUMBER}, {REPORT_EVENT, {0, 0, 1, ""}, parser_report_event, set_default_params_char_logs, JSON_NUMBER}, #endif + {RB_MAP_SZ, {DEFAULT_RB_MAP_SZ, 1, MAX_RB_MAP_SZ, ""}, parser_ringbuf_map_size, set_default_params_char_ringbuf_map_size, JSON_NUMBER}, {REPORT_CPORT, {0, 0, 1, ""}, parser_report_cport, set_default_params_char_report_cport, JSON_NUMBER}, {L7_PROTOCOL, {0, 0, 0, ""}, parser_l7pro, set_default_params_inter_l7_probe_proto_flags, JSON_ARRAY}, {SUPPORT_SSL, {0, 0, 1, ""}, parser_support_ssl, set_default_params_char_support_ssl, JSON_NUMBER}, @@ -784,6 +812,11 @@ void probe_params_to_json(struct probe_s *probe, void *params) if (probe_type == PROBE_TCP) { Json_AddCharItemToObject(params, REPORT_CPORT, probe_param->report_cport); } + + if (probe_type == PROBE_TCP || probe_type == PROBE_SOCKET) { + Json_AddUIntItemToObject(params, RB_MAP_SZ, probe_param->ringbuf_map_size); + } + if (probe_type == PROBE_BASEINFO) { Json_AddStringToObject(params, ELF_PATH, probe_param->elf_path); } diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index 4cdaf418..fc27bb26 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -1006,7 +1006,8 @@ static void report_tcp_socks(struct endpoint_probe_s * probe) } #endif -static int endpoint_load_probe_tcp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, char is_load) +static int endpoint_load_probe_tcp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, + unsigned char ringbuf_map_size, char is_load) { struct bpf_buffer *buffer = NULL; @@ -1016,7 +1017,7 @@ static int endpoint_load_probe_tcp(struct endpoint_probe_s *probe, struct bpf_pr int kernel_version = probe_kernel_version(); PROG_ENABLE_ONLY_IF(tcp, bpf_raw_trace_tcp_retransmit_synack, kernel_version > KERNEL_VERSION(4, 18, 0)); PROG_ENABLE_ONLY_IF(tcp, bpf_trace_tcp_retransmit_synack_func, kernel_version <= KERNEL_VERSION(4, 18, 0)); - + MAP_SET_MAX_ENTRIES(tcp, tcp_evt_map, buffer, ringbuf_map_size); LOAD_ATTACH(endpoint, tcp, err, is_load); prog->skels[prog->num].skel = tcp_skel; @@ -1041,13 +1042,14 @@ err: return -1; } -static int endpoint_load_probe_udp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, char is_load) +static int endpoint_load_probe_udp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, + unsigned char ringbuf_map_size, char is_load) { struct bpf_buffer *buffer = NULL; OPEN_UDP_PROBE(udp, err, is_load, buffer); if (is_load) { - + MAP_SET_MAX_ENTRIES(udp, udp_evt_map, buffer, ringbuf_map_size); LOAD_ATTACH(endpoint, udp, err, is_load); prog->skels[prog->num].skel = udp_skel; @@ -1087,11 +1089,11 @@ static int endpoint_load_probe(struct endpoint_probe_s *probe, struct ipc_body_s return -1; } - if (endpoint_load_probe_tcp(probe, new_prog, is_load_tcp)) { + if (endpoint_load_probe_tcp(probe, new_prog, ipc_body->probe_param.ringbuf_map_size, is_load_tcp)) { goto err; } - if (endpoint_load_probe_udp(probe, new_prog, is_load_udp)) { + if (endpoint_load_probe_udp(probe, new_prog, ipc_body->probe_param.ringbuf_map_size, is_load_udp)) { goto err; } diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c b/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c index 6b0aa820..0937e086 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c @@ -102,7 +102,7 @@ struct { struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 64); + __uint(max_entries, 4096); } tcp_evt_map SEC(".maps"); struct sock_info_s { diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c b/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c index d2133d48..aa957bcc 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c @@ -39,7 +39,7 @@ char g_license[] SEC("license") = "GPL"; struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 64); + __uint(max_entries, 4096); } udp_evt_map SEC(".maps"); static __always_inline unsigned char *skb_network_header(const struct sk_buff *skb) diff --git a/src/probes/extends/ebpf.probe/src/include/__compat.h b/src/probes/extends/ebpf.probe/src/include/__compat.h index 22dd0135..2f2a669f 100644 --- a/src/probes/extends/ebpf.probe/src/include/__compat.h +++ b/src/probes/extends/ebpf.probe/src/include/__compat.h @@ -120,6 +120,22 @@ static inline int bpf_buffer__reset(struct bpf_map *map, struct bpf_map *heap) return type; } +#define MAX_RB_MAP_SZ 32 +static inline int bpf_buffer__set_max_entries(struct bpf_map *map, struct bpf_buffer *buffer, unsigned char map_size_mb) +{ + + if (buffer == NULL || buffer->type != BPF_MAP_TYPE_RINGBUF) { + return 0; + } + + if (map_size_mb == 0 || map_size_mb > MAX_RB_MAP_SZ) { + return -1; + } + + u32 max_entries = map_size_mb * 1024 * 1024; + return bpf_map__set_max_entries(map, max_entries); +} + static inline struct bpf_buffer *bpf_buffer__new(struct bpf_map *map, struct bpf_map *heap) { struct bpf_buffer *buffer; diff --git a/src/probes/extends/ebpf.probe/src/include/__libbpf.h b/src/probes/extends/ebpf.probe/src/include/__libbpf.h index b4964e31..34d0be5b 100644 --- a/src/probes/extends/ebpf.probe/src/include/__libbpf.h +++ b/src/probes/extends/ebpf.probe/src/include/__libbpf.h @@ -190,6 +190,15 @@ static __always_inline int set_memlock_rlimit(unsigned long limit) } \ } while (0) +#define MAP_SET_MAX_ENTRIES(probe_name, map_name, buffer, map_size_mb) \ + do { \ + int err; \ + err = bpf_buffer__set_max_entries(probe_name##_skel->maps.map_name, buffer, map_size_mb); \ + if (err) { \ + WARN("Failed to set map size for " #map_name " in " #probe_name "\n"); \ + } \ + } while (0) + #define MAP_INIT_BPF_BUFFER_SHARED(probe_name, map_name, buffer_ptr, load) \ do { \ if (load) { \ diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h index 65d477c4..ea6a4157 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h @@ -66,7 +66,7 @@ struct { #ifndef TCP_FD_BPF struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 64); + __uint(max_entries, 4096); } tcp_output SEC(".maps"); #endif diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c index f2fd2075..c91e46a8 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c @@ -771,12 +771,12 @@ static int proc_tcp_metrics_evt(void *ctx, void *data, u32 size) #endif -static int tcp_load_probe_stats(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_stats(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_stats, err, is_load, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_stats, err, is_load, buffer, ringbuf_map_size); if (is_load) { __SELECT_RCV_ESTABLISHED_HOOKPOINT(tcp_stats); @@ -807,12 +807,12 @@ err: return -1; } -static int tcp_load_probe_abn(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_abn(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_abn, err, is_load, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_abn, err, is_load, buffer, ringbuf_map_size); if (is_load) { __SELECT_RCV_ESTABLISHED_HOOKPOINT(tcp_abn); @@ -846,12 +846,12 @@ err: return -1; } -static int tcp_load_probe_txrx(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_txrx(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_LOAD_PROBE_WITH_OUTPUT(tcp_tx_rx, err, is_load, buffer); + __OPEN_LOAD_PROBE_WITH_OUTPUT(tcp_tx_rx, err, is_load, buffer, ringbuf_map_size); if (is_load) { prog->skels[prog->num].skel = tcp_tx_rx_skel; prog->skels[prog->num].fn = (skel_destroy_fn)tcp_tx_rx_bpf__destroy; @@ -874,12 +874,12 @@ err: return -1; } -static int tcp_load_probe_delay(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_delay(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_delay, err, is_load, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_delay, err, is_load, buffer, ringbuf_map_size); if (is_load) { bool is_const = probe_kernel_version() > KERNEL_VERSION(5, 12, 0); @@ -913,12 +913,12 @@ err: return -1; } -static int tcp_load_probe_link(struct tcp_mng_s *tcp_mng, struct probe_params *args, struct bpf_prog_s *prog) +static int tcp_load_probe_link(struct tcp_mng_s *tcp_mng, unsigned char ringbuf_map_size, struct bpf_prog_s *prog) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_link, err, 1, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_link, err, 1, buffer, ringbuf_map_size); __SELECT_DESTROY_SOCK_HOOKPOINT(tcp_link); __LOAD_PROBE(tcp_link, err, 1); @@ -971,28 +971,28 @@ int tcp_load_probe(struct tcp_mng_s *tcp_mng, struct ipc_body_s *ipc_body, struc } #if defined(__x86_64__) || defined(__riscv) - if (tcp_load_probe_link(tcp_mng, &(ipc_body->probe_param), prog)) { + if (tcp_load_probe_link(tcp_mng, ipc_body->probe_param.ringbuf_map_size, prog)) { goto err; } #endif - if (tcp_load_probe_txrx(tcp_mng, prog, is_load_txrx)) { + if (tcp_load_probe_txrx(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_txrx)) { goto err; } - if (tcp_load_probe_abn(tcp_mng, prog, is_load_abn)) { + if (tcp_load_probe_abn(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_abn)) { goto err; } - if (tcp_load_probe_stats(tcp_mng, prog, is_load_stats)) { + if (tcp_load_probe_stats(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_stats)) { goto err; } - if (tcp_load_probe_delay(tcp_mng, prog, is_load_delay)) { + if (tcp_load_probe_delay(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_delay)) { goto err; } #ifdef __aarch64__ - if (tcp_load_probe_link(tcp_mng, &(ipc_body->probe_param), prog)) { + if (tcp_load_probe_link(tcp_mng, ipc_body->probe_param.ringbuf_map_size, prog)) { goto err; } #endif diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h b/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h index db517911..1b59600f 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h @@ -230,11 +230,11 @@ int is_tcp_fd_probe_loaded(void); MAP_SET_PIN_PATH(probe_name, sock_map, TCP_LINK_SOCKS_PATH, load); \ MAP_SET_PIN_PATH(probe_name, tcp_fd_map, TCP_LINK_FD_PATH, load) -#define __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer) \ +#define __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer, output_map_size) \ INIT_OPEN_OPTS(probe_name); \ PREPARE_CUSTOM_BTF(probe_name); \ OPEN_OPTS(probe_name, end, load); \ - MAP_INIT_BPF_BUFFER(probe_name, tcp_output, buffer, load); \ + MAP_INIT_BPF_BUFFER(probe_name, tcp_output, buffer, output_map_size); \ MAP_SET_PIN_PATH(probe_name, args_map, TCP_LINK_ARGS_PATH, load); \ MAP_SET_PIN_PATH(probe_name, tcp_link_map, TCP_LINK_TCP_PATH, load); \ MAP_SET_PIN_PATH(probe_name, sock_map, TCP_LINK_SOCKS_PATH, load); \ @@ -248,8 +248,8 @@ int is_tcp_fd_probe_loaded(void); __OPEN_PROBE(probe_name, end, load); \ __LOAD_PROBE(probe_name, end, load) -#define __OPEN_LOAD_PROBE_WITH_OUTPUT(probe_name, end, load, buffer) \ - __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer); \ +#define __OPEN_LOAD_PROBE_WITH_OUTPUT(probe_name, end, load, buffer, output_map_size) \ + __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer, output_map_size); \ __LOAD_PROBE(probe_name, end, load) #define __UNLOAD_PROBE(probe_name) \ diff --git a/src/probes/extends/python.probe/common/ipc.py b/src/probes/extends/python.probe/common/ipc.py index 4889c5a6..b1f25e85 100644 --- a/src/probes/extends/python.probe/common/ipc.py +++ b/src/probes/extends/python.probe/common/ipc.py @@ -45,6 +45,7 @@ class ProbeParams(Structure): ("drops_count_thr", c_uint), ("kafka_port", c_uint), ("logs", c_char), + ("ringbuf_map_size", c_char), ("report_cport", c_char), ("metrics_flags", c_char), ("env_flags", c_char), -- Gitee