Log Record Counts in PySpark (With a Timer)

When I’m working with large datasets in PySpark, I often need to know how many records are flowing through my transformations. It’s a simple thing, but being able to log that information at the right time can help me catch issues early—like unexpected filters wiping out rows or joins ballooning in size. What makes it even more useful is timing the count operation, since .count() is an action that can be expensive on large datasets.

Thank me by sharing on Twitter 🙏

Here’s how I handle logging record counts in PySpark, with a timer included, using plain Python and Spark’s DataFrame API.

1. Setting up Logging

I like to keep things simple, so I use Python’s built-in logging module. This lets me control log levels and output formatting without needing anything extra. I start by configuring logging at the beginning of my script or notebook.

Python
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

With this in place, I can use logger.info() to write messages, including how many records a DataFrame contains and how long it took to compute.

2. Creating or Receiving a DataFrame

Sometimes I load a DataFrame from a file, sometimes it comes from another transformation. Here, I’m just creating one for demonstration:

Python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RecordCountLoggerWithTimer").getOrCreate()

df = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
], ["id", "name"])

3. Counting and Timing

The key part is using time.time() to track the duration of the .count() call. Since this operation triggers a full pass over the data, I want to know how long it takes. This helps me detect performance issues early on.

Python
import time

start_time = time.time()
record_count = df.count()
elapsed_time = time.time() - start_time

logger.info(f"Number of records: {record_count}")
logger.info(f"Time taken to count records: {elapsed_time:.2f} seconds")

The logs now tell me both what’s in the DataFrame and how long it took Spark to evaluate it. This is especially useful when working with large pipelines or debugging joins, filters, and aggregations.

4. Avoid Redundant Actions

One thing I always keep in mind: .count() triggers a job. If I’ve already triggered an action elsewhere (like .write() or .collect()), I try to piggyback off that result rather than call .count() again. For example, if I’m writing out a file, I might wrap that write in a timed block instead and log how many records I wrote, based on what I already know.

Wrapping Up

This simple approach—adding logging and a timer around .count()—has helped me stay in control of my PySpark jobs. I don’t overuse it, but when something feels off in the data, logging record counts is one of my go-to sanity checks. It gives me confidence that each step in the pipeline is doing what I expect, and it doesn’t require any extra tooling.

Share this:

Leave a Reply