DataX批量导入多张表的自动化实践:从JSON模板到Shell脚本

张开发
2026/4/17 4:01:42 15 分钟阅读

分享文章

DataX批量导入多张表的自动化实践:从JSON模板到Shell脚本
1. 为什么需要批量导入多张表在实际的数据迁移或ETL项目中经常会遇到需要同时处理多张表的情况。比如最近我接手的一个项目需要将客户的老系统数据迁移到新平台涉及的表多达50多张。如果按照传统方式为每张表单独编写DataX的JSON配置文件再逐个执行不仅效率低下还容易出错。想象一下每次迁移都要手动修改JSON文件中的表名、数据库连接信息等参数50张表就意味着要重复操作50次。更可怕的是如果中途某个参数需要调整比如密码变更或者增加过滤条件那就得把所有文件都修改一遍。这种重复劳动不仅浪费时间还容易因为人为疏忽导致错误。批量导入的核心思路很简单模板化自动化。通过一个JSON模板定义通用的数据结构用Shell脚本动态生成具体的配置文件并执行。这样只需要维护一个模板文件所有表的配置都能自动生成大大提高了工作效率。2. 设计JSON模板的要点2.1 基础模板结构DataX的JSON配置文件主要包含三部分job设置、reader配置和writer配置。我们的模板需要保留这些基本结构同时将可能变化的参数用占位符替代。下面是我常用的模板结构{ job: { setting: { speed: { channel: 5 } }, content: [ { reader: { name: mysqlreader, parameter: { username: {{src_username}}, password: {{src_password}}, column: [*], connection: [ { jdbcUrl: [jdbc:mysql://{{src_host}}:{{src_port}}/{{src_db}}], table: [{{src_table}}] } ], where: {{where}} } }, writer: { name: mysqlwriter, parameter: { batchSize: 2048, batchByteSize: 33554432, username: {{dest_username}}, password: {{dest_password}}, column: [*], writeMode: insert, connection: [ { jdbcUrl: jdbc:mysql://{{dest_host}}:{{dest_port}}/{{dest_db}}, table: [{{dest_table}}] } ] } } } ] } }这个模板中所有双大括号包裹的部分如{{src_username}}都是占位符后续会被Shell脚本替换为实际值。2.2 模板设计技巧参数分组将相关参数放在一起比如源数据库参数都以src_开头目标数据库参数以dest_开头这样在脚本替换时更清晰。灵活的条件过滤where参数特别有用可以实现增量同步。比如只同步某个月的数据wherecreate_time 2023-01-01。性能调优参数channel控制并发数batchSize和batchByteSize影响批量写入性能这些可以根据服务器配置调整。列映射如果需要指定列而不是用*可以在column数组中列出具体列名。3. Shell脚本编写详解3.1 基础脚本结构Shell脚本的主要任务是读取表名列表、替换模板中的占位符、生成配置文件、执行DataX任务。下面是一个完整的示例#!/bin/bash set -e # 数据库连接配置 src_host172.18.45.28 src_port53306 src_dbsource_db src_usernameuser src_passwordpassword dest_host172.18.45.28 dest_port53306 dest_dbtarget_db dest_usernameuser dest_passwordpassword # 数据过滤条件 where11 # 默认不过滤 # 表名映射数组源表名:目标表名 tables( users:users_2023 orders:orders_backup products:products ) # 遍历所有表 for table_pair in ${tables[]}; do IFS: read -ra TABLE $table_pair src_table${TABLE[0]} dest_table${TABLE[1]} # 生成配置文件 sed -e s/{{src_table}}/$src_table/g \ -e s/{{src_username}}/$src_username/g \ -e s/{{src_password}}/$src_password/g \ -e s/{{src_host}}/$src_host/g \ -e s/{{src_port}}/$src_port/g \ -e s/{{src_db}}/$src_db/g \ -e s/{{dest_table}}/$dest_table/g \ -e s/{{dest_username}}/$dest_username/g \ -e s/{{dest_password}}/$dest_password/g \ -e s/{{dest_host}}/$dest_host/g \ -e s/{{dest_port}}/$dest_port/g \ -e s/{{dest_db}}/$dest_db/g \ -e s/{{where}}/$where/g \ template.json job_${dest_table}.json # 执行DataX任务 echo 开始迁移: $src_table $dest_table python /path/to/datax/bin/datax.py job_${dest_table}.json ${dest_table}.log 21 # 检查执行状态 if [ $? -eq 0 ]; then echo 迁移成功日志见 ${dest_table}.log else echo 迁移失败请检查 ${dest_table}.log exit 1 fi done echo 所有表迁移完成3.2 脚本优化技巧错误处理set -e确保脚本在遇到错误时立即退出避免继续执行可能造成的问题。日志记录每个任务的输出都重定向到单独的日志文件方便排查问题。表名映射使用源表:目标表的格式可以灵活处理表名不同的情况。参数化所有配置都在脚本开头集中定义修改起来很方便。进度反馈每个任务开始和结束都有提示信息让执行过程更透明。4. 高级应用与优化4.1 并发执行控制默认情况下脚本是串行执行的即一个表迁移完成后才开始下一个。对于大量表迁移这会很耗时。我们可以使用后台执行实现并发# 在原执行命令后添加 符号 python /path/to/datax/bin/datax.py job_${dest_table}.json ${dest_table}.log 21 但要注意两点并发数不能太高避免压垮数据库需要修改DataX的channel参数控制单个任务的并发度4.2 增量同步实现对于定期执行的迁移任务我们通常只需要同步新增或修改的数据。可以通过以下方式实现时间戳字段在where条件中使用update_time 上次同步时间自增IDwhereid 上次最大ID版本号如果有版本号字段可以基于版本号过滤# 在脚本中添加 last_sync_time2023-01-01 00:00:00 whereupdate_time $last_sync_time4.3 表结构自动同步DataX只迁移数据不处理表结构。如果目标表不存在迁移会失败。可以在脚本中添加表结构同步逻辑# 使用mysqldump导出结构 mysqldump -h$src_host -P$src_port -u$src_username -p$src_password \ --no-data $src_db $src_table temp.sql # 修改表名后导入目标库 sed -i s/$src_table/$dest_table/g temp.sql mysql -h$dest_host -P$dest_port -u$dest_username -p$dest_password \ $dest_db temp.sql4.4 性能监控与调优对于大数据量迁移性能是关键。可以通过以下方式优化调整DataX参数channel增加并发通道数batchSize增大批量提交的记录数batchByteSize增大批量提交的数据量数据库层面迁移期间关闭索引和外键约束增大数据库连接池优化服务器配置网络层面确保源库和目标库之间的网络带宽充足如果跨机房考虑压缩传输5. 实际案例分享最近我用这套方案完成了一个电商系统的数据迁移涉及87张表总数据量约2TB。下面分享一些实战经验5.1 超大表处理对于单表超过100GB的大表直接全量迁移风险很高。我的做法是按时间范围分批迁移如按月使用where条件限制每次迁移的数据量在脚本中添加检查点机制记录已迁移的范围# 分批迁移示例 for month in {1..12}; do wherecreate_time BETWEEN 2023-$month-01 AND 2023-$month-31 # 替换where条件并执行迁移 done5.2 特殊数据类型处理遇到BLOB、TEXT等大字段时DataX默认配置可能不够用。需要在reader和writer中增加配置reader: { name: mysqlreader, parameter: { ... blobTruncate: false, truncateThreshold: 1048576 } }, writer: { name: mysqlwriter, parameter: { ... blobTruncate: false } }5.3 数据一致性验证迁移完成后我通常会运行验证脚本检查数据一致性# 检查行数是否一致 src_count$(mysql -h$src_host -u$src_username -p$src_password $src_db -N -e SELECT COUNT(*) FROM $src_table) dest_count$(mysql -h$dest_host -u$dest_username -p$dest_password $dest_db -N -e SELECT COUNT(*) FROM $dest_table) if [ $src_count -eq $dest_count ]; then echo 数据量一致: $src_count 条 else echo 数据量不一致: 源表$src_count条, 目标表$dest_count条 fi5.4 遇到的坑与解决方案特殊字符问题密码中包含特殊字符时sed替换会出错。解决方案是使用sed -e s~{{src_password}}~$src_password~g用~代替/作为分隔符。内存不足大数据量迁移时DataX可能OOM。需要调整JVM参数export JAVA_OPTS-Xms4g -Xmx8g python /path/to/datax/bin/datax.py ...网络中断长时间迁移可能遇到网络问题。可以添加重试机制max_retries3 retry_count0 until python /path/to/datax/bin/datax.py ... || [ $retry_count -eq $max_retries ]; do retry_count$((retry_count1)) sleep 60 done这套自动化方案已经在我们团队内部推广累计完成了超过200次数据迁移任务平均节省了80%以上的时间。最关键的是它消除了人为操作带来的错误风险让数据迁移变得可靠且可重复。

更多文章