** 替换成自己的服务器地址
- 初始化
1. 在项目中创建`src/main/resources/META-INF/app.properties`文件, 并添加如下内容:
```ini
# appkey 只能包含英文字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 和中划线 (-)
app.name = {appkey}
```
### 2.2.2. API介绍
#### 2.2.2.1. Transaction
##### 基本用法
Transaction 适合记录跨越系统边界的程序访问行为, 比如远程调用, 数据库调用, 也适合执行时间较长的业务逻辑监控, Transaction用来记录一段代码的执行时间和次数.
现在为我们的框架还没有与dubbo, mybatis集成, 所以我们通过手动编写一个本地方法, 来测试Transaction的用法, 创建TransactionController用于测试.
```java
package cn.sheep.springbootcat.controller;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* transaction 示例
*
* @author yanglei
*/
@RestController
@RequestMapping("transaction")
public class TransactionController {
/**
* transaction test
*
* @return
*/
@RequestMapping("test")
public String test() {
// 开启第一个 Transaction. 类别为URL, 名称为test
Transaction t = Cat.newTransaction("URL", "test");
try {
String dubbo = this.dubbo();
System.out.println(dubbo);
// 成功执行 Transaction
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
// 失败, 设置异常
t.setStatus(e);
} finally {
// 结束这一 Transaction
t.complete();
}
return "test";
}
/**
* 模拟dubbo方法
*
* @return
*/
private String dubbo() {
// 开启第二个 Transaction. 类别为DUBBO, 名称为dubbo
Transaction t = Cat.newTransaction("DUBBO", "dubbo");
try {
// 成功执行 Transaction
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
// 失败, 设置异常
t.setStatus(e);
} finally {
// 结束这一 Transaction
t.complete();
}
return "dubbo";
}
}
```
##### 扩展API
CAT提供了一系列API来对Transaction进行修改.
- `com.dianping.cat.message.Message#addData`: 添加额外的数据显示
- `com.dianping.cat.message.Message#setStatus`: 设置状态, 成功可以设置SUCCESS, 失败可以设置异常
- `com.dianping.cat.message.Transaction#setDurationInMillis`: 设置执行耗时(毫秒)
- `com.dianping.cat.message.Message#setTimestamp`: 设置执行时间
- `com.dianping.cat.message.Message#complete`: 结束Transaction
编写如下代码进行测试:
```java
@RequestMapping("api")
public String api() {
Transaction t = Cat.newTransaction("TAPI", "api");
try {
// 修改执行耗时1秒
t.setDurationInMillis(1000L);
// 设置开始时间
t.setTimestamp(System.currentTimeMillis());
// 添加额外的数据
t.addData("content");
int i = 1 / 0;
// 成功执行 Transaction
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
// 失败, 设置异常
t.setStatus(e);
} finally {
// 结束这一 Transaction
t.complete();
}
return "api";
}
```
启动项目, 访问接口: http://localhost:9000/springboot-cat/transaction/api
点击左侧菜单Transaction报表, 选中TAPI类型对应的LogView查看调用链关系.
如图所示, 调用耗时已经被手动修改成了1000ms, 并且添加了额外的信息content
> 在使用Transaction API时, 可能需要注意以下几点:
>
> 1. 可以调用 `addData `多次, 添加的数据会被 `&` 连接起来
> 2. 不要忘记完成 transaction! 否则会得到一个毁坏的消息树以及内存泄漏!
#### 2.2.2.1. Event
Event 用来记录一件事发生的次数, 比如记录系统异常, 它和transaction相比缺少了时间的统计, 开销比transaction要小.
##### Cat.logEvent
`com.dianping.cat.Cat#logEvent`: 记录一个事件
```java
Cat.logEvent("URL.Server", "serverIp", Event.SUCCESS, "ip=${serverIp}");
```
##### Cat.logError
`com.dianping.cat.Cat#logError(java.lang.Throwable)`: 记录一个带有错误堆栈信息的 Error.
Error 是一种特殊的事件, 它的 `type` 取决于传入的 `Throwable e`.
1. 如果 `e` 是一个 `Error`, `type` 会被设置为 `Error`.
2. 如果 `e` 是一个 `RuntimeException`, `type` 会被设置为 `RuntimeException`.
3. 其他情况下, `type` 会被设置为 `Exception`.
同时错误堆栈信息会被收集并写入 `data` 属性中.
```java
try {
int i = 1 / 0;
} catch (Exception e) {
Cat.logError(e);
}
```
可以向错误堆栈顶部添加自己的错误消息, 如下代码所示:
```java
Cat.logError("error(X) := exception(X)", e);
```
#### 2.2.2.3. Metric
Metric 用于记录业务指标, 指标可能包含对一个指标记录次数, 记录平均值, 记录总和, 业务指标最低统计粒度为1分钟.
```java
# Counter
Cat.logMetricForCount("metric.key");
Cat.logMetricForCount("metric.key", 3);
# Duration
Cat.logMetricForDuration("metric.key", 5);
```
我们每秒会聚合 metric
举例来说, 如果你在同一秒调用 count 三次(相同的 name), 我们会累加他们的值, 并且一次性上报给服务端.
在 `duration` 的情况下, 我们用平均值来取代累加值.
编写案例测试上述API:
```java
/**
* metric 示例
*
* @author yanglei
*/
@RestController
@RequestMapping("metric")
public class MetricController {
/**
* logMetricForCount test
*
* @return
*/
@RequestMapping("count")
public String count() {
// 累加数据
Cat.logMetricForCount("count");
return "count";
}
/**
* logMetricForCount test
*
* @return
*/
@RequestMapping("duration")
public String duration() {
// 求平均值
Cat.logMetricForDuration("duration", 1000);
return "duration";
}
}
```
点击 Business 报表可以看到, count和duration的具体数值.
### 2.2.3. CAT监控界面介绍
#### 2.2.3.1. DashBoard
**DashBoard仪表盘显示了每分钟出现错误的系统及其错误的次数和时间.**

- 点击右上角的时间按钮可以切换不同的展示时间, -7d代表7天前, -1h代表1小时前, now定位到当前时间
- 上方时间轴按照分钟进行排布, 点击之后可以看到该时间到结束的异常情况
- 下方标识了出错的系统和出错的时间, 次数, 点击系统名称可以跳转到Problem报表
#### 2.2.3.2. Transaction
**Transaction报表用来监控一段代码运行情况**: `运行次数, QPS, 错误次数, 失败率, 响应时间统计(平均影响时间, Tp分位值)等等`.
应用启动后默认会打点的部署:
| 打点 | 来源组件 | 描述 |
| ------ | ------------------- | ---------------------- |
| System | cat-client | 上报监控数据的打点信息 |
| URL | 需要接入 cat-filter | URL访问的打点信息 |
小时报表
Type统计界面展示了一个Transaction的第一层分类的视图, 可以知道这段时间里面一个分类运行的次数, 平均响应时间, 延迟, 以及分位线.

**从上而下分析报表**:
1. **报表的时间跨度**
- CAT默认是以一小时为统计时间跨度, 点击[切到历史模式], 更改查看报表的时间跨度;
- 默认是小时模式;
- 切换历史模式后, 右侧快速导航, 变为month(月报表), week(周报表), day(天报表), 可以点击进行查看, 注意报表的时间跨度会有所不同.
2. **时间选择**
- 通过右上角时间导航栏选择时间: 点击[+1h]/[-1h]切换时间为下一小时/上一小时;
- 点击[+1d]/[-1d]切换时间为后一天的同一小时/前一天的同一小时;
- 点击右上角[+7d]/[-7d]切换时间为后一周的同一小时/前一周的同一小时;
- 点击[now]回到当前小时.
3. **项目选择**
- 输入项目名, 查看项目数据;
- 如果需要切换其他项目, 输入项目名, 回车即可.
4. **机器分组**
- CAT可以将若干个机器, 作为一个分组进行数据统计.
- 默认会有一个ALL分组, 代表所有机器的统计数据, 即集群统计数据.
5. **所有Type汇总表格**
- 第一层分类(Type), 点击查看第二级分类(称为name)的数据
- Transaction的埋点的Type和Name由业务自己定义, 当打点了Cat.newTransaction(type, name)时, 第一层分类是type, 第二级分类是name.
- 第二级分类数据是统计相同type下的所有name的数据, 数据均与第一级(type)一样的展示风格.
6. **单个Type指标图表**
- 点击show, 查看Type所有name分钟级统计, 如下图

7. **指标说明**
- 显示的是小时粒度第一级分类(type)的次数, 错误书, 失败率等数据.
8. **样本Log View**
- L代表logview, 为一个样例的调用链路
9. **分位线说明**
- 小时粒度的时间第一级分类(type)相关统计
- 95Line标识95%的请求的响应时间比参考值要小. 99Line标识99%的响应时间比参考值要小, 95Line以及99Line, 也称之为tp95, tp99.
10. **历史报表**
- Transaction历史报表支持每天, 每周, 每月的数据统计以及趋势图, 点击导航栏的切换历史模式进行查询. Transaction历史报表以响应时间, 访问量, 错误量三个维度进行展示, 以天为例: 选取一个type, 点击show, 即可查看天报表.
#### 2.2.3.3. Event
**Event报表监控一段代码运行次数**: `例如记录程序中一个事件记录了多少次, 错误了多少次.` **Event报表的整体结构和Transaction报表几乎一样, 只缺少响应时间的统计.**
1. **第一级分类(Type)统计界面**
Type统计页面展示了一个Event的第一层分类的视图, Event相对于Transaction少了运行时间统计. 可以知道这段时间里面一个分类运行的次数, 失败次数, 失败率, 采样logview, QPS.

2. **第二级分类(Name)统计页面**

#### 2.2.3.4. Problem
Problem记录整个项目在运行过程中出现的问题, 包括一些异常, 错误, 访问较长的行为. Problem报表是由logview存在的特征整合而成, 方便用户定位问题. 来源:
1. 业务代码显式调用 `Cat.Error(e)` API进行埋点, 具体埋点说明可查看埋点文档.
2. 与LOG框架集成, 会捕获log日志中有异常堆栈的exception日志.
3. Long-url: 表示Transaction打点URL的慢请求
4. Long-sql: 表示Transaction打点SQL的慢请求
5. Long-service: 表示Transaction打点Service或者PigeonService的慢请求
6. Long-call: 表示Transaction打点Call或者PigeonCall的慢请求
7. Long-cache: 表示Transaction打点Cache.开头的慢请求
**所有错误汇总报表**: 第一层分类(Type), 代表错误类型, 比如error, long-url等; 第二级分类(称为Status), 对应具体的错误, 比如一个异常类名等.

**错误数分布**: 点击type和status的show, 分别展示type和status的分钟级错误数分布.
#### 2.2.3.5. HeartBeat
HeartBeat报表是CAT客户端, 以一分钟为周期, 定期向服务端汇报当前运行时候的一些状态.
**JVM相关指标**
以下所有的指标统计都是1分钟内的值, cat最低统计粒度是一分钟

| GC Info / JVM GC 相关指标 | 描述 |
| ------------------------------ | ------------------------ |
| NewGc Count / PS ScavengeCount | 新生代GC次数 |
| NewGc Time / PS ScavengeTime | 新生代GC耗时 |
| OldGcCount | 老年代GC耗时 |
| PS MarkSweepTime | 老年代GC耗时 |
| Heap Usage | Java虚拟机堆的使用情况 |
| None Heap Usage | Java虚拟机Perm的使用情况 |
| FrameworkThread Info / JVM Thread 相关指标 | 描述 |
| ------------------------------------------ | ----------------------- |
| Active Thread | 系统当前活动线程 |
| Daemon Thread | 系统后台线程 |
| Total Started Thread | 系统总共开启线程 |
| Started Thread | 系统每分钟新启动的线程 |
| CAT Started Thread | 系统中CAT客户端启动线程 |
可以参考`java.lang.management.ThreadInfo`的定义
**系统指标**
| System Info / System 相关指标 | 描述 |
| ----------------------------- | ------------------ |
| System LoadAverage | 系统Load详细信息 |
| Memory Free | 系统momoryFree情况 |
| FreePhysicalMemory | 物理内存剩余情况 |
| /Free | /根的使用情况 |
| /data Free | /data盘的使用情况 |
#### 2.2.3.6. Business
**Business报表对应着业务指标, 比如订单指标**. 与Transaction, Event, Problem不同, Business更偏向于宏观上的指标, 另外三者偏向于微观代码的执行情况.

场景示例:
1. 我想监控订单数量
2. 我想监控订单耗时
**基线值**: 基线是对业务指标的预测值
- 基线生成算法
最近一个月的4个每周几的数据加权求和平均计算得出, 秉着更加信任新数据的原则, cat会基于历史数据做异常点的修正, 会把一些明显高于以及低于平均值的点剔除.
举例: 今天是2018-10-25(周四), 今天整天基数数据的算法是最近四个周四(2018-10-18, 2018-10-11, 2018-10-04, 2018-09-27)的每个分钟数据的加权求和或平均, 权重值依次为1, 2, 3, 4.
如: 当前时间为19:56分设为value, 前四周对应的19:56分数据(由远及近)分别为A, B, C, D, 则 `value = (A + 2B + 3C + 4D) / 10`.
对于刚上线的应用, 第一天没有基线, 第二天的基线是前一天的数据, 以此类推.
- 如何开启基线
只有配置了基线告警的指标, 才会自动计算基线. 如需基线功能, 请配置基线告警
- 注意事项
> 1. 打点尽量用纯英文, 不要带一些特殊符号, 例如: 空格( ), 分号(;), 竖线(|), 斜线(/), 逗号(,), 与号(&), 星号(*), 左右尖括号(<>), 以及一些奇奇怪怪的字符
> 2. 如果有分隔需求, 建议用下划线(_), 中划线(-), 英文点号(.)等
> 3. 由于数据库不区分大小写, 请尽量统一大小写, 并且不要对大小写进行改动
> 4. 有可能出现小数, 趋势图的每个点都代表一分钟的值. 假设监控区间是10分钟, 且10分钟内总共上报15次, 趋势图中该点的值为 `5 % 10 = 0.5`
#### 2.2.3.7. State
**State报表显示了与CAT相关的信息**

# 3. Cat高级
## 3.1. 框架集成
### 3.1.1. Dubbo
#### 3.1.1.1. 制作cat-dubbo插件
使用idea打开cat源码, 找到integration目录,
1. 在`dubbo[cat-monitor]`项目修改pom.xml文件, `cat-client`版本号修改为`3.0.0`
2. 把`dubbo[cat-monitor]`添加到maven里, `右键点击pom -> Add as Maven Project`
3. 使用`install`命令将插件安装到本地仓库.
4. 使用如下依赖自动引入dubbo插件
```xml
net.dubboclub
cat-monitor
0.0.6
```
#### 3.1.1.2. 服务提供方 dubbo-provider-cat
**如何发布dubbo服务**
- 添加依赖
```xml
com.alibaba.spring.boot
dubbo-spring-boot-starter
2.0.0
net.dubboclub
cat-monitor
0.0.6
```
> 这里直接使用了 `dubbo-spring-boot-starter` 这亦dubbo与spring-boot集成的组件.
>
> 官方文档地址: https://github.com/alibaba/dubbo-spring-boot-starter/blob/master/README_zh.md
- 在 `application.properties` 添加dubbo的相关配置信息
```properties
server.port = 7072
server.servlet.context-path = /dubbo-provider-cat
spring.application.name = dubbo_provider_cat
spring.dubbo.server = true
spring.dubbo.registry = N/A
```
注: 这个配置只针对服务提供端, 消费端不用指定协议, 它自己会根据服务端的地址信息和`@Reference`注解去解析协议
- 在 `Spring Boot Application` 上添加 `@EnableDubboConfiguration`, 表示要开启dubbo功能
- 编写dubbo服务, 只需要添加要发布的服务实现上添加 `@Service (import com.alibaba.dubbo.config.annotation.Service)` 注解, 其中interfaceClass是要发布服务的接口.
#### 3.1.1.3. 服务消费方 dubbo-consumer-cat
**如何消费dubbo服务**
- 添加依赖
```xml
com.alibaba.spring.boot
dubbo-spring-boot-starter
2.0.0
net.dubboclub
cat-monitor
0.0.6
```
- 在 `application.properties` 添加dubbo的相关配置信息
```properties
server.port = 7074
server.servlet.context-path = /dubbo-consumer-cat
spring.application.name = dubbo_consumer_cat
```
- 开启 `@EnableDubboConfiguration`
- 通过 `@Reference (import com.alibaba.dubbo.config.annotation.Reference)` 注入需要使用的 interface
#### 3.1.1.4. 测试
**按照如下顺序启动相关应用**
1. 启动 dubbo-provider-cat
2. 启动 dubbo-consumer-cat
3. 访问地址: http://localhost:7074/dubbo-consumer-cat/test/hello
4. 查看cat页面, 点击 `PigeonCall`

如图所示dubbo的调用已经被正确显示在transaction报表中. 点击 Log View 查看详细的调用

如图所示, 调用的日志已经被成功打印.
> dubbo插件的日志打印内容显示的并不是十分良好, 如果在企业应用中, 可以基于dubbo插件进行二次开发.
### 3.1.2. Mybatis
#### 3.1.2.1. 创建springboot和mybatis的集成项目
- 表结构
```mysql
USE sheep;
DROP TABLE IF EXISTS sheep.cat_t_user;
CREATE TABLE sheep.cat_t_user (
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
user_name VARCHAR(32) DEFAULT NULL COMMENT '用户名',
password VARCHAR(32) DEFAULT 0 COMMENT '密码',
create_time DATETIME NOT NULL COMMENT '创建时间',
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (id)
) COMMENT '用户表';
INSERT INTO sheep.cat_t_user
(id, user_name, password, create_time)
VALUES
(1, 'jack', '123456', now());
```
- pom.xml
```xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.1.13.RELEASE
cn.sheep
mybatis-cat
0.0.1-SNAPSHOT
mybatis-cat
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.mybatis.spring.boot
mybatis-spring-boot-starter
2.1.2
mysql
mysql-connector-java
runtime
5.1.49
com.alibaba
druid
1.2.6
cn.hutool
hutool-all
5.7.3
org.projectlombok
lombok
1.16.18
com.dianping.cat
cat-client
3.0.0
org.springframework.boot
spring-boot-maven-plugin
```
- application.yml
```yaml
server:
port: 7079
servlet:
context-path: /mybatis-cat
# datasource
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/sheep?serverTimezone=UTC&useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
# mybatis
mybatis:
config-location: classpath:mybatis/mybatis-config.xml
mapper-locations:
- classpath:mybatis/mapper/**/*.xml
```
- 启动类
```java
package cn.sheep.mybatis.cat;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* SpringBoot 启动类
*
* @author sheep
*/
@MapperScan("cn.sheep.mybatis.cat.dao")
@SpringBootApplication
public class MybatisCatApplication {
public static void main(String[] args) {
SpringApplication.run(MybatisCatApplication.class, args);
}
}
```
#### 3.1.2.2. 集成cat-mybatis插件
- cat-mybatis插件
```java
package cn.sheep.mybatis.cat.catmonitor;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.apache.ibatis.datasource.unpooled.UnpooledDataSource;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.ParameterMapping;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.type.TypeHandlerRegistry;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.text.DateFormat;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Cat-Mybatis 插件
*
* 1. Cat-Mybatis plugin: Rewrite on the version of Steven;
* 2. Support DruidDataSource,PooledDataSource(mybatis Self-contained data source);
*
*
* @author sheep
*/
@Intercepts({
@Signature(method = "query", type = Executor.class, args = {
MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class
}),
@Signature(method = "update", type = Executor.class, args = {
MappedStatement.class, Object.class
})
})
public class CatMybatisPlugin implements Interceptor {
private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\?");
private static final String MYSQL_DEFAULT_URL = "jdbc:mysql://UUUUUKnown:3306/%s?useUnicode=true";
public static final String SQL = "SQL";
public static final String SQL_METHOD = "SQL.Method";
public static final String SQL_DATABASE = "SQL.Database";
/**
* 拦截. 拦截目标对象的目标方法的执行
*
* @param invocation
* @return
* @throws Throwable
*/
@Override
public Object intercept(Invocation invocation) throws Throwable {
MappedStatement mappedStatement = this.getStatement(invocation);
String methodName = this.getMethodName(mappedStatement);
// 开启一个 Transaction. 类别为SQL, 名称为SQL方法名
Transaction t = Cat.newTransaction(SQL, methodName);
// 记录一个 Event. 事件类型: SQL.Method; 事件名称: SQL方法类型; 状态: 成功; 信息: sql
String sqlCommandName = mappedStatement.getSqlCommandType()
.name()
.toLowerCase();
String sql = this.getSql(invocation, mappedStatement);
Cat.logEvent(SQL_METHOD, sqlCommandName, Message.SUCCESS, sql);
// 记录一个 Event. 事件类型: SQL.Database; 事件名称: jdbcUrl
Cat.logEvent(SQL_DATABASE, this.getSQLDatabaseUrlByStatement(mappedStatement));
return doFinish(invocation, t);
}
/**
* 获取 MappedStatement
*
* @param invocation
* @return
*/
private MappedStatement getStatement(Invocation invocation) {
Object[] args = invocation.getArgs();
return (MappedStatement)args[0];
}
/**
* 获取SQL 方法名. UserMapper.findById
*
* @param mappedStatement
* @return
*/
private String getMethodName(MappedStatement mappedStatement) {
// 获取sqlId. 例: cn.sheep.mybatis.cat.dao.UserMapper.findById
String mappedStatementId = mappedStatement.getId();
// sqlId最后两位. 例: UserMapper.findById
String[] strArr = mappedStatementId.split("\\.");
int strLen = strArr.length;
return StrUtil.format("{}.{}", strArr[strLen - 2], strArr[strLen - 1]);
}
/**
* 获取sql
*
* @param invocation
* @param mappedStatement
* @return
*/
private String getSql(Invocation invocation, MappedStatement mappedStatement) {
Object parameter = null;
Object[] args = invocation.getArgs();
if (ArrayUtil.length(args) > 1) {
parameter = args[1];
}
BoundSql boundSql = mappedStatement.getBoundSql(parameter);
Configuration configuration = mappedStatement.getConfiguration();
return sqlResolve(configuration, boundSql);
}
private Object doFinish(Invocation invocation, Transaction t) throws InvocationTargetException, IllegalAccessException {
Object returnObj;
try {
// 执行目标方法; 返回执行后的返回值
returnObj = invocation.proceed();
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
Cat.logError(e);
throw e;
} finally {
t.complete();
}
return returnObj;
}
/**
* 获取 jdbcUrl
*
* @param mappedStatement
* @return
*/
private String getSQLDatabaseUrlByStatement(MappedStatement mappedStatement) {
String url;
DataSource dataSource = null;
try {
Configuration configuration = mappedStatement.getConfiguration();
Environment environment = configuration.getEnvironment();
dataSource = environment.getDataSource();
url = this.switchDataSource(dataSource);
return url;
} catch (NoSuchFieldException | IllegalAccessException | NullPointerException e) {
Cat.logError(e);
}
Cat.logError(new Exception("UnSupport type of DataSource : " + ClassUtil.getClassName(dataSource, false)));
return MYSQL_DEFAULT_URL;
}
/**
* 根据连接池获取 jdbcUrl
*
* @param dataSource
* @return
* @throws NoSuchFieldException
* @throws IllegalAccessException
*/
private String switchDataSource(DataSource dataSource) throws NoSuchFieldException, IllegalAccessException {
String url = null;
if (dataSource instanceof DruidDataSource) {
url = ((DruidDataSource)dataSource).getUrl();
} else if (dataSource instanceof PooledDataSource) {
Class extends DataSource> dataSourceClass = dataSource.getClass();
Field dataSource1 = dataSourceClass.getDeclaredField("dataSource");
dataSource1.setAccessible(true);
UnpooledDataSource dataSource2 = (UnpooledDataSource)dataSource1.get(dataSource);
url = dataSource2.getUrl();
} else {
// 其他数据源 扩展
}
return url;
}
/**
* SQL 解析. 将?替换为实际值
*
* @param configuration
* @param boundSql
* @return
*/
public String sqlResolve(Configuration configuration, BoundSql boundSql) {
Object parameterObject = boundSql.getParameterObject();
List parameterMappings = boundSql.getParameterMappings();
String sql = boundSql.getSql()
.replaceAll("[\\s]+", " ");
if (parameterMappings.size() > 0 && parameterObject != null) {
TypeHandlerRegistry typeHandlerRegistry = configuration.getTypeHandlerRegistry();
if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
sql = sql.replaceFirst("\\?", Matcher.quoteReplacement(this.resolveParameterValue(parameterObject)));
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
Matcher matcher = PARAMETER_PATTERN.matcher(sql);
StringBuffer sqlBuffer = new StringBuffer();
for (ParameterMapping parameterMapping : parameterMappings) {
String propertyName = parameterMapping.getProperty();
Object obj = null;
if (metaObject.hasGetter(propertyName)) {
obj = metaObject.getValue(propertyName);
} else if (boundSql.hasAdditionalParameter(propertyName)) {
obj = boundSql.getAdditionalParameter(propertyName);
}
if (matcher.find()) {
matcher.appendReplacement(sqlBuffer, Matcher.quoteReplacement(resolveParameterValue(obj)));
}
}
matcher.appendTail(sqlBuffer);
sql = sqlBuffer.toString();
}
}
return sql;
}
private String resolveParameterValue(Object obj) {
String value;
if (obj instanceof String) {
value = StrUtil.format("'{}'", obj.toString());
} else if (obj instanceof Date) {
value = StrUtil.format("'{}'", DateFormat.getDateTimeInstance(DateFormat.DEFAULT, DateFormat.DEFAULT, Locale.CHINA).format((Date)obj));
} else {
value = obj == null ? "" : obj.toString();
}
return value;
}
/**
* 包装目标对象
*
* @param target
* @return
*/
@Override
public Object plugin(Object target) {
if (target instanceof Executor) {
// 借助Plugin.wrap(target, interceptor) 方法包装目标对象; 返回为当前target创建的动态代理对象
return Plugin.wrap(target, this);
}
return target;
}
/**
* 设置属性. 将插件注册时的property属性设置进来
*
* @param properties
*/
@Override
public void setProperties(Properties properties) {
}
}
```
> 将此文件和所有其他cat插件一同打包放到私有仓库上是一种更好的选择.
- mybatis-config.xml
```xml
```
#### 3.1.2.3. 测试
- 访问接口 http://localhost:7079/mybatis-cat/user/findAll
### 3.1.3. Log
Cat集成日志框架的思路大体上都类似, 采用SpringBoot默认的logback日志框架进行演示, 如果使用了log4j, log4j2处理方式也是类似的.
#### 3.1.3.1. 搭建环境
搭建Spring Boot初始化环境, 只需要添加Web依赖.
- pom.xml
```xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.1.13.RELEASE
cn.sheep
logback-cat
0.0.1-SNAPSHOT
logback-cat
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
com.dianping.cat
cat-client
3.0.0
org.springframework.boot
spring-boot-maven-plugin
```
- application.yml
```yaml
server:
port: 7080
servlet:
context-path: /logback-cat
logging:
path: D:\data\applogs\logback-cat
```
- logback-spring.xml
```xml
cat
%yellow(%d{yyyy-MM-dd HH:mm:ss}) %red([%thread]) %highlight(%-5level) %cyan(%logger{50}) - %magenta(%msg) %n
UTF-8
ERROR
DENY
ACCEPT
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n
UTF-8
${logging.path}/cat.info.%d{yyyy-MM-dd}.log
90
1GB
ERROR
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n
${logging.path}/cat.error.%d{yyyy-MM-dd}.log
90
```
#### 3.1.3.2. 集成cat-logback插件
- cat-logback插件
```java
package cn.sheep.logback.cat.catmonitor;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.ThrowableProxy;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.LogbackException;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.MessageManager;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* LogbackCat 适配器
*
* @author sheep
*/
public class CatLogbackAppender extends AppenderBase {
public static final String LOGBACK = "Logback";
@Override
protected void append(ILoggingEvent event) {
try {
MessageManager manager = Cat.getManager();
// 检查是否启用或禁用 CAT 跟踪模式
boolean isTraceMode = manager.isTraceMode();
Level level = event.getLevel();
if (level.isGreaterOrEqual(Level.ERROR)) {
this.logError(event);
} else if (isTraceMode) {
this.logTrace(event);
}
} catch (Exception ex) {
throw new LogbackException(event.getFormattedMessage(), ex);
}
}
private void logError(ILoggingEvent event) {
ThrowableProxy info = (ThrowableProxy)event.getThrowableProxy();
if (info != null) {
Throwable exception = info.getThrowable();
Object message = event.getFormattedMessage();
if (message != null) {
Cat.logError(String.valueOf(message), exception);
} else {
Cat.logError(exception);
}
}
}
private void logTrace(ILoggingEvent event) {
String name = event.getLevel().toString();
Object message = event.getFormattedMessage();
String data;
if (message instanceof Throwable) {
data = buildExceptionStack((Throwable)message);
} else {
data = event.getFormattedMessage();
}
ThrowableProxy info = (ThrowableProxy)event.getThrowableProxy();
if (info != null) {
data = data + '\n' + buildExceptionStack(info.getThrowable());
}
Cat.logTrace(LOGBACK, name, Message.SUCCESS, data);
}
private String buildExceptionStack(Throwable exception) {
if (exception == null) {
return "";
}
StringWriter writer = new StringWriter(2048);
exception.printStackTrace(new PrintWriter(writer));
return writer.toString();
}
}
```
- 修改 logback-spring.xml
```xml
```
- LogbackController
```java
package cn.sheep.logback.cat.controller;
import com.dianping.cat.Cat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* TestController
*
* @author yanglei
*/
@RestController
@RequestMapping("logback")
public class LogbackController {
private Logger log = LoggerFactory.getLogger(LogbackController.class);
/**
* test
*
* @return
*/
@GetMapping("test")
public String test() {
// 设置 CAT 跟踪模式。
Cat.getManager().setTraceMode(true);
log.info("cat info");
try {
int i = 1 / 0;
} catch (Exception e) {
log.error("cat error", e);
}
return "logback";
}
/**
* test2
*
* @return
*/
@GetMapping("test2")
public String test2() {
// 设置 CAT 跟踪模式。
Cat.getManager().setTraceMode(true);
log.info("cat info");
log.debug("cat debug");
log.warn("cat warn");
log.error("cat error");
return "logback2";
}
}
```
#### 3.1.3.3. 测试
- 访问接口: http://localhost:7080/logback-cat/logback/test
### 3.1.4. SpringBoot
#### 3.1.4.1. 搭建环境
SpringBoot的集成方式相对比较简单, 使用已经搭建完的MyBatis框架来进行测试
- 添加如下配置到config包中
```java
package cn.sheep.mybatis.cat.catmonitor;
import com.dianping.cat.servlet.CatFilter;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* SpringBoot cat 插件
*
* @author sheep
*/
@Configuration
public class CatFilterConfigure {
/**
* 注册过滤器
*
* @return
*/
@Bean
public FilterRegistrationBean catFilter() {
FilterRegistrationBean registration = new FilterRegistrationBean();
CatFilter filter = new CatFilter();
registration.setFilter(filter);
registration.addUrlPatterns("/*");
// 设置此注册的名称。 如果未指定,将使用 bean 名称
registration.setName("cat-filter");
registration.setOrder(1);
return registration;
}
}
```
#### 3.1.4.2. 测试
- 访问接口 http://localhost:7079/mybatis-cat/user/findAll

- 途中的调用线经过了controller, 所以打印出了相关信息
- /mybatis-cat/user/findAll: 接口地址
- URL.Server: 服务器, 浏览器等相关信息
- URL.Method: 调用方法(GET, POST等)和URL
### 3.1.5. Spring Aop
使用Spring Aop技术可以简化埋点操作, 通过添加统一注解的方式, 使得指定方法能被CAT监控起来.
#### 3.1.5.1. 搭建环境
创建基于SpringBoot的springaop-cat项目
- pom.xml
```xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.1.13.RELEASE
cn.sheep
springaop-cat
0.0.1-SNAPSHOT
springaop-cat
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-starter-aop
cn.hutool
hutool-all
5.7.3
com.dianping.cat
cat-client
3.0.0
org.springframework.boot
spring-boot-maven-plugin
```
- 创建AOP接口
```java
package cn.sheep.springaop.cat.catmonitor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
/**
* Cat 注解
*
* @author sheep
*/
@Retention(RUNTIME)
@Target(ElementType.METHOD)
public @interface CatAnnotation {}
```
- 创建AOP处理类
```java
package cn.sheep.springaop.cat.catmonitor;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.StrUtil;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* Aop 切面类. 用于cat埋点
*
* @author sheep
*/
@Component
@Aspect
public class CatAopService {
public static final String AOP_METHOD = "Aop.Method";
@Around(value = "@annotation(CatAnnotation)")
public Object aroundMethod(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature joinPointObject = (MethodSignature)pjp.getSignature();
Method method = joinPointObject.getMethod();
// 开启一个 Transaction. 类别: Aop.Method, 名称: 类名.方法名
String name = StrUtil.format("{}.{}", ClassUtil.getClassName(method.getDeclaringClass(), true), method.getName());
Transaction t = Cat.newTransaction(AOP_METHOD, name);
try {
Object res = pjp.proceed();
t.setSuccessStatus();
return res;
} catch (Throwable e) {
t.setStatus(e);
Cat.logError(e);
throw e;
} finally {
t.complete();
}
}
}
```
> 可自定义AOP实现类集成 `net.sf.cglib.proxy.MethodInterceptor`, 实现该逻辑更加优雅.
- 在方法上添加 `CatAnnotation`, 就可以被cat监控起来.
#### 3.1.5.2. 测试
- 访问接口: http://localhost:7081/aop/test
- 查看cat的transaction报表
### 3.1.6. Spring MVC
Spring MVC的集成方式, 官方提供的是使用AOP来进行集成, 源码如下:
- AOP接口
```java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CatTransaction {
String type() default "Handler";//"URL MVC Service SQL" is reserved for Cat Transaction Type
String name() default "";
}
```
- AOP处理代码
```java
@Around("@annotation(catTransaction)")
public Object catTransactionProcess(ProceedingJoinPoint pjp, CatTransaction catTransaction) throws Throwable {
String transName = pjp.getSignature().getDeclaringType().getSimpleName() + "." + pjp.getSignature().getName();
if (StringUtils.isNotBlank(catTransaction.name())) {
transName = catTransaction.name();
}
Transaction t = Cat.newTransaction(catTransaction.type(), transName);
try {
Object result = pjp.proceed();
t.setStatus(Transaction.SUCCESS);
return result;
} catch (Throwable e) {
t.setStatus(e);
throw e;
} finally {
t.complete();
}
}
```
这部分与Spirng Aop处理方式基本一样
## 3.2. 告警配置
cat提供晚上的告警功能. 合理, 灵活的监控规则可以帮助更快, 更精确的发现业务线上故障.
### 3.2.1. 告警通用配置
##### 告警服务器配置
- 只有配置为告警服务器的机器, 才会执行告警逻辑; 只有配置为发送服务器的机器, 才会发送告警.
- 进入`Configs -> 全局系统配置 -> 服务端配置`, 修改服务器类型, 告警配置. 如图所示

##### 告警策略
- 配置某种告警类型, 某个项目, 某个错误级别, 对应的告警发送渠道, 以及暂停事件.

- 举例: 下述配置示例, 说明对于Transaction告警, 当告警项目名为mybatis-cat:
- 当告警级别为error时, 发送渠道为邮件, 短信, 微信, 连续告警之间的间隔为5分钟
- 当告警级别为warning时, 发送渠道为邮件, 微信, 连续告警之间的间隔为10分钟
###### 配置示例
```xml
```
###### 配置说明
- type id属性: 告警的类型, 可选: Transaction, Event, Business, Heartbeat
- group id属性: group可以为default, 代表默认, 即所有项目; 也可以为项目名, 代表某个项目的策略, 此时default策略不会生效
- level id属性: 错误级别, 分为warning代表警告, error代表错误
- level send属性: 告警渠道, 分为mail-邮箱, weixin-微信, sms-短信
- level suspendMinute属性: 连续告警的暂停时间
##### 告警接收人
- 告警接收人, 为告警所属项目的联系人
- 项目组邮件: 项目负责人邮件, 或项目组产品线邮件, 多个邮箱由英文逗号分隔, 不要留有空格; 作为发送告警邮件, 微信的依据
- 项目组号码: 项目负责人手机号, 多个号码由英文逗号分隔, 不要留有空格; 作为发送告警短信的依据

##### 告警服务端
- 告警发送中心的配置. (什么是告警发送中心: 提供发送短信, 邮件, 微信功能, 且提供http api的服务)
- cat在生成告警时, 调用告警发送中心的http接口发送告警. cat资深并不集成告警发送中心, 请自己搭建告警发送中心.

###### 配置示例
```xml
```
###### 配置说明
- sender id属性: 告警的类型, 可选: mail, sms, weixin
- sender url属性: 告警中心的URL
- sender batchSend属性: 是否支持批量发送告警信息
- par: 告警中心所需的http参数. ${argument}代表构建告警对象时, 附带的动态参数; 此处需要根据告警发送中心的需求, 将动态参数假如到代码AlertEntity中的m_paras
### 3.2.2. 告警规则
目前cat的监控规则有5个要素:
1. 告警时间段: 同一项业务指标在每天不同的时段可能有不同的趋势. 设定该项, 可让cat在每天不同的时间段执行不同的监控规则. 注意: 告警时间段, 不是监控数据的时间段, 只是告警从这一刻开始进行检查数据
2. 规则组合. 在一个时间段中, 可能指标触发了多个监控规则中的一个规则就要发出警报, 也有可能指标要同时触发了多个监控规则才需要发出警报.
3. 监控规则类型. 通过以下6种类型对指标进行监控: 最大值, 最小值, 波动上升百分比, 波动下降百分比, 总和最大值, 总和最小值
4. 监控最近分钟数. 设定时间后(单位为分钟), 当指标在设定的最近的时间长度内连续触发了监控规则, 才会发出警报. 比如最近分钟数为3, 表明连续3分钟的数组都满足条件才告警. 如果分钟数为1, 表示最近的一分钟满足条件就告警
5. 规则与被监控指标的匹配. 监控规则可以按照名称, 正则表达式与监控的对象(指标)进行匹配
##### 子条件类型
有6种类型. 子条件的内容为对应的阈值, 请注意阈值只能由数字组成, 当阈值表达百分比时, 不能在最后加上百分号. 8种类型如下:
| 类型 | 说明 |
| ----------------------------------- | ------------------------------------------------------------ |
| MaxVal 最大值(当前值) | 当前实际值 最大值,比如检查最近3分钟数据,3分钟数据会有3个value,是表示(>=N)个值都必须同时>=设定值 |
| MinVal 最小值(当前值) | 当前实际值 最小值,比如检查最近3分钟数据,3分钟数据会有3个value,是表示(>=N)个值都必须同时比<=设定值 |
| FluAscPer 波动上升百分比(当前值) | 波动百分比最大值。即当前最后(N)分钟值比监控周期内其它分钟值(M-N个)的增加百分比都>=设定的百分比时触发警报,比如检查最近10分钟数据,触发个数为3;10分钟内数据会算出7个百分比数据,是表示最后3分钟值分别相比前面7分钟值,3组7次比较的上升波动百分比全部>=配置阈值。比如下降50%,阈值填写50。 |
| FluDescPer 波动下降百分比(当前值) | 波动百分比最小值。当前最后(N)分钟值比监控周期内其它(M-N个)分钟值的减少百分比都大于设定的百分比时触发警报,比如检查最近10分钟数据,触发个数为3;10分钟数据会算出7个百分比数据,是表示最后3分钟值分别相比前面7分钟值,3组7次比较的下降波动百分比全部>=配置阈值。比如下降50%,阈值填写50。 |
| SumMaxVal 总和最大值(当前值) | 当前值总和最大值,比如检查最近3分钟数据,表示3分钟内的总和>=设定值就告警。 |
| SumMinVal 总和最小值(当前值) | 当前值总和最小值,比如检查最近3分钟数据,表示3分钟内的总和<=设定值就告警。 |
##### Transaction告警
对Transaction的告警, 支持的指标有次数, 延时, 失败率; 监控周期; 一分钟
###### 配置图示
如下图所示, 配置了mybatis-cat项目的Transaction监控规则

###### 配置说明
- 项目: 要监控的项目名
- Type: 被监控Transaction的type
- Name: 被监控Transaction的name; 如果为All, 代表全部name
- 监控项: 次数, 延时, 失败率
- 告警规则: 详见**告警规则**部分
##### Event告警
对Event的个数进行告警; 监控周期: 一分钟
##### 心跳告警
心跳告警是对服务器当前状态的监控, 如监控系统负载, GC数量等信息; 监控周期: 一分钟
##### 异常告警
对异常的个数进行告警; 监控周期: 一分钟
### 3.2.3. 告警接口编写
编写controller接口
```java
package cn.sheep.springboot.cat.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 告警
*
* @author yanglei
*/
@Slf4j
@RestController
@RequestMapping("alert")
public class AlertController {
/**
* 发送告警
*
* @return
*/
@RequestMapping("msg")
public String msg(@RequestParam String to, @RequestParam String value) {
log.warn("告警触发了. 接收人: {}; 告警内容: {}", to, value);
/*
告警服务端
*/
return "200";
}
}
```
修改告警服务端的配置, 填写接口地址, 以邮件为例
##### 配置示例
```xml
```
测试结果, 输入内容如下:
```ini
2021-07-17 16:45:00.657 WARN 4180 --- [nio-9000-exec-1] c.s.s.cat.controller.AlertController : 告警触发了. 接收人: testUser1@test.com,testUser2@test.com,121529654@qq.com; 告警内容: [CAT Transaction告警] [项目: mybatis-cat] [监控项: URL-All-count],[CAT Transaction告警: mybatis-cat URL All] : [实际值:100 ] [最大阈值: 5 ][告警时间:2021-07-17 16:45:00]
[时间: 2021-07-17 16:45]
点击此处查看详情
[告警间隔时间]5分钟
```
# 4. Cat原理
- 客户端原理介绍
- 服务端原理介绍
## 4.1. 客户端原理
### 4.1.1. 客户端设计
**客户端设计**
客户端设计是CAT系统设计中最为核心的一个环节, 客户端要求是做到API简单, 高可靠性能, 因为监控只是公司核心业务流程一个旁路环节, 无论在任何场景下都不能影响业务性能.
**架构设计**
CAT客户端在手机端数据方面使用ThreadLocal(线程局部变量), 是线程本地变量, 也可以称之为线程本地存储. 其实ThreadLocal的功用非常简单, 就是为每一个使用该变量的线程都提供一个变量值的副本, 属于Java中一种较为特殊的线程绑定机制, 每一个线程都可以独立地改变自己的副本, 不会和其它线程的副本冲突.
在监控场景下, 为用户提供服务都是Web容器, 比如tomcat或者Jetty, 后端的RPC服务端比如Dubbo或者Pigeon, 也都是基于线程池来实现的. 业务方在处理业务逻辑时基本都是在一个线程内部调用后端服务, 数据库, 缓存等, 将这些数据拿回来再进行业务逻辑封装, 最后将结果展示给用户. 所以将所有的监控请求作为一个监控上下文存入线程变量就非常合适.

如上图所示, 业务执行业务逻辑的时候, 就会把此次请求对应的监控存放于线程上下文中, 存于上下文的其实是一个监控树的结构. 在最后业务线程执行结束时, 将监控对象存入一个异步内存队列中, CAT有个消费线程将队列内的数据异步发送到服务端.
**总结流程如下**:
- 业务线程产生消息, 交给消息Producer, 消息Producer将消息存放在该业务线程**消息栈**中;
- 业务线程通知消息Producer消息结束时, 消息Producer根据其消息栈产生**消息树**放置在同步消息队列中;
- 消息上报线程监听消息队列, 根据消息树产生最终的消息报文上报CAT服务端.
**API设计**
监控API定义往往取决于对监控或者性能分析这个领域的理解, 监控和性能分析所针对的场景有如下几种:
- 一段代码的执行时间, 一段代码可以是URL执行耗时, 也可以是SQL的执行耗时.
- 一段代码的执行次数, 比如Java抛出异常记录次数, 或者一段逻辑的执行次数.
- 定期执行某段代码, 比如定期上报一些核心指标: JVM内存, GC等指标.
- 关键的业务监控指标, 比如监控订单数, 交易额, 支付成功率等.
在上述领域模型的基础上, CAT设计自己核心的几个监控对象: Transaction, Event, Heartbeat, Metric.

**序列化和通信**
序列化和通信是整个客户端包括服务端性能里面很关键的一个环节.
- CAT序列化协议是自定义序列化协议, 自定义序列化协议相比通用序列化协议要高效很多, 这个在大规模数据实时处理场景下还是非常有必要的.
- CAT通信是基于Netty来实现的NIO的数据传输, Netty是一个非常好的NIO开发框架, 在这边就不详细介绍了.
### 4.1.2. 核心类分析


CAT将监控的内容分为了4种: Transaction, Event, Heartbeat, Metric
使用4个接口定义他们的行为, 对应的实现类命名方式均为Default+接口名. 他们都继承自Message接口. 这个接口中主要用来提供通用性的方法, 比如添加数据, 设置状态等
### 4.1.3. 流程分析
#### **启动**
1. 懒加载创建Cat客户端对象
```java
package com.dianping.cat;
public static MessageProducer getProducer() {
try {
// 懒加载
checkAndInitialize();
MessageProducer producer = s_instance.m_producer;
if (producer != null) {
return producer;
} else {
return NullMessageProducer.NULL_MESSAGE_PRODUCER;
}
} catch (Exception e) {
errorHandler(e);
return NullMessageProducer.NULL_MESSAGE_PRODUCER;
}
}
```
2. 读取`client.xml`
```java
package com.dianping.cat;
private static void checkAndInitialize() {
try {
if (!s_init) {
initialize(new File(getCatHome(), "client.xml"));
}
} catch (Exception e) {
errorHandler(e);
}
}
```
```java
package com.dianping.cat;
public static String getCatHome() {
// CAT_HOME_DEFAULT_DIR = "/data/appdatas/cat/"
String catHome = CatPropertyProvider.INST.getProperty("CAT_HOME", CatConstants.CAT_HOME_DEFAULT_DIR);
if (!catHome.endsWith("/")) {
catHome = catHome + "/";
}
return catHome;
}
```
3. 加载模块
```java
package com.dianping.cat;
// this should be called during application initialization time
public static void initialize(File configFile) {
try {
if (!s_init) {
synchronized (s_instance) {
if (!s_init) {
// 使用点评Plexus容器加载对应的模块
PlexusContainer container = ContainerLoader.getDefaultContainer();
ModuleContext ctx = new DefaultModuleContext(container);
Module module = ctx.lookup(Module.class, CatClientModule.ID);
if (!module.isInitialized()) {
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
ctx.setAttribute("cat-client-config-file", configFile);
initializer.execute(ctx, module);
}
log("INFO", "Cat is lazy initialized!");
s_init = true;
}
}
}
} catch (Exception e) {
errorHandler(e);
}
}
```
#### **创建Message**
1. 创建一个新的Transaction
```java
package com.dianping.cat
public static Transaction newTransaction(String type, String name) {
try {
return Cat.getProducer().newTransaction(type, name);
} catch (Exception e) {
errorHandler(e);
return NullMessage.TRANSACTION;
}
}
```
```java
package com.dianping.cat.message.internal;
@Override
public Transaction newTransaction(String type, String name) {
// this enable CAT client logging cat message without explicit setup
// 创建线程上下文
if (!m_manager.hasContext()) {
m_manager.setup();
}
// 创建Transaction
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
// 添加到线程上下文中
m_manager.start(transaction, false);
return transaction;
}
```
2. 创建线程上下文
```java
package com.dianping.cat.message.internal;
@Override
public void setup() {
Context ctx;
if (m_domain != null) {
ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
} else {
ctx = new Context("Unknown", m_hostName, "");
}
double samplingRate = m_configManager.getSampleRatio();
if (samplingRate < 1.0 && hitSample(samplingRate)) {
ctx.m_tree.setHitSample(true);
}
m_context.set(ctx);
}
```
3. 添加Transaction到线程上下文中
```java
package com.dianping.cat.message.internal;
@Override
public void start(Transaction transaction, boolean forked) {
Context ctx = getContext();
if (ctx != null) {
ctx.start(transaction, forked);
if (transaction instanceof TaggedTransaction) {
TaggedTransaction tt = (TaggedTransaction) transaction;
m_taggedTransactions.put(tt.getTag(), tt);
}
} else if (m_firstMessage) {
m_firstMessage = false;
m_logger.warn("CAT client is not enabled because it's not initialized yet");
}
}
```
4. 添加Transaction到DefaultMessageTree中
```java
package com.dianping.cat.message.internal;
public void start(Transaction transaction, boolean forked) {
if (!m_stack.isEmpty()) {
// Do NOT make strong reference from parent transaction to forked transaction.
// Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway()
// By doing so, there is no need for synchronization between parent and child threads.
// Both threads can complete() anytime despite the other thread.
if (!(transaction instanceof ForkedTransaction)) {
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
}
} else {
m_tree.setMessage(transaction);
}
if (!forked) {
m_stack.push(transaction);
}
}
```
5. 关闭Transaction
```java
package com.dianping.cat.message.internal;
@Override
public void complete() {
try {
if (isCompleted()) {
// complete() was called more than once
DefaultEvent event = new DefaultEvent("cat", "BadInstrument");
event.setStatus("TransactionAlreadyCompleted");
event.complete();
addChild(event);
} else {
if (m_durationInMicro == -1) {
m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;
}
setCompleted(true);
if (m_manager != null) {
// 关闭这一次的
m_manager.end(this);
}
}
} catch (Exception e) {
// ignore
}
}
```
```java
package com.dianping.cat.message.internal;
@Override
public void end(Transaction transaction) {
Context ctx = getContext();
if (ctx != null && transaction.isStandalone()) {
if (ctx.end(this, transaction)) {
m_context.remove();
}
}
}
```
这里需要介绍一下, 消息进入到线程上下文后, 是通过栈的方式来存储的
Transaction之间是有应用的, 因此在end方法中只需要将第一个Transaction(封装在MessageTree中), 通过MessageManager来flush, 在拼接消息时可以根据这个引用关系来找到所有的Transaction.
```java
package com.dianping.cat.message.internal;
/**
* return true means the transaction has been flushed.
*
* @param manager
* @param transaction
* @return true if message is flushed, false otherwise
*/
public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.pop();
// 取到栈底的Transaction
if (transaction == current) {
m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
} else {
while (transaction != current && !m_stack.empty()) {
m_validator.validate(m_stack.peek(), current);
current = m_stack.pop();
}
}
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
m_tree.setMessageId(null);
m_tree.setMessage(null);
if (m_totalDurationInMicros > 0) {
adjustForTruncatedTransaction((Transaction) tree.getMessage());
}
// 刷新并发送数据
manager.flush(tree, true);
return true;
}
}
return false;
}
```
#### **发送数据**
1. 首先获取到发送类的对象, 调用其方法进行发送
```java
package com.dianping.cat.message.internal;
public void flush(MessageTree tree, boolean clearContext) {
// 获取发送类对象
MessageSender sender = m_transportManager.getSender();
if (sender != null && isMessageEnabled()) {
// 发送
sender.send(tree);
if (clearContext) {
reset();
}
} else {
m_throttleTimes++;
if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
}
```
2. 发送时是经典的`生产者-消费者`模型, 生产者只需要向队列中放入数据, 消费者监听队列, 获取数据并发送
```java
package com.dianping.cat.message.internal;
public void flush(MessageTree tree, boolean clearContext) {
// 获取发送类对象
MessageSender sender = m_transportManager.getSender();
if (sender != null && isMessageEnabled()) {
// 发送
sender.send(tree);
if (clearContext) {
reset();
}
} else {
m_throttleTimes++;
if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
}
```
```java
package com.dianping.cat.message.io;
private void offer(MessageTree tree) {
if (m_configManager.isAtomicMessage(tree)) {
boolean result = m_atomicQueue.offer(tree);
if (!result) {
logQueueFullInfo(tree);
}
} else {
// 生产者-消费者模型
boolean result = m_queue.offer(tree);
if (!result) {
logQueueFullInfo(tree);
}
}
}
```
消费者线程拉取消息:
```java
package com.dianping.cat.message.io;
private void processNormalMessage() {
while (true) {
ChannelFuture channel = m_channelManager.channel();
if (channel != null) {
try {
// 拉取消息
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(channel, tree);
tree.setMessage(null);
} else {
try {
Thread.sleep(5);
} catch (Exception e) {
m_active = false;
}
break;
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
try {
Thread.sleep(5);
} catch (Exception e) {
m_active = false;
}
}
}
}
```
使用自定义的序列化方式进行序列化, 最后使用Netty发送数据:
```java
package com.dianping.cat.message.io;
public void sendInternal(ChannelFuture channel, MessageTree tree) {
if (tree.getMessageId() == null) {
tree.setMessageId(m_factory.getNextId());
}
// 自定义序列化方式
ByteBuf buf = m_codec.encode(tree);
int size = buf.readableBytes();
// 使用Netty进行数据发送
channel.channel().writeAndFlush(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
```
Cat使用了自定义的序列化方式
```java
/*
* Copyright (c) 2011-2018, Meituan Dianping. All Rights Reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dianping.cat.message.spi.codec;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Stack;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Metric;
import com.dianping.cat.message.Trace;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultEvent;
import com.dianping.cat.message.internal.DefaultHeartbeat;
import com.dianping.cat.message.internal.DefaultMetric;
import com.dianping.cat.message.internal.DefaultTrace;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class NativeMessageCodec implements MessageCodec {
public static final String ID = "NT1"; // native message tree version 1
@Override
public MessageTree decode(ByteBuf buf) {
buf.readInt(); // read the length of the message tree
DefaultMessageTree tree = new DefaultMessageTree();
Context ctx = new Context(tree);
Codec.HEADER.decode(ctx, buf);
Message msg = decodeMessage(ctx, buf);
tree.setMessage(msg);
tree.setBuffer(buf);
return tree;
}
private Message decodeMessage(Context ctx, ByteBuf buf) {
Message msg = null;
while (buf.readableBytes() > 0) {
char ch = ctx.readId(buf);
switch (ch) {
case 't':
Codec.TRANSACTION_START.decode(ctx, buf);
break;
case 'T':
msg = Codec.TRANSACTION_END.decode(ctx, buf);
break;
case 'E':
Message e = Codec.EVENT.decode(ctx, buf);
ctx.addChild(e);
break;
case 'M':
Message m = Codec.METRIC.decode(ctx, buf);
ctx.addChild(m);
break;
case 'H':
Message h = Codec.HEARTBEAT.decode(ctx, buf);
ctx.addChild(h);
break;
case 'L':
Message l = Codec.TRACE.decode(ctx, buf);
ctx.addChild(l);
break;
default:
throw new RuntimeException(String.format("Unsupported message type(%s).", ch));
}
}
if (msg == null) {
msg = ctx.getMessageTree().getMessage();
}
return msg;
}
@Override
public ByteBuf encode(MessageTree tree) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 * 1024);
try {
Context ctx = new Context(tree);
buf.writeInt(0); // place-holder
Codec.HEADER.encode(ctx, buf, null);
Message msg = tree.getMessage();
if (msg != null) {
encodeMessage(ctx, buf, msg);
}
int readableBytes = buf.readableBytes();
buf.setInt(0, readableBytes - 4); // reset the message size
return buf;
} catch (RuntimeException e) {
buf.release();
throw e;
}
}
private void encodeMessage(Context ctx, ByteBuf buf, Message msg) {
if (msg instanceof Transaction) {
Transaction transaction = (Transaction) msg;
List children = transaction.getChildren();
Codec.TRANSACTION_START.encode(ctx, buf, msg);
for (Message child : children) {
if (child != null) {
encodeMessage(ctx, buf, child);
}
}
Codec.TRANSACTION_END.encode(ctx, buf, msg);
} else if (msg instanceof Event) {
Codec.EVENT.encode(ctx, buf, msg);
} else if (msg instanceof Metric) {
Codec.METRIC.encode(ctx, buf, msg);
} else if (msg instanceof Heartbeat) {
Codec.HEARTBEAT.encode(ctx, buf, msg);
} else if (msg instanceof Trace) {
Codec.TRACE.encode(ctx, buf, msg);
} else {
throw new RuntimeException(String.format("Unsupported message(%s).", msg));
}
}
@Override
public void reset() {
}
enum Codec {
HEADER {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
MessageTree tree = ctx.getMessageTree();
String version = ctx.getVersion(buf);
if (ID.equals(version)) {
tree.setDomain(ctx.readString(buf));
tree.setHostName(ctx.readString(buf));
tree.setIpAddress(ctx.readString(buf));
tree.setThreadGroupName(ctx.readString(buf));
tree.setThreadId(ctx.readString(buf));
tree.setThreadName(ctx.readString(buf));
tree.setMessageId(ctx.readString(buf));
tree.setParentMessageId(ctx.readString(buf));
tree.setRootMessageId(ctx.readString(buf));
tree.setSessionToken(ctx.readString(buf));
} else {
throw new RuntimeException(String.format("Unrecognized version(%s) for binary message codec!", version));
}
return null;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
MessageTree tree = ctx.getMessageTree();
ctx.writeVersion(buf, ID);
ctx.writeString(buf, tree.getDomain());
ctx.writeString(buf, tree.getHostName());
ctx.writeString(buf, tree.getIpAddress());
ctx.writeString(buf, tree.getThreadGroupName());
ctx.writeString(buf, tree.getThreadId());
ctx.writeString(buf, tree.getThreadName());
ctx.writeString(buf, tree.getMessageId());
ctx.writeString(buf, tree.getParentMessageId());
ctx.writeString(buf, tree.getRootMessageId());
ctx.writeString(buf, tree.getSessionToken());
}
},
TRANSACTION_START {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
long timestamp = ctx.readTimestamp(buf);
String type = ctx.readString(buf);
String name = ctx.readString(buf);
if ("System".equals(type) && name.startsWith("UploadMetric")) {
name = "UploadMetric";
}
DefaultTransaction t = new DefaultTransaction(type, name);
t.setTimestamp(timestamp);
ctx.pushTransaction(t);
MessageTree tree = ctx.getMessageTree();
if (tree instanceof DefaultMessageTree) {
tree.getTransactions().add(t);
}
return t;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
ctx.writeId(buf, 't');
ctx.writeTimestamp(buf, msg.getTimestamp());
ctx.writeString(buf, msg.getType());
ctx.writeString(buf, msg.getName());
}
},
TRANSACTION_END {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
String status = ctx.readString(buf);
String data = ctx.readString(buf);
long durationInMicros = ctx.readDuration(buf);
DefaultTransaction t = ctx.popTransaction();
t.setStatus(status);
t.addData(data);
t.setDurationInMicros(durationInMicros);
return t;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
Transaction t = (Transaction) msg;
ctx.writeId(buf, 'T');
ctx.writeString(buf, msg.getStatus());
ctx.writeString(buf, msg.getData().toString());
ctx.writeDuration(buf, t.getDurationInMicros());
}
},
EVENT {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
long timestamp = ctx.readTimestamp(buf);
String type = ctx.readString(buf);
String name = ctx.readString(buf);
String status = ctx.readString(buf);
String data = ctx.readString(buf);
DefaultEvent e = new DefaultEvent(type, name);
e.setTimestamp(timestamp);
e.setStatus(status);
e.addData(data);
MessageTree tree = ctx.getMessageTree();
if (tree instanceof DefaultMessageTree) {
tree.getEvents().add(e);
}
return e;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
ctx.writeId(buf, 'E');
ctx.writeTimestamp(buf, msg.getTimestamp());
ctx.writeString(buf, msg.getType());
ctx.writeString(buf, msg.getName());
ctx.writeString(buf, msg.getStatus());
ctx.writeString(buf, msg.getData().toString());
}
},
METRIC {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
long timestamp = ctx.readTimestamp(buf);
String type = ctx.readString(buf);
String name = ctx.readString(buf);
String status = ctx.readString(buf);
String data = ctx.readString(buf);
DefaultMetric m = new DefaultMetric(type, name);
m.setTimestamp(timestamp);
m.setStatus(status);
m.addData(data);
MessageTree tree = ctx.getMessageTree();
if (tree instanceof DefaultMessageTree) {
tree.getMetrics().add(m);
}
return m;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
ctx.writeId(buf, 'M');
ctx.writeTimestamp(buf, msg.getTimestamp());
ctx.writeString(buf, msg.getType());
ctx.writeString(buf, msg.getName());
ctx.writeString(buf, msg.getStatus());
ctx.writeString(buf, msg.getData().toString());
}
},
HEARTBEAT {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
long timestamp = ctx.readTimestamp(buf);
String type = ctx.readString(buf);
String name = ctx.readString(buf);
String status = ctx.readString(buf);
String data = ctx.readString(buf);
DefaultHeartbeat h = new DefaultHeartbeat(type, name);
h.setTimestamp(timestamp);
h.setStatus(status);
h.addData(data);
MessageTree tree = ctx.getMessageTree();
if (tree instanceof DefaultMessageTree) {
tree.getHeartbeats().add(h);
}
return h;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
ctx.writeId(buf, 'H');
ctx.writeTimestamp(buf, msg.getTimestamp());
ctx.writeString(buf, msg.getType());
ctx.writeString(buf, msg.getName());
ctx.writeString(buf, msg.getStatus());
ctx.writeString(buf, msg.getData().toString());
}
},
TRACE {
@Override
protected Message decode(Context ctx, ByteBuf buf) {
long timestamp = ctx.readTimestamp(buf);
String type = ctx.readString(buf);
String name = ctx.readString(buf);
String status = ctx.readString(buf);
String data = ctx.readString(buf);
DefaultTrace t = new DefaultTrace(type, name);
t.setTimestamp(timestamp);
t.setStatus(status);
t.addData(data);
return t;
}
@Override
protected void encode(Context ctx, ByteBuf buf, Message msg) {
ctx.writeId(buf, 'L');
ctx.writeTimestamp(buf, msg.getTimestamp());
ctx.writeString(buf, msg.getType());
ctx.writeString(buf, msg.getName());
ctx.writeString(buf, msg.getStatus());
ctx.writeString(buf, msg.getData().toString());
}
};
protected abstract Message decode(Context ctx, ByteBuf buf);
protected abstract void encode(Context ctx, ByteBuf buf, Message msg);
}
private static class Context {
private static Charset UTF8 = Charset.forName("UTF-8");
private MessageTree m_tree;
private Stack m_parents = new Stack();
private byte[] m_data = new byte[256];
public Context(MessageTree tree) {
m_tree = tree;
}
public void addChild(Message msg) {
if (!m_parents.isEmpty()) {
m_parents.peek().addChild(msg);
} else {
m_tree.setMessage(msg);
}
}
public MessageTree getMessageTree() {
return m_tree;
}
public String getVersion(ByteBuf buf) {
byte[] data = new byte[3];
buf.readBytes(data);
return new String(data);
}
public DefaultTransaction popTransaction() {
return m_parents.pop();
}
public void pushTransaction(DefaultTransaction t) {
if (!m_parents.isEmpty()) {
m_parents.peek().addChild(t);
}
m_parents.push(t);
}
public long readDuration(ByteBuf buf) {
return readVarint(buf, 64);
}
public char readId(ByteBuf buf) {
return (char) buf.readByte();
}
public String readString(ByteBuf buf) {
int len = (int) readVarint(buf, 32);
if (len == 0) {
return "";
} else if (len > m_data.length) {
m_data = new byte[len];
}
buf.readBytes(m_data, 0, len);
return new String(m_data, 0, len, StandardCharsets.UTF_8);
}
public long readTimestamp(ByteBuf buf) {
return readVarint(buf, 64);
}
protected long readVarint(ByteBuf buf, int length) {
int shift = 0;
long result = 0;
while (shift < length) {
final byte b = buf.readByte();
result |= (long) (b & 0x7F) << shift;
if ((b & 0x80) == 0) {
return result;
}
shift += 7;
}
throw new RuntimeException("Malformed variable int " + length + "!");
}
public void writeDuration(ByteBuf buf, long duration) {
writeVarint(buf, duration);
}
public void writeId(ByteBuf buf, char id) {
buf.writeByte(id);
}
public void writeString(ByteBuf buf, String str) {
if (str == null || str.length() == 0) {
writeVarint(buf, 0);
} else {
byte[] data = str.getBytes(UTF8);
writeVarint(buf, data.length);
buf.writeBytes(data);
}
}
public void writeTimestamp(ByteBuf buf, long timestamp) {
writeVarint(buf, timestamp); // TODO use relative value of root message timestamp
}
private void writeVarint(ByteBuf buf, long value) {
while (true) {
if ((value & ~0x7FL) == 0) {
buf.writeByte((byte) value);
return;
} else {
buf.writeByte(((byte) value & 0x7F) | 0x80);
value >>>= 7;
}
}
}
public void writeVersion(ByteBuf buf, String version) {
buf.writeBytes(version.getBytes());
}
}
}
```
根据不同的数据类型, 进行写入即可.
## 4.2. 服务端原理
### 4.2.1. 架构设计
单体的consumer架构设计如下:

如上图, CAT服务端在整个实时处理中, 基本上实现了全异步化处理.
- 消息接受是基于Netty的NIO实现.
- 消息接受到服务端就存放内存队列, 然后程序开启一个线程回消费整个消息做消息分发.
- 每个消息都会有一批线程并发消费各自队列的数据, 以做到消息处理的隔离.
- 消息存储是先存入本地磁盘, 然后异步上传到HDFS文件, 这也避免了强依赖HDFS.
当某个报表处理器处理来不及的时候, 比如Transaction报表处理比较慢, 可以通过配置支持开启多个Transaction处理线程, 并发消费消息.

### 4.2.2. 消息ID设计
CAT每个消息都有一个唯一的ID,这个ID在客户端生成,后续都通过这个ID在进行消息内容的查找。典型的RPC消息串起来的问题,比如A调用B的时候,在A这端生成一个Message-ID,在A调用B的过程中,将Message-ID作为调用传递到B端,在B执行过程中,B用context传递的Message-ID作为当前监控消息的Message-ID。
CAT消息的Message-ID格式ShopWeb-0a010680-375030-2,CAT消息一共分为四段:
- 第一段是应用名shop-web。
- 第二段是当前这台机器的IP的16进制格式,0a01010680表示10.1.6.108。
- 第三段的375030,是系统当前时间除以小时得到的整点数。
- 第四段的2,是表示当前这个客户端在当前小时的顺序递增号。
### 4.2.3. 存储数据设计
消息存储是CAT最有挑战的部分。关键问题是消息数量多且大,目前美团每天处理消息1000亿左右,大小大约100TB,单物理机高峰期每秒要处理100MB左右的流量。CAT服务端基于此流量做实时计算,还需要将这些数据压缩后写入磁盘。
整体存储结构如下图:

CAT在写数据一份是Index文件,一份是Data文件.
- Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。
- 一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。
- 48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。
- CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内偏移地址读取出真正的消息内容。
### 服务端设计总结
CAT在分布式实时方面,主要归结于以下几点因素:
- 去中心化,数据分区处理。
- 基于日志只读特性,以一个小时为时间窗口,实时报表基于内存建模和分析,历史报表通过聚合完成。
- 基于内存队列,全面异步化、单线程化、无锁设计。
- 全局消息ID,数据本地化生产,集中式存储。
- 组件化、服务化理念。