diff --git "a/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" "b/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" index 0ee073b0f49e79334da65a002528dbd1ed3cfb83..a730c350f9c4e1b1d1ac123858ccdce335a1ce32 100644 --- "a/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" +++ "b/config/gala-gopher\346\224\257\346\214\201\345\212\250\346\200\201\351\205\215\347\275\256\346\216\245\345\217\243\350\256\276\350\256\241_v0.3.md" @@ -219,6 +219,7 @@ curl -X PUT http://localhost:9999/tcp -d json=' | drops_thr | 丢包上送门限 | 0, [10~100000] | package | tcp, nic | Y | | res_lower_thr | 资源百分比下限 | 0, [0~100] | percent | ALL | Y | | res_upper_thr | 资源百分比上限 | 0, [0~100] | percent | ALL | Y | +| ringbuf_map_size | 上报map大小 | 1,[1~32] | MB | tcp, socket | Y | | report_event | 上报异常事件 | 0, [0, 1] | | ALL | Y | | metrics_type | 上报telemetry metrics | "raw", ["raw", "telemetry"] | | ALL | N | | env | 工作环境类型 | "node", ["node", "container", "kubenet"] | | ALL | N | diff --git a/src/common/args.h b/src/common/args.h index 1b82c3f2ac0b4089b523dd663616a898c9aa85a3..e6e04f4d586fed1685e9486b8a708f7ea1e32f3e 100644 --- a/src/common/args.h +++ b/src/common/args.h @@ -28,6 +28,8 @@ #define MAX_IP_NUM 8 #define MAX_TGIDS_LEN 64 #define DEFAULT_CADVISOR_PORT 8083 +#define DEFAULT_RB_MAP_SZ 1 +#define MAX_RB_MAP_SZ 32 #define PYSCOPE_SERVER_URL_LEN 64 // compat for [domainName]:4040 for most of domains and xxx.xxx.xxx.xxx:4040 #ifndef PATH_LEN @@ -70,6 +72,7 @@ struct probe_params { unsigned int drops_count_thr; // Threshold of the number of drop packets, default is 0 unsigned int kafka_port; // the port to which kafka server attach. char logs; // Enable the logs function + unsigned char ringbuf_map_size; // size of output ringbuf map(unit: MB), only for tcpprobe and endpointprboe char report_cport; // Enable tcpprobe to report true client port char metrics_flags; // Support for report metrics flags(0x01(raw metrics), 0x02(openTelemetry metrics, eg.. P50/P90/P99) ); char env_flags; // Support for env flags(default 0x01(node), 0x02(container), 0x04(K8S)); diff --git a/src/lib/probe/probe_params_parser.c b/src/lib/probe/probe_params_parser.c index 811897abbab1e8e549c88c941ea491ce4897b1f0..3242d39fe16bf128c9cde93eb00bdc8835caffe7 100644 --- a/src/lib/probe/probe_params_parser.c +++ b/src/lib/probe/probe_params_parser.c @@ -77,6 +77,8 @@ struct param_key_s { int key_type; }; +#define IS_POWER_OF_TWO(n) ((n) != 0 && (((n) & ((n) - 1)) == 0)) + static int parser_sample_peirod(struct probe_s *probe, const struct param_key_s *param_key, const void *key_item) { int value = Json_GetValueInt(key_item); @@ -168,6 +170,29 @@ static int parser_res_upper_thr(struct probe_s *probe, const struct param_key_s return 0; } +static int parser_ringbuf_map_size(struct probe_s *probe, const struct param_key_s *param_key, const void *key_item) +{ + int value = Json_GetValueInt(key_item); + if (value < param_key->v.min || value > param_key->v.max || value == INVALID_INT_NUM) { + PARSE_ERR("params.%s invalid value %d, must be in [%d, %d]", + param_key->key, value, param_key->v.min, param_key->v.max); + return -1; + } + + if (!IS_POWER_OF_TWO(value)) { + PARSE_ERR("params.%s invalid value %d, must be power of 2", param_key->key, value); + return -1; + } + + if (probe->probe_param.ringbuf_map_size != value && !IS_STOPPED_PROBE(probe)) { + PARSE_ERR("params.%s invalid can only be changed when probe is stopped", param_key->key); + return -1; + } + + probe->probe_param.ringbuf_map_size = (unsigned char)value; + return 0; +} + static int parser_report_event(struct probe_s *probe, const struct param_key_s *param_key, const void *key_item) { int value = Json_GetValueInt(key_item); @@ -584,6 +609,7 @@ SET_DEFAULT_PARAMS_INTER(min_exec_dur); SET_DEFAULT_PARAMS_INTER(min_aggr_dur); SET_DEFAULT_PARAMS_CAHR(logs); +SET_DEFAULT_PARAMS_CAHR(ringbuf_map_size); SET_DEFAULT_PARAMS_CAHR(report_cport); SET_DEFAULT_PARAMS_CAHR(support_ssl); SET_DEFAULT_PARAMS_CAHR(res_percent_upper); @@ -604,6 +630,7 @@ SET_DEFAULT_PARAMS_STR(flame_dir); #define DROPS_THR "drops_thr" #define RES_LOWER_THR "res_lower_thr" #define RES_UPPER_THR "res_upper_thr" +#define RB_MAP_SZ "ringbuf_map_size" #define REPORT_EVENT "report_event" #define REPORT_CPORT "report_cport" #define L7_PROTOCOL "l7_protocol" @@ -638,6 +665,7 @@ struct param_key_s param_keys[] = { {RES_UPPER_THR, {0, 0, 100, ""}, parser_res_upper_thr, set_default_params_char_res_percent_upper, JSON_NUMBER}, {REPORT_EVENT, {0, 0, 1, ""}, parser_report_event, set_default_params_char_logs, JSON_NUMBER}, #endif + {RB_MAP_SZ, {DEFAULT_RB_MAP_SZ, 1, MAX_RB_MAP_SZ, ""}, parser_ringbuf_map_size, set_default_params_char_ringbuf_map_size, JSON_NUMBER}, {REPORT_CPORT, {0, 0, 1, ""}, parser_report_cport, set_default_params_char_report_cport, JSON_NUMBER}, {L7_PROTOCOL, {0, 0, 0, ""}, parser_l7pro, set_default_params_inter_l7_probe_proto_flags, JSON_ARRAY}, {SUPPORT_SSL, {0, 0, 1, ""}, parser_support_ssl, set_default_params_char_support_ssl, JSON_NUMBER}, @@ -784,6 +812,11 @@ void probe_params_to_json(struct probe_s *probe, void *params) if (probe_type == PROBE_TCP) { Json_AddCharItemToObject(params, REPORT_CPORT, probe_param->report_cport); } + + if (probe_type == PROBE_TCP || probe_type == PROBE_SOCKET) { + Json_AddUIntItemToObject(params, RB_MAP_SZ, probe_param->ringbuf_map_size); + } + if (probe_type == PROBE_BASEINFO) { Json_AddStringToObject(params, ELF_PATH, probe_param->elf_path); } diff --git a/src/lib/probe/snooper.c b/src/lib/probe/snooper.c index 1fa67a8293f927b72fe780d00fd72a1ecdee7850..57aec21079c40aff2f424260bc3bba669b871ceb 100644 --- a/src/lib/probe/snooper.c +++ b/src/lib/probe/snooper.c @@ -634,7 +634,7 @@ static void __build_ipc_body(struct probe_s *probe, struct ipc_body_s* ipc_body) ipc_body->probe_flags |= IPC_FLAGS_SNOOPER_CHG; } if (probe->resnd_snooper_for_restart) { - ipc_body->probe_flags = 0; + ipc_body->probe_flags = (IPC_FLAGS_SNOOPER_CHG | IPC_FLAGS_SNOOPER_CHG); } memcpy(&(ipc_body->probe_param), &probe->probe_param, sizeof(struct probe_params)); return; diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c index 4cdaf418f5c368e32180fbe5d91083a70c7bd219..fc27bb265a60888cbe2695c3e74dff7c80a98c2e 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/endpoint.c @@ -1006,7 +1006,8 @@ static void report_tcp_socks(struct endpoint_probe_s * probe) } #endif -static int endpoint_load_probe_tcp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, char is_load) +static int endpoint_load_probe_tcp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, + unsigned char ringbuf_map_size, char is_load) { struct bpf_buffer *buffer = NULL; @@ -1016,7 +1017,7 @@ static int endpoint_load_probe_tcp(struct endpoint_probe_s *probe, struct bpf_pr int kernel_version = probe_kernel_version(); PROG_ENABLE_ONLY_IF(tcp, bpf_raw_trace_tcp_retransmit_synack, kernel_version > KERNEL_VERSION(4, 18, 0)); PROG_ENABLE_ONLY_IF(tcp, bpf_trace_tcp_retransmit_synack_func, kernel_version <= KERNEL_VERSION(4, 18, 0)); - + MAP_SET_MAX_ENTRIES(tcp, tcp_evt_map, buffer, ringbuf_map_size); LOAD_ATTACH(endpoint, tcp, err, is_load); prog->skels[prog->num].skel = tcp_skel; @@ -1041,13 +1042,14 @@ err: return -1; } -static int endpoint_load_probe_udp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, char is_load) +static int endpoint_load_probe_udp(struct endpoint_probe_s *probe, struct bpf_prog_s *prog, + unsigned char ringbuf_map_size, char is_load) { struct bpf_buffer *buffer = NULL; OPEN_UDP_PROBE(udp, err, is_load, buffer); if (is_load) { - + MAP_SET_MAX_ENTRIES(udp, udp_evt_map, buffer, ringbuf_map_size); LOAD_ATTACH(endpoint, udp, err, is_load); prog->skels[prog->num].skel = udp_skel; @@ -1087,11 +1089,11 @@ static int endpoint_load_probe(struct endpoint_probe_s *probe, struct ipc_body_s return -1; } - if (endpoint_load_probe_tcp(probe, new_prog, is_load_tcp)) { + if (endpoint_load_probe_tcp(probe, new_prog, ipc_body->probe_param.ringbuf_map_size, is_load_tcp)) { goto err; } - if (endpoint_load_probe_udp(probe, new_prog, is_load_udp)) { + if (endpoint_load_probe_udp(probe, new_prog, ipc_body->probe_param.ringbuf_map_size, is_load_udp)) { goto err; } diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c b/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c index 6b0aa820e591021b64b144a94ad2353876ec0f43..0937e0866f765694b5ba8f436a8ebe6bc7d34fac 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/tcp.bpf.c @@ -102,7 +102,7 @@ struct { struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 64); + __uint(max_entries, 4096); } tcp_evt_map SEC(".maps"); struct sock_info_s { diff --git a/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c b/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c index d2133d4878f4cd439f998ae559ea775caa99e9f4..aa957bcc5d34cc99e7e35392af757bc745b02d47 100644 --- a/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c +++ b/src/probes/extends/ebpf.probe/src/endpointprobe/udp.bpf.c @@ -39,7 +39,7 @@ char g_license[] SEC("license") = "GPL"; struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 64); + __uint(max_entries, 4096); } udp_evt_map SEC(".maps"); static __always_inline unsigned char *skb_network_header(const struct sk_buff *skb) diff --git a/src/probes/extends/ebpf.probe/src/include/__compat.h b/src/probes/extends/ebpf.probe/src/include/__compat.h index 22dd01356655dcca20bb0efa6b69eb33ace4e0d5..2f2a669f5e66c6a657a28985eaffbbeda1c1b1ea 100644 --- a/src/probes/extends/ebpf.probe/src/include/__compat.h +++ b/src/probes/extends/ebpf.probe/src/include/__compat.h @@ -120,6 +120,22 @@ static inline int bpf_buffer__reset(struct bpf_map *map, struct bpf_map *heap) return type; } +#define MAX_RB_MAP_SZ 32 +static inline int bpf_buffer__set_max_entries(struct bpf_map *map, struct bpf_buffer *buffer, unsigned char map_size_mb) +{ + + if (buffer == NULL || buffer->type != BPF_MAP_TYPE_RINGBUF) { + return 0; + } + + if (map_size_mb == 0 || map_size_mb > MAX_RB_MAP_SZ) { + return -1; + } + + u32 max_entries = map_size_mb * 1024 * 1024; + return bpf_map__set_max_entries(map, max_entries); +} + static inline struct bpf_buffer *bpf_buffer__new(struct bpf_map *map, struct bpf_map *heap) { struct bpf_buffer *buffer; diff --git a/src/probes/extends/ebpf.probe/src/include/__libbpf.h b/src/probes/extends/ebpf.probe/src/include/__libbpf.h index b4964e314ba5fc6b8da4a43ee30dfa2c3269ee9e..34d0be5bd983827cad6e2ad4995ca10b1407d95b 100644 --- a/src/probes/extends/ebpf.probe/src/include/__libbpf.h +++ b/src/probes/extends/ebpf.probe/src/include/__libbpf.h @@ -190,6 +190,15 @@ static __always_inline int set_memlock_rlimit(unsigned long limit) } \ } while (0) +#define MAP_SET_MAX_ENTRIES(probe_name, map_name, buffer, map_size_mb) \ + do { \ + int err; \ + err = bpf_buffer__set_max_entries(probe_name##_skel->maps.map_name, buffer, map_size_mb); \ + if (err) { \ + WARN("Failed to set map size for " #map_name " in " #probe_name "\n"); \ + } \ + } while (0) + #define MAP_INIT_BPF_BUFFER_SHARED(probe_name, map_name, buffer_ptr, load) \ do { \ if (load) { \ diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h index 65d477c46816bc6b0192d1d73dfc22d1b2e1bea4..ea6a41572b83b7c7ff51484f00ceebdb7dcd8359 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_link.h @@ -66,7 +66,7 @@ struct { #ifndef TCP_FD_BPF struct { __uint(type, BPF_MAP_TYPE_RINGBUF); - __uint(max_entries, 64); + __uint(max_entries, 4096); } tcp_output SEC(".maps"); #endif diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c index f2fd2075cc61c159afc4e54375dd22ea83b09f1e..bfe6e17c9544cbdc9524be6fe2607581f8418e8c 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_probe.c @@ -771,12 +771,12 @@ static int proc_tcp_metrics_evt(void *ctx, void *data, u32 size) #endif -static int tcp_load_probe_stats(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_stats(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_stats, err, is_load, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_stats, err, is_load, buffer, ringbuf_map_size); if (is_load) { __SELECT_RCV_ESTABLISHED_HOOKPOINT(tcp_stats); @@ -807,12 +807,12 @@ err: return -1; } -static int tcp_load_probe_abn(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_abn(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_abn, err, is_load, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_abn, err, is_load, buffer, ringbuf_map_size); if (is_load) { __SELECT_RCV_ESTABLISHED_HOOKPOINT(tcp_abn); @@ -846,12 +846,12 @@ err: return -1; } -static int tcp_load_probe_txrx(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_txrx(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_LOAD_PROBE_WITH_OUTPUT(tcp_tx_rx, err, is_load, buffer); + __OPEN_LOAD_PROBE_WITH_OUTPUT(tcp_tx_rx, err, is_load, buffer, ringbuf_map_size); if (is_load) { prog->skels[prog->num].skel = tcp_tx_rx_skel; prog->skels[prog->num].fn = (skel_destroy_fn)tcp_tx_rx_bpf__destroy; @@ -874,12 +874,12 @@ err: return -1; } -static int tcp_load_probe_delay(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, char is_load) +static int tcp_load_probe_delay(struct tcp_mng_s *tcp_mng, struct bpf_prog_s *prog, unsigned char ringbuf_map_size, char is_load) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_delay, err, is_load, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_delay, err, is_load, buffer, ringbuf_map_size); if (is_load) { bool is_const = probe_kernel_version() > KERNEL_VERSION(5, 12, 0); @@ -913,12 +913,12 @@ err: return -1; } -static int tcp_load_probe_link(struct tcp_mng_s *tcp_mng, struct probe_params *args, struct bpf_prog_s *prog) +static int tcp_load_probe_link(struct tcp_mng_s *tcp_mng, unsigned char ringbuf_map_size, struct bpf_prog_s *prog) { int err; struct bpf_buffer *buffer = NULL; - __OPEN_PROBE_WITH_OUTPUT(tcp_link, err, 1, buffer); + __OPEN_PROBE_WITH_OUTPUT(tcp_link, err, 1, buffer, ringbuf_map_size); __SELECT_DESTROY_SOCK_HOOKPOINT(tcp_link); __LOAD_PROBE(tcp_link, err, 1); @@ -970,32 +970,25 @@ int tcp_load_probe(struct tcp_mng_s *tcp_mng, struct ipc_body_s *ipc_body, struc return -1; } -#if defined(__x86_64__) || defined(__riscv) - if (tcp_load_probe_link(tcp_mng, &(ipc_body->probe_param), prog)) { + if (tcp_load_probe_link(tcp_mng, ipc_body->probe_param.ringbuf_map_size, prog)) { goto err; } -#endif - if (tcp_load_probe_txrx(tcp_mng, prog, is_load_txrx)) { + if (tcp_load_probe_txrx(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_txrx)) { goto err; } - if (tcp_load_probe_abn(tcp_mng, prog, is_load_abn)) { + if (tcp_load_probe_abn(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_abn)) { goto err; } - if (tcp_load_probe_stats(tcp_mng, prog, is_load_stats)) { + if (tcp_load_probe_stats(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_stats)) { goto err; } - if (tcp_load_probe_delay(tcp_mng, prog, is_load_delay)) { - goto err; - } -#ifdef __aarch64__ - if (tcp_load_probe_link(tcp_mng, &(ipc_body->probe_param), prog)) { + if (tcp_load_probe_delay(tcp_mng, prog, ipc_body->probe_param.ringbuf_map_size, is_load_delay)) { goto err; } -#endif INFO("[TCPPROBE]: Successfully load ebpf prog.\n"); *new_prog = prog; diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tx_rx.bpf.c b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tx_rx.bpf.c index 878ece70c35cf5f5eee34cc0a285687abaf15717..c0bc0fd8d28edaa22b4f37693dc75781ef723d3f 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tx_rx.bpf.c +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcp_tx_rx.bpf.c @@ -73,7 +73,7 @@ static void get_tcp_tx_rx_segs(struct sock *sk, struct tcp_tx_rx* stats) stats->segs_out = _(tcp_sk->segs_out); } -KPROBE(tcp_sendmsg, pt_regs) +KPROBE(tcp_sendmsg_locked, pt_regs) { struct tcp_metrics_s *metrics; struct sock *sk = (struct sock *)PT_REGS_PARM1(ctx); diff --git a/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h b/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h index db517911e030cb37875556cf5f39f007c6237d19..1b59600fff314ce91e0bfc5dea641c7bc587573d 100644 --- a/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h +++ b/src/probes/extends/ebpf.probe/src/tcpprobe/tcpprobe.h @@ -230,11 +230,11 @@ int is_tcp_fd_probe_loaded(void); MAP_SET_PIN_PATH(probe_name, sock_map, TCP_LINK_SOCKS_PATH, load); \ MAP_SET_PIN_PATH(probe_name, tcp_fd_map, TCP_LINK_FD_PATH, load) -#define __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer) \ +#define __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer, output_map_size) \ INIT_OPEN_OPTS(probe_name); \ PREPARE_CUSTOM_BTF(probe_name); \ OPEN_OPTS(probe_name, end, load); \ - MAP_INIT_BPF_BUFFER(probe_name, tcp_output, buffer, load); \ + MAP_INIT_BPF_BUFFER(probe_name, tcp_output, buffer, output_map_size); \ MAP_SET_PIN_PATH(probe_name, args_map, TCP_LINK_ARGS_PATH, load); \ MAP_SET_PIN_PATH(probe_name, tcp_link_map, TCP_LINK_TCP_PATH, load); \ MAP_SET_PIN_PATH(probe_name, sock_map, TCP_LINK_SOCKS_PATH, load); \ @@ -248,8 +248,8 @@ int is_tcp_fd_probe_loaded(void); __OPEN_PROBE(probe_name, end, load); \ __LOAD_PROBE(probe_name, end, load) -#define __OPEN_LOAD_PROBE_WITH_OUTPUT(probe_name, end, load, buffer) \ - __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer); \ +#define __OPEN_LOAD_PROBE_WITH_OUTPUT(probe_name, end, load, buffer, output_map_size) \ + __OPEN_PROBE_WITH_OUTPUT(probe_name, end, load, buffer, output_map_size); \ __LOAD_PROBE(probe_name, end, load) #define __UNLOAD_PROBE(probe_name) \ diff --git a/src/probes/extends/ebpf.probe/src/tprofilingprobe/bpf_prog.c b/src/probes/extends/ebpf.probe/src/tprofilingprobe/bpf_prog.c index b522d56dab21fbca96c135ba979e6663f55dc4fb..b2f988dfe81fbd28f51f9603b5cea48e8ecf8e66 100644 --- a/src/probes/extends/ebpf.probe/src/tprofilingprobe/bpf_prog.c +++ b/src/probes/extends/ebpf.probe/src/tprofilingprobe/bpf_prog.c @@ -646,15 +646,15 @@ int attach_uprobes(struct ipc_body_s *ipc_body) if (mem_pymem_obj != NULL && proc_link->mem_pymem_link == NULL) { ret = attach_mem_pymem_probes_per_proc(mem_pymem_obj, proc_link); if (ret == 0) { - return 0; + continue; } - TP_DEBUG("Failed to attach mem_pymem probes: pid=%d\n", pid); + TP_DEBUG("Failed to attach mem_pymem probes: pid=%d\n", next_key.proc_id); } } if (mem_glibc_obj != NULL && proc_link->mem_glibc_link == NULL) { ret = attach_mem_glibc_probes_per_proc(mem_glibc_obj, proc_link); if (ret) { - TP_DEBUG("Failed to attach mem_glibc probes: pid=%d\n", pid); + TP_DEBUG("Failed to attach mem_glibc probes: pid=%d\n", next_key.proc_id); } } } diff --git a/src/probes/extends/ebpf.probe/src/tprofilingprobe/readme.md b/src/probes/extends/ebpf.probe/src/tprofilingprobe/readme.md index c812a68b28688dbaaa5e498a2d316e44a45fb989..adf82fb856553c21fde1b2988e43d61aadc3f8d4 100644 --- a/src/probes/extends/ebpf.probe/src/tprofilingprobe/readme.md +++ b/src/probes/extends/ebpf.probe/src/tprofilingprobe/readme.md @@ -17,6 +17,7 @@ Profiling 是 gala-gopher 提供的一个主机侧的进程/线程级应用性 - 网络 I/O 耗时、阻塞问题 - 锁竞争问题 - 死锁问题 +- DDR OOM问题 随着更多类型的事件不断地补充和完善,tprofiling 将能够覆盖更多类型的应用性能问题场景。 @@ -45,7 +46,44 @@ tprofiling 当前已观测的系统调用事件参见章节: [支持的系统 此外,根据线程是否在 CPU 上运行可以将线程的运行状态分为两种:oncpu 和 offcpu ,前者表示线程正在 cpu 上运行,后者表示线程不在 cpu 上运行。通过观测线程的 oncpu 事件,可以识别线程是否正在执行耗时的 cpu 操作。通过观测线程的 offcpu 事件,可以观察线程在IO、锁等阻塞事件上等待的时间以及阻塞的原因。 - +**DDR OOM 事件** + +| **函数名** | **作用** | **原型** | **描述** | **使用场景** | +|--------------------|--------------------------------------------------------------------------|------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------|--------------------------------------------------------| +| `malloc` | 分配指定大小的内存,未初始化。 | `void *malloc(size_t size);` | 从堆中分配指定字节数的内存,内容未初始化。分配失败时返回 `NULL`。 | 动态分配内存。 | +| `calloc` | 分配指定大小的内存,并初始化为零。 | `void *calloc(size_t num, size_t size);` | 分配 `num` 个元素,每个元素大小为 `size` 字节,且内存初始化为零。分配失败时返回 `NULL`。 | 需要初始化为零的动态内存。 | +| `realloc` | 重新调整已分配内存的大小,若需要,复制数据并释放原内存。 | `void *realloc(void *ptr, size_t new_size);` | 调整指针 `ptr` 指向的内存块大小到 `new_size` 字节。若无法扩展,尝试分配新内存并复制数据。 | 动态调整数组或数据结构的大小。 | +| `mmap` | 将文件或设备映射到内存,或分配匿名内存。 | `void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);` | 将文件或设备映射到进程的地址空间,或分配匿名内存。返回映射区域的指针。失败时返回 `MAP_FAILED`。 | 内存映射文件或共享内存,提升内存访问效率。 | +| `posix_memalign` | 分配对齐到指定字节边界的内存。 | `int posix_memalign(void **ptr, size_t alignment, size_t size);` | 分配 `size` 字节的内存,且内存起始地址对齐到 `alignment` 字节边界。返回 `0` 表示成功,非零表示错误。 | 需要特定内存对齐的场景,如硬件加速。 | +| `valloc` | 分配页面对齐的内存。 | `void *valloc(size_t size);` | 分配指定大小的内存,并确保起始地址是页面大小的整数倍(通常为 4KB)。 | 需要页面对齐的内存分配,底层内存管理。 | +| `memalign` | 分配对齐到指定字节边界的内存。 | `void *memalign(size_t alignment, size_t size);` | 分配大小为 `size` 字节的内存,且起始地址对齐到 `alignment` 字节边界。`alignment` 必须是 2 的幂。 | 需要特定内存对齐的内存分配。 | +| `pvalloc` | 分配页面对齐的内存,并保证大小是页面大小的整数倍。 | `void *pvalloc(size_t size);` | 分配内存并保证其大小是页面大小的整数倍。类似 `valloc`,但总大小会四舍五入到页面大小的整数倍。 | 需要页面对齐且大小是页面倍数的内存。 | +| `aligned_alloc` | 分配对齐到指定字节边界的内存。 | `void *aligned_alloc(size_t alignment, size_t size);` | 分配大小为 `size` 字节的内存,且起始地址对齐到 `alignment` 字节边界。`alignment` 必须是 2 的幂。 | 需要内存对齐且符合对齐要求的场景。 | +| `free` | 释放通过 `malloc`、`calloc`、`realloc` 等分配的内存。 | `void free(void *ptr);` | 释放通过动态分配函数分配的内存。释放后该指针不再有效。 | 释放动态分配的内存。 | +| `munmap` | 解除通过 `mmap` 映射的内存区域映射。 | `int munmap(void *addr, size_t length);` | 解除通过 `mmap` 映射的内存区域的映射,释放资源。 | 解除映射文件或内存区域。 | + +Python 内存管理函数 + +| 函数名称 | 作用描述 | +| --- | --- | +| `PyMem_RawMalloc` | 分配一块指定大小的原始内存,不进行任何初始化,返回指向分配内存的指针。 | +| `PyMem_RawCalloc` | 分配一块指定大小的原始内存,并将其初始化为零,返回指向分配内存的指针。 | +| `PyMem_RawRealloc` | 重新分配一块指定大小的原始内存,可能移动内存块,返回指向新内存的指针。 | +| `PyMem_Malloc` | 分配一块指定大小的内存,与 PyMem_RawMalloc 类似,但可能进行额外的管理。 | +| `PyMem_Calloc` | 分配一块指定大小的内存,并将其初始化为零,与 PyMem_RawCalloc 类似。 | +| `PyMem_Realloc` | 重新分配一块指定大小的内存,与 PyMem_RawRealloc 类似。 | +| `PyObject_Malloc` | 分配一块指定大小的内存,专门用于 Python 对象,可能进行额外的管理。 | +| `PyObject_Calloc` | 分配一块指定大小的内存,并将其初始化为零,专门用于 Python 对象。 | +| `PyObject_Realloc` | 重新分配一块指定大小的内存,专门用于 Python 对象。 | +| `PyMem_RawFree` | 释放一块之前通过 PyMem_RawMalloc 或 PyMem_RawRealloc 分配的原始内存。 | +| `PyMem_Free` | 释放一块之前通过 PyMem_Malloc 或 PyMem_Realloc 分配的内存。 | +| `PyObject_Free` | 释放一块之前通过 PyObject_Malloc 或 PyObject_Realloc 分配的内存。 | + +主机侧DDR内存OOM的定界定位策略包括如下几个步骤: + +1) 使用内存水线机制对主机OOM进行预警; +2) OOM预警触发时,使用定位工具找到发生OOM的目标进程; +3) 定位到OOM目标进程后,进一步使用定位工具定位到发生OOM的模块或代码堆栈。 ### 事件内容 @@ -104,7 +142,154 @@ tprofiling 当前已观测的系统调用事件参见章节: [支持的系统 不同事件类型支持的扩展事件属性的详细情况参见章节:[支持的系统调用事件](###支持的系统调用事件) 。 ### 事件输出 +### oncpu和syscall事件 + +输出内容存放到本地文件,文件存放 /var/log/tprofiling 目录下,文件名格式: + +`timeline-trace-.json` + +其中``是一个时间戳,表示启动 Profiling 监控任务的时间戳,一个完整的文件路径示例为:`/var/log/tprofiling /timeline-trace-202404261508.json`。 + +文件内容的格式如下: + +```{ +"traceEvents": [ + {}, + { + "cat": "oncpu", + "name": "oncpu", + "ph": "b", + "pid": 1243695, + "tid": 1243707, + "ts": 1735379937905379, + "id": 57, + "cname": "good", + "args": {} + }, + { + "cat": "oncpu", + "name": "oncpu", + "ph": "e", + "pid": 1243695, + "tid": 1243707, + "ts": 1735379937905403, + "id": 57, + "cname": "good", + "args": { + "count": 1, + "event.type": "oncpu", + "thread.name": "node" + } + }, + { + "cat": "syscall", + "name": "poll", + "ph": "X", + "pid": 1243695, + "tid": 1243707, + "ts": 1735379937905397, + "dur": 500530.860, + "args": { + "count": 1, + "thread.name": "node", + "event.type": "sched" + }, + "sf": "1" + } + ], + "stackFrames": { + "1": { + "category": "func", + "name": "Thread#_bootstrap[p]" + }, + "2": { + "category": "func", + "name": "Thread#_bootstrap_inner[p]", + "parent": "1" + } + } +} +``` + +说明:文件内容为json格式,它满足chrome 的trace event格式,输出的内容以事件的形式存放在traceEvents字段中,堆栈存放在stackFrames字段中。 +traceEvents: + +- `cat`:事件分类,oncpu或syscall; +- `name`:事件类型,目前支持 oncpu、file、net、lock、sched 五种; +- `ph`:单个字符,根据输出的事件类型而变化。参考[Trace Event Format](https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview?pli=1&tab=t.0); +- `pid`:事件所属的进程ID; +- `tid`:事件所属的线程ID; +- `ts`:事件上报的时间戳,单位是纳秒; +- `id`:事件标识符,便于匹配异步事件开始和结束(仅异步事件涉及); +- `cname`:没有特殊含义,一个tricky,指向在UI展示的颜色,当前只有oncpu事件被赋值为绿色; +- `dur`:表示事件的耗时,单位是微秒(oncpu事件不涉及); +- `sf`:表示事件的函数调用栈,内容是以分号(;)分隔的函数名列表,分号左边是调用方的函数名,分号右边是被调用的函数名(oncpu事件不涉及); +- `args`:表示每个事件特有的信息,内容主要包括: + - count字段,表示事件发生的计数; + - thread.name字段,表示事件所在的线程的名称; + - xxx.info:不同类型的事件包含不同的属性值(oncpu事件不涉及); + - event.type字段,事件类型,目前支持 oncpu、file、net、lock、sched 五种 + +stackFrames(oncpu不涉及,为空): + +- `1`:调用栈ID; +- `category`:调用栈类型; +- `name`:调用栈具体名; +- `parent`:父节点ID(非必须)。 + +### DDR OOM事件 + +#### 定位到OOM进程的代码堆栈 + +输出内容存放到本地文件,文件存放在 gala-gopher 容器的 /var/log/gala-gopher/tprofiling 目录下,文件名格式: +timeline-trace-.json +其中是一个时间戳,表示启动 Profiling 监控任务的时间戳,一个完整的文件路径示例为: /var/log/gala-gopher/tprofiling /timeline-trace-202404261508.json 。 +文件内容的格式如下: + +``` +{ + "traceEvents": [ + {"name": "memory::Allocs", "ph": "C", "ts": 0, "pid": 100, "args": {"current_allocs": 0}}, + {"name": "memory::Allocs", "ph": "C", "ts": 1, "pid": 100, "args": {"current_allocs": 20}}, + { + "cat": "memory", "pid": 100, "ts": 1, "ph": "O", "name": "memory::Heap", "id": "0x1", + "args": { + "snapshot": [ + {"trace": "funcA;funcB;funcC", "current_allocs": 10}, + {"trace": "funcA;funcB;funcD", "current_allocs": 5}, + {"trace": "funcA;funcB;funcE", "current_allocs": 1} + ] + } + } + ] +} +``` + +说明:文件内容为json格式,它满足chrome 的trace event格式,输出的内容以事件的形式存放在traceEvents字段中,包括两种类型的事件。 + +* 当前申请的内存使用量 + 在整个Profiling周期内,会每隔一段时间将当前时间申请的内存使用量作为一个计数事件进行上报。计数事件的主要内容如下: +* name:事件名,它的值为 memory::Allocs ,用于标识该计数事件类型 + + * ts:事件上报的时间戳 + * pid:进程号 + * args.current_allocs:当前申请的内存使用量 +* 当前申请的内存堆栈快照 + 每次上报当前的内存使用量事件的同时,会将当前top3申请的热点内存占用的代码堆栈作为一个快照事件上报。快照事件的主要内容如下: + + * name:事件名,它的值为 memory::Heap ,用于标识该快照事件类型 + * ts:事件上报的时间戳 + * pid:进程号 + * rgs.snapshot:当前申请的内存堆栈快照,保存了top3热点内存堆栈的信息。它是一个列表,列表中每一项的内容说明如下: + * trace:表示一个代码堆栈,函数之间使用分号(;)分隔。每个函数的组成如下 + func_name[flag](mod_name:func_offset) + 其中, + * func_name:函数名,若无法获取符号表,则使用 [unknown] 替代。 + * flag:标记函数所处的层次,当前支持的flag值为:k – 内核函数,u – 用户函数,p – python语言函数。 + * mod_name:函数所处的模块名,比如可执行文件或动态库的文件名(不包括文件所在的目录)。 + * func_offset:函数在模块中的偏移量。 + * current_allocs:表示该代码堆栈申请的内存大小 ## 快速开始 @@ -136,7 +321,7 @@ curl -X PUT http://localhost:9999/tprofiling -d json='{"cmd":{"probe": ["oncpu", 启动配置参数说明: - `"probe"` :指定了 Profiling 事件的监控范围。 - 当前支持如下6类事件,用户可根据需要开启一个或多个事件类型。 + 当前支持如下事件,用户可根据需要开启一个或多个事件类型。 - oncpu:线程oncpu事件和offcpu事件,以及offcpu的调用栈 - oncpu_sample:线程在占用cpu期间的调用栈采样事件 @@ -146,6 +331,7 @@ curl -X PUT http://localhost:9999/tprofiling -d json='{"cmd":{"probe": ["oncpu", - syscall_sched:线程在执行调度相关的系统调用事件,比如 sleep/epoll_wait - python_gc:python线程执行的gc事件 - pthread_sync:线程在执行 glibc 库中 pthread 相关的同步、锁事件,比如 pthread_mutex_lock/sem_wait + - mem_glibc:通过glibc相关API进行的内存申请释放事件 - `"snoopers"`:指定了要监控的应用程序范围。 根据应用的部署方式,我们支持通过Pod ID(`pod_id`)、容器ID(`container_id`)、进程ID(`proc_id`)、进程名(`proc_name`)的方式来指定待监控的应用程序。`snoopers` 相关的配置示例如下, diff --git a/src/probes/extends/python.probe/common/ipc.py b/src/probes/extends/python.probe/common/ipc.py index 4889c5a6ece506f94f10cc861ab99a1981f78ea6..b1f25e855b16ca3c28bad495b1f348a93bb27168 100644 --- a/src/probes/extends/python.probe/common/ipc.py +++ b/src/probes/extends/python.probe/common/ipc.py @@ -45,6 +45,7 @@ class ProbeParams(Structure): ("drops_count_thr", c_uint), ("kafka_port", c_uint), ("logs", c_char), + ("ringbuf_map_size", c_char), ("report_cport", c_char), ("metrics_flags", c_char), ("env_flags", c_char),