PostgreSQL
安装 Agent 后,您需要在 Tapdata Cloud 平台为 Agent 和 PostgreSQL 数据库建立连接,完成操作后即可在数据复制/开发任务中使用该数据源。本文介绍建立连接前的准备工作(如授权账号等)。
支持版本
PostgreSQL 9.4、9.5、9.6、10.x、11.x、12版本
增量数据读取原理
为实现增量数据的读取,Tapdata Cloud 需借助 PostgreSQL 的逻辑解码功能,提取提交到事务日志中的更改,并通过插件以用户友好的方式处理这些更改。支持的变更数据捕获(CDC)如下:
- 逻辑解码(Logical Decoding):用于从 WAL 日志中解析逻辑变更事件
- 复制协议(Replication Protocol):提供了消费者实时订阅(甚至同步订阅)数据库变更的机制
- 快照导出(export snapshot):允许导出数据库的一致性快照(pg_export_snapshot)
- 复制槽(Replication Slot):用于保存消费者偏移量,跟踪订阅者进度。
作为源库
以管理员身份登录 PostgreSQL 数据库。
修改复制标识为 FULL(使用整行作为标识),该属性决定了当数据发生 UPDATE/DELETE 时,日志记录的字段。
ALTER TABLE '[schema]'.'[table name]' REPLICA IDENTITY FULL;
安装解码器插件,根据业务需求和当前版本选择:
Wal2json(9.4 及以上)
如果源表没有主键,则无法实现同步执行删除操作。
Decoderbufs(9.6 及以上)
Pgoutput(10.0 及以上)
以 Wal2json 为例,安装步骤如下:
确保环境变量 PATH 中包含
/bin
。export PATH=$PATH:<PostgreSQL 安装路径>/bin
依次执行下述命令,完成插件的安装。
git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& USE_PGXS=1 make \
&& USE_PGXS=1 make install \
&& cd .. \
&& rm -rf wal2json提示如执行 make 命令时报错:“fatal error: [xxx].h: No such file or directory”,可尝试安装 postgresql-server-dev 来解决。以 debian 系统为例,安装命令参考:
apt-get install -y postgresql-server-dev-<版本号>
。修改配置文件 postgresql.conf,设置在启动时加载插件。
shared_preload_libraries = 'decoderbufs,wal2json'
修改配置文件 postgresql.conf,设置 REPLICATION 属性。
# REPLICATION
wal_level = logical
max_wal_senders = 1 # 大于0即可
max_replication_slots = 1 # 大于0即可
创建用于数据同步/开发任务的账号,具体操作,见 CREATE USER 和 GRANT 语法。
为刚刚创建的数据库账号授予权限,简易示例如下,推荐基于业务需求设置更精细化的权限控制。
--初始化
GRANT SELECT ON ALL TABLES IN SCHEMA <schemaname> TO <username>;
--增量权限,无需增量则可以不授予 REPLICATION LOGIN 权限
CREATE ROLE <rolename> REPLICATION LOGIN;
CREATE USER <username> ROLE <rolename> PASSWORD '<password>';
--或者
CREATE USER <username> WITH REPLICATION LOGIN PASSWORD '<password>';修改配置文件 pg_hba.conf,添加下述内容。
local replication <youruser> trust
host replication <youruser> 0.0.0.0/32 md5
host replication <youruser> ::1/128 trust(可选)测试日志插件。
连接 postgres 数据库,切换至需要同步的数据库并创建一张测试表。
-- 假设需要同步的数据库为 postgres,模型为 public
\c postgres
create table public.test_decode
(
uid integer not null
constraint users_pk
primary key,
name varchar(50),
age integer,
score decimal
)创建 slot 连接,以 wal2json 插件为例。
select * from pg_create_logical_replication_slot('slot_test', 'wal2json')
对测试表插入一条数据,然后监听日志并查看返回结果,是否有刚才插入操作的信息。
select * from pg_logical_slot_peek_changes('slot_test', null, null)
确认无问题后,可销毁 slot 连接并删除测试表。
select * from pg_drop_replication_slot('slot_test')
drop table public.test_decode
(可选)如需使用最后更新时间戳的方式进行增量同步,您需要执行下述步骤。
在源数据库中,执行下述命令创建公共函数,需替换 schema 名称。
CREATE OR REPLACE FUNCTION <schema>.update_lastmodified_column()
RETURNS TRIGGER language plpgsql AS $$
BEGIN
NEW.last_update = now();
RETURN NEW;
END;
$$;创建字段和 trigger,每个表均需执行一次,例如表名为 mytable。
// 创建 last_update 字段
alter table <schema>.mytable add column last_udpate timestamp default now();
// 创建 trigger
create trigger trg_uptime before update on <schema>.mytable for each row execute procedure
update_lastmodified_column();
作为目标库
以管理员身份登录 PostgreSQL 数据库。
创建用于数据同步/开发任务的账号,具体操作,见 CREATE USER 和 GRANT 语法。
执行下述格式的命令,为数据库账号授予权限,简易示例如下,推荐基于业务需求设置更精细化的权限控制。
GRANT INSERT,UPDATE,DELETE,TRUNCATE
ON ALL TABLES IN SCHEMA <schemaname> TO <username>;
异常处理
当 CDC 意外中断后,可能导致 Slot 连接无法从 PostgreSQL 主节点删除,此时需手动登录主节点,删除相关 Slot 避免一直占用。
// 查看是否有 slot_name=tapdata 的信息
TABLE pg_replication_slots;
// 删除 Slot 节点
select * from pg_drop_replication_slot('tapdata');