tidb实时同步到mysql

news/2025/2/22 18:08:59

客户要求实时同步表的数据到mysql,但这个表在tidb
测试直接通过tidb cdc写入到mysql,有些字段是null,所以中间加了一个kafka实现

客户库中创建表
CREATE TABLE tb_1 (
    id bigint primary key,
    cid bigint,
    gid bigint,
    fee DECIMAL(10,2),
    created_at timestamp,
    type smallint,
    remark string ,
    key i_cid(cid)
);
tidbcdckafka_18">tidb配置cdc,写入到kafka

1、tidb添加cdc组件
2、配置cdc任务
cat your.toml

case-sensitive = true
enable-old-value = true
[filter]
rules = ['db.tb_1']

3、启动任务

tiup ctl:v5.3.0 cdc changefeed create --pd=http://pd-ip:2379 \
--sink-uri="kafka://kafka-ip:9092/your-topic?kafka-version=1.1.1&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json" --changefeed-id="my" --sort-engine="unified" \
--start-ts=453870757254529193 --config your.toml

protocol=canal-json 使用这个格式
–start-ts这个通过一次导出查看,cat db/metadata

dumpling -u root -p pwd -h tidb-ip -P 3306  -F 1GiB --compress gzip -t 2 -o db -B db -T db.tb --where "cid=123456"

4、更新一条数据,看看kafka是不是有了

mysql_43">创建SQL作业,从kafka消费后入mysql
CREATE TABLE tb_1 (
    id bigint,
    cid bigint,
    gid bigint,
    fee DECIMAL(10,2),
    create_time timestamp,
    type smallint,
    remark string,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  -- 定义 Kafka 参数
  'connector' = 'kafka',
  'topic' = 'your-topic',
  'scan.startup.mode' = 'latest-offset', 
  'properties.bootstrap.servers' = 'ip:9092',
  'properties.group.id' = 'your-group',
  'format' = 'canal-json', -- tidb 支持该方式
  'canal-json.ignore-parse-errors' = 'false'
);



CREATE TABLE kh_tb_1 (
    id bigint,
    cid bigint,
    gid bigint,
    fee DECIMAL(10,2),
    create_time timestamp,
    type smallint,
    remark string,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxxxxx:3306/db?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
  'table-name' = 'tb_1',
  'username' = 'user',
  'password' = 'pwd',
  'sink.buffer-flush.max-rows' = '5000',
  'sink.buffer-flush.interval' = '2s',
  'sink.max-retries' = '10'
);

insert into kh_tb_1 select * from tb_1 where cid=xxxxxxx;

最后启动作业任务即可。


http://www.niftyadmin.cn/n/5862612.html

相关文章

若依框架实现动态失效时间JWT Token的实践指南

一、功能需求背景 在前后端分离架构中,JWT(JSON Web Token)作为无状态认证方案被广泛使用。若依(RuoYi)框架的TokenService默认采用固定失效时间策略,但在实际开发中常需要根据业务场景动态调整Token有效期…

GUI编程(window系统→Linux系统)

最近有个项目需要将windows系统的程序往Linux系统上面移植,由于之前程序没有考虑过多平台兼容的问题,导致部分功能不可用以下是对近期遇到的问题的总结,以及相应的解决方案和经验分享。 1. Python 模块安装与管理 在 Linux 系统中&#xff0…

C#设计模式 学习笔记

概述 设计模式是一套被反复使用的、多数人知晓的、经过分类编目的代码设计经验的总结 本文是《C#设计模式》书籍的学习笔记 面向对象设计原则 单一职责原则 一个对象应该只包含单一的职责,并且该职责被完整地封装在一个类中 开闭原则 对扩展开放,对…

java(spring boot)实现向deepseek/GPT等模型的api发送请求/多轮对话(附源码)

我们再启动应用并获取api密钥后就可以对它发送请求了,但是官方文档对于如何进行多轮对话以及怎么自定义参数并没有说的很清楚,给的模板也没有java的,因此我们需要自己实现。 import org.json.JSONArray; import org.json.JSONObject;import j…

设计模式教程:迭代器模式(Iterator Pattern)

迭代器模式(Iterator Pattern)是设计模式中的一种行为型模式,它允许顺序访问一个集合对象中的元素,而无需暴露集合对象的内部结构。换句话说,迭代器模式提供了一个方法,能让你遍历集合中的元素,…

给老系统做个安全检查——Burp SqlMap扫描注入漏洞

背景 在AI技术突飞猛进的今天,类似Cursor之类的工具已经能写出堪比大部分程序员水平的代码了。然而,在我们的代码世界里,仍然有不少"老骥伏枥"的系统在兢兢业业地发光发热。这些祖传系统的代码可能早已过时,架构可能岌…

【精调】LLaMA-Factory 快速开始4 自定义个一个sharegpt数据集并训练

数据格式说明 LLaMA Factory:微调LLaMA3模型实现角色扮演 数据集 参考 开源模型应用落地-DeepSeek-R1-Distill-Qwen-7B-LoRA微调-LLaMA-Factory-单机单卡-V100(一) 大神给出的数据集的讲解:注册 如

计算机毕业设计Python+DeepSeek-R1高考推荐系统 高考分数线预测 大数据毕设(源码+LW文档+PPT+讲解)

温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…