# PostgreSQL数据库WAL监听 **Repository Path**: logicjwell/pg-wal-listen ## Basic Information - **Project Name**: PostgreSQL数据库WAL监听 - **Description**: 利用PostgreSQL数据库WAL日志,来进行表数据变更监听 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2025-01-11 - **Last Updated**: 2025-01-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # pg-wal-listen #### 介绍 Postgres数据库同步,恢复工具:读取WAL日志,可选择推送到Kafka或自定义处理。 ![pg监听效果](src/main/resources/image.png) #### 软件架构 * Flink 1.13.6 + FlinkCDC 2.1.1 * Postgres ``` #### Postgres vi /var/lib/postgresql/data/postgresql.conf ```roomsql # 更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # max number of replication slots # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样 max_wal_senders = 20 # max number of walsender processes # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s) wal_sender_timeout = 180s # in milliseconds; 0 disable  ``` ##### 普通账户需要开权限-postgres ```roomsql # 开权限 -- pg新建用户 CREATE USER test1 WITH PASSWORD 'test123'; -- 给用户复制流权限 ALTER ROLE test1 replication; -- 给用户登录数据库权限 GRANT CONNECT ON DATABASE test_db to test1; -- 把当前库public下所有表查询权限赋给用户 GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1; # 发布表 -- 设置发布为true update pg_publication set puballtables=true where pubname is not null; -- 把所有表进行发布 CREATE PUBLICATION dbz_publication FOR ALL TABLES; -- 查询哪些表已经发布 select * from pg_publication_tables; # 更改表的复制标识包含更新和删除的值 -- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性) ALTER TABLE t_user REPLICA IDENTITY FULL; -- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置) select relreplident from pg_class where relname='t_user'; ``` #### 配置文件-application.yml ```yaml ot: jdbc: type: postgres url: 127.0.0.1:15432 username: postgres password: password zone: UTC # 时区,解决时间+8问题 tables: - postgres.public.t_user - postgres.public.test cdc: target: type: kafka kafka: hosts: 127.0.0.1:9092 topic: test # 不写按表名为topic推送 # 鉴权 # mechanism: SCRAM-SHA-256 # config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";' # protocol: SASL_PLAINTEXT ```