Doris + Iceberg 实现冷热数据智能分层:架构优化与落地实践

张开发
2026/5/29 3:50:46 15 分钟阅读
Doris + Iceberg 实现冷热数据智能分层:架构优化与落地实践
1. 冷热数据分层大数据时代的存储智慧第一次接触冷热数据分层这个概念时我正被一个报表系统折磨得焦头烂额。当时我们的用户行为数据全部存储在同一个Hadoop集群里每天新增上亿条记录查询最近一天的数据需要3秒而查询一个月前的数据竟然要等2分钟更糟的是存储成本每个月都在飙升。直到尝试了DorisIceberg的冷热分层方案问题才迎刃而解。所谓冷热数据分层其实就是根据数据的访问频率和时效性将它们存放在不同性能和成本的存储介质上。就像我们整理衣柜常穿的衣服挂在最方便拿取的位置过季的衣物收纳在顶层或箱子里多年不穿的可能就直接捐赠或丢弃了。这种分类管理方式在大数据领域同样适用热数据最近1小时到1天内的数据特点是查询频率高、响应要求快。比如电商的实时交易数据、服务器的监控指标等。温数据1天到30天内的数据查询频率中等。比如周报、月报需要分析的运营数据。冷数据30天以上的历史数据很少被查询但需要长期保存。比如合规要求的交易记录、用户行为日志等。不分层的代价是巨大的。我曾经见过一个把所有数据都放在Doris里的案例存储成本是分层方案的5倍查询性能却只有1/3。更可怕的是当需要扩容时不得不为那些几乎从不查询的历史数据购买昂贵的SSD存储这就像为了存放旧衣服而不断购买新衣柜一样荒谬。2. Doris与Iceberg天生一对的黄金组合在众多技术选型中为什么特别推荐Doris和Iceberg的组合这要从五年前我参与的一个数据中台项目说起。当时我们尝试过HBaseParquet、ClickHouseHDFS等多种方案最终发现Doris和Iceberg在互补性上表现最为出色。Apache Doris就像数据世界的短跑运动员毫秒级的查询响应速度轻松支撑上千QPS支持高并发点查询特别适合dashboard类应用内置的物化视图和预聚合功能能把复杂查询优化到极致而Apache Iceberg则是马拉松选手完美的ACID支持再也不怕并发写入导致的数据不一致分区演进特性允许随时调整分区策略这对长期存储特别重要与对象存储如S3/MinIO深度适配存储成本可以降到极低最妙的是它们的配合方式。在我们的电商项目中热数据当天订单存在Doris确保客服能实时查询订单状态隔夜数据自动迁移到Iceberg支持市场部门做用户行为分析三个月前的数据则进一步压缩后存入S3仅用于审计核查。这种架构让我们的存储成本降低了70%查询性能反而提升了3倍。3. 架构设计从理论到落地的关键细节很多团队在实施冷热分层时容易犯一个错误只关注存储而忽略查询体验。我曾经重构过一个系统虽然实现了冷热分离但用户需要手动切换数据源查询体验极其糟糕。后来我们采用了统一查询层设计彻底解决了这个问题。完整的架构应该包含这些核心组件实时数据流 → Kafka → Flink实时处理 → Doris热存储 ↓ [冷热数据分流逻辑] ↓ Spark批处理 → Iceberg冷存储 ↘ ↓ ↓ BI工具实时查询 离线分析报表关键在于冷热分流策略的设计。我们的最佳实践是在Flink作业中设置时间判断规则0-24小时数据写入Doris每天凌晨启动Spark作业将Doris中超过24小时的数据迁移到Iceberg使用Presto创建跨源视图对上层应用完全透明一个常见的坑是时区问题。我们曾经因为服务器时区和业务时区不一致导致数据提前或延迟迁移。解决方案是在Flink作业中明确指定时区-- Flink SQL中的时区设置示例 SET table.local-time-zone Asia/Shanghai;4. 数据迁移实战避坑指南第一次做Doris到Iceberg的数据迁移时我踩过一个大坑直接全量导出导入结果把Doris集群搞挂了。后来摸索出一套平滑迁移的方案这里分享关键步骤。步骤一Doris表设计要预留迁移字段CREATE TABLE user_events ( event_time DATETIME COMMENT 必须包含时间字段, user_id BIGINT, event_type STRING ) PARTITION BY RANGE(event_time)( PARTITION p202301 VALUES LESS THAN (2023-02-01), PARTITION p202302 VALUES LESS THAN (2023-03-01) ) DISTRIBUTED BY HASH(user_id) BUCKETS 16;步骤二Iceberg表结构保持兼容CREATE TABLE iceberg_db.user_events ( event_time TIMESTAMP, user_id BIGINT, event_type STRING ) PARTITIONED BY (months(event_time)) LOCATION s3://bucket/cold_data/user_events;步骤三使用Spark做增量迁移# PySpark迁移脚本示例 from pyspark.sql import SparkSession spark SparkSession.builder \ .config(spark.sql.catalog.iceberg, org.apache.iceberg.spark.SparkCatalog) \ .config(spark.sql.catalog.iceberg.warehouse, s3://bucket/warehouse) \ .getOrCreate() # 读取Doris数据 doris_df spark.read.format(doris) \ .option(doris.table.identifier, db.user_events) \ .option(doris.fenodes, doris-fe:8030) \ .option(doris.request.query.timeout.ms, 60000) \ .load() # 写入Iceberg doris_df.filter(event_time date_sub(current_date(), 1)) \ .writeTo(iceberg.user_events) \ .append()迁移过程中要特别注意控制批次大小建议每次迁移不超过1亿条避开业务高峰期通常选择凌晨执行迁移后立即执行Iceberg的元数据刷新5. 查询优化让冷数据也能飞起来冷数据存储在Iceberg后最怕变成数据坟墓——存进去就再也查不动了。我们通过三个技巧解决了这个问题。技巧一分区剪枝优化Iceberg表一定要设计合理的时间分区比如-- 按月分区比按天更适合冷数据 PARTITIONED BY (months(event_time))查询时确保带上分区条件-- 好的查询能利用分区剪枝 SELECT * FROM iceberg_db.sales WHERE event_month 2023-01 AND amount 1000; -- 坏的查询全表扫描 SELECT * FROM iceberg_db.sales WHERE amount 1000;技巧二文件合并策略定期执行小文件合并# 使用Spark合并小文件 spark.sql(CALL iceberg.system.rewrite_data_files(db.sales))技巧三缓存加速在Presto中配置Iceberg缓存# presto的config.properties cache.enabledtrue cache.base-directory/opt/presto/cache cache.max-size100GB在我们的日志分析系统中通过这些优化对半年冷数据的查询速度从原来的45秒提升到了3秒以内。6. 成本控制每一分钱都要花在刀刃上数据存储有个残酷的现实存储成本不是线性增长而是指数级上升。我们通过DorisIceberg的分层架构实现了存储成本从每月50万降到15万的关键突破。成本对比表存储方案存储成本查询性能扩展性全量Doris100%100%差全量HDFS30%30%好DorisIceberg35%90%优秀具体实施方法热数据压缩Doris启用ZSTD压缩ALTER TABLE hot_data SET (storage_format v2, enable_zstd_compress true);冷数据转存对象存储Iceberg直接使用S3# iceberg的s3配置 warehouses3://my-bucket/iceberg io-implorg.apache.iceberg.aws.s3.S3FileIO生命周期自动化用Flink实现自动降冷// Flink数据分流逻辑示例 DataStreamEvent events ...; events.process(new ProcessFunctionEvent() { public void processElement(Event event, Context ctx, CollectorEvent out) { if (event.isHot()) { // 根据时间判断冷热 out.collectToHot(event); } else { out.collectToCold(event); } } });7. 运维监控防患于未然再好的架构也需要配套的监控体系。我们曾经因为Iceberg小文件过多导致查询超时后来建立了一套完整的监控指标。关键监控项Doris集群查询延迟、内存使用、副本健康度Iceberg文件数量、元数据版本数、存储量增长迁移任务成功率、耗时、数据一致性推荐使用PrometheusGrafana监控示例配置# Doris的Prometheus配置 - job_name: doris metrics_path: /metrics static_configs: - targets: [doris-fe:8030, doris-be1:8040] # Iceberg的监控指标 iceberg_file_count{tablesales} 1423 iceberg_table_size{tablesales} 45GB对于数据迁移任务一定要做校验。我们的检查脚本类似def verify_data(doris_conn, iceberg_df): # 对比行数 doris_count doris_conn.execute(SELECT COUNT(*) FROM sales WHERE dt 2023-01-01) iceberg_count iceberg_df.filter(dt 2023-01-01).count() assert doris_count iceberg_count # 抽样校验 sample_ids doris_conn.execute(SELECT id FROM sales TABLESAMPLE(100 ROWS)) for id in sample_ids: doris_row doris_conn.execute(fSELECT * FROM sales WHERE id {id}) iceberg_row iceberg_df.filter(fid {id}).collect() assert doris_row iceberg_row这套架构已经在多个金融、电商和物联网项目中得到验证。最近一个客户案例中系统每天处理20TB新增数据热数据查询P99控制在200ms内冷数据查询95%在1秒内完成而存储成本只有传统方案的1/3。

更多文章