===============================================================================
README: Locust + pytest — Anti-patterns, Risks, and Practical Alternatives
===============================================================================
OVERVIEW
--------
This document describes common anti-patterns when combining Locust (load
testing) with pytest (functional/unit testing), explains why they are harmful,
and recommends pragmatic alternatives and guardrails for maintainable test
engineering.
ANTI-PATTERNS
-------------
1) Test runner confusion and discovery problems
- Mixing pytest test functions and Locust user tasks breaks mental model:
pytest's collection/assertion model != Locust's long-lived user model.
- Result: fragile runs, misleading failures, and brittle CI integration.
2) Duplication of intent and misused abstractions
- Copying pytest functional tests into Locust tasks brings heavy assertions
and fixtures into load runs.
- Result: distorted performance signals from test-harness work, not SUT.
3) False confidence from functional-to-load reuse
- Passing pytest coverage as "we load-tested this" is misleading.
- Result: concurrency, pacing, session state under load stay untested.
4) Resource and timing skew
- Synchronous pytest patterns, heavy fixtures, or per-request asserts cause
GC, DB-connection, or test-harness contention that contaminates metrics.
5) Test lifecycle and side-effect mismanagement
- Reusing mutating pytest fixtures across many virtual users pollutes state
and produces non-reproducible, flaky scenarios.
6) CI and scaling anti-patterns
- Embedding Locust inside pytest in CI hinders distributed orchestration,
complicates scaling, and produces noisy failures.
PRACTICAL ALTERNATIVES
----------------------
- Keep concerns separated:
* pytest => correctness, fast unit and deterministic functional tests.
* Locust => realistic user behaviour, pacing, concurrency, and throughput.
- Extract reusable, framework-agnostic helpers:
* HTTP client wrappers, serializers, login helpers used by both suites.
* Avoid pytest fixtures inside Locust user classes.
- Make Locust tasks minimal:
* No heavy assertions; log responses for offline analysis.
* Add realistic pacing, jitter, and session reuse.
- Use idempotent setup/cleanup for load tests:
* Per-user ephemeral data or isolated test tenants.
- Dedicated pipelines:
* Run load tests in separate CI jobs/environments with their own metrics,
thresholds, and failure criteria.
GUIDELINES & GUARDRAILS
-----------------------
- Do not assert on high-concurrency paths inside Locust. Use passive checks and
separate correctness test runs.
- Limit shared mutable fixtures; prefer read-only fixtures or per-user
provisioning that is explicitly torn down.
- Instrument extensively: collect headers, timings, and server-side trace IDs.
- Use conservative caps and stop conditions when probing production:
* max attempts, burst caps, cool-downs, jitter, and explicit stop-on-lockout.
- Treat rate testing as measurement, not verification. Report observed
thresholds and hysteresis rather than binary pass/fail.
SAMPLE WORKFLOW (recommended)
----------------------------
1) Implement HTTP client helpers (login, token refresh) in a small library.
2) Write pytest tests that exercise correctness using that library.
3) Implement Locust users that call the same library but avoid assertions.
4) Run Locust in dedicated environment; collect metrics and response logs.
5) Analyze artifacts; map load thresholds to SLOs and remediation steps.
CONCLUSION
----------
Combining pytest and Locust directly is an anti-pattern when it mixes
execution models and responsibilities. Keep testing layers separated,
share non-test harness code, and design load scenarios to be lightweight,
idempotent, and observability-rich.
===============================================================================
"""
locustfile.py
Purpose:
- Politely probe rate limits for an auth endpoint (form or API) with configurable ramps.
- Record timestamps, status codes, and relevant headers (Retry-After, X-RateLimit-*) for analysis.
- Stop/slow on defensive signals (429, persistent 403/lock messages, CAPTCHA patterns).
Safety note:
- Run only with explicit authorization from the site owner or against a staging instance.
- Keep total_attempts cap and cool-downs conservative by default.
"""
import time
import random
from datetime import datetime, timedelta
from locust import HttpUser, task, between, events
# CONFIG
TARGET_LOGIN_PATH = "/login" # POST endpoint (adjust)
TEST_USERNAME = "test-user" # use dedicated test account
TEST_PASSWORD = "test-pass"
MAX_TOTAL_ATTEMPTS = 500 # hard cap for total attempts in a test run
BURST_SIZES = [5, 10, 20] # sequences of attempts per burst
BURST_INTERVAL_SECONDS = 60 # cool-down between bursts
JITTER_PCT = 0.20 # jitter ±20% between requests
STOP_ON_LOCKOUT = True # stop when persistent lockout detected
OBSERVE_HEADERS = ["Retry-After", "X-RateLimit-Limit", "X-RateLimit-Remaining", "X-RateLimit-Reset"]
# Helper: polite sleep with jitter
def polite_sleep(base_seconds):
jitter = base_seconds * JITTER_PCT
sleep_for = base_seconds + random.uniform(-jitter, jitter)
time.sleep(max(0, sleep_for))
# Simple logger to stdout (can be extended to file)
def record_event(name, metadata):
now = datetime.utcnow().isoformat() + "Z"
print(f"{now} | {name} | {metadata}")
# Global counters (keeps test from runaway)
global_attempts = {"count": 0}
class RateLimitProber(HttpUser):
# keep user idle between bursts; actual pacing controlled in task logic
wait_time = between(0.5, 1.5)
# optional: set host via command line or environment; Locust supports --host
# host = "https://example.com"
def on_start(self):
# init per-user state
self.attempts_done = 0
self.burst_index = 0
self.last_defensive_ts = None
self.lockout_detected = False
record_event("probe_start", {"username": TEST_USERNAME})
def safe_post_login(self):
"""
Perform a single login attempt and record useful response metadata.
Returns tuple: (status_code, parsed_headers, body_snippet)
"""
# build payload; adjust form keys as required by target
payload = {"username": TEST_USERNAME, "password": TEST_PASSWORD}
# add a lightweight marker to help correlate server-side logs (public id)
payload["_probe_id"] = f"probe-{int(time.time()*1000)}"
# send request
with self.client.post(TARGET_LOGIN_PATH, data=payload, catch_response=True, name="login_attempt") as resp:
code = resp.status_code
headers = {h: resp.headers.get(h) for h in OBSERVE_HEADERS if resp.headers.get(h) is not None}
# record a short body snippet for human analysis (avoid logging secrets)
body_snippet = (resp.text or "")[:400].replace("\n", " ") if resp.text else ""
metadata = {"status": code, "headers": headers, "snippet_len": len(body_snippet)}
record_event("attempt_result", metadata)
# detect obvious defensive signals
if code == 429:
resp.failure("429 Too Many Requests")
elif code in (403, 401) and ("locked" in body_snippet.lower() or "suspended" in body_snippet.lower()):
resp.failure("Account locked or suspended detected")
else:
resp.success()
return code, headers, body_snippet
@task
def run_probe_bursts(self):
# enforce global cap
if global_attempts["count"] >= MAX_TOTAL_ATTEMPTS:
record_event("max_attempts_reached", {"global": global_attempts["count"]})
self.environment.runner.quit()
return
# sequence of bursts: pick next burst size cyclically
burst_size = BURST_SIZES[self.burst_index % len(BURST_SIZES)]
self.burst_index += 1
record_event("burst_start", {"size": burst_size})
for i in range(burst_size):
# check global cap again
if global_attempts["count"] >= MAX_TOTAL_ATTEMPTS:
break
global_attempts["count"] += 1
self.attempts_done += 1
status, headers, snippet = self.safe_post_login()
# heuristic defensive handling
if status == 429:
# if Retry-After provided, respect it
retry_after = None
if "Retry-After" in headers:
try:
retry_after = int(headers["Retry-After"])
except Exception:
# header can be HTTP-date or seconds; fall back to estimate
retry_after = None
record_event("throttled", {"retry_after": retry_after, "global_attempts": global_attempts["count"]})
# honor Retry-After if parseable; else modest backoff
if retry_after:
polite_sleep(retry_after)
else:
polite_sleep(5) # conservative fallback
# break burst to cool down
break
if status in (401, 403):
# if the body suggests lockout, escalate and optionally stop
if "locked" in snippet.lower() or "suspended" in snippet.lower():
record_event("lockout_signaled", {"snippet": snippet[:200]})
self.lockout_detected = True
self.last_defensive_ts = time.time()
if STOP_ON_LOCKOUT:
record_event("stopping_on_lockout", {"global_attempts": global_attempts["count"]})
self.environment.runner.quit()
return
else:
# if not stopping, impose a conservative backoff
polite_sleep(60)
break
# polite pacing between attempts in a burst
polite_sleep(1)
# cool-down after burst
record_event("burst_end", {"size": burst_size, "attempts_done_by_user": self.attempts_done})
polite_sleep(BURST_INTERVAL_SECONDS)
# optional: if lockout was recently observed, extend cooldown
if self.lockout_detected:
polite_sleep(300)
# end of task; Locust schedules this task repeatedly until stopped
# Optional: attach listeners to capture aggregated events or export CSVs
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
record_event("test_stop", {"total_attempts": global_attempts["count"]})
import os
import sys
from locust import HttpUser, task, between
from pathlib import Path
# Credentials from environment (sourced via .bashrc)
USERNAME = os.getenv("LOGIN_USERNAME")
PASSWORD = os.getenv("LOGIN_PASSWORD")
# Target base URL from command line argument
if len(sys.argv) < 2:
print("Usage: locust -f script.py <base_url>")
sys.exit(1)
BASE_URL = sys.argv[-1]
# Local path to save downloaded file
DOWNLOAD_PATH = Path("downloads/bop.csv")
DOWNLOAD_PATH.parent.mkdir(parents=True, exist_ok=True)
class WebsiteUser(HttpUser):
host = BASE_URL
wait_time = between(1, 2)
def on_start(self):
self.login()
def login(self):
with self.client.post(
"/login",
data={"username": USERNAME, "password": PASSWORD},
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Login failed: {response.status_code}")
raise Exception(f"Login failed with status: {response.status_code}")
response.success()
@task
def get_foo(self):
self.check_status("/foo")
@task
def get_bar(self):
self.check_status("/foo/bar")
@task
def get_baz(self):
self.check_status("/foo/bar/baz")
@task
def download_csv(self):
with self.client.get("/foo/bar/baz/bop.csv", stream=True, catch_response=True) as response:
if response.status_code != 200:
response.failure(f"Download failed: {response.status_code}")
raise Exception(f"CSV download failed: {response.status_code}")
with open(DOWNLOAD_PATH, "wb") as f:
f.write(response.content)
response.success()
# Trigger logout request after successful download
with self.client.get("/logout", catch_response=True) as logout_response:
if logout_response.status_code != 200:
logout_response.failure(f"Logout failed: {logout_response.status_code}")
raise Exception(f"Logout failed with status: {logout_response.status_code}")
logout_response.success()
'''
lr_jwt.py
'''
import jwt
import datetime
def long_running_jwt()
# Define a secret key
SECRET_KEY = "your_secret_key"
# Create a payload with a long expiration time
payload = {
"user": "locust_test_user",
"iat": datetime.datetime.utcnow(), # Issued at
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=6), # Expires in 6 hours
}
# Generate the token
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
print(f"Generated JWT: {token}")
return token
from locust import HttpUser, task
from lr_jwt import long_running_jwt
# Replace with your generated JWT
JWT_TOKEN = long_running_jwt()
class LoadTestUser(HttpUser):
@task
def test_endpoint(self):
headers = {"Authorization": f"Bearer {JWT_TOKEN}"}
self.client.get("/your/api/endpoint", headers=headers)