Everything You Ever Wanted to Know about Pandas / PyArrow UDFs in Apache Spark
Vectorized UDFs, Zero-Copy Arrows & 100× Speed-Ups
TLDR
Vectorized (Pandas) UDFs marry Spark’s scale with Pandas & NumPy’s speed by streaming Arrow column batches across the JVM ↔ Python boundary. They are 10-100× faster than classic Python UDFs, but they still have sharp edges—batch sizing, unsupported types, 2 GB limits, executor memory, SafeSpark jail-sandboxes, etc.
In the world of big data processing, Apache Spark stands as the preeminent framework for distributed computation. One of its most powerful features for Python users is the ability to create User-Defined Functions (UDFs). However, traditional Python UDFs often face significant performance limitations. Enter Pandas PyArrow UDFs: a revolutionary approach that combines the analytical capabilities of pandas with the efficiency of Apache Arrow to deliver exceptional performance in distributed environments.
The Evolution of Python UDFs in Spark
Traditional Python UDFs in Spark suffer from three fundamental limitations that impact performance:
Serialization overhead: Data must be serialized between JVM and Python processes using pickle, which is computationally expensive2.
Row-by-row processing: Functions operate on individual rows rather than batches, resulting in millions of function calls for large datasets4.
Lack of vectorization: Operations can't leverage the optimized C/Cython implementations in pandas and NumPy libraries4.
Pandas UDFs were introduced in Spark 2.3 to address these limitations, with significant improvements in Spark 3.0 and beyond. These vectorized UDFs use Apache Arrow to efficiently transfer data and pandas to process it in a vectorized manner, delivering performance increases of up to 100x compared to traditional UDFs3.
Apache Arrow: The Backbone of High-Performance Data Exchange
Apache Arrow is the critical technology that enables the exceptional performance of Pandas UDFs in Spark. As an open-source columnar in-memory data format, Arrow was specifically designed to facilitate efficient data exchange between different programming environments2. For Pandas UDFs, Arrow eliminates the costly serialization/deserialization overhead that plagues traditional Python UDFs when transferring data between JVM and Python processes5.
Arrow achieves this efficiency through its columnar memory layout, which stores data contiguously by column rather than by row. This approach provides numerous benefits for analytical workloads: better memory compression, improved CPU cache utilization, and support for SIMD (Single Instruction, Multiple Data) vector operations4. Most importantly, Arrow enables a "zero-copy" shared memory model where both JVM and Python processes can access the same data without duplicating it, dramatically reducing the cost of data transfer6.
When a Pandas UDF executes, Spark converts data to Arrow format, splits it into batches, transfers these batches to Python workers as Arrow structures, processes them using pandas, and then returns the results via the same efficient Arrow pathway5. This entire pipeline is optimized for high-throughput, parallel processing across a distributed cluster. The result is performance gains that can transform previously impractical Python processing into viable production workflows3.
The Data Flow Process
The process of executing a Pandas UDF involves several steps that highlight how data flows through the Spark execution environment:
Spark converts the data into Arrow format
The data is split into batches (configured by
spark.sql.execution.arrow.maxRecordsPerBatch
)Arrow batches are transferred to Python workers
Python workers convert Arrow batches to pandas Series or DataFrames
The UDF function processes these pandas objects
Results are converted back to Arrow format
Arrow data is transferred back to Spark
Spark converts Arrow data back to its internal format
This entire process happens in parallel across the Spark cluster, leveraging the distributed nature of Spark while maintaining the efficiency of vectorized operations.
Pandas UDFs Defined
You define a pandas UDF by decorating a Python function with @pandas_udf
and adding type hints for the input and output:
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
return map(lambda s: s + 1, iterator)
display(spark.range(10).select(pandas_plus_one("id")))
The signature (
pd.Series → pd.Series
) tells Spark which UDF flavor to pick.Under the hood, Spark uses Apache Arrow for zero-copy (de)serialization.
Vectorized: your code gets whole batches as
pd.Series
/pd.DataFrame
, not single cells.
Different flavours of Pandas UDF
Series to Series (
pandas.Series -> pandas.Series
): This pattern exists to provide a clear, Pythonic, and type-hinted way to define vectorized UDFs that transform one Spark column into another, operating row-by-row (conceptually, applied batch-wise). It directly replaces the need to explicitly specify the olderSCALAR
Pandas UDF type, making the function's intent (operating on a Series and returning a Series of the same size) self-evident from the type hints.Iterator of Series to Iterator of Series (
Iterator[pandas.Series] -> Iterator[pandas.Series]
): This pattern was introduced to offer more flexibility and optimization for Series-to-Series transformations. It allows processing data in batches (iterators) rather than loading the entire column partition at once. The why is twofold: 1) It helps manage memory usage for very large data partitions, and 2) It enables expensive state initialization (e.g., loading a model) to be done once per batch iterator, improving performance.Iterator of Multiple Series to Iterator of Series (
Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]
): This extends the previous pattern because many operations require logic based on multiple input columns simultaneously. This type hint signature allows users to define UDFs that take batches of multiple input Series, perform calculations using them together, and return a single output Series batch, offering the same memory and initialization benefits for multi-column logic.Series to Scalar (
pandas.Series -> Any
): This pattern, often used withgroupBy().agg()
or window functions, provides a type-hinted way to define aggregations. It takes a Pandas Series representing a group or partition and returns a single scalar value. The why is to replace the olderGROUPED_AGG
Pandas UDF type with a more standard Python type hint signature, making the aggregation intent clear.applyInPandas
(on GroupedData): This function exists specifically to implement the "split-apply-combine" pattern on grouped data6. Why? It allows applying a custom Python function, operating on a full Pandas DataFrame for each group, to perform complex, group-specific transformations or aggregations that are difficult or inefficient with standard Spark functions26. It expects one Pandas DataFrame (representing a group) as input and requires a Pandas DataFrame as output, effectively transforming each group256. Note: It loads the entire group into memory, which can be demanding for large groups6.mapInPandas
(on DataFrame): This function exists to apply a Python function to an iterator of Pandas DataFrames, where each DataFrame represents a batch of data from a DataFrame partition13. Why? It's designed for transformations on entire partitions or batches where the logic is complex, best expressed in Python/Pandas, and crucially, where the number of output rows per input batch might differ from the input batch size (e.g., filtering rows, or mapping one input row to multiple output rows like unpacking files)1456. Its iterator-based approach aids memory efficiency for large datasets16.
Arrow Batch Size Considerations
The batch size for Arrow transfers is controlled by spark.sql.execution.arrow.maxRecordsPerBatch
(default is typically 10,000 records). This setting can be tuned for performance:
Smaller batches: Reduce memory pressure but increase overhead
Larger batches: Better performance but higher memory requirements
For scalar operations, one partition consists of multiple Arrow batches. For grouping operations, one partition (group) is one Arrow batch, which can lead to out-of-memory issues if a group is too large6.
⚠️ Gotchas / Limitations
Memory pressure: Each Arrow batch (or entire group in grouped UDFs) is copied into the Python worker’s RAM; large groups or oversized batches can OOM an executor. For grouped operations, all data for a group must fit in memory6
Arrow caps:
Max rows per record batch = 2,147,483,647.
Some complex / deeply nested types (
ArrayType(TimestampType)
, very nestedMap
/Struct
) need recent PyArrow versions—or are unsupported.Memory constraints: Setup overhead on tiny data: Python-worker spin-up and Arrow serialisation can outweigh benefits for small inputs; native Spark or even a pickled UDF may be faster. If you small data, honestly you should not worry just use Arrow.
🚀 Best Practices & Performance Tips
Prefer native Spark first → Pandas UDF second → pickled Python UDF last.
Vectorise: Use pandas/NumPy column ops, not Python loops.
Tune batch size:
Start with default 10 k rows.
Reduce to ease memory or raise to improve throughput—set via
spark.sql.execution.arrow.maxRecordsPerBatch
.
Provision RAM: Remember processing is in Python space; size executors for worst-case batch or group.
Resource cleanup: In iterator UDFs, wrap model loads / file handles in
try … finally
.Type hints everywhere: Clearer code, earlier failures, faster Arrow conversion.
Timestamps: Keep data in UTC; rely on pandas time-series APIs for conversions.
Our newsletter is 100% free and always will be, but without your claps, comments, or shares, search engines may bury this post forever. A quick clap not only tells us this content resonates but also makes sure you (and everyone else) can find it again when it matters most.
Performance Hierarchy:
Native Spark Functions: Generally the fastest as they operate entirely within the optimized Spark engine46.
Pandas UDFs (Vectorized): Offer significant speedups over traditional Python UDFs by using Arrow and vectorized processing. Can be 10-100x faster1346.
Arrow-Optimized Python UDFs: Faster than pickled UDFs due to efficient Arrow transport, but still process row-by-row logically1456.
Traditional (Pickled) Python UDFs: Slowest due to serialization overhead and row-by-row execution1456.
If your batch job processes fewer than 1 billion rows, stressing over whether to use a Pandas UDF or a traditional UDF is honestly overkill. Too many people debate UDF types without timing anything. Benchmark it properly: first, run your pipeline without the UDF to get a baseline. Then add the UDF and compare. Don’t speculate—measure.
Pro tip: use this command to time your pipeline
df.write.format("noop").mode("overwrite").save()
Conclusion: The Future of Python Data Processing in Spark & Arrow
Pandas PyArrow UDFs represent a breakthrough in solving the long-standing performance challenges of Python processing in Apache Spark. By bridging the gap between Spark's distributed computing capabilities and the rich ecosystem of Python data science tools, these UDFs enable data scientists and engineers to write high-performance Python code that can scale to massive datasets.
With the continued evolution of Arrow-optimized UDFs in Spark 3.5 and beyond, the performance advantages will only increase. The integration between pandas and Spark continues to deepen, making it easier for Python users to leverage their existing skills in a distributed environment while maintaining high performance.
As the ecosystem evolves, we can expect further innovations in this space, potentially eliminating the remaining barriers between Python's ease of use and Spark's distributed computing power, ultimately making complex data processing both simple and efficient.
TL;DR – Stop pickling rows. Start vectorizing columns. And may your Arrow batches be ever‑contiguous.