20250212 ‐ logs from python to elastic - cywongg/2025 GitHub Wiki

Below is a more detailed explanation of how heartbeat monitoring typically works, who pings whom in each method, and how you might integrate warnings to mark downtime. We’ll also cover some options for exporting logs from Python to Elasticsearch.


1. Who Pings Whom?

Option A: Elastic Heartbeat (Elastic → App)

In this approach, a separate Beat called “Heartbeat” runs on your server or elsewhere and periodically pings the Python app. It sends requests (e.g., HTTP GET on a health endpoint) or checks TCP connections. Heartbeat then sends these results to Elasticsearch directly or through Logstash. Essentially, Heartbeat → Python app → Heartbeat → Elasticsearch.

• If the HTTP (or TCP) check fails, Heartbeat records a “down” event.
• If it succeeds, Heartbeat records an “up” event.

Option B: Python Writes Directly to Elasticsearch (App → Elastic)

Here, your Python script (the app) pings Elasticsearch by sending a “still alive” or “heartbeat” document at regular intervals. In other words, Python app → Elasticsearch.
• Python determines how/when it is “healthy” and sends that status up.
• If you stop seeing this “heartbeat doc” in Elasticsearch for some time, you know the app is down or unhealthy.

Option C: Logs-Driven Heartbeat with Filebeat (App Generates Logs → Filebeat → Elastic)

Your Python app logs heartbeat or status messages (“I’m OK”, “Warning…”, “Down…”, etc.). Then a Beat (like Filebeat or Logstash) reads these logs and pushes them into Elasticsearch. In other words, Python app → Logs → Filebeat → Elasticsearch.
• You can parse the logs in Logstash or ingest pipelines and interpret “Warning” or “Critical” messages as a sign of downtime.


2. Handling Warnings in the Logs and Marking Uptime as Down

If you want to interpret certain logs (“Warning” or “Exception” or “Error”) as a signal that the service is essentially “down,” you have a few options:

A. Use a Watcher/Alert (In Kibana)

  1. Parse your logs so warnings or errors are stored in Elasticsearch with a field that indicates severity.
  2. Create a Kibana Watcher or Alerting rule that fires when X number of warnings occur in a given timeframe.
  3. When the alert triggers, you can mark or store a record in Elasticsearch that sets “uptime_status = down” or sends a notification to your team.

B. Convert Warnings to “Heartbeat Down” in Real-Time

If your Python app has logic to detect a critical warning, it can push a special log or direct JSON to Elasticsearch with “status = down.” Then, in Kibana, you track that down event as “service is not healthy.”

Concretely:
• Python logs: “WARNING: Something broke: status=down.”
• Filebeat sees that line, sends it to Elasticsearch.
• In your index, it’s stored as a document with a field “status: down.”
• Kibana can visualize or alert on that status.


3. How to Export Python Logs to Elasticsearch

You have multiple ways to ship logs from a Python script to Elasticsearch:

A. Writing Logs to Disk and Using Filebeat

  1. Python logs (with the standard logging library) to a file.
  2. Filebeat monitors that file.
  3. Filebeat outputs logs to Elasticsearch.

Example Python logging config (simplified):

import logging

logging.basicConfig(
    filename='my_app.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def do_something():
    logging.info("App has started...")

if __name__ == "__main__":
    do_something()

Then in filebeat.yml:

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /path/to/my_app.log

output.elasticsearch:
  hosts: ["localhost:9200"]

B. Send Logs Directly from Python to Elasticsearch via API

You can use the official Elasticsearch Python client to index documents directly:

from elasticsearch import Elasticsearch
import datetime

es = Elasticsearch(["http://localhost:9200"])

def log_message_to_elastic(level, message):
    doc = {
        'timestamp': datetime.datetime.utcnow().isoformat(),
        'level': level,
        'message': message
    }
    es.index(index='python-logs', document=doc)

log_message_to_elastic("INFO", "Application started")
log_message_to_elastic("WARNING", "Something is not right")

This gives you full control. However, you lose a bit of the convenience that Beats provide for log rotation, caching, etc.

C. Use Logstash

You can configure your Python app to write logs, then run Logstash with an input plugin (like a file input or TCP input) and an output plugin to Elasticsearch.


4. Summary of Approaches

  1. Elastic Heartbeat
    • Elastic → App
    • Great for simple up/down checks (like ping or HTTP).
    • Doesn’t automatically parse logs or warnings.

  2. Python → Elasticsearch (Custom Heartbeat)
    • App → Elastic
    • Python decides if it’s healthy or not and sends events.
    • Good for custom logic.

  3. Logs + Filebeat
    • App logs to disk → Filebeat → Elasticsearch
    • Warnings can be parsed in Elasticsearch or Logstash.
    • Use Kibana alerting for “down” if you see warnings.

  4. Add Alerting
    • If a warning is logged, interpret that as “down” by an alert.
    • Or you can store a specific status field set to “down” whenever a warning is triggered.


Practical Example of “Warning → Down” with Filebeat

Imagine your Python script logs lines like:
“2025-02-12 12:00:01, INFO: Everything is OK.”
“2025-02-12 12:05:20, WARNING: Disk utilization 90%.”

1. Create a Pipeline or Logstash Config to Parse Severity

Use a Grok pattern in Logstash or an Ingest Pipeline in Elasticsearch to parse out the datetime, severity, message.

2. Save Severity in Elasticsearch

Store the log under fields:
@timestamp: 2025-02-12 12:05:20
log.severity: WARNING
log.message: "Disk utilization 90%"

3. Kibana Alert

Set an alert that says: “If I get a WARNING or ERROR in the last X minutes, mark the service as down.” The mechanism is up to you:
• You could write a Watcher that indexes (serviceStatus = DOWN).
• Or you could email/Slack your team.

4. Visualize

In a Kibana dashboard, you can show:
• A gauge for “Service is UP vs DOWN.”
• A timeseries of error counts.
• A table of last 5 warnings.


Takeaways

  • “Heartbeat” can be implemented either by letting Elastic ping your app (Elastic Heartbeat) or by having your app push signals to Elasticsearch.
  • Warnings in logs can be treated as a downtime signal if you designate them appropriately and have an alert or downstream process interpret them as “DOWN.”
  • To export Python logs to Elasticsearch, you can write logs to disk and ship them with Filebeat (common practice) or index them directly from Python using the Elasticsearch API (more control).

Which approach is right for you depends on:
• Whether your app can expose an HTTP endpoint.
• How rest of your logging pipeline is set up.
• Whether you want minimal overhead on your Python code vs. less external tooling.

All these strategies are valid in production, depending on scale, performance, and your operational requirements.

Below is a concise guide to the simplest setup, depending on whether you can expose an HTTP endpoint (e.g., a small web server) or not.


If You Can Expose an HTTP/Health Check Endpoint

  1. Spin up a small Python web server with a “/health” endpoint that returns a 200 status when the app is healthy. For example, using Flask:

    from flask import Flask, jsonify
    
    app = Flask(__name__)
    
    @app.route("/health", methods=["GET"])
    def health():
        return jsonify({"status": "OK"}), 200
    
    if __name__ == "__main__":
        app.run(host="0.0.0.0", port=8080)
    
  2. Install Elastic Heartbeat on the same machine (or wherever you’d like Heartbeat to run):

    • Download Heartbeat: https://www.elastic.co/downloads/beats/heartbeat

    • In the Heartbeat configuration file (heartbeat.yml), add a simple HTTP monitor:

      heartbeat.monitors:
        - type: http
          name: "Python Health Check"
          schedule: '@every 10s'
          urls: ["http://localhost:8080/health"]
          check.request:
            method: GET
          check.response:
            status: 200
      
      output.elasticsearch:
        hosts: ["localhost:9200"]
      
  3. Start Heartbeat and verify it’s sending data to Elasticsearch.

  4. In Kibana, go to the “Uptime” app to see whether the app is up or down.

That’s it. Heartbeat automatically indexes uptime data into Elasticsearch. The biggest advantage is it requires minimal custom code and is quick to set up.


If You Cannot Expose an HTTP/Health Check Endpoint

  1. Write a tiny “heartbeat” log line every few seconds from your Python script:

    import time
    import logging
    from datetime import datetime
    
    logging.basicConfig(
        filename='heartbeat.log',
        level=logging.INFO,
        format='%(asctime)s %(message)s'
    )
    
    if __name__ == "__main__":
        while True:
            logging.info("HEARTBEAT: OK - timestamp=%s", datetime.utcnow())
            time.sleep(10)
    
  2. Install Filebeat:

  3. Configure Filebeat (filebeat.yml) to watch the file you just created:

    filebeat.inputs:
      - type: log
        enabled: true
        paths:
          - /path/to/heartbeat.log
    
    output.elasticsearch:
      hosts: ["localhost:9200"]
    
  4. Start Filebeat. It will ship log lines to Elasticsearch.

  5. In Kibana, you can create a simple visualization or alert that checks if fresh “HEARTBEAT: OK” log lines have arrived recently. If no new lines show up in the last few seconds/minutes, consider the app “down.”


Handling Warnings as Downtime (Quick Method)

• If the log contains “WARNING,” you can consider that an immediate “down” event.
• In Kibana, create an alert that detects the presence of “WARNING” in the last X minutes. If found, mark or alert that your service is down.

Or, you can do something like the following in your Python code:

logging.warning("CRITICAL_WARNING: Something is wrong, status=down")

And then parse that in Elasticsearch (via Filebeat) to transition your status to “down.”


TL;DR

• If the Python app can expose a little HTTP health endpoint, the fastest route is Elastic Heartbeat → Health Endpoint → Elasticsearch. No extra Python code needed except a trivial health check.
• If not, just log a “heartbeat” message in Python to a file, and let Filebeat ship it to Elasticsearch. Then create a Kibana alert if logs stop or contain specific warnings.

These approaches are quick to set up and widely used in production.