# sheep-java-cat **Repository Path**: sheep-cloud/sheep-java-cat ## Basic Information - **Project Name**: sheep-java-cat - **Description**: CAt 实时应用监控平台 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2021-07-29 - **Last Updated**: 2024-07-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 0. 学习目标 - 能够知道什么是CAT - 能够知道什么是CAT - 能够搭建CAT服务端环境 - 能够进行CAT客户端的集成 - 能够使用CAT监控界面进行服务监控 - 能够完成CAT和常用框架集成 - 了解CAT告警配置 - 了解CAT客户端和服务端原理 # 1. CAT入门 ## 1.1. 什么是调用链监控 ### 1.1.1. 架构的演进历史 **单体应用** ![](http://assets.processon.com/chart_image/60ae45c37d9c0878763b20df.png) 结构说明: ​ 全部功能集中在一个项目内(All in one) 在单体应用的年代, 分析线上问题主要靠日志以及系统级别的指标. ![](http://assets.processon.com/chart_image/60ae48cae0b34d384189df95.png) **微服务架构** ![](http://assets.processon.com/chart_image/60ae4a231e08531e9c86cdbd.png) 架构说明: ​ 将系统服务层完全独立出来, 抽取为一个一个的微服务. 当我们开始微服务架构之后, 服务变成分布式的了, 并且对服务进行了拆分. 当用户的一个请求进来, 会一次经过不同的服务节点进行处理, 处理完成后再返回结果给用户. 那么在整个处理的链条中, 如果有任何一个节点出现了延迟或者问题, 都有可能导致最终的结果出现异常, 有的时候不同的服务节点甚至是不同的团队开发的, 部署在不同的服务器上, 那么在这错综复杂的环境下, 我们想要排查出是链条中的具体哪个服务节点除了问题, 其实并不容易. ### 1.1.2. 调用链监控的需求 调用链监控是在微服务架构中非常重要的一环. 它除了能帮助我们定位问题以外, 还能帮助项目成员清晰的去了解项目部署结构, 毕竟一个几十上百的微服务, 相信在运行时间久了之后, 项目的结构会出现上述非常复杂的调用链, 在这种情况下, 团队开发者甚至是架构师都不一定能对项目的网络结构有很清晰的了解, 那就更别谈系统优化了. 这里我们会使用到**调用链监控工具**, 那么首先我们先对调用链监控工具提出我们的需求: 1. 线上的服务是否运行正常. 是不是有一些服务已经宕机了, 当时我们没有发现呢? 如何快速发现已经宕机的服务? 2. 来自用户的一笔调用失败了, 到底是哪个服务导致的错误, 我们需要能够快速定位到才能做到修复. 3. 用户反映, 我们的系统很"慢". 如何知道究竟慢在何处? 从上述问题可以看出, 微服务架构下, 如果没有一款强大的调用链监控工具, 势必会产生如下问题: - 处理问题不及时, 影响用户的体验 - 不同应用的负责人不承认是自己的问题导致失败, 容易出现"扯皮" - 服务之间的调用关系难以梳理, 可能会存在很多错误的调用关系 - 由于没有具体的数据, 团队成员对自己的应用性能不在意 ### 1.1.3. 调用链监控的原理 在2010年, google发表了一篇名为"Dapper, a Large-Scale Distributed Systems Tracing Infrastructure"(大型分布式系统跟踪基础结构)的论文, 在文中介绍了google生产环境大规模分布式系统下的跟踪系统Dapper的设计和使用经验. 而如今很多的调用链系统入zipkin/pinpoint等系统都是基于这篇文章而实现的. 接下来我们就简单的介绍一些Dapper中调用链监控的原理: ![](http://assets.processon.com/chart_image/60ae4efe1e08531e9c86dda6.png) 如上图所示, 这是一个查询订单的简单业务, 它有如下的步骤: 1. 前端浏览器发起请求到订单服务, 订单服务会从数据库中查询出对应的订单数据. 订单数据中包含了商品的ID, 所以还需要查询商品信息. 2. 订单服务发起一笔调用, 通过RPC的方式, 远程调用商品服务的查询商品信息接口. 3. 订单服务组装数据, 返回给前端. 这几个步骤中, 有几个核心概念需要了解: - **Trace**: - Trace是指一次请求链路的链路过程, trace id 是指这次请求调用的ID. - 在一次请求中, 会在网络的最开始生成一个全局唯一的用于标识此次请求的trace id, 这个trace id在这次请求调用过程中无论经过多少个节点都会保持不变, 并且在随着每一层的调用不停的传递. 最终, 可以通过trace id将这一次用户请求在系统中的路径全部串起来. - **Span**: - Span是指一个模块的调用过程, 一般用span id来标识. - 在一次请求的过程中会调用不同的节点/模块/服务, 每一次调用都会生成一个新的span id来记录. 这样, 就可以通过span id来定位当前请求在整个系统调用链中所处的位置, 以及它的上下游节点分别是什么. 那么回到上面的案例中, 查询订单数据和查询商品数据这两个过程, 就分别是两个span, 我们记为spanA和B. B的parent也就是父span就是A. 这两个span的拥有同一个Trace Id: 1. 并且在信息收集过程中, 会记录调用的开始时间, 结束时间, 从中计算出调用的耗时. 这样, 就可以清楚的知道, 每次调用: - 经过了哪几个服务以及服务的调用顺序 - 每个服务过程的耗时 ## 1.2. 什么是CAT CAT是由大众点评开源的一款调用链监控系统, 基于Java开发的. 有很多互联网企业在使用, 热度非常高. 它有一个非常强大和丰富的可视化报表界面, 这一点其实对于一款调用链监控系统来说非常的重要. 在CAT提供的报表界面中有非常多的功能, 几乎能看到你想要的任何维度的报表数据. 特点: 聚合报表丰富, 中文支持好, 国内案例多 国内案例: 携程, 点评, 陆金所等 - PinPoint - Pinpoint是由一个韩国团队实现并开源, 针对Java编写的大规模分布式系统涉及, 通过JavaAgent的机制做字节代码植入, 实现加入traceid和获取性能数据的目的, 对应用代码零侵入. - 优点: 支持多种插件, UI功能强大, 接入端无代码侵入 - 缺点: 收集数据较多, 导致整体性能较差. - 官网: https://github.com/pinpoint-apm/pinpoint - SkyWalking - SkyWalking是apache基金会下面的一个开源APM项目, 为微服务架构和云原生架构系统设计. 它通过探针自动收集所需的指标, 并进行分布式追踪. 通过这些调用链路以及指标, Skywalking APM会感知应用间关系和服务间关系, 并进行相应的指标统计. Skywalking支持链路追踪和监控应用组件基本涵盖主流框架和容器, 如国产RPC Dubbo和motan等, 国际化的spring boot, spring cloud. - 优点: 支持多种插件, UI功能较强, 接入端无代码侵入 - 官网: http://skywalking.apache.org/ - Zipkin - Zipkin是由Twitter开源, 是分布式链路调用监控系统, 聚合各业务系统调用延迟数据, 达到链路调用监控跟踪. Zipkin基于Google的Dapper论文实现, 主要完成数据的收集, 存储, 搜索与页面展示. - 优点: 轻量, 使用部署简单. 与spring cloud有良好集成. - 官网: https://zipkin.io/ ## 1.3. CAT报表介绍 CAT支持如下报表: | 报表名称 | 报表内容 | | --------------- | ------------------------------------------------------------ | | Transaction报表 | 一段代码的运行时间, 次数. 比如URL/cache/sql执行次数相应时间 | | Event报表 | 一段代码运行次数. 比如出现一次异常, 穿插一些日志 | | Problem报表 | 根据Transaction/Event数据分析出系统可能出现的异常, 包括访问较慢的程序等 | | Heartbeat报表 | JVM内部一些状态信息, Memory, Thread等 | | Business报表 | 业务指标等, 用户可以自己定制 | Transaction报表: ![](http://assets.processon.com/chart_image/60afa13bf346fb715d560488.png?_=1625145975514) Event报表: ![](http://assets.processon.com/chart_image/60afa22d7d9c0878763f6325.png?_=1623331728514) Problem报表: ![](http://assets.processon.com/chart_image/60afa288079129624592617e.png?_=1623332380900) Heartbeat报表: ![](http://assets.processon.com/chart_image/60afa2eaf346fb715d560969.png?_=1623374166101) Business报表: ![](http://assets.processon.com/chart_image/60ae55336376893238defe6c.png?_=1623374368330) # 2. CAT基础 ## 2.1. 下载与安装 ### 2.1.1. github源码下载 要安装CAT, 首先需要从github上下载最新版本的源码. 官方给出的建议如下 > - 注意cat的3.0代码分支更新都发布在master上, 包括最新文档也都是这个分支 > - 注意文档请用最新master里面的代码文档作为标准, 一些开源网站上面一些老版本的一些配置包括数据库等可能遇到不兼容情况, 请以master代码为准, 这份文档都是美团点评内部同学为这个版本统一整理汇总. 内部同学已经核对, 包括也验证过, 如果遇到一些看不懂, 或者模糊的地方, 欢迎提交PR. 所以本次学习将会使用master分支的3.0版本. CAT官方github地址: https://github.com/dianping/cat/tree/master ```sh cd D:\Develop\Workspaces\IdeaProjects\OpenSource\dianping # Fork git clone git@github.com:sheep-cloud/cat.git ``` ### 2.1.2. 模块介绍 > - cat-alarm: 实时告警, 提供报表指标的监控报警 > - cat-client: 客户端, 上报监控数据. 提供给业务以及中间层埋点的底层SDK > - cat-consumer: 服务端, 收集监控数据进行统计分析, 构建丰富的统计报表. 用于实时分析从客户端提供的数据 > - cat-core: 核心源码, 通用依赖 > - cat-hadoop: 数据存储, logview存储至Hdfs > - cat-home: 管理端, 报表展示, 配置管理等 ### 2.1.3. 服务端安装 #### 2.3.3.1. 环境要求 CAT服务端的环境要求如下: - Linux 2.6以及以上(2.6内核才可以支持epoll), 线上服务端部署请使用Linux环境, Mac以及Windows环境可以作为开发环境, 美团点评内部CentOS 6.5 - Java6, 7, 8, 服务端推荐使用jdk7版本, 客户端jdk6, 7, 8都支持 - Maven 3及以上 - MySQL 5.6 , 5.7, 更高版本MySQL都不建议使用, 不清楚兼容性 - J2EE容器建议使用tomcat, 建议使用推荐版本7.x或8.x - Hadoop环境可选, 一般建议规模较小的公司直接使用磁盘模式, 可以申请CAT服务端, 500GB磁盘或者更大磁盘, 这个磁盘挂载在/data/目录上 数据库安装 - 数据库的脚本文件 `script/CatApplication.sql` ```mysql # 1. 创建库 cat CREATE DATABASE IF NOT EXISTS cat; # 2. 修改库的字符集. 数据库编码使用 utf8mb4, 否则可能造成中文乱码等问题 ALTER DATABASE cat CHARACTER SET utf8mb4; ``` 应用打包 - 源码构建 1. 在cat的源码目录, 执行`mvn clean install -DskipTests` 2. 如果发现cat的war打包不通过, CAT所需依赖jar都部署在 http://unidal.org/nexus/ 3. 可以配置这个公有云的仓库地址到本地Maven配置(一般为`~/.m2/settings.xml`), 理论上不需要配置即可, 可以参考cat的pom.xml配置 ```xml central Maven2 Central Repository default https://repo1.maven.org/maven2 unidal.releases http://unidal.org/nexus/content/repositories/releases/ ``` - 官方下载 1. 如果自行打包有问题, 请使用下面链接进行下载: http://unidal.org/nexus/service/local/repositories/releases/content/com/dianping/cat/cat-home/3.0.0/cat-home-3.0.0.war 2. 官方的cat为master版本, 重命名为`cat.war`进行部署, 注意此war是用jdk8, 服务端请使用jdk8版本 #### 2.1.3.2. linux源码安装 - 程序对于`/data/`目录具体读写权限 1. 要求`/data/`目录能进行读写操作, 如果`/data/`目录不能写, 建议使用linux的软链接链接到一个固定可写的目录. 所有的客户端集成程序的机器以及CAT服务端机器都需要进行这个权限初始化. (可以通过公司运维工具统一处理) 2. 此目录会存一些CAT必要的配置文件以及运行时候的数据存储目录 3. CAT支持CAT_HOME环境变量, 可以通过JVM参数修改默认的路径 ```sh mkdir /data chmod -R 777 /data/ ``` - 配置`/data/appdatas/cat/client.xml($CAT_HOME/client.xml)` ```sh mkdir -p /data/appdatas/cat cd /data/appdatas/cat vim client.xml ``` ```xml ``` - 配置`/data/appdatas/cat/datasources.xml($CAT_HOME/datasources.xml)` ```sh vim datasources.xml ``` ```xml 3 1s 10m 1000 com.mysql.jdbc.Driver root 123456 ``` - **安装mysql** 虚拟机上以及使用docker安装了mysql, 直接启动即可. ```sh docker start mysql ``` - **安装tomcat** #### 2.1.3.3. windows源码安装 - 禁用虚拟网卡, 防止cat使用虚拟网卡的IP地址. - CAT支持配置环境变量: `CAT_HOME`, 内容是: `CAT的tomcat启动的盘符:\data\appdatas\cat` - 拷贝`client.xml`, `datasources.xml`文件到`/data/appdatas/cat/`目录 - `client.xml` ```xml ``` - `datasources.xml` ```xml 3 1s 10m 1000 com.mysql.jdbc.Driver root 123456 ``` - 在cat的源码目录, 执行`mvn clean install -DskipTests` - 修改中文乱码, `tomcat/conf/logging.properties`, 将`UTF-8`替换为`GBK` - 将`cat.war`放置到tomcat的`webapps`目录下, 然后执行`.startup.bat`启动tomcat ```sh cp D:\Develop\Repository\Maven\com\dianping\cat\cat-home\3.0.0\cat-home-3.0.0.war D:\Develop\Config\tomcat\apache-tomcat-8.5.65\webapps\cat.war cd D:\Develop\Config\tomcat\apache-tomcat-8.5.65\bin\ # 启动 .\startup.bat # 停止 .\shutdown.bat ``` - 访问页面: http://localhost:8080/cat/r - 点击 Configs, 账号密码admin/admin进行登录 ## 2.2. 客户端安装 ### 2.2.1. 简单案例 springboot-cat - 编写一个简单的springboot与Cat整合的案例, 首先创建一个Spring Boot的初始化工程. 只需要勾选web依赖即可. - 添加maven依赖 ```xml com.dianping.cat cat-client 3.0.0 ``` - **启动cat客户端前的准备工作** 以下所有文件, 如果在windows下, 需要创建在启动项目的盘符下. 1. 创建`/data/appdatas/cat`目录. 确保具有这个目录的读写权限 2. 创建`/data/applogs/cat`目录(可选). 这个目录用于存放运行时日志, 将会对调试提供很大帮助, 同样需要读写权限. 3. 创建`/data/appdatas/cat/client.xml`. 不要忘记把 **** 替换成自己的服务器地址 - 初始化 1. 在项目中创建`src/main/resources/META-INF/app.properties`文件, 并添加如下内容: ```ini # appkey 只能包含英文字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 和中划线 (-) app.name = {appkey} ``` ### 2.2.2. API介绍 #### 2.2.2.1. Transaction ##### 基本用法 Transaction 适合记录跨越系统边界的程序访问行为, 比如远程调用, 数据库调用, 也适合执行时间较长的业务逻辑监控, Transaction用来记录一段代码的执行时间和次数. 现在为我们的框架还没有与dubbo, mybatis集成, 所以我们通过手动编写一个本地方法, 来测试Transaction的用法, 创建TransactionController用于测试. ```java package cn.sheep.springbootcat.controller; import com.dianping.cat.Cat; import com.dianping.cat.message.Transaction; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * transaction 示例 * * @author yanglei */ @RestController @RequestMapping("transaction") public class TransactionController { /** * transaction test * * @return */ @RequestMapping("test") public String test() { // 开启第一个 Transaction. 类别为URL, 名称为test Transaction t = Cat.newTransaction("URL", "test"); try { String dubbo = this.dubbo(); System.out.println(dubbo); // 成功执行 Transaction t.setStatus(Transaction.SUCCESS); } catch (Exception e) { // 失败, 设置异常 t.setStatus(e); } finally { // 结束这一 Transaction t.complete(); } return "test"; } /** * 模拟dubbo方法 * * @return */ private String dubbo() { // 开启第二个 Transaction. 类别为DUBBO, 名称为dubbo Transaction t = Cat.newTransaction("DUBBO", "dubbo"); try { // 成功执行 Transaction t.setStatus(Transaction.SUCCESS); } catch (Exception e) { // 失败, 设置异常 t.setStatus(e); } finally { // 结束这一 Transaction t.complete(); } return "dubbo"; } } ``` ##### 扩展API CAT提供了一系列API来对Transaction进行修改. - `com.dianping.cat.message.Message#addData`: 添加额外的数据显示 - `com.dianping.cat.message.Message#setStatus`: 设置状态, 成功可以设置SUCCESS, 失败可以设置异常 - `com.dianping.cat.message.Transaction#setDurationInMillis`: 设置执行耗时(毫秒) - `com.dianping.cat.message.Message#setTimestamp`: 设置执行时间 - `com.dianping.cat.message.Message#complete`: 结束Transaction 编写如下代码进行测试: ```java @RequestMapping("api") public String api() { Transaction t = Cat.newTransaction("TAPI", "api"); try { // 修改执行耗时1秒 t.setDurationInMillis(1000L); // 设置开始时间 t.setTimestamp(System.currentTimeMillis()); // 添加额外的数据 t.addData("content"); int i = 1 / 0; // 成功执行 Transaction t.setStatus(Transaction.SUCCESS); } catch (Exception e) { // 失败, 设置异常 t.setStatus(e); } finally { // 结束这一 Transaction t.complete(); } return "api"; } ``` 启动项目, 访问接口: http://localhost:9000/springboot-cat/transaction/api 点击左侧菜单Transaction报表, 选中TAPI类型对应的LogView查看调用链关系. 如图所示, 调用耗时已经被手动修改成了1000ms, 并且添加了额外的信息content > 在使用Transaction API时, 可能需要注意以下几点: > > 1. 可以调用 `addData `多次, 添加的数据会被 `&` 连接起来 > 2. 不要忘记完成 transaction! 否则会得到一个毁坏的消息树以及内存泄漏! #### 2.2.2.1. Event Event 用来记录一件事发生的次数, 比如记录系统异常, 它和transaction相比缺少了时间的统计, 开销比transaction要小. ##### Cat.logEvent `com.dianping.cat.Cat#logEvent`: 记录一个事件 ```java Cat.logEvent("URL.Server", "serverIp", Event.SUCCESS, "ip=${serverIp}"); ``` ##### Cat.logError `com.dianping.cat.Cat#logError(java.lang.Throwable)`: 记录一个带有错误堆栈信息的 Error. Error 是一种特殊的事件, 它的 `type` 取决于传入的 `Throwable e`. 1. 如果 `e` 是一个 `Error`, `type` 会被设置为 `Error`. 2. 如果 `e` 是一个 `RuntimeException`, `type` 会被设置为 `RuntimeException`. 3. 其他情况下, `type` 会被设置为 `Exception`. 同时错误堆栈信息会被收集并写入 `data` 属性中. ```java try { int i = 1 / 0; } catch (Exception e) { Cat.logError(e); } ``` 可以向错误堆栈顶部添加自己的错误消息, 如下代码所示: ```java Cat.logError("error(X) := exception(X)", e); ``` #### 2.2.2.3. Metric Metric 用于记录业务指标, 指标可能包含对一个指标记录次数, 记录平均值, 记录总和, 业务指标最低统计粒度为1分钟. ```java # Counter Cat.logMetricForCount("metric.key"); Cat.logMetricForCount("metric.key", 3); # Duration Cat.logMetricForDuration("metric.key", 5); ``` 我们每秒会聚合 metric 举例来说, 如果你在同一秒调用 count 三次(相同的 name), 我们会累加他们的值, 并且一次性上报给服务端. 在 `duration` 的情况下, 我们用平均值来取代累加值. 编写案例测试上述API: ```java /** * metric 示例 * * @author yanglei */ @RestController @RequestMapping("metric") public class MetricController { /** * logMetricForCount test * * @return */ @RequestMapping("count") public String count() { // 累加数据 Cat.logMetricForCount("count"); return "count"; } /** * logMetricForCount test * * @return */ @RequestMapping("duration") public String duration() { // 求平均值 Cat.logMetricForDuration("duration", 1000); return "duration"; } } ``` 点击 Business 报表可以看到, count和duration的具体数值. ### 2.2.3. CAT监控界面介绍 #### 2.2.3.1. DashBoard **DashBoard仪表盘显示了每分钟出现错误的系统及其错误的次数和时间.** ![2021-06-10_202823.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grdg15jp1lj61gt0mq0u902.jpg) - 点击右上角的时间按钮可以切换不同的展示时间, -7d代表7天前, -1h代表1小时前, now定位到当前时间 - 上方时间轴按照分钟进行排布, 点击之后可以看到该时间到结束的异常情况 - 下方标识了出错的系统和出错的时间, 次数, 点击系统名称可以跳转到Problem报表 #### 2.2.3.2. Transaction **Transaction报表用来监控一段代码运行情况**: `运行次数, QPS, 错误次数, 失败率, 响应时间统计(平均影响时间, Tp分位值)等等`. 应用启动后默认会打点的部署: | 打点 | 来源组件 | 描述 | | ------ | ------------------- | ---------------------- | | System | cat-client | 上报监控数据的打点信息 | | URL | 需要接入 cat-filter | URL访问的打点信息 | 小时报表 Type统计界面展示了一个Transaction的第一层分类的视图, 可以知道这段时间里面一个分类运行的次数, 平均响应时间, 延迟, 以及分位线. ![2021-06-10_204835.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grdgm69u7uj61gz0pg76c02.jpg) **从上而下分析报表**: 1. **报表的时间跨度** - CAT默认是以一小时为统计时间跨度, 点击[切到历史模式], 更改查看报表的时间跨度; - 默认是小时模式; - 切换历史模式后, 右侧快速导航, 变为month(月报表), week(周报表), day(天报表), 可以点击进行查看, 注意报表的时间跨度会有所不同. 2. **时间选择** - 通过右上角时间导航栏选择时间: 点击[+1h]/[-1h]切换时间为下一小时/上一小时; - 点击[+1d]/[-1d]切换时间为后一天的同一小时/前一天的同一小时; - 点击右上角[+7d]/[-7d]切换时间为后一周的同一小时/前一周的同一小时; - 点击[now]回到当前小时. 3. **项目选择** - 输入项目名, 查看项目数据; - 如果需要切换其他项目, 输入项目名, 回车即可. 4. **机器分组** - CAT可以将若干个机器, 作为一个分组进行数据统计. - 默认会有一个ALL分组, 代表所有机器的统计数据, 即集群统计数据. 5. **所有Type汇总表格** - 第一层分类(Type), 点击查看第二级分类(称为name)的数据 - Transaction的埋点的Type和Name由业务自己定义, 当打点了Cat.newTransaction(type, name)时, 第一层分类是type, 第二级分类是name. - 第二级分类数据是统计相同type下的所有name的数据, 数据均与第一级(type)一样的展示风格. 6. **单个Type指标图表** - 点击show, 查看Type所有name分钟级统计, 如下图 ![2021-06-10_205004.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grdgnk6i64j61gw0pqq6002.jpg) 7. **指标说明** - 显示的是小时粒度第一级分类(type)的次数, 错误书, 失败率等数据. 8. **样本Log View** - L代表logview, 为一个样例的调用链路 9. **分位线说明** - 小时粒度的时间第一级分类(type)相关统计 - 95Line标识95%的请求的响应时间比参考值要小. 99Line标识99%的响应时间比参考值要小, 95Line以及99Line, 也称之为tp95, tp99. 10. **历史报表** - Transaction历史报表支持每天, 每周, 每月的数据统计以及趋势图, 点击导航栏的切换历史模式进行查询. Transaction历史报表以响应时间, 访问量, 错误量三个维度进行展示, 以天为例: 选取一个type, 点击show, 即可查看天报表. #### 2.2.3.3. Event **Event报表监控一段代码运行次数**: `例如记录程序中一个事件记录了多少次, 错误了多少次.` **Event报表的整体结构和Transaction报表几乎一样, 只缺少响应时间的统计.** 1. **第一级分类(Type)统计界面** Type统计页面展示了一个Event的第一层分类的视图, Event相对于Transaction少了运行时间统计. 可以知道这段时间里面一个分类运行的次数, 失败次数, 失败率, 采样logview, QPS. ![2021-06-10_212531.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grdhoqcrz8j61gu0on40a02.jpg) 2. **第二级分类(Name)统计页面** ![2021-06-10_212737.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grdhqmgpofj61gq0p776k02.jpg) #### 2.2.3.4. Problem Problem记录整个项目在运行过程中出现的问题, 包括一些异常, 错误, 访问较长的行为. Problem报表是由logview存在的特征整合而成, 方便用户定位问题. 来源: 1. 业务代码显式调用 `Cat.Error(e)` API进行埋点, 具体埋点说明可查看埋点文档. 2. 与LOG框架集成, 会捕获log日志中有异常堆栈的exception日志. 3. Long-url: 表示Transaction打点URL的慢请求 4. Long-sql: 表示Transaction打点SQL的慢请求 5. Long-service: 表示Transaction打点Service或者PigeonService的慢请求 6. Long-call: 表示Transaction打点Call或者PigeonCall的慢请求 7. Long-cache: 表示Transaction打点Cache.开头的慢请求 **所有错误汇总报表**: 第一层分类(Type), 代表错误类型, 比如error, long-url等; 第二级分类(称为Status), 对应具体的错误, 比如一个异常类名等. ![2021-06-10_213911.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grdi2n0sufj61gt0nxq5402.jpg) **错误数分布**: 点击type和status的show, 分别展示type和status的分钟级错误数分布. #### 2.2.3.5. HeartBeat HeartBeat报表是CAT客户端, 以一分钟为周期, 定期向服务端汇报当前运行时候的一些状态. **JVM相关指标** 以下所有的指标统计都是1分钟内的值, cat最低统计粒度是一分钟 ![2021-06-11_091524.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gre271nxkbj61gq0lbwh502.jpg) | GC Info / JVM GC 相关指标 | 描述 | | ------------------------------ | ------------------------ | | NewGc Count / PS ScavengeCount | 新生代GC次数 | | NewGc Time / PS ScavengeTime | 新生代GC耗时 | | OldGcCount | 老年代GC耗时 | | PS MarkSweepTime | 老年代GC耗时 | | Heap Usage | Java虚拟机堆的使用情况 | | None Heap Usage | Java虚拟机Perm的使用情况 | | FrameworkThread Info / JVM Thread 相关指标 | 描述 | | ------------------------------------------ | ----------------------- | | Active Thread | 系统当前活动线程 | | Daemon Thread | 系统后台线程 | | Total Started Thread | 系统总共开启线程 | | Started Thread | 系统每分钟新启动的线程 | | CAT Started Thread | 系统中CAT客户端启动线程 | 可以参考`java.lang.management.ThreadInfo`的定义 **系统指标** | System Info / System 相关指标 | 描述 | | ----------------------------- | ------------------ | | System LoadAverage | 系统Load详细信息 | | Memory Free | 系统momoryFree情况 | | FreePhysicalMemory | 物理内存剩余情况 | | /Free | /根的使用情况 | | /data Free | /data盘的使用情况 | #### 2.2.3.6. Business **Business报表对应着业务指标, 比如订单指标**. 与Transaction, Event, Problem不同, Business更偏向于宏观上的指标, 另外三者偏向于微观代码的执行情况. ![2021-06-11_091802.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gre2ah3n72j61gv0ow0ua02.jpg) 场景示例: 1. 我想监控订单数量 2. 我想监控订单耗时 **基线值**: 基线是对业务指标的预测值 - 基线生成算法 最近一个月的4个每周几的数据加权求和平均计算得出, 秉着更加信任新数据的原则, cat会基于历史数据做异常点的修正, 会把一些明显高于以及低于平均值的点剔除. 举例: 今天是2018-10-25(周四), 今天整天基数数据的算法是最近四个周四(2018-10-18, 2018-10-11, 2018-10-04, 2018-09-27)的每个分钟数据的加权求和或平均, 权重值依次为1, 2, 3, 4. 如: 当前时间为19:56分设为value, 前四周对应的19:56分数据(由远及近)分别为A, B, C, D, 则 `value = (A + 2B + 3C + 4D) / 10`. 对于刚上线的应用, 第一天没有基线, 第二天的基线是前一天的数据, 以此类推. - 如何开启基线 只有配置了基线告警的指标, 才会自动计算基线. 如需基线功能, 请配置基线告警 - 注意事项 > 1. 打点尽量用纯英文, 不要带一些特殊符号, 例如: 空格( ), 分号(;), 竖线(|), 斜线(/), 逗号(,), 与号(&), 星号(*), 左右尖括号(<>), 以及一些奇奇怪怪的字符 > 2. 如果有分隔需求, 建议用下划线(_), 中划线(-), 英文点号(.)等 > 3. 由于数据库不区分大小写, 请尽量统一大小写, 并且不要对大小写进行改动 > 4. 有可能出现小数, 趋势图的每个点都代表一分钟的值. 假设监控区间是10分钟, 且10分钟内总共上报15次, 趋势图中该点的值为 `5 % 10 = 0.5` #### 2.2.3.7. State **State报表显示了与CAT相关的信息** ![2021-06-12_103826.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grfa8hbu7uj61gw0mxq6t02.jpg) # 3. Cat高级 ## 3.1. 框架集成 ### 3.1.1. Dubbo #### 3.1.1.1. 制作cat-dubbo插件 使用idea打开cat源码, 找到integration目录, 1. 在`dubbo[cat-monitor]`项目修改pom.xml文件, `cat-client`版本号修改为`3.0.0` 2. 把`dubbo[cat-monitor]`添加到maven里, `右键点击pom -> Add as Maven Project` 3. 使用`install`命令将插件安装到本地仓库. 4. 使用如下依赖自动引入dubbo插件 ```xml net.dubboclub cat-monitor 0.0.6 ``` #### 3.1.1.2. 服务提供方 dubbo-provider-cat **如何发布dubbo服务** - 添加依赖 ```xml com.alibaba.spring.boot dubbo-spring-boot-starter 2.0.0 net.dubboclub cat-monitor 0.0.6 ``` > 这里直接使用了 `dubbo-spring-boot-starter` 这亦dubbo与spring-boot集成的组件. > > 官方文档地址: https://github.com/alibaba/dubbo-spring-boot-starter/blob/master/README_zh.md - 在 `application.properties` 添加dubbo的相关配置信息 ```properties server.port = 7072 server.servlet.context-path = /dubbo-provider-cat spring.application.name = dubbo_provider_cat spring.dubbo.server = true spring.dubbo.registry = N/A ``` 注: 这个配置只针对服务提供端, 消费端不用指定协议, 它自己会根据服务端的地址信息和`@Reference`注解去解析协议 - 在 `Spring Boot Application` 上添加 `@EnableDubboConfiguration`, 表示要开启dubbo功能 - 编写dubbo服务, 只需要添加要发布的服务实现上添加 `@Service (import com.alibaba.dubbo.config.annotation.Service)` 注解, 其中interfaceClass是要发布服务的接口. #### 3.1.1.3. 服务消费方 dubbo-consumer-cat **如何消费dubbo服务** - 添加依赖 ```xml com.alibaba.spring.boot dubbo-spring-boot-starter 2.0.0 net.dubboclub cat-monitor 0.0.6 ``` - 在 `application.properties` 添加dubbo的相关配置信息 ```properties server.port = 7074 server.servlet.context-path = /dubbo-consumer-cat spring.application.name = dubbo_consumer_cat ``` - 开启 `@EnableDubboConfiguration` - 通过 `@Reference (import com.alibaba.dubbo.config.annotation.Reference)` 注入需要使用的 interface #### 3.1.1.4. 测试 **按照如下顺序启动相关应用** 1. 启动 dubbo-provider-cat 2. 启动 dubbo-consumer-cat 3. 访问地址: http://localhost:7074/dubbo-consumer-cat/test/hello 4. 查看cat页面, 点击 `PigeonCall` ![2021-06-17_215656.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grllx87l4qj61gs0kpdic02.jpg) 如图所示dubbo的调用已经被正确显示在transaction报表中. 点击 Log View 查看详细的调用 ![2021-06-17_215753.png](http://ww1.sinaimg.cn/large/0069yeMZgy1grlly956znj61gv0dagmn02.jpg) 如图所示, 调用的日志已经被成功打印. > dubbo插件的日志打印内容显示的并不是十分良好, 如果在企业应用中, 可以基于dubbo插件进行二次开发. ### 3.1.2. Mybatis #### 3.1.2.1. 创建springboot和mybatis的集成项目 - 表结构 ```mysql USE sheep; DROP TABLE IF EXISTS sheep.cat_t_user; CREATE TABLE sheep.cat_t_user ( id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键', user_name VARCHAR(32) DEFAULT NULL COMMENT '用户名', password VARCHAR(32) DEFAULT 0 COMMENT '密码', create_time DATETIME NOT NULL COMMENT '创建时间', update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (id) ) COMMENT '用户表'; INSERT INTO sheep.cat_t_user (id, user_name, password, create_time) VALUES (1, 'jack', '123456', now()); ``` - pom.xml ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.13.RELEASE cn.sheep mybatis-cat 0.0.1-SNAPSHOT mybatis-cat Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.mybatis.spring.boot mybatis-spring-boot-starter 2.1.2 mysql mysql-connector-java runtime 5.1.49 com.alibaba druid 1.2.6 cn.hutool hutool-all 5.7.3 org.projectlombok lombok 1.16.18 com.dianping.cat cat-client 3.0.0 org.springframework.boot spring-boot-maven-plugin ``` - application.yml ```yaml server: port: 7079 servlet: context-path: /mybatis-cat # datasource spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/sheep?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false username: root password: 123456 type: com.alibaba.druid.pool.DruidDataSource # mybatis mybatis: config-location: classpath:mybatis/mybatis-config.xml mapper-locations: - classpath:mybatis/mapper/**/*.xml ``` - 启动类 ```java package cn.sheep.mybatis.cat; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * SpringBoot 启动类 * * @author sheep */ @MapperScan("cn.sheep.mybatis.cat.dao") @SpringBootApplication public class MybatisCatApplication { public static void main(String[] args) { SpringApplication.run(MybatisCatApplication.class, args); } } ``` #### 3.1.2.2. 集成cat-mybatis插件 - cat-mybatis插件 ```java package cn.sheep.mybatis.cat.catmonitor; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ClassUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.druid.pool.DruidDataSource; import com.dianping.cat.Cat; import com.dianping.cat.message.Message; import com.dianping.cat.message.Transaction; import org.apache.ibatis.datasource.pooled.PooledDataSource; import org.apache.ibatis.datasource.unpooled.UnpooledDataSource; import org.apache.ibatis.executor.Executor; import org.apache.ibatis.mapping.BoundSql; import org.apache.ibatis.mapping.Environment; import org.apache.ibatis.mapping.MappedStatement; import org.apache.ibatis.mapping.ParameterMapping; import org.apache.ibatis.plugin.*; import org.apache.ibatis.reflection.MetaObject; import org.apache.ibatis.session.Configuration; import org.apache.ibatis.session.ResultHandler; import org.apache.ibatis.session.RowBounds; import org.apache.ibatis.type.TypeHandlerRegistry; import javax.sql.DataSource; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.text.DateFormat; import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * Cat-Mybatis 插件 *
   *     1. Cat-Mybatis plugin:  Rewrite on the version of Steven;
   *     2. Support DruidDataSource,PooledDataSource(mybatis Self-contained data source);
   * 
* * @author sheep */ @Intercepts({ @Signature(method = "query", type = Executor.class, args = { MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class }), @Signature(method = "update", type = Executor.class, args = { MappedStatement.class, Object.class }) }) public class CatMybatisPlugin implements Interceptor { private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\?"); private static final String MYSQL_DEFAULT_URL = "jdbc:mysql://UUUUUKnown:3306/%s?useUnicode=true"; public static final String SQL = "SQL"; public static final String SQL_METHOD = "SQL.Method"; public static final String SQL_DATABASE = "SQL.Database"; /** * 拦截. 拦截目标对象的目标方法的执行 * * @param invocation * @return * @throws Throwable */ @Override public Object intercept(Invocation invocation) throws Throwable { MappedStatement mappedStatement = this.getStatement(invocation); String methodName = this.getMethodName(mappedStatement); // 开启一个 Transaction. 类别为SQL, 名称为SQL方法名 Transaction t = Cat.newTransaction(SQL, methodName); // 记录一个 Event. 事件类型: SQL.Method; 事件名称: SQL方法类型; 状态: 成功; 信息: sql String sqlCommandName = mappedStatement.getSqlCommandType() .name() .toLowerCase(); String sql = this.getSql(invocation, mappedStatement); Cat.logEvent(SQL_METHOD, sqlCommandName, Message.SUCCESS, sql); // 记录一个 Event. 事件类型: SQL.Database; 事件名称: jdbcUrl Cat.logEvent(SQL_DATABASE, this.getSQLDatabaseUrlByStatement(mappedStatement)); return doFinish(invocation, t); } /** * 获取 MappedStatement * * @param invocation * @return */ private MappedStatement getStatement(Invocation invocation) { Object[] args = invocation.getArgs(); return (MappedStatement)args[0]; } /** * 获取SQL 方法名. UserMapper.findById * * @param mappedStatement * @return */ private String getMethodName(MappedStatement mappedStatement) { // 获取sqlId. 例: cn.sheep.mybatis.cat.dao.UserMapper.findById String mappedStatementId = mappedStatement.getId(); // sqlId最后两位. 例: UserMapper.findById String[] strArr = mappedStatementId.split("\\."); int strLen = strArr.length; return StrUtil.format("{}.{}", strArr[strLen - 2], strArr[strLen - 1]); } /** * 获取sql * * @param invocation * @param mappedStatement * @return */ private String getSql(Invocation invocation, MappedStatement mappedStatement) { Object parameter = null; Object[] args = invocation.getArgs(); if (ArrayUtil.length(args) > 1) { parameter = args[1]; } BoundSql boundSql = mappedStatement.getBoundSql(parameter); Configuration configuration = mappedStatement.getConfiguration(); return sqlResolve(configuration, boundSql); } private Object doFinish(Invocation invocation, Transaction t) throws InvocationTargetException, IllegalAccessException { Object returnObj; try { // 执行目标方法; 返回执行后的返回值 returnObj = invocation.proceed(); t.setStatus(Transaction.SUCCESS); } catch (Exception e) { Cat.logError(e); throw e; } finally { t.complete(); } return returnObj; } /** * 获取 jdbcUrl * * @param mappedStatement * @return */ private String getSQLDatabaseUrlByStatement(MappedStatement mappedStatement) { String url; DataSource dataSource = null; try { Configuration configuration = mappedStatement.getConfiguration(); Environment environment = configuration.getEnvironment(); dataSource = environment.getDataSource(); url = this.switchDataSource(dataSource); return url; } catch (NoSuchFieldException | IllegalAccessException | NullPointerException e) { Cat.logError(e); } Cat.logError(new Exception("UnSupport type of DataSource : " + ClassUtil.getClassName(dataSource, false))); return MYSQL_DEFAULT_URL; } /** * 根据连接池获取 jdbcUrl * * @param dataSource * @return * @throws NoSuchFieldException * @throws IllegalAccessException */ private String switchDataSource(DataSource dataSource) throws NoSuchFieldException, IllegalAccessException { String url = null; if (dataSource instanceof DruidDataSource) { url = ((DruidDataSource)dataSource).getUrl(); } else if (dataSource instanceof PooledDataSource) { Class dataSourceClass = dataSource.getClass(); Field dataSource1 = dataSourceClass.getDeclaredField("dataSource"); dataSource1.setAccessible(true); UnpooledDataSource dataSource2 = (UnpooledDataSource)dataSource1.get(dataSource); url = dataSource2.getUrl(); } else { // 其他数据源 扩展 } return url; } /** * SQL 解析. 将?替换为实际值 * * @param configuration * @param boundSql * @return */ public String sqlResolve(Configuration configuration, BoundSql boundSql) { Object parameterObject = boundSql.getParameterObject(); List parameterMappings = boundSql.getParameterMappings(); String sql = boundSql.getSql() .replaceAll("[\\s]+", " "); if (parameterMappings.size() > 0 && parameterObject != null) { TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry(); if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) { sql = sql.replaceFirst("\\?", Matcher.quoteReplacement(this.resolveParameterValue(parameterObject))); } else { MetaObject metaObject = configuration.newMetaObject(parameterObject); Matcher matcher = PARAMETER_PATTERN.matcher(sql); StringBuffer sqlBuffer = new StringBuffer(); for (ParameterMapping parameterMapping : parameterMappings) { String propertyName = parameterMapping.getProperty(); Object obj = null; if (metaObject.hasGetter(propertyName)) { obj = metaObject.getValue(propertyName); } else if (boundSql.hasAdditionalParameter(propertyName)) { obj = boundSql.getAdditionalParameter(propertyName); } if (matcher.find()) { matcher.appendReplacement(sqlBuffer, Matcher.quoteReplacement(resolveParameterValue(obj))); } } matcher.appendTail(sqlBuffer); sql = sqlBuffer.toString(); } } return sql; } private String resolveParameterValue(Object obj) { String value; if (obj instanceof String) { value = StrUtil.format("'{}'", obj.toString()); } else if (obj instanceof Date) { value = StrUtil.format("'{}'", DateFormat.getDateTimeInstance(DateFormat.DEFAULT, DateFormat.DEFAULT, Locale.CHINA).format((Date)obj)); } else { value = obj == null ? "" : obj.toString(); } return value; } /** * 包装目标对象 * * @param target * @return */ @Override public Object plugin(Object target) { if (target instanceof Executor) { // 借助Plugin.wrap(target, interceptor) 方法包装目标对象; 返回为当前target创建的动态代理对象 return Plugin.wrap(target, this); } return target; } /** * 设置属性. 将插件注册时的property属性设置进来 * * @param properties */ @Override public void setProperties(Properties properties) { } } ``` > 将此文件和所有其他cat插件一同打包放到私有仓库上是一种更好的选择. - mybatis-config.xml ```xml ``` #### 3.1.2.3. 测试 - 访问接口 http://localhost:7079/mybatis-cat/user/findAll ### 3.1.3. Log Cat集成日志框架的思路大体上都类似, 采用SpringBoot默认的logback日志框架进行演示, 如果使用了log4j, log4j2处理方式也是类似的. #### 3.1.3.1. 搭建环境 搭建Spring Boot初始化环境, 只需要添加Web依赖. - pom.xml ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.13.RELEASE cn.sheep logback-cat 0.0.1-SNAPSHOT logback-cat Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test com.dianping.cat cat-client 3.0.0 org.springframework.boot spring-boot-maven-plugin ``` - application.yml ```yaml server: port: 7080 servlet: context-path: /logback-cat logging: path: D:\data\applogs\logback-cat ``` - logback-spring.xml ```xml cat %yellow(%d{yyyy-MM-dd HH:mm:ss}) %red([%thread]) %highlight(%-5level) %cyan(%logger{50}) - %magenta(%msg) %n UTF-8 ERROR DENY ACCEPT %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n UTF-8 ${logging.path}/cat.info.%d{yyyy-MM-dd}.log 90 1GB ERROR %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n ${logging.path}/cat.error.%d{yyyy-MM-dd}.log 90 ``` #### 3.1.3.2. 集成cat-logback插件 - cat-logback插件 ```java package cn.sheep.logback.cat.catmonitor; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ThrowableProxy; import ch.qos.logback.core.AppenderBase; import ch.qos.logback.core.LogbackException; import com.dianping.cat.Cat; import com.dianping.cat.message.Message; import com.dianping.cat.message.spi.MessageManager; import java.io.PrintWriter; import java.io.StringWriter; /** * LogbackCat 适配器 * * @author sheep */ public class CatLogbackAppender extends AppenderBase { public static final String LOGBACK = "Logback"; @Override protected void append(ILoggingEvent event) { try { MessageManager manager = Cat.getManager(); // 检查是否启用或禁用 CAT 跟踪模式 boolean isTraceMode = manager.isTraceMode(); Level level = event.getLevel(); if (level.isGreaterOrEqual(Level.ERROR)) { this.logError(event); } else if (isTraceMode) { this.logTrace(event); } } catch (Exception ex) { throw new LogbackException(event.getFormattedMessage(), ex); } } private void logError(ILoggingEvent event) { ThrowableProxy info = (ThrowableProxy)event.getThrowableProxy(); if (info != null) { Throwable exception = info.getThrowable(); Object message = event.getFormattedMessage(); if (message != null) { Cat.logError(String.valueOf(message), exception); } else { Cat.logError(exception); } } } private void logTrace(ILoggingEvent event) { String name = event.getLevel().toString(); Object message = event.getFormattedMessage(); String data; if (message instanceof Throwable) { data = buildExceptionStack((Throwable)message); } else { data = event.getFormattedMessage(); } ThrowableProxy info = (ThrowableProxy)event.getThrowableProxy(); if (info != null) { data = data + '\n' + buildExceptionStack(info.getThrowable()); } Cat.logTrace(LOGBACK, name, Message.SUCCESS, data); } private String buildExceptionStack(Throwable exception) { if (exception == null) { return ""; } StringWriter writer = new StringWriter(2048); exception.printStackTrace(new PrintWriter(writer)); return writer.toString(); } } ``` - 修改 logback-spring.xml ```xml ``` - LogbackController ```java package cn.sheep.logback.cat.controller; import com.dianping.cat.Cat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * TestController * * @author yanglei */ @RestController @RequestMapping("logback") public class LogbackController { private Logger log = LoggerFactory.getLogger(LogbackController.class); /** * test * * @return */ @GetMapping("test") public String test() { // 设置 CAT 跟踪模式。 Cat.getManager().setTraceMode(true); log.info("cat info"); try { int i = 1 / 0; } catch (Exception e) { log.error("cat error", e); } return "logback"; } /** * test2 * * @return */ @GetMapping("test2") public String test2() { // 设置 CAT 跟踪模式。 Cat.getManager().setTraceMode(true); log.info("cat info"); log.debug("cat debug"); log.warn("cat warn"); log.error("cat error"); return "logback2"; } } ``` #### 3.1.3.3. 测试 - 访问接口: http://localhost:7080/logback-cat/logback/test ### 3.1.4. SpringBoot #### 3.1.4.1. 搭建环境 SpringBoot的集成方式相对比较简单, 使用已经搭建完的MyBatis框架来进行测试 - 添加如下配置到config包中 ```java package cn.sheep.mybatis.cat.catmonitor; import com.dianping.cat.servlet.CatFilter; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * SpringBoot cat 插件 * * @author sheep */ @Configuration public class CatFilterConfigure { /** * 注册过滤器 * * @return */ @Bean public FilterRegistrationBean catFilter() { FilterRegistrationBean registration = new FilterRegistrationBean(); CatFilter filter = new CatFilter(); registration.setFilter(filter); registration.addUrlPatterns("/*"); // 设置此注册的名称。 如果未指定,将使用 bean 名称 registration.setName("cat-filter"); registration.setOrder(1); return registration; } } ``` #### 3.1.4.2. 测试 - 访问接口 http://localhost:7079/mybatis-cat/user/findAll ![2021-07-02_213854.png](http://ww1.sinaimg.cn/large/0069yeMZly1gs2xq96tlzj31cw06twf2.jpg) - 途中的调用线经过了controller, 所以打印出了相关信息 - /mybatis-cat/user/findAll: 接口地址 - URL.Server: 服务器, 浏览器等相关信息 - URL.Method: 调用方法(GET, POST等)和URL ### 3.1.5. Spring Aop 使用Spring Aop技术可以简化埋点操作, 通过添加统一注解的方式, 使得指定方法能被CAT监控起来. #### 3.1.5.1. 搭建环境 创建基于SpringBoot的springaop-cat项目 - pom.xml ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.13.RELEASE cn.sheep springaop-cat 0.0.1-SNAPSHOT springaop-cat Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-aop cn.hutool hutool-all 5.7.3 com.dianping.cat cat-client 3.0.0 org.springframework.boot spring-boot-maven-plugin ``` - 创建AOP接口 ```java package cn.sheep.springaop.cat.catmonitor; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.Target; import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * Cat 注解 * * @author sheep */ @Retention(RUNTIME) @Target(ElementType.METHOD) public @interface CatAnnotation {} ``` - 创建AOP处理类 ```java package cn.sheep.springaop.cat.catmonitor; import cn.hutool.core.util.ClassUtil; import cn.hutool.core.util.StrUtil; import com.dianping.cat.Cat; import com.dianping.cat.message.Transaction; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; /** * Aop 切面类. 用于cat埋点 * * @author sheep */ @Component @Aspect public class CatAopService { public static final String AOP_METHOD = "Aop.Method"; @Around(value = "@annotation(CatAnnotation)") public Object aroundMethod(ProceedingJoinPoint pjp) throws Throwable { MethodSignature joinPointObject = (MethodSignature)pjp.getSignature(); Method method = joinPointObject.getMethod(); // 开启一个 Transaction. 类别: Aop.Method, 名称: 类名.方法名 String name = StrUtil.format("{}.{}", ClassUtil.getClassName(method.getDeclaringClass(), true), method.getName()); Transaction t = Cat.newTransaction(AOP_METHOD, name); try { Object res = pjp.proceed(); t.setSuccessStatus(); return res; } catch (Throwable e) { t.setStatus(e); Cat.logError(e); throw e; } finally { t.complete(); } } } ``` > 可自定义AOP实现类集成 `net.sf.cglib.proxy.MethodInterceptor`, 实现该逻辑更加优雅. - 在方法上添加 `CatAnnotation`, 就可以被cat监控起来. #### 3.1.5.2. 测试 - 访问接口: http://localhost:7081/aop/test - 查看cat的transaction报表 ### 3.1.6. Spring MVC Spring MVC的集成方式, 官方提供的是使用AOP来进行集成, 源码如下: - AOP接口 ```java import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface CatTransaction { String type() default "Handler";//"URL MVC Service SQL" is reserved for Cat Transaction Type String name() default ""; } ``` - AOP处理代码 ```java @Around("@annotation(catTransaction)") public Object catTransactionProcess(ProceedingJoinPoint pjp, CatTransaction catTransaction) throws Throwable { String transName = pjp.getSignature().getDeclaringType().getSimpleName() + "." + pjp.getSignature().getName(); if (StringUtils.isNotBlank(catTransaction.name())) { transName = catTransaction.name(); } Transaction t = Cat.newTransaction(catTransaction.type(), transName); try { Object result = pjp.proceed(); t.setStatus(Transaction.SUCCESS); return result; } catch (Throwable e) { t.setStatus(e); throw e; } finally { t.complete(); } } ``` 这部分与Spirng Aop处理方式基本一样 ## 3.2. 告警配置 cat提供晚上的告警功能. 合理, 灵活的监控规则可以帮助更快, 更精确的发现业务线上故障. ### 3.2.1. 告警通用配置 ##### 告警服务器配置 - 只有配置为告警服务器的机器, 才会执行告警逻辑; 只有配置为发送服务器的机器, 才会发送告警. - 进入`Configs -> 全局系统配置 -> 服务端配置`, 修改服务器类型, 告警配置. 如图所示 ![2021-07-03_115545.png](http://ww1.sinaimg.cn/large/0069yeMZly1gs3mgspexdj31hc0swgqx.jpg) ##### 告警策略 - 配置某种告警类型, 某个项目, 某个错误级别, 对应的告警发送渠道, 以及暂停事件. ![2021-07-03_120049.png](http://ww1.sinaimg.cn/large/0069yeMZly1gs3mlzp6qyj30nn0f1gnl.jpg) - 举例: 下述配置示例, 说明对于Transaction告警, 当告警项目名为mybatis-cat: - 当告警级别为error时, 发送渠道为邮件, 短信, 微信, 连续告警之间的间隔为5分钟 - 当告警级别为warning时, 发送渠道为邮件, 微信, 连续告警之间的间隔为10分钟 ###### 配置示例 ```xml ``` ###### 配置说明 - type id属性: 告警的类型, 可选: Transaction, Event, Business, Heartbeat - group id属性: group可以为default, 代表默认, 即所有项目; 也可以为项目名, 代表某个项目的策略, 此时default策略不会生效 - level id属性: 错误级别, 分为warning代表警告, error代表错误 - level send属性: 告警渠道, 分为mail-邮箱, weixin-微信, sms-短信 - level suspendMinute属性: 连续告警的暂停时间 ##### 告警接收人 - 告警接收人, 为告警所属项目的联系人 - 项目组邮件: 项目负责人邮件, 或项目组产品线邮件, 多个邮箱由英文逗号分隔, 不要留有空格; 作为发送告警邮件, 微信的依据 - 项目组号码: 项目负责人手机号, 多个号码由英文逗号分隔, 不要留有空格; 作为发送告警短信的依据 ![2021-07-17_150142.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsjyj9tsc2j60mc0ghgv702.jpg) ##### 告警服务端 - 告警发送中心的配置. (什么是告警发送中心: 提供发送短信, 邮件, 微信功能, 且提供http api的服务) - cat在生成告警时, 调用告警发送中心的http接口发送告警. cat资深并不集成告警发送中心, 请自己搭建告警发送中心. ![2021-07-17_150657.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsjynx7wd9j60vz0dgwmp02.jpg) ###### 配置示例 ```xml ``` ###### 配置说明 - sender id属性: 告警的类型, 可选: mail, sms, weixin - sender url属性: 告警中心的URL - sender batchSend属性: 是否支持批量发送告警信息 - par: 告警中心所需的http参数. ${argument}代表构建告警对象时, 附带的动态参数; 此处需要根据告警发送中心的需求, 将动态参数假如到代码AlertEntity中的m_paras ### 3.2.2. 告警规则 目前cat的监控规则有5个要素: 1. 告警时间段: 同一项业务指标在每天不同的时段可能有不同的趋势. 设定该项, 可让cat在每天不同的时间段执行不同的监控规则. 注意: 告警时间段, 不是监控数据的时间段, 只是告警从这一刻开始进行检查数据 2. 规则组合. 在一个时间段中, 可能指标触发了多个监控规则中的一个规则就要发出警报, 也有可能指标要同时触发了多个监控规则才需要发出警报. 3. 监控规则类型. 通过以下6种类型对指标进行监控: 最大值, 最小值, 波动上升百分比, 波动下降百分比, 总和最大值, 总和最小值 4. 监控最近分钟数. 设定时间后(单位为分钟), 当指标在设定的最近的时间长度内连续触发了监控规则, 才会发出警报. 比如最近分钟数为3, 表明连续3分钟的数组都满足条件才告警. 如果分钟数为1, 表示最近的一分钟满足条件就告警 5. 规则与被监控指标的匹配. 监控规则可以按照名称, 正则表达式与监控的对象(指标)进行匹配 ##### 子条件类型 有6种类型. 子条件的内容为对应的阈值, 请注意阈值只能由数字组成, 当阈值表达百分比时, 不能在最后加上百分号. 8种类型如下: | 类型 | 说明 | | ----------------------------------- | ------------------------------------------------------------ | | MaxVal 最大值(当前值) | 当前实际值 最大值,比如检查最近3分钟数据,3分钟数据会有3个value,是表示(>=N)个值都必须同时>=设定值 | | MinVal 最小值(当前值) | 当前实际值 最小值,比如检查最近3分钟数据,3分钟数据会有3个value,是表示(>=N)个值都必须同时比<=设定值 | | FluAscPer 波动上升百分比(当前值) | 波动百分比最大值。即当前最后(N)分钟值比监控周期内其它分钟值(M-N个)的增加百分比都>=设定的百分比时触发警报,比如检查最近10分钟数据,触发个数为3;10分钟内数据会算出7个百分比数据,是表示最后3分钟值分别相比前面7分钟值,3组7次比较的上升波动百分比全部>=配置阈值。比如下降50%,阈值填写50。 | | FluDescPer 波动下降百分比(当前值) | 波动百分比最小值。当前最后(N)分钟值比监控周期内其它(M-N个)分钟值的减少百分比都大于设定的百分比时触发警报,比如检查最近10分钟数据,触发个数为3;10分钟数据会算出7个百分比数据,是表示最后3分钟值分别相比前面7分钟值,3组7次比较的下降波动百分比全部>=配置阈值。比如下降50%,阈值填写50。 | | SumMaxVal 总和最大值(当前值) | 当前值总和最大值,比如检查最近3分钟数据,表示3分钟内的总和>=设定值就告警。 | | SumMinVal 总和最小值(当前值) | 当前值总和最小值,比如检查最近3分钟数据,表示3分钟内的总和<=设定值就告警。 | ##### Transaction告警 对Transaction的告警, 支持的指标有次数, 延时, 失败率; 监控周期; 一分钟 ###### 配置图示 如下图所示, 配置了mybatis-cat项目的Transaction监控规则 ![2021-07-17_161457.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsk0mo5mutj611f0ef45002.jpg) ###### 配置说明 - 项目: 要监控的项目名 - Type: 被监控Transaction的type - Name: 被监控Transaction的name; 如果为All, 代表全部name - 监控项: 次数, 延时, 失败率 - 告警规则: 详见**告警规则**部分 ##### Event告警 对Event的个数进行告警; 监控周期: 一分钟 ##### 心跳告警 心跳告警是对服务器当前状态的监控, 如监控系统负载, GC数量等信息; 监控周期: 一分钟 ##### 异常告警 对异常的个数进行告警; 监控周期: 一分钟 ### 3.2.3. 告警接口编写 编写controller接口 ```java package cn.sheep.springboot.cat.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * 告警 * * @author yanglei */ @Slf4j @RestController @RequestMapping("alert") public class AlertController { /** * 发送告警 * * @return */ @RequestMapping("msg") public String msg(@RequestParam String to, @RequestParam String value) { log.warn("告警触发了. 接收人: {}; 告警内容: {}", to, value); /* 告警服务端 */ return "200"; } } ``` 修改告警服务端的配置, 填写接口地址, 以邮件为例 ##### 配置示例 ```xml ``` 测试结果, 输入内容如下: ```ini 2021-07-17 16:45:00.657 WARN 4180 --- [nio-9000-exec-1] c.s.s.cat.controller.AlertController : 告警触发了. 接收人: testUser1@test.com,testUser2@test.com,121529654@qq.com; 告警内容: [CAT Transaction告警] [项目: mybatis-cat] [监控项: URL-All-count],[CAT Transaction告警: mybatis-cat URL All] : [实际值:100 ] [最大阈值: 5 ][告警时间:2021-07-17 16:45:00]
[时间: 2021-07-17 16:45] 点击此处查看详情

[告警间隔时间]5分钟 ``` # 4. Cat原理 - 客户端原理介绍 - 服务端原理介绍 ## 4.1. 客户端原理 ### 4.1.1. 客户端设计 **客户端设计** 客户端设计是CAT系统设计中最为核心的一个环节, 客户端要求是做到API简单, 高可靠性能, 因为监控只是公司核心业务流程一个旁路环节, 无论在任何场景下都不能影响业务性能. **架构设计** CAT客户端在手机端数据方面使用ThreadLocal(线程局部变量), 是线程本地变量, 也可以称之为线程本地存储. 其实ThreadLocal的功用非常简单, 就是为每一个使用该变量的线程都提供一个变量值的副本, 属于Java中一种较为特殊的线程绑定机制, 每一个线程都可以独立地改变自己的副本, 不会和其它线程的副本冲突. 在监控场景下, 为用户提供服务都是Web容器, 比如tomcat或者Jetty, 后端的RPC服务端比如Dubbo或者Pigeon, 也都是基于线程池来实现的. 业务方在处理业务逻辑时基本都是在一个线程内部调用后端服务, 数据库, 缓存等, 将这些数据拿回来再进行业务逻辑封装, 最后将结果展示给用户. 所以将所有的监控请求作为一个监控上下文存入线程变量就非常合适. ![0d0caf3b.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsont831gbj613o0lstc802.jpg) 如上图所示, 业务执行业务逻辑的时候, 就会把此次请求对应的监控存放于线程上下文中, 存于上下文的其实是一个监控树的结构. 在最后业务线程执行结束时, 将监控对象存入一个异步内存队列中, CAT有个消费线程将队列内的数据异步发送到服务端. **总结流程如下**: - 业务线程产生消息, 交给消息Producer, 消息Producer将消息存放在该业务线程**消息栈**中; - 业务线程通知消息Producer消息结束时, 消息Producer根据其消息栈产生**消息树**放置在同步消息队列中; - 消息上报线程监听消息队列, 根据消息树产生最终的消息报文上报CAT服务端. **API设计** 监控API定义往往取决于对监控或者性能分析这个领域的理解, 监控和性能分析所针对的场景有如下几种: - 一段代码的执行时间, 一段代码可以是URL执行耗时, 也可以是SQL的执行耗时. - 一段代码的执行次数, 比如Java抛出异常记录次数, 或者一段逻辑的执行次数. - 定期执行某段代码, 比如定期上报一些核心指标: JVM内存, GC等指标. - 关键的业务监控指标, 比如监控订单数, 交易额, 支付成功率等. 在上述领域模型的基础上, CAT设计自己核心的几个监控对象: Transaction, Event, Heartbeat, Metric. ![9f36af1e.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsoo2y184cj60y40kyn0p02.jpg) **序列化和通信** 序列化和通信是整个客户端包括服务端性能里面很关键的一个环节. - CAT序列化协议是自定义序列化协议, 自定义序列化协议相比通用序列化协议要高效很多, 这个在大规模数据实时处理场景下还是非常有必要的. - CAT通信是基于Netty来实现的NIO的数据传输, Netty是一个非常好的NIO开发框架, 在这边就不详细介绍了. ### 4.1.2. 核心类分析 ![DefaultMessageTree.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsopm37eosj609q16qk1402.jpg) ![Message.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsopkreogej60p00qujzz02.jpg) CAT将监控的内容分为了4种: Transaction, Event, Heartbeat, Metric 使用4个接口定义他们的行为, 对应的实现类命名方式均为Default+接口名. 他们都继承自Message接口. 这个接口中主要用来提供通用性的方法, 比如添加数据, 设置状态等 ### 4.1.3. 流程分析 #### **启动** 1. 懒加载创建Cat客户端对象 ```java package com.dianping.cat; public static MessageProducer getProducer() { try { // 懒加载 checkAndInitialize(); MessageProducer producer = s_instance.m_producer; if (producer != null) { return producer; } else { return NullMessageProducer.NULL_MESSAGE_PRODUCER; } } catch (Exception e) { errorHandler(e); return NullMessageProducer.NULL_MESSAGE_PRODUCER; } } ``` 2. 读取`client.xml` ```java package com.dianping.cat; private static void checkAndInitialize() { try { if (!s_init) { initialize(new File(getCatHome(), "client.xml")); } } catch (Exception e) { errorHandler(e); } } ``` ```java package com.dianping.cat; public static String getCatHome() { // CAT_HOME_DEFAULT_DIR = "/data/appdatas/cat/" String catHome = CatPropertyProvider.INST.getProperty("CAT_HOME", CatConstants.CAT_HOME_DEFAULT_DIR); if (!catHome.endsWith("/")) { catHome = catHome + "/"; } return catHome; } ``` 3. 加载模块 ```java package com.dianping.cat; // this should be called during application initialization time public static void initialize(File configFile) { try { if (!s_init) { synchronized (s_instance) { if (!s_init) { // 使用点评Plexus容器加载对应的模块 PlexusContainer container = ContainerLoader.getDefaultContainer(); ModuleContext ctx = new DefaultModuleContext(container); Module module = ctx.lookup(Module.class, CatClientModule.ID); if (!module.isInitialized()) { ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class); ctx.setAttribute("cat-client-config-file", configFile); initializer.execute(ctx, module); } log("INFO", "Cat is lazy initialized!"); s_init = true; } } } } catch (Exception e) { errorHandler(e); } } ``` #### **创建Message** 1. 创建一个新的Transaction ```java package com.dianping.cat public static Transaction newTransaction(String type, String name) { try { return Cat.getProducer().newTransaction(type, name); } catch (Exception e) { errorHandler(e); return NullMessage.TRANSACTION; } } ``` ```java package com.dianping.cat.message.internal; @Override public Transaction newTransaction(String type, String name) { // this enable CAT client logging cat message without explicit setup // 创建线程上下文 if (!m_manager.hasContext()) { m_manager.setup(); } // 创建Transaction DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager); // 添加到线程上下文中 m_manager.start(transaction, false); return transaction; } ``` 2. 创建线程上下文 ```java package com.dianping.cat.message.internal; @Override public void setup() { Context ctx; if (m_domain != null) { ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); } else { ctx = new Context("Unknown", m_hostName, ""); } double samplingRate = m_configManager.getSampleRatio(); if (samplingRate < 1.0 && hitSample(samplingRate)) { ctx.m_tree.setHitSample(true); } m_context.set(ctx); } ``` 3. 添加Transaction到线程上下文中 ```java package com.dianping.cat.message.internal; @Override public void start(Transaction transaction, boolean forked) { Context ctx = getContext(); if (ctx != null) { ctx.start(transaction, forked); if (transaction instanceof TaggedTransaction) { TaggedTransaction tt = (TaggedTransaction) transaction; m_taggedTransactions.put(tt.getTag(), tt); } } else if (m_firstMessage) { m_firstMessage = false; m_logger.warn("CAT client is not enabled because it's not initialized yet"); } } ``` 4. 添加Transaction到DefaultMessageTree中 ```java package com.dianping.cat.message.internal; public void start(Transaction transaction, boolean forked) { if (!m_stack.isEmpty()) { // Do NOT make strong reference from parent transaction to forked transaction. // Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway() // By doing so, there is no need for synchronization between parent and child threads. // Both threads can complete() anytime despite the other thread. if (!(transaction instanceof ForkedTransaction)) { Transaction parent = m_stack.peek(); addTransactionChild(transaction, parent); } } else { m_tree.setMessage(transaction); } if (!forked) { m_stack.push(transaction); } } ``` 5. 关闭Transaction ```java package com.dianping.cat.message.internal; @Override public void complete() { try { if (isCompleted()) { // complete() was called more than once DefaultEvent event = new DefaultEvent("cat", "BadInstrument"); event.setStatus("TransactionAlreadyCompleted"); event.complete(); addChild(event); } else { if (m_durationInMicro == -1) { m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L; } setCompleted(true); if (m_manager != null) { // 关闭这一次的 m_manager.end(this); } } } catch (Exception e) { // ignore } } ``` ```java package com.dianping.cat.message.internal; @Override public void end(Transaction transaction) { Context ctx = getContext(); if (ctx != null && transaction.isStandalone()) { if (ctx.end(this, transaction)) { m_context.remove(); } } } ``` 这里需要介绍一下, 消息进入到线程上下文后, 是通过栈的方式来存储的 Transaction之间是有应用的, 因此在end方法中只需要将第一个Transaction(封装在MessageTree中), 通过MessageManager来flush, 在拼接消息时可以根据这个引用关系来找到所有的Transaction. ```java package com.dianping.cat.message.internal; /** * return true means the transaction has been flushed. * * @param manager * @param transaction * @return true if message is flushed, false otherwise */ public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!m_stack.isEmpty()) { Transaction current = m_stack.pop(); // 取到栈底的Transaction if (transaction == current) { m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current); } else { while (transaction != current && !m_stack.empty()) { m_validator.validate(m_stack.peek(), current); current = m_stack.pop(); } } if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); m_tree.setMessageId(null); m_tree.setMessage(null); if (m_totalDurationInMicros > 0) { adjustForTruncatedTransaction((Transaction) tree.getMessage()); } // 刷新并发送数据 manager.flush(tree, true); return true; } } return false; } ``` #### **发送数据** 1. 首先获取到发送类的对象, 调用其方法进行发送 ```java package com.dianping.cat.message.internal; public void flush(MessageTree tree, boolean clearContext) { // 获取发送类对象 MessageSender sender = m_transportManager.getSender(); if (sender != null && isMessageEnabled()) { // 发送 sender.send(tree); if (clearContext) { reset(); } } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } } ``` 2. 发送时是经典的`生产者-消费者`模型, 生产者只需要向队列中放入数据, 消费者监听队列, 获取数据并发送 ```java package com.dianping.cat.message.internal; public void flush(MessageTree tree, boolean clearContext) { // 获取发送类对象 MessageSender sender = m_transportManager.getSender(); if (sender != null && isMessageEnabled()) { // 发送 sender.send(tree); if (clearContext) { reset(); } } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } } ``` ```java package com.dianping.cat.message.io; private void offer(MessageTree tree) { if (m_configManager.isAtomicMessage(tree)) { boolean result = m_atomicQueue.offer(tree); if (!result) { logQueueFullInfo(tree); } } else { // 生产者-消费者模型 boolean result = m_queue.offer(tree); if (!result) { logQueueFullInfo(tree); } } } ``` 消费者线程拉取消息: ```java package com.dianping.cat.message.io; private void processNormalMessage() { while (true) { ChannelFuture channel = m_channelManager.channel(); if (channel != null) { try { // 拉取消息 MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(channel, tree); tree.setMessage(null); } else { try { Thread.sleep(5); } catch (Exception e) { m_active = false; } break; } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); } } else { try { Thread.sleep(5); } catch (Exception e) { m_active = false; } } } } ``` 使用自定义的序列化方式进行序列化, 最后使用Netty发送数据: ```java package com.dianping.cat.message.io; public void sendInternal(ChannelFuture channel, MessageTree tree) { if (tree.getMessageId() == null) { tree.setMessageId(m_factory.getNextId()); } // 自定义序列化方式 ByteBuf buf = m_codec.encode(tree); int size = buf.readableBytes(); // 使用Netty进行数据发送 channel.channel().writeAndFlush(buf); if (m_statistics != null) { m_statistics.onBytes(size); } } ``` Cat使用了自定义的序列化方式 ```java /* * Copyright (c) 2011-2018, Meituan Dianping. All Rights Reserved. * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.dianping.cat.message.spi.codec; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Stack; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import com.dianping.cat.message.Event; import com.dianping.cat.message.Heartbeat; import com.dianping.cat.message.Message; import com.dianping.cat.message.Metric; import com.dianping.cat.message.Trace; import com.dianping.cat.message.Transaction; import com.dianping.cat.message.internal.DefaultEvent; import com.dianping.cat.message.internal.DefaultHeartbeat; import com.dianping.cat.message.internal.DefaultMetric; import com.dianping.cat.message.internal.DefaultTrace; import com.dianping.cat.message.internal.DefaultTransaction; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.message.spi.internal.DefaultMessageTree; public class NativeMessageCodec implements MessageCodec { public static final String ID = "NT1"; // native message tree version 1 @Override public MessageTree decode(ByteBuf buf) { buf.readInt(); // read the length of the message tree DefaultMessageTree tree = new DefaultMessageTree(); Context ctx = new Context(tree); Codec.HEADER.decode(ctx, buf); Message msg = decodeMessage(ctx, buf); tree.setMessage(msg); tree.setBuffer(buf); return tree; } private Message decodeMessage(Context ctx, ByteBuf buf) { Message msg = null; while (buf.readableBytes() > 0) { char ch = ctx.readId(buf); switch (ch) { case 't': Codec.TRANSACTION_START.decode(ctx, buf); break; case 'T': msg = Codec.TRANSACTION_END.decode(ctx, buf); break; case 'E': Message e = Codec.EVENT.decode(ctx, buf); ctx.addChild(e); break; case 'M': Message m = Codec.METRIC.decode(ctx, buf); ctx.addChild(m); break; case 'H': Message h = Codec.HEARTBEAT.decode(ctx, buf); ctx.addChild(h); break; case 'L': Message l = Codec.TRACE.decode(ctx, buf); ctx.addChild(l); break; default: throw new RuntimeException(String.format("Unsupported message type(%s).", ch)); } } if (msg == null) { msg = ctx.getMessageTree().getMessage(); } return msg; } @Override public ByteBuf encode(MessageTree tree) { ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 * 1024); try { Context ctx = new Context(tree); buf.writeInt(0); // place-holder Codec.HEADER.encode(ctx, buf, null); Message msg = tree.getMessage(); if (msg != null) { encodeMessage(ctx, buf, msg); } int readableBytes = buf.readableBytes(); buf.setInt(0, readableBytes - 4); // reset the message size return buf; } catch (RuntimeException e) { buf.release(); throw e; } } private void encodeMessage(Context ctx, ByteBuf buf, Message msg) { if (msg instanceof Transaction) { Transaction transaction = (Transaction) msg; List children = transaction.getChildren(); Codec.TRANSACTION_START.encode(ctx, buf, msg); for (Message child : children) { if (child != null) { encodeMessage(ctx, buf, child); } } Codec.TRANSACTION_END.encode(ctx, buf, msg); } else if (msg instanceof Event) { Codec.EVENT.encode(ctx, buf, msg); } else if (msg instanceof Metric) { Codec.METRIC.encode(ctx, buf, msg); } else if (msg instanceof Heartbeat) { Codec.HEARTBEAT.encode(ctx, buf, msg); } else if (msg instanceof Trace) { Codec.TRACE.encode(ctx, buf, msg); } else { throw new RuntimeException(String.format("Unsupported message(%s).", msg)); } } @Override public void reset() { } enum Codec { HEADER { @Override protected Message decode(Context ctx, ByteBuf buf) { MessageTree tree = ctx.getMessageTree(); String version = ctx.getVersion(buf); if (ID.equals(version)) { tree.setDomain(ctx.readString(buf)); tree.setHostName(ctx.readString(buf)); tree.setIpAddress(ctx.readString(buf)); tree.setThreadGroupName(ctx.readString(buf)); tree.setThreadId(ctx.readString(buf)); tree.setThreadName(ctx.readString(buf)); tree.setMessageId(ctx.readString(buf)); tree.setParentMessageId(ctx.readString(buf)); tree.setRootMessageId(ctx.readString(buf)); tree.setSessionToken(ctx.readString(buf)); } else { throw new RuntimeException(String.format("Unrecognized version(%s) for binary message codec!", version)); } return null; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { MessageTree tree = ctx.getMessageTree(); ctx.writeVersion(buf, ID); ctx.writeString(buf, tree.getDomain()); ctx.writeString(buf, tree.getHostName()); ctx.writeString(buf, tree.getIpAddress()); ctx.writeString(buf, tree.getThreadGroupName()); ctx.writeString(buf, tree.getThreadId()); ctx.writeString(buf, tree.getThreadName()); ctx.writeString(buf, tree.getMessageId()); ctx.writeString(buf, tree.getParentMessageId()); ctx.writeString(buf, tree.getRootMessageId()); ctx.writeString(buf, tree.getSessionToken()); } }, TRANSACTION_START { @Override protected Message decode(Context ctx, ByteBuf buf) { long timestamp = ctx.readTimestamp(buf); String type = ctx.readString(buf); String name = ctx.readString(buf); if ("System".equals(type) && name.startsWith("UploadMetric")) { name = "UploadMetric"; } DefaultTransaction t = new DefaultTransaction(type, name); t.setTimestamp(timestamp); ctx.pushTransaction(t); MessageTree tree = ctx.getMessageTree(); if (tree instanceof DefaultMessageTree) { tree.getTransactions().add(t); } return t; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { ctx.writeId(buf, 't'); ctx.writeTimestamp(buf, msg.getTimestamp()); ctx.writeString(buf, msg.getType()); ctx.writeString(buf, msg.getName()); } }, TRANSACTION_END { @Override protected Message decode(Context ctx, ByteBuf buf) { String status = ctx.readString(buf); String data = ctx.readString(buf); long durationInMicros = ctx.readDuration(buf); DefaultTransaction t = ctx.popTransaction(); t.setStatus(status); t.addData(data); t.setDurationInMicros(durationInMicros); return t; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { Transaction t = (Transaction) msg; ctx.writeId(buf, 'T'); ctx.writeString(buf, msg.getStatus()); ctx.writeString(buf, msg.getData().toString()); ctx.writeDuration(buf, t.getDurationInMicros()); } }, EVENT { @Override protected Message decode(Context ctx, ByteBuf buf) { long timestamp = ctx.readTimestamp(buf); String type = ctx.readString(buf); String name = ctx.readString(buf); String status = ctx.readString(buf); String data = ctx.readString(buf); DefaultEvent e = new DefaultEvent(type, name); e.setTimestamp(timestamp); e.setStatus(status); e.addData(data); MessageTree tree = ctx.getMessageTree(); if (tree instanceof DefaultMessageTree) { tree.getEvents().add(e); } return e; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { ctx.writeId(buf, 'E'); ctx.writeTimestamp(buf, msg.getTimestamp()); ctx.writeString(buf, msg.getType()); ctx.writeString(buf, msg.getName()); ctx.writeString(buf, msg.getStatus()); ctx.writeString(buf, msg.getData().toString()); } }, METRIC { @Override protected Message decode(Context ctx, ByteBuf buf) { long timestamp = ctx.readTimestamp(buf); String type = ctx.readString(buf); String name = ctx.readString(buf); String status = ctx.readString(buf); String data = ctx.readString(buf); DefaultMetric m = new DefaultMetric(type, name); m.setTimestamp(timestamp); m.setStatus(status); m.addData(data); MessageTree tree = ctx.getMessageTree(); if (tree instanceof DefaultMessageTree) { tree.getMetrics().add(m); } return m; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { ctx.writeId(buf, 'M'); ctx.writeTimestamp(buf, msg.getTimestamp()); ctx.writeString(buf, msg.getType()); ctx.writeString(buf, msg.getName()); ctx.writeString(buf, msg.getStatus()); ctx.writeString(buf, msg.getData().toString()); } }, HEARTBEAT { @Override protected Message decode(Context ctx, ByteBuf buf) { long timestamp = ctx.readTimestamp(buf); String type = ctx.readString(buf); String name = ctx.readString(buf); String status = ctx.readString(buf); String data = ctx.readString(buf); DefaultHeartbeat h = new DefaultHeartbeat(type, name); h.setTimestamp(timestamp); h.setStatus(status); h.addData(data); MessageTree tree = ctx.getMessageTree(); if (tree instanceof DefaultMessageTree) { tree.getHeartbeats().add(h); } return h; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { ctx.writeId(buf, 'H'); ctx.writeTimestamp(buf, msg.getTimestamp()); ctx.writeString(buf, msg.getType()); ctx.writeString(buf, msg.getName()); ctx.writeString(buf, msg.getStatus()); ctx.writeString(buf, msg.getData().toString()); } }, TRACE { @Override protected Message decode(Context ctx, ByteBuf buf) { long timestamp = ctx.readTimestamp(buf); String type = ctx.readString(buf); String name = ctx.readString(buf); String status = ctx.readString(buf); String data = ctx.readString(buf); DefaultTrace t = new DefaultTrace(type, name); t.setTimestamp(timestamp); t.setStatus(status); t.addData(data); return t; } @Override protected void encode(Context ctx, ByteBuf buf, Message msg) { ctx.writeId(buf, 'L'); ctx.writeTimestamp(buf, msg.getTimestamp()); ctx.writeString(buf, msg.getType()); ctx.writeString(buf, msg.getName()); ctx.writeString(buf, msg.getStatus()); ctx.writeString(buf, msg.getData().toString()); } }; protected abstract Message decode(Context ctx, ByteBuf buf); protected abstract void encode(Context ctx, ByteBuf buf, Message msg); } private static class Context { private static Charset UTF8 = Charset.forName("UTF-8"); private MessageTree m_tree; private Stack m_parents = new Stack(); private byte[] m_data = new byte[256]; public Context(MessageTree tree) { m_tree = tree; } public void addChild(Message msg) { if (!m_parents.isEmpty()) { m_parents.peek().addChild(msg); } else { m_tree.setMessage(msg); } } public MessageTree getMessageTree() { return m_tree; } public String getVersion(ByteBuf buf) { byte[] data = new byte[3]; buf.readBytes(data); return new String(data); } public DefaultTransaction popTransaction() { return m_parents.pop(); } public void pushTransaction(DefaultTransaction t) { if (!m_parents.isEmpty()) { m_parents.peek().addChild(t); } m_parents.push(t); } public long readDuration(ByteBuf buf) { return readVarint(buf, 64); } public char readId(ByteBuf buf) { return (char) buf.readByte(); } public String readString(ByteBuf buf) { int len = (int) readVarint(buf, 32); if (len == 0) { return ""; } else if (len > m_data.length) { m_data = new byte[len]; } buf.readBytes(m_data, 0, len); return new String(m_data, 0, len, StandardCharsets.UTF_8); } public long readTimestamp(ByteBuf buf) { return readVarint(buf, 64); } protected long readVarint(ByteBuf buf, int length) { int shift = 0; long result = 0; while (shift < length) { final byte b = buf.readByte(); result |= (long) (b & 0x7F) << shift; if ((b & 0x80) == 0) { return result; } shift += 7; } throw new RuntimeException("Malformed variable int " + length + "!"); } public void writeDuration(ByteBuf buf, long duration) { writeVarint(buf, duration); } public void writeId(ByteBuf buf, char id) { buf.writeByte(id); } public void writeString(ByteBuf buf, String str) { if (str == null || str.length() == 0) { writeVarint(buf, 0); } else { byte[] data = str.getBytes(UTF8); writeVarint(buf, data.length); buf.writeBytes(data); } } public void writeTimestamp(ByteBuf buf, long timestamp) { writeVarint(buf, timestamp); // TODO use relative value of root message timestamp } private void writeVarint(ByteBuf buf, long value) { while (true) { if ((value & ~0x7FL) == 0) { buf.writeByte((byte) value); return; } else { buf.writeByte(((byte) value & 0x7F) | 0x80); value >>>= 7; } } } public void writeVersion(ByteBuf buf, String version) { buf.writeBytes(version.getBytes()); } } } ``` 根据不同的数据类型, 进行写入即可. ## 4.2. 服务端原理 ### 4.2.1. 架构设计 单体的consumer架构设计如下: ![image.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsy2b11bdgj613u0gotin02.jpg) 如上图, CAT服务端在整个实时处理中, 基本上实现了全异步化处理. - 消息接受是基于Netty的NIO实现. - 消息接受到服务端就存放内存队列, 然后程序开启一个线程回消费整个消息做消息分发. - 每个消息都会有一批线程并发消费各自队列的数据, 以做到消息处理的隔离. - 消息存储是先存入本地磁盘, 然后异步上传到HDFS文件, 这也避免了强依赖HDFS. 当某个报表处理器处理来不及的时候, 比如Transaction报表处理比较慢, 可以通过配置支持开启多个Transaction处理线程, 并发消费消息. ![image.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsy2ezpsedj60us0iojyu02.jpg) ### 4.2.2. 消息ID设计 CAT每个消息都有一个唯一的ID,这个ID在客户端生成,后续都通过这个ID在进行消息内容的查找。典型的RPC消息串起来的问题,比如A调用B的时候,在A这端生成一个Message-ID,在A调用B的过程中,将Message-ID作为调用传递到B端,在B执行过程中,B用context传递的Message-ID作为当前监控消息的Message-ID。 CAT消息的Message-ID格式ShopWeb-0a010680-375030-2,CAT消息一共分为四段: - 第一段是应用名shop-web。 - 第二段是当前这台机器的IP的16进制格式,0a01010680表示10.1.6.108。 - 第三段的375030,是系统当前时间除以小时得到的整点数。 - 第四段的2,是表示当前这个客户端在当前小时的顺序递增号。 ### 4.2.3. 存储数据设计 消息存储是CAT最有挑战的部分。关键问题是消息数量多且大,目前美团每天处理消息1000亿左右,大小大约100TB,单物理机高峰期每秒要处理100MB左右的流量。CAT服务端基于此流量做实时计算,还需要将这些数据压缩后写入磁盘。 整体存储结构如下图: ![image.png](http://ww1.sinaimg.cn/large/0069yeMZgy1gsy2jujpftj60kx0cddgv02.jpg) CAT在写数据一份是Index文件,一份是Data文件. - Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。 - 一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。 - 48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。 - CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内偏移地址读取出真正的消息内容。 ### 服务端设计总结 CAT在分布式实时方面,主要归结于以下几点因素: - 去中心化,数据分区处理。 - 基于日志只读特性,以一个小时为时间窗口,实时报表基于内存建模和分析,历史报表通过聚合完成。 - 基于内存队列,全面异步化、单线程化、无锁设计。 - 全局消息ID,数据本地化生产,集中式存储。 - 组件化、服务化理念。