Skip to main content

Command Palette

Search for a command to run...

Building a Modular Log Analysis Pipeline in Python and Crew AI

Download the source code provided at the bottom.

Published
3 min read
A

A DevOps practitioner dedicated to sharing practical knowledge. Expect in-depth tutorials and clear explanations of DevOps concepts, from fundamentals to advanced techniques. Join me on this journey of continuous learning and improvement!

Log analysis is critical for monitoring, troubleshooting, and understanding the health of applications yet, wrangling hundreds of log files can quickly become a daunting, error-prone task. As applications scale and systems become distributed, a streamlined, automated approach to log analysis is not just helpful, but essential.

In this blog post, I’ll walk you through the design and implementation of a modular log analysis pipeline in Python. The goal: automatically read logs from a directory, correlate entries by timestamp, and detect errors and exceptions—all in a clean, extensible manner.


Why Build a Log Analysis Pipeline?

  • Manual log review is tedious and error-prone.

  • Distributed systems produce massive, fragmented logs.

  • Quickly surfacing root causes is vital for uptime and user trust.

A good log analysis tool should be:

  • Automated: No manual searching or copying.

  • Extensible: Easy to add new analysis, formats, or integrations.

  • Clear: Output actionable, human-readable insights.


Architecture Overview

The pipeline is composed of three main agents:

  1. LogReaderAgent – Reads all .log files from a directory.

  2. CorrelationAgent – Groups log entries by timestamp for contextual analysis.

  3. FailureDetectionAgent – Detects and summarizes error or exception events.

The main script orchestrates these agents, making the tool easy to run and maintain.

Diagram


Implementation

1. Reading Log Files

The LogReaderAgent scans a directory and reads every .log file:

import glob
import os

class LogReaderAgent:
    def read_logs(self, logs_dir):
        logs = []
        log_files = glob.glob(os.path.join(logs_dir, "*.log"))
        for path in log_files:
            with open(path, encoding="utf-8") as f:
                logs.extend([line.strip() for line in f if line.strip()])
        return logs

2. Correlating Log Entries

The CorrelationAgent groups logs by their timestamp (assuming a standard format at the start of each line):

from collections import defaultdict
import re

class CorrelationAgent:
    def correlate(self, logs):
        events = defaultdict(list)
        for log in logs:
            match = re.match(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', log)
            if match:
                ts = match.group(1)
                events[ts].append(log)
            else:
                events["NO_TIMESTAMP"].append(log)
        return dict(events)

3. Detecting Failures

The FailureDetectionAgent hunts for lines containing "ERROR" or "Exception":

class FailureDetectionAgent:
    def detect_failures(self, correlated_events):
        failure_report = []
        for ts, logs in correlated_events.items():
            for log in logs:
                if "ERROR" in log or "Exception" in log:
                    failure_report.append(f"[{ts}] {log}")
        if not failure_report:
            return "No failures detected."
        summary = "Failures detected:\n" + "\n".join(failure_report)
        return summary

4. Orchestrating the Pipeline

The main.py script ties everything together:

if __name__ == "__main__":
    logs_dir = "logs"  # Path to your log directory

    reader = LogReaderAgent()
    logs = reader.read_logs(logs_dir)
    if not logs:
        print("No logs found.")
        exit()

    correlator = CorrelationAgent()
    correlated = correlator.correlate(logs)

    detector = FailureDetectionAgent()
    report = detector.detect_failures(correlated)

    print(report)

Running the Tool

  1. Install prerequisites (Python 3.7+, crewai if using agent base classes):

     pip install crewai
    
  2. Place your log files in a directory named logs/.

  3. Run the analysis:

     python main.py
    

Output Example

Failures detected:
[2025-05-20 10:27:00] ERROR: Database connection failed
[2025-05-20 10:27:01] Exception: Timeout occurred in module X
...

Extending the Pipeline

  • Custom error patterns: Tweak or expand the detection logic.

  • Visualization: Pipe output to a dashboard or Slack.

  • Support more formats: Add JSON log parsing or multi-line events.


Conclusion

This log analysis pipeline automates and accelerates one of the most repetitive parts of debugging and monitoring: finding the signal in the noise of log files. The modular design means you can add features and scale the tool as your needs grow.

Feel free to fork, adapt, and contribute!
🔗 [https://github.com/aditya-khadanga/ai-agent-crewai]


Happy debugging! 🐍🛠️

#Python #LogAnalysis #DevOps #Automation #OpenSource #Observability

DevOps Decoded

Part 9 of 12

We'll demystify CI/CD, automation, collaboration, and more. Understand DevOps, step by step.

Up next

Terrafrom vs OpenTofu

OpenTofu: How Open Source is Shaping the Future of Terraform