> 用元数据的丝线编织数据生态——让异构数据源无缝对话,让数据消费像打开 App Store 一样简单。
┌─────────────────────────────────┐
│ Data Fabric 编织层 │
└────────────┬────────────────────┘
│
┌──────────┬──────────┬──────┴──────┬──────────┬──────────┐
▼ ▼ ▼ ▼ ▼ ▼
主动元数据 知识图谱 数据虚拟化 自动化编排 自助服务 治理嵌入
(Active (Knowledge (Data (Auto- (Self- (Governed
Metadata) Graph) Virtualization) Orchestr) Service) by Design)
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
Atlas Neo4j Trino/ Airflow/ Data RBAC+
OpenMetadata NebulaGraph Presto Dagster Portal Policy
| 维度 | 传统ETL/数据湖 | Data Fabric |
|---|---|---|
| :--- | :--- | :--- |
| 集成方式 | 手动编写 ETL 管道,逐源接入 | 元数据驱动,一次注册,全局可用 |
| 数据发现 | 翻阅文档/问开发,T+1 级 | 实时目录检索,秒级 |
| 查询方式 | 每个源写专属查询 | 虚拟化统一 SQL,跨源联邦查询 |
| 新增数据源 | 2-4 周(ETL开发+测试) | 1-2 天(注册元数据+策略) |
| 治理粒度 | 表级/库级粗粒度 | 列级/行级细粒度(主动元数据) |
| 数据消费 | 提需求 → 排期 → 取数 | 自助数据市场,即查即用 |
# 一键部署 Data Fabric 实验环境
docker-compose up -d
# 注册新数据源
python scripts/register_datasource.py --type postgres --host localhost --db sales
# 搜索数据资产
python scripts/search_assets.py --keyword "客户交易"
# 查询血缘关系
python scripts/trace_lineage.py --table dwd_sales_order
# 联邦查询演示
python scripts/federated_query.py
主动元数据不仅记录"数据在哪里",更自动感知变化并推送建议。
# scripts/datasource_registry.yaml
# 数据源注册清单 - 元数据自动采集
datasources:
- name: "交易数据库"
type: mysql
host: "10.0.1.100"
port: 3306
tags: ["P0", "金融", "OLTP"]
- name: "用户行为日志"
type: kafka
brokers: ["10.0.2.10:9092", "10.0.2.11:9092"]
topic_pattern: "user_behavior_*"
tags: ["P1", "实时", "用户"]
- name: "数据湖存储"
type: s3
endpoint: "minio:9000"
bucket: "data-lake"
format: "iceberg"
tags: ["P2", "湖仓一体"]
- name: "Elasticsearch 搜索日志"
type: elasticsearch
host: "10.0.3.20"
port: 9200
index_pattern: "logs-*"
tags: ["P3", "日志", "全文检索"]
# 血缘关系自动推导
lineage_inference:
enabled: true
sql_parser: "sqlglot" # 解析所有 SQL 自动推导血缘
automatic_discovery: true # 自动发现上下游依赖
# scripts/active_metadata.py
"""主动元数据采集引擎"""
from atlasclient.client import Atlas
import sqlglot
import json
class ActiveMetadataCollector:
"""
主动元数据采集器
功能:自动扫描数据源、解析SQL推导血缘、生成智能标签
"""
def __init__(self, atlas_url: str = "http://localhost:21000"):
self.client = Atlas(atlas_url, ("admin", "admin"))
def crawl_datasource(self, ds_config: dict) -> dict:
"""自动爬取数据源元数据"""
db_type = ds_config["type"]
if db_type == "mysql":
return self._crawl_mysql(ds_config)
elif db_type == "kafka":
return self._crawl_kafka(ds_config)
elif db_type == "s3":
return self._crawl_s3(ds_config)
elif db_type == "elasticsearch":
return self._crawl_elasticsearch(ds_config)
else:
raise ValueError(f"不支持的数据源类型: {db_type}")
def infer_lineage_from_sql(self, sql: str) -> list:
"""从SQL语句自动推导血缘关系"""
tree = sqlglot.parse_one(sql, dialect="mysql")
lineage = []
# 提取目标表
target = tree.find(sqlglot.exp.Create).this if tree else None
# 提取源表
sources = []
for tbl in tree.find_all(sqlglot.exp.Table):
if tbl.name != str(target):
sources.append({
"source": tbl.name,
"database": tbl.db if tbl.db else "default"
})
for src in sources:
lineage.append({
"source_table": src["source"],
"source_db": src["database"],
"target_table": str(target.name),
"transform_type": tree.key.lower() if hasattr(tree, 'key') else "unknown"
})
return lineage
def generate_smart_tags(self, table_meta: dict) -> list:
"""基于元数据自动生成智能标签"""
tags = []
columns = table_meta.get("columns", [])
for col in columns:
col_name = col["name"].lower()
if "id" in col_name:
tags.append("标识字段")
if any(kw in col_name for kw in ["amount", "price", "fee", "money"]):
tags.append("金额字段-PII")
if any(kw in col_name for kw in ["phone", "mobile", "tel"]):
tags.append("手机号-PII-L3")
if any(kw in col_name for kw in ["email", "mail"]):
tags.append("邮箱-PII-L3")
if any(kw in col_name for kw in ["name", "姓名"]):
tags.append("姓名-PII-L3")
return list(set(tags))
# 演示
collector = ActiveMetadataCollector()
# 模拟采集结果
ds_config = {
"type": "mysql",
"host": "10.0.1.100",
"database": "sales",
"tables": ["orders", "customers"]
}
print("=== 主动元数据采集演示 ===")
print(f" 数据源: {ds_config['type']} @ {ds_config['host']}")
print(f" 数据库: {ds_config['database']}")
# SQL血缘推导
sql = """
CREATE TABLE dwd_sales_order AS
SELECT
o.order_id,
c.customer_name,
o.order_amount,
o.created_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.status = 'paid'
"""
lineage = collector.infer_lineage_from_sql(sql)
print(f"\n 血缘推导: orders + customers → dwd_sales_order")
print(f" 源表: {', '.join(ln['source_table'] for ln in lineage)}")
将元数据构建为知识图谱,让 AI 也能理解数据之间的关系。
-- scripts/knowledge_graph.cypher
-- 用 Neo4j 构建数据资产知识图谱
// 1. 创建数据源节点
CREATE (ds1:Datasource {name: '交易库MySQL', type: 'mysql', owner: '数据平台组'})
CREATE (ds2:Datasource {name: '日志集群Kafka', type: 'kafka', owner: '基础架构组'})
CREATE (ds3:Datasource {name: '湖仓MinIO', type: 's3', owner: '数据平台组'})
// 2. 创建数据集节点
CREATE
(ds1)-[:CONTAINS]->(:Dataset {name: 'orders', domain: '交易', freshness: '实时'}),
(ds1)-[:CONTAINS]->(:Dataset {name: 'customers', domain: '客户', freshness: 'T+1'}),
(ds1)-[:CONTAINS]->(:Dataset {name: 'products', domain: '商品', freshness: 'T+1'}),
(ds2)-[:CONTAINS]->(:Dataset {name: 'user_clicks', domain: '用户行为', freshness: '实时'}),
(ds3)-[:CONTAINS]->(:Dataset {name: 'dwd_sales_order', domain: '交易', freshness: 'T+0'}),
(ds3)-[:CONTAINS]->(:Dataset {name: 'dws_user_profile', domain: '用户画像', freshness: 'T+1'})
// 3. 建立血缘关系(数据从哪里来、到哪里去)
MATCH (a:Dataset {name: 'orders'}), (b:Dataset {name: 'dwd_sales_order'})
CREATE (a)-[:FEEDS_INTO]->(b)
MATCH (a:Dataset {name: 'customers'}), (b:Dataset {name: 'dwd_sales_order'})
CREATE (a)-[:FEEDS_INTO]->(b)
MATCH (a:Dataset {name: 'user_clicks'}), (b:Dataset {name: 'dws_user_profile'})
CREATE (a)-[:FEEDS_INTO]->(b)
MATCH (a:Dataset {name: 'dwd_sales_order'}), (b:Dataset {name: 'dws_user_profile'})
CREATE (a)-[:FEEDS_INTO]->(b)
// 4. 知识图谱查询演示
// Q: dwd_sales_order 依赖哪些上游表?
// MATCH (upstream)-[:FEEDS_INTO]->(d:Dataset {name:'dwd_sales_order'})
// RETURN upstream.name, upstream.domain
用一个 SQL 查询多个异构数据源,无需搬迁数据。
# scripts/federated_query.py
"""联邦查询演示:Trino 跨源查询"""
# Trino 联邦查询示例
SQL_FEDERATED_QUERY = """
-- 同时查询 MySQL 交易表 + Kafka 日志 + Iceberg 湖表
WITH
-- 源1: MySQL 实时订单
recent_orders AS (
SELECT customer_id, order_amount, created_at
FROM mysql.sales.orders
WHERE created_at >= DATE('2026-05-01')
),
-- 源2: Iceberg 湖仓中的客户画像
customer_profiles AS (
SELECT customer_id, age_group, city, credit_level
FROM iceberg.data_lake.dws_user_profile
),
-- 源3: Kafka 实时行为
user_behavior AS (
SELECT
JSON_EXTRACT_SCALAR(event, '$.customer_id') AS customer_id,
JSON_EXTRACT_SCALAR(event, '$.action') AS action,
count(*) AS action_count
FROM kafka.user_behavior.user_clicks
GROUP BY 1, 2
)
-- 联邦 JOIN:三源合一,数据不搬迁
SELECT
c.age_group,
c.city,
c.credit_level,
SUM(o.order_amount) AS total_amount,
ub.action,
ub.action_count
FROM recent_orders o
JOIN customer_profiles c ON o.customer_id = c.customer_id
LEFT JOIN (
SELECT customer_id, action, action_count,
ROW_NUMBER() OVER(PARTITION BY customer_id ORDER BY action_count DESC) AS rn
FROM user_behavior
) ub ON o.customer_id = ub.customer_id AND ub.rn = 1
GROUP BY 1, 2, 3, 5, 6
ORDER BY total_amount DESC
LIMIT 100
"""
print("=== 联邦查询演示 ===")
print("Trino 同时查询 MySQL + Kafka + Iceberg(三个异构源)")
print("数据不搬迁,实时 JOIN,秒级返回\n")
print(SQL_FEDERATED_QUERY)
# scripts/data_portal.py
"""数据资产地图核心——让业务人员自助发现和使用数据"""
class DataPortal:
"""
数据资产地图
功能:
- 关键词搜索数据资产
- 推荐相似数据集
- 一键申请访问权限
- 查看数据质量评分
"""
def __init__(self):
self.catalog = self._load_catalog()
def _load_catalog(self):
"""模拟从 Atlas/OpenMetadata 加载目录"""
return {
"dwd_sales_order": {
"name": "交易订单明细宽表",
"domain": "交易域",
"freshness": "T+0 日",
"quality_score": 98,
"usage_count_7d": 156,
"columns": [
{"name": "order_id", "description": "订单ID", "pii": False},
{"name": "customer_name", "description": "客户姓名", "pii": True, "level": "L3"},
{"name": "order_amount", "description": "订单金额(元)", "pii": False},
],
"tags": ["P0", "交易", "宽表", "自助取数"],
"owner": "张三(数据平台组)"
},
"dws_user_profile": {
"name": "用户画像汇总表",
"domain": "用户域",
"freshness": "T+1 日",
"quality_score": 95,
"usage_count_7d": 203,
"tags": ["P0", "用户画像", "营销"],
"owner": "李四(推荐算法组)"
},
"ads_report_weekly": {
"name": "每周经营分析报表",
"domain": "报表域",
"freshness": "每周一更新",
"quality_score": 100,
"usage_count_7d": 42,
"tags": ["P1", "报表", "管理"],
"owner": "王五(BI组)"
}
}
def search(self, keyword: str) -> list:
"""关键词搜索数据资产"""
results = []
keyword_lower = keyword.lower()
for id, meta in self.catalog.items():
# 匹配表名、描述、域名、标签
text = f"{id} {meta['name']} {meta['domain']} {' '.join(meta['tags'])}"
if keyword_lower in text.lower():
results.append({"id": id, **meta})
return results
def get_popular(self, top_n: int = 5) -> list:
"""热门数据资产"""
sorted_items = sorted(
self.catalog.items(),
key=lambda x: x[1]["usage_count_7d"],
reverse=True
)
return [{"id": id, **meta} for id, meta in sorted_items[:top_n]]
# 演示
portal = DataPortal()
print("=== Data Fabric 数据资产地图 ===\n")
# 搜索
print("搜索「交易」相关数据:")
for r in portal.search("交易"):
print(f" {r['id']:30s} | {r['name']} | 质量: {r['quality_score']}% | 周用量: {r['usage_count_7d']}")
# 热门榜单
print("\n📊 本周热门数据 TOP 3:")
for i, r in enumerate(portal.get_popular(3), 1):
print(f" #{i} {r['id']:30s} | {r['name']} | 周访问 {r['usage_count_7d']}次")
| 组件 | 推荐 | 备选 | 说明 |
|---|---|---|---|
| :--- | :--- | :--- | :--- |
| 元数据管理 | Apache Atlas | OpenMetadata, DataHub | Atlas 生态最强(Hive/Kafka血缘) |
| 数据目录 | OpenMetadata | Amundsen, DataHub | UI友好,协作标注 |
| 数据虚拟化 | Trino | Denodo, Starburst | 开源+高性能联邦查询 |
| 知识图谱 | Neo4j | NebulaGraph, JanusGraph | 元数据关系建模 |
| 自动化编排 | Airflow + dbt | Dagster, Prefect | 按元数据自动生成 DAG |
| 数据质量 | Great Expectations | Soda, Deequ | 与元数据联动 |
阶段1(0-1月):元数据基础
✓ 部署 Atlas + OpenMetadata
✓ 接入 3 个核心数据源
✓ 建立数据目录基础
阶段2(1-2月):血缘与图谱
✓ 自动血缘采集上线
✓ 构建数据资产知识图谱
✓ 智能标签自动打标
阶段3(2-4月):虚拟化与自助
✓ Trino 联邦查询上线
✓ 数据资产地图门户
✓ 自助取数工作流
阶段4(4-6月):智能编织
✓ AI 推荐相关数据集
✓ 自动治理建议
✓ Fabric → Mesh 过渡评估
Data Fabric(编织) Data Mesh(网格)
───────────────────── ─────────────────
中心化元数据驱动 去中心化领域自治
统一虚拟化层 各域独立数据产品
全局治理 联邦治理
↓
过渡策略:
1. Fabric 的元数据层 → Mesh 的联邦目录
2. Fabric 的虚拟化层 → Mesh 各域的 Data API
3. Fabric 的治理 → Mesh 的联邦治理委员会
| 级别 | 特征 | 评估 |
|---|---|---|
| :--- | :--- | :---: |
| L1 初始 | 手动 ETL,无元数据管理 | 原始 |
| L2 可管理 | 有数据目录,但被动元数据 | 基础 |
| L3 主动 | 主动元数据,自动血缘 | ✅ 当前技能 |
| L4 智能 | AI 推荐,主动治理建议 | 目标 |
| L5 自适应 | 自动编排,自优化 | 愿景 |
共 1 个版本