正在加载,请稍候…

Apache Spark 数据工程:DataFrame、优化与生产最佳实践

掌握 Apache Spark 数据工程:DataFrame API、广播变量、分区策略、写入优化以及 PB 级管道的性能调优。

Apache Spark 数据工程:DataFrame、优化与生产最佳实践

Apache Spark 数据工程:DataFrame、优化与生产最佳实践

Apache Spark 已成为大规模数据处理的事实标准。无论您是在构建 ETL 管道、运行分析工作负载还是训练机器学习模型,掌握 Spark 内部机制和优化技术是将高性能管道与代价高昂的故障区分开的关键。本指南涵盖 DataFrame 优化、广播变量、分区策略以及生产规模的写入调优。

Spark 架构回顾

在深入优化之前,理解 Spark 执行模型至关重要。

Driver 和 Executors:Driver 进程运行您的应用程序代码,创建 SparkContext 并协调 executors。Executors 是工作节点上运行任务和缓存数据的 JVM 进程。

DAG 和 Stages:Spark 构建转换的 Directed Acyclic Graph(DAG)。宽转换(shuffle 操作,如 groupByjoinrepartition)会创建阶段边界。最小化 shuffle 是性能提升的最大杠杆。

惰性求值:转换是惰性的——它们构建逻辑计划。只有动作(countshowwrite)才会触发执行。这使得 Catalyst 优化器能够重新排序、剪枝和融合操作。

Apache Spark 数据工程:DataFrame、优化与生产最佳实践 插图

DataFrame API 基础

优先使用 DataFrames 和 Datasets 而非 RDDs。它们受益于 Catalyst 优化和 Tungsten 内存管理。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

spark = SparkSession.builder \
    .appName("DataEngineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", LongType(), True),
    StructField("event_type", StringType(), True),
    StructField("occurred_at", TimestampType(), True),
    StructField("properties", StringType(), True),
])

df = spark.read \
    .schema(schema) \
    .parquet("s3://data-lake/events/year=2026/month=05/")

始终显式定义 schema——schema 推断会读取整个数据集,使 I/O 成本翻倍。

Catalyst 优化器和谓词下推

Catalyst 优化器自动应用逻辑和物理计划优化,但您可以引导它。

谓词下推:尽早过滤以最小化扫描的数据量。

# 好:在 join 之前过滤
events_filtered = df.filter(
    (F.col("event_type") == "purchase") &
    (F.col("occurred_at") >= "2026-01-01")
)

users = spark.read.parquet("s3://data-lake/users/")
result = events_filtered.join(users, "user_id", "left")

列剪枝:在昂贵操作之前只选择需要的列。

events_slim = df.select("event_id", "user_id", "event_type", "occurred_at")

使用 df.explain(True) 检查物理计划并验证下推是否已应用。

广播变量和广播连接

当将大表与小查找表(默认小于 10 MB,可配置)连接时,Spark 可以将小表广播到所有 executors,从而消除 shuffle。

# 设置广播阈值
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024))  # 50 MB

# 显式广播提示
from pyspark.sql.functions import broadcast

country_codes = spark.read.parquet("s3://data-lake/dims/country_codes/")
result = large_events.join(broadcast(country_codes), "country_code", "left")

对于非 DataFrame 用例,广播变量在所有 executors 上缓存只读数据:

currency_map = {"USD": 1.0, "EUR": 1.08, "GBP": 1.26}
bc_currency = spark.sparkContext.broadcast(currency_map)

def convert_to_usd(amount, currency):
    rate = bc_currency.value.get(currency, 1.0)
    return float(amount) * rate

Apache Spark 数据工程:DataFrame、优化与生产最佳实践 插图

分区策略

分区是影响最大的性能杠杆。糟糕的分区会导致数据倾斜或过多开销。

# 检查当前分区数
print(df.rdd.getNumPartitions())

# repartition:完全 shuffle,用于增加分区数或平衡倾斜数据
df_balanced = df.repartition(200, "user_id")

# coalesce:无 shuffle,仅减少分区数
df_small = df_balanced.coalesce(50)

批处理工作负载的目标是每个分区 128–256 MB。

倾斜键加盐:当少数键占主导时,对键加盐以将负载分散到多个分区。

from pyspark.sql.functions import explode, array

# 为大表添加随机盐值 0-9
df_salted = df.withColumn("salt", (F.rand() * 10).cast("int")) \
              .withColumn("user_id_salted",
                          F.concat(F.col("user_id").cast("string"),
                                   F.lit("_"),
                                   F.col("salt").cast("string")))

# 用相同的盐值范围展开查找表
salts = array([F.lit(i) for i in range(10)])
lookup_exploded = lookup_df \
    .withColumn("salt", explode(salts)) \
    .withColumn("user_id_salted",
                F.concat(F.col("user_id").cast("string"),
                         F.lit("_"),
                         F.col("salt").cast("string")))

result = df_salted.join(lookup_exploded, "user_id_salted").drop("salt", "user_id_salted")

自适应查询执行(AQE)

Spark 3.0+ 引入了 AQE,它在运行时使用实际分区统计信息重新优化计划。

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

AQE 自动合并 shuffle 后的小分区,在运行时大小允许时将 sort-merge join 转换为 broadcast join,并处理倾斜的 join 分区。

缓存与持久化

缓存在同一作业中多次重用的中间 DataFrames。

from pyspark import StorageLevel

expensive_df.persist(StorageLevel.MEMORY_AND_DISK_SER)
expensive_df.count()  # 立即物化

# ... 多次使用 expensive_df ...

expensive_df.unpersist()  # 完成后释放

存储级别指南:

  • MEMORY_ONLY:最快;OOM 时重新计算(无磁盘溢出)
  • MEMORY_AND_DISK:对于大数据集更安全
  • DISK_ONLY:当内存稀缺且重新计算代价高昂时使用

Apache Spark 数据工程:DataFrame、优化与生产最佳实践 插图

窗口函数用于分析

from pyspark.sql.window import Window

w = Window.partitionBy("user_id").orderBy("occurred_at")

df_analytics = df \
    .withColumn("row_num", F.row_number().over(w)) \
    .withColumn("prev_event", F.lag("event_type", 1).over(w)) \
    .withColumn("running_total",
                F.sum("revenue").over(
                    w.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# 基于纪元秒的 7 天滚动平均
rolling_w = Window.partitionBy("user_id") \
    .orderBy(F.col("occurred_at").cast("long")) \
    .rangeBetween(-7 * 86400, 0)

df_rolling = df.withColumn("rolling_7d_avg", F.avg("revenue").over(rolling_w))

写入优化

分区写入——启用下游读取的分区剪枝:

df.write \
    .mode("overwrite") \
    .partitionBy("year", "month", "day") \
    .parquet("s3://data-lake/output/events/")

控制输出文件大小——避免小文件问题:

# 写入前重新分区
df.repartition(100).write.mode("overwrite").parquet("s3://data-lake/output/")

# 或限制每个文件的记录数
df.write.option("maxRecordsPerFile", 1_000_000).parquet("s3://data-lake/output/")

动态分区覆盖——仅更新受影响的分区:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

df.write.mode("overwrite").partitionBy("date").parquet("s3://data-lake/output/events/")

监控与调试

需要调查的关键 Spark UI 指标:

  • GC 时间 > 任务时间的 10% → 内存压力,调整 executor 内存或减小数据集大小
  • Shuffle 溢出到磁盘 → executor 内存不足以容纳 shuffle 缓冲区
  • 任务持续时间差异 > 3 倍 → 数据倾斜,应用加盐或 AQE 倾斜连接

使用累加器进行自定义错误跟踪:

error_count = spark.sparkContext.accumulator(0)

def safe_parse(row):
    try:
        return parse(row)
    except Exception:
        error_count.add(1)
        return None

cleaned = df.rdd.map(safe_parse).filter(lambda x: x is not None).toDF(schema)
cleaned.count()  # 触发执行
print(f"Parse errors: {error_count.value}")

生产检查清单

在部署 Spark 作业之前:

  1. 显式定义 schema(非推断)
  2. 在 join 之前应用过滤
  3. 广播小维度表;join 顺序考虑大小
  4. 分区数适当(每个分区目标 128–256 MB)
  5. 倾斜键加盐或启用 AQE 倾斜连接
  6. 重用的 DataFrames 缓存并在使用后取消持久化
  7. 输出分区且文件大小受控
  8. 为部分更新启用动态分区覆盖

结论

掌握 Apache Spark 数据工程需要理解执行模型、使用带有显式 schema 的 DataFrames、应用广播连接进行维度查找、设计分区策略以避免倾斜、利用 AQE 进行运行时优化、策略性缓存以及控制文件大小的写入。结合 Spark UI 监控和结构化错误处理,这些实践将脆弱的脚本转变为任何规模下可靠、经济高效的数据管道。