Building a Modular Log Analysis Pipeline in Python and Crew AI
Download the source code provided at the bottom.
A DevOps practitioner dedicated to sharing practical knowledge. Expect in-depth tutorials and clear explanations of DevOps concepts, from fundamentals to advanced techniques. Join me on this journey of continuous learning and improvement!
Log analysis is critical for monitoring, troubleshooting, and understanding the health of applications yet, wrangling hundreds of log files can quickly become a daunting, error-prone task. As applications scale and systems become distributed, a streamlined, automated approach to log analysis is not just helpful, but essential.
In this blog post, I’ll walk you through the design and implementation of a modular log analysis pipeline in Python. The goal: automatically read logs from a directory, correlate entries by timestamp, and detect errors and exceptions—all in a clean, extensible manner.
Why Build a Log Analysis Pipeline?
Manual log review is tedious and error-prone.
Distributed systems produce massive, fragmented logs.
Quickly surfacing root causes is vital for uptime and user trust.
A good log analysis tool should be:
Automated: No manual searching or copying.
Extensible: Easy to add new analysis, formats, or integrations.
Clear: Output actionable, human-readable insights.
Architecture Overview
The pipeline is composed of three main agents:
LogReaderAgent – Reads all
.logfiles from a directory.CorrelationAgent – Groups log entries by timestamp for contextual analysis.
FailureDetectionAgent – Detects and summarizes error or exception events.
The main script orchestrates these agents, making the tool easy to run and maintain.
Diagram
Implementation
1. Reading Log Files
The LogReaderAgent scans a directory and reads every .log file:
import glob
import os
class LogReaderAgent:
def read_logs(self, logs_dir):
logs = []
log_files = glob.glob(os.path.join(logs_dir, "*.log"))
for path in log_files:
with open(path, encoding="utf-8") as f:
logs.extend([line.strip() for line in f if line.strip()])
return logs
2. Correlating Log Entries
The CorrelationAgent groups logs by their timestamp (assuming a standard format at the start of each line):
from collections import defaultdict
import re
class CorrelationAgent:
def correlate(self, logs):
events = defaultdict(list)
for log in logs:
match = re.match(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', log)
if match:
ts = match.group(1)
events[ts].append(log)
else:
events["NO_TIMESTAMP"].append(log)
return dict(events)
3. Detecting Failures
The FailureDetectionAgent hunts for lines containing "ERROR" or "Exception":
class FailureDetectionAgent:
def detect_failures(self, correlated_events):
failure_report = []
for ts, logs in correlated_events.items():
for log in logs:
if "ERROR" in log or "Exception" in log:
failure_report.append(f"[{ts}] {log}")
if not failure_report:
return "No failures detected."
summary = "Failures detected:\n" + "\n".join(failure_report)
return summary
4. Orchestrating the Pipeline
The main.py script ties everything together:
if __name__ == "__main__":
logs_dir = "logs" # Path to your log directory
reader = LogReaderAgent()
logs = reader.read_logs(logs_dir)
if not logs:
print("No logs found.")
exit()
correlator = CorrelationAgent()
correlated = correlator.correlate(logs)
detector = FailureDetectionAgent()
report = detector.detect_failures(correlated)
print(report)
Running the Tool
Install prerequisites (Python 3.7+, crewai if using agent base classes):
pip install crewaiPlace your log files in a directory named
logs/.Run the analysis:
python main.py
Output Example
Failures detected:
[2025-05-20 10:27:00] ERROR: Database connection failed
[2025-05-20 10:27:01] Exception: Timeout occurred in module X
...
Extending the Pipeline
Custom error patterns: Tweak or expand the detection logic.
Visualization: Pipe output to a dashboard or Slack.
Support more formats: Add JSON log parsing or multi-line events.
Conclusion
This log analysis pipeline automates and accelerates one of the most repetitive parts of debugging and monitoring: finding the signal in the noise of log files. The modular design means you can add features and scale the tool as your needs grow.
Feel free to fork, adapt, and contribute!
🔗 [https://github.com/aditya-khadanga/ai-agent-crewai]
Happy debugging! 🐍🛠️
#Python #LogAnalysis #DevOps #Automation #OpenSource #Observability





