When I was working on debugging a PyFlink job running inside a Docker container, I encountered a significant challenge: accessing and monitoring logs directly from my host machine. Traditional logging methods proved cumbersome, making it difficult to trace issues effectively. To overcome this hurdle, I decided to implement a custom Python logger that sends log messages to a REST endpoint. This approach allowed me to centralize and view logs seamlessly on my host computer, greatly simplifying the debugging process.
Thank me by sharing on Twitter 🙏
In this post, I’ll share how I set up a Python server using FastAPI to receive logs and how I configured an asynchronous REST handler in Python to send logs from my Dockerized PyFlink job. By following these steps, you’ll be able to implement a similar logging system in your projects, enhancing your ability to monitor and debug applications running in containerized environments.
Setting Up the Python Logging Server with FastAPI
The first step was to create a server capable of receiving log messages sent from my Python application. I chose FastAPI for its simplicity, speed, and robust support for building RESTful APIs.
Installing Dependencies
Before writing any code, I ensured that all necessary dependencies were installed. Using a virtual environment helped keep project dependencies isolated and manageable.
Excel Tips & Tricks QuickStudy Laminated Reference Guide (QuickStudy Computer)
$5.53 (as of April 1, 2025 14:16 GMT +00:00 - More infoProduct prices and availability are accurate as of the date/time indicated and are subject to change. Any price and availability information displayed on [relevant Amazon Site(s), as applicable] at the time of purchase will apply to the purchase of this product.)The Scaling Era: An Oral History of AI, 2019–2025
$9.99 (as of April 1, 2025 14:16 GMT +00:00 - More infoProduct prices and availability are accurate as of the date/time indicated and are subject to change. Any price and availability information displayed on [relevant Amazon Site(s), as applicable] at the time of purchase will apply to the purchase of this product.)Nexus: A Brief History of Information Networks from the Stone Age to AI
$21.66 (as of April 1, 2025 14:16 GMT +00:00 - More infoProduct prices and availability are accurate as of the date/time indicated and are subject to change. Any price and availability information displayed on [relevant Amazon Site(s), as applicable] at the time of purchase will apply to the purchase of this product.)python -m venv log_server_env
source log_server_env/bin/activate # On Windows: log_server_env\Scripts\activate
pip install fastapi uvicorn
Creating the FastAPI Server
With the dependencies in place, I created a Python script named log_server.py
to set up the FastAPI server. This server includes two endpoints: one for receiving logs (POST /logs
) and another for retrieving all stored logs (GET /logs
).
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List
from datetime import datetime
app = FastAPI()
# Configure CORS if necessary
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust this in production for security
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define the LogEntry model
class LogEntry(BaseModel):
level: str
message: str
logger: str
timestamp: str
serverTimestamp: str = None # Will be set server-side
# In-memory list to store logs
logs: List[LogEntry] = []
@app.post("/logs", response_model=dict)
async def receive_log(log_entry: LogEntry):
# Validate required fields
if not log_entry.level or not log_entry.message:
raise HTTPException(status_code=400, detail="Invalid log entry")
# Add server-side timestamp
log_entry.serverTimestamp = datetime.utcnow().isoformat() + "Z"
# Append to the in-memory list
logs.append(log_entry)
# Optionally, print to console
print(
f"Received log: [{log_entry.level}] {log_entry.message} at {log_entry.timestamp}"
)
return {"status": "Log received"}
@app.get("/logs", response_model=List[LogEntry])
async def get_logs():
return logs
Running the Server
To start the FastAPI server, I used Uvicorn, an ASGI server for Python. Running the server with the --reload
flag allows for automatic reloads upon code changes, which is particularly useful during development.
uvicorn log_server:app --host 0.0.0.0 --port 8000 --reload
Upon running, the server listens on http://localhost:8000
. The /logs
endpoint is now ready to receive log entries from my Python application.
Implementing the AsyncRESTHandler
With the server in place, the next step was to create a custom logging handler in Python that sends log messages asynchronously to the FastAPI server. Using an asynchronous handler ensures that logging operations do not block the main application, which is crucial for maintaining performance, especially when dealing with high-frequency logs.
Installing Additional Dependencies
For asynchronous HTTP requests, I opted for the httpx
library due to its compatibility with asynchronous workflows.
pip install httpx
Creating the AsyncRESTHandler
Here’s the implementation of the AsyncRESTHandler
:
import logging
import json
import asyncio
import httpx
class AsyncRESTHandler(logging.Handler):
def __init__(self, endpoint, level=logging.NOTSET):
super().__init__(level)
self.endpoint = endpoint
self.client = httpx.AsyncClient()
async def emit_async(self, record):
try:
if self.formatter:
log_entry = self.format(record)
timestamp = self.formatter.formatTime(record, self.formatter.datefmt)
else:
log_entry = record.getMessage()
timestamp = self.formatTime(record, self.default_datefmt)
payload = {
'level': record.levelname,
'message': log_entry,
'logger': record.name,
'timestamp': timestamp
}
response = await self.client.post(self.endpoint, json=payload)
response.raise_for_status()
except Exception as e:
print(f"Failed to send log to {self.endpoint}: {e}")
def emit(self, record):
asyncio.create_task(self.emit_async(record))
@property
def default_datefmt(self):
return "%Y-%m-%d %H:%M:%S"
Explanation of the Handler:
- Initialization: The handler is initialized with the REST endpoint URL. It also creates an instance of
httpx.AsyncClient
to handle asynchronous HTTP requests. - emit_async Method: This asynchronous method formats the log record and sends it as a JSON payload to the specified endpoint. If the handler has a formatter, it uses it to format the log message and timestamp. Otherwise, it falls back to a default date format.
- emit Method: The standard
emit
method is overridden to create an asynchronous task for sending the log. This ensures that the logging operation does not block the main application. - Default Date Format: Provides a fallback date format in case no formatter is set.
Configuring the Logger
With both the server and the custom handler ready, the final step was to configure the Python logger to use the AsyncRESTHandler
. This setup ensures that all log messages are sent to the FastAPI server and stored for later retrieval.
Setting Up the Logger
import logging
# Configure the logger
logger = logging.getLogger('DebugLogger')
logger.setLevel(logging.DEBUG)
# Create an instance of AsyncRESTHandler
rest_endpoint = 'http://localhost:8000/logs' # FastAPI server endpoint
rest_handler = AsyncRESTHandler(endpoint=rest_endpoint)
rest_handler.setLevel(logging.DEBUG)
# Define a formatter and set it for the handler
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
rest_handler.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(rest_handler)
# Example log messages
logger.debug('This is a DEBUG message')
logger.info('This is an INFO message')
logger.warning('This is a WARNING message')
logger.error('This is an ERROR message')
logger.critical('This is a CRITICAL message')
Key Points:
- Logger Configuration: I created a logger named
'DebugLogger'
and set its level toDEBUG
to capture all levels of log messages. - Handler Attachment: The
AsyncRESTHandler
is attached to the logger. This means any log message emitted bylogger
will be sent to the FastAPI server. - Formatter: A formatter is defined to structure the log messages, including the timestamp, logger name, log level, and the actual message.
- Example Logs: To verify the setup, I included example log messages at various levels (
DEBUG
,INFO
,WARNING
,ERROR
,CRITICAL
).
Testing the Setup
With both the server and the logger configured, testing the setup was straightforward. Running the Python script containing the logger configuration should send log messages to the FastAPI server, which in turn stores them in the in-memory list.
Running the Logger Script
python send_logs.py
Expected Server Output:
As the script runs, the FastAPI server’s console should display the received log messages:
Received log: [DEBUG] 2023-10-05 12:00:00 - DebugLogger - DEBUG - This is a DEBUG message at 2023-10-05 12:00:00
Received log: [INFO] 2023-10-05 12:00:01 - DebugLogger - INFO - This is an INFO message at 2023-10-05 12:00:01
Received log: [WARNING] 2023-10-05 12:00:02 - DebugLogger - WARNING - This is a WARNING message at 2023-10-05 12:00:02
Received log: [ERROR] 2023-10-05 12:00:03 - DebugLogger - ERROR - This is an ERROR message at 2023-10-05 12:00:03
Received log: [CRITICAL] 2023-10-05 12:00:04 - DebugLogger - CRITICAL - This is a CRITICAL message at 2023-10-05 12:00:04
Retrieving Stored Logs
To view all stored logs, I accessed the /logs
endpoint using a web browser or curl
.
Using a Browser:
Navigating to http://localhost:8000/logs
displays a JSON array of all received log entries:
[
{
"level": "DEBUG",
"message": "2023-10-05 12:00:00 - DebugLogger - DEBUG - This is a DEBUG message",
"logger": "DebugLogger",
"timestamp": "2023-10-05 12:00:00",
"serverTimestamp": "2023-10-05T12:00:00.123456Z"
},
{
"level": "INFO",
"message": "2023-10-05 12:00:01 - DebugLogger - INFO - This is an INFO message",
"logger": "DebugLogger",
"timestamp": "2023-10-05 12:00:01",
"serverTimestamp": "2023-10-05T12:00:01.123456Z"
},
// ... more logs
]
Using curl
:
Alternatively, running the following command in the terminal retrieves the logs:
curl http://localhost:8000/logs
Response:
[
{
"level": "DEBUG",
"message": "2023-10-05 12:00:00 - DebugLogger - DEBUG - This is a DEBUG message",
"logger": "DebugLogger",
"timestamp": "2023-10-05 12:00:00",
"serverTimestamp": "2023-10-05T12:00:00.123456Z"
},
{
"level": "INFO",
"message": "2023-10-05 12:00:01 - DebugLogger - INFO - This is an INFO message",
"logger": "DebugLogger",
"timestamp": "2023-10-05 12:00:01",
"serverTimestamp": "2023-10-05T12:00:01.123456Z"
},
// ... more logs
]
This setup provided a clear and organized way to monitor log messages from my Dockerized PyFlink job. By centralizing the logs on the host machine, I could efficiently debug and monitor the application’s behavior without digging into container logs, which can often be scattered and hard to manage.
Conclusion
Debugging applications running inside Docker containers can be challenging, especially when it comes to accessing and monitoring logs effectively. My experience with a PyFlink job highlighted the need for a more streamlined logging solution. By implementing a custom Python logger that sends log messages to a FastAPI server, I established a centralized and efficient logging system. The asynchronous nature of the AsyncRESTHandler
ensured that logging operations remained non-blocking, preserving the performance of my application.
This approach not only simplified the debugging process but also provided a scalable way to handle logs from multiple sources. Whether you’re dealing with PyFlink or other Python applications running in containerized environments, setting up a centralized logging system using a REST endpoint can significantly enhance your ability to monitor and troubleshoot effectively.