log_processor application - jastit00/IT-Sec-Projekt GitHub Wiki
log_processor
Author: Yunis G
Introduction
This is an application that reads log files, processes them, and writes the extracted information to a database.
Description
The application has a modular structure and consists of Python scripts. The scripts that are important for processing log files are services, models.py, serializers.py and views.
Directory Structure
- backend/
- log_processor/
- models.py
- serializers.py
- views.py
- services/
- log_parser.py
- log_uploader.py
- utils.py
- urls.py
- models.py
- views/
- upload.py
- analytics.py
- config.py
- unified_log.py
- utils.py
- validation.py
- log_processor/
models.py
This script defines the ORM models, which contain all relevant attributes to be extracted from the log file (e.g. timestamp, IP address, username, etc.). This enables structured storage and later analysis of the log data.
The script consists of two parts: definition of a class with the relevant attributes and then a function to instantiate this class.
The definition of the attributes follows this structure:
<Attribute name> = models.<Attribute type>()
, where models comes from the django.db library.
To see all types, please check the official documentation.
[!NOTE] If the log files have new information or a new structure, this script must be supplemented or changed.
Log Processing Services
This services/ directory contains the core logic for processing and analyzing log files in our application. The modules are clearly divided into task areas that together enable reading, parsing and storing log data.
Module Overview
-
log_parser.py
This module reads log files line by line and filters relevant log entries.- Classification of log entries by type (
USER_LOGIN,USER_LOGOUT,USYS_CONFIG,NETFILTER_PKTetc.) - Extraction of important fields using regular expressions (Regex)
- Avoidance of duplicates through previous database query
- Aggregation of network packets at 30-second intervals
- Storage of extracted data in corresponding database models
- Use of helper functions from
utils.py(e.g. IP validation, protocol names) - Call of an incident detector to detect security incidents after parsing
- Classification of log entries by type (
-
log_uploader.py
Responsible for handling the upload of log files by users:- Temporary storage of the uploaded file
- Calculation of a SHA256 hash for duplicate detection
- Checking if a file with the same hash already exists (prevents double processing)
hasher = hashlib.sha256()
for chunk in uploaded_file.chunks():
hasher.update(chunk)
temp_file.write(chunk)
file_hash = hasher.hexdigest()
Purpose of the Hash Function:
-
Duplicate prevention: Before saving, it checks if a file with identical content (same hash) already exists
-
Streaming processing: Large files are processed in chunks to avoid memory overflow
-
Integrity: The hash serves as a fingerprint for file integrity
- Call of
process_log_file()fromlog_parser.pyfor actual log processing - Storage of upload metadata and status (e.g. number of created entries, detected incidents) in the database
- Safe deletion of temporary file after processing
- Error handling for upload or parsing, with feedback of corresponding status codes
- Call of
-
utils.py
Collection of helper functions to support log processing:extract_timestamp(line): Extracts a Unix timestamp from the log and converts it to atimezone-awarePythondatetimeobjectextract_match(pattern, line, default): Performs a regex search on the log line and returns the first found matchis_valid_ip(ip_address): Checks if an IP address (IPv4 or IPv6) is validget_protocol_name(protocol_number): Translates protocol numbers (e.g. "6" → "TCP") into understandable names
Working Method in Detail
Upload and Duplicate Detection
- During upload, the file is read in chunks to keep memory usage low.
- A SHA256 hash of the file is calculated.
- If a file with identical hash has already been processed, processing is aborted and a corresponding status is returned to prevent duplicate uploads.
Log File Processing
- The log file is read line by line and examined for different log types.
- For each recognized log type, the necessary fields are extracted and checked whether an identical database entry already exists.
- Only new entries are saved to avoid duplicates in the database.
- Network packets (
NETFILTER_PKT) are temporally rounded and aggregated before being saved to enable more efficient storage.
Security Incidents
- After successful parsing,
process_log_file()calls the incident detector (detect_incidents()), which detects security incidents based on the stored data. - The number of newly detected incidents is returned and stored in the upload record.
Error Handling
- Invalid or faulty lines in the log file are skipped so that the overall process does not abort.
- Errors during file upload or processing are caught and lead to an error status that is communicated to the user.
- Temporary files are always safely deleted at the end of processing, even in case of errors.
Example Workflow
- A user uploads a log file (
log_uploader.handle_uploaded_log_file()). - The file is temporarily stored and a SHA256 hash is calculated.
- The hash is checked against already processed files to avoid duplicates.
- The file is read and evaluated line by line by
log_parser.process_log_file(). - The extracted data is stored in the database, only new entries are added.
- Network packets are aggregated and also stored.
- An incident detector is executed that determines security incidents.
- The temporary file is deleted.
- A summary (status, number of entries, detected incidents) is returned.
UML workflow overview:
views.py / views/ – API Endpoint Management
In this modular directory, the API endpoints are defined with which log files can be uploaded, processed and analyzed data can be retrieved from the database. The endpoints use Django REST Framework (DRF) and access the models of log processing (log_processor) and incident detection (incident_detector).
Directory Structure:
views/
├── upload.py # Log file upload and processing
├── analytics.py # Data analysis endpoints
├── config.py # Configuration management
├── unified_log.py # Unified event logs
├── utils.py # Helper functions
└── validation.py # Keycloak authentication
The views system consists of six main areas:
- Upload logic via LogFileUploadView
- Analytics endpoints for querying processed data
- Configuration management for incident detection
- Unified event system for combined logs
- Utility functions for data processing
- Security validation through Keycloak integration
upload.py – Log File Upload and Processing
LogFileUploadView – Intelligent File Upload with Security Features
This view allows uploading a .log file via a POST request. The file is temporarily stored and then processed by the service handle_uploaded_log_file(). Relevant information is extracted (e.g. IP address, username, timestamp).
uploaded_file = request.FILES.get('file')
source = request.data.get('source', 'unknown')
# Authenticated user from Keycloak
keycloak_user = request.keycloak_user
uploaded_by_user = keycloak_user.get('preferred_username')
Security Validation:
- The file is only accepted if it ends with
.log - Keycloak authentication is mandatory for all uploads
- User tracking – every upload is assigned to the authenticated user
- Comprehensive logging of all upload activities
File Integrity through SHA256 Hash:
The hash value created in log:uploader.py is compared, if an identical hash already exists, an error message is output
if UploadedLogFile.objects.filter(file_hash=file_hash).exists():
return Response({"status": "error", "message": "This file has already been uploaded."},
status=status.HTTP_400_BAD_REQUEST)
Extended Response Structure:
Upon successful processing, a detailed entry is saved in UploadedLogFile and the following information is returned:
{
"id": 123,
"status": "success",
"filename": "audit_2025-06-15.log",
"entries_created": 1547,
"incidents_created_total": 23,
"incident_counts": {
"dos": 5,
"ddos": 3,
"bruteforce": 15,
"config": 0,
"concurrent_login": 0
}
}
Error Handling:
- Parsing errors are logged in detail and reported to the client with a 500 status
- Invalid file types lead to a 400 status
- Duplicates are rejected with a meaningful error message
analytics.py – Data Analysis Endpoints
This file contains specialized GET APIs for querying the analyzed database entries. All endpoints support flexible time filtering and use the optimized get_filtered_queryset function from utils.py.
Available Endpoints:
| Endpoint | Purpose | Special Features |
|---|---|---|
processed_logins |
User login analysis | Time filtering, success/failure |
processed_config_changes |
Configuration changes | Complete change history |
ddos_packets |
DDoS attack data | Field optimization for performance |
dos_packets |
DoS attack data | Field optimization for performance |
Time-based Filtering:
All analytics endpoints support optional time parameters:
GET /api/logins/?start=2025-01-01&end=2025-03-01
GET /api/config-changes/?start=2025-01-15
GET /api/ddos-incidents/?end=2025-02-28
Performance Optimization through Field Filtering:
For memory-intensive endpoints like ddos_packets and dos_packets, only relevant fields are transmitted:
fields_to_keep = ['timestamp', 'dst_ip_address', 'protocol', 'packets', 'timeDelta', 'sources']
data = get_filtered_queryset(
model=DDosIncident,
serializer_class=DDosIncidentSerializer,
fields_to_keep=fields_to_keep,
start=start,
end=end
)
Automatic Sorting:
- All endpoints deliver data chronologically sorted (newest first)
- Optimized database queries through
order_by('-timestamp')
config.py – Configuration Management
IncidentConfigAPIView – Live Configuration of Incident Detection
This specialized view enables real-time configuration of incident detection parameters without service restart.
Extended Validation Logic:
def post(self, request):
dos_config = request.data.get('dos', {})
ddos_config = request.data.get('ddos', {})
dos_time_delta = dos_config.get('time_delta')
ddos_time_delta = ddos_config.get('time_delta')
Intelligent Minimum Time Validation:
The system enforces a 30-second minimum time for time_delta parameters, based on the 30-second packet window:
if int(dos_time_delta) < 30:
return Response({
"status": "error",
"message": "DoS time_delta must be at least 30 seconds due to 30s packet window."
}, status=status.HTTP_400_BAD_REQUEST)
Smart Configuration Management:
- Real-time comparison: New configuration is compared with current one
- Avoid unnecessary updates: Identical configurations are not saved
- Automatic incident recalculation: When changes occur, all incidents are re-evaluated
current_config, _ = get_current_config()
if current_config == new_config:
return Response({"message": "Config unchanged"}, status=status.HTTP_200_OK)
result = update_config(new_config)
last_updated = save_new_config(new_config)
Extended Response Structure:
{
"message": "Configuration updated successfully",
"last_updated": "2025-06-15T14:30:00Z",
"changed": true,
"total_incidents": 157,
"result": {
"recalculated": 45,
"new_incidents": 3
},
"config": {
"dos": {"time_delta": 60, "threshold": 100},
"ddos": {"time_delta": 120, "threshold": 1000}
}
}
unified_log.py – Revolutionary Event Aggregation System
unified_event_log – Single Endpoint for All Security Events
This game-changer endpoint /api/unified-log/ combines all events (logins, logouts, incidents, config changes) in a single, chronologically sorted list.
Multi-Model Aggregation:
models_and_serializers = [
(UserLogin, UserLoginSerializer),
(UserLogout, UserLogoutSerializer),
(UsysConfig, UsysConfigSerializer),
(NetfilterPackets, NetfilterPacketsSerializer),
(DDosIncident, DDosIncidentSerializer),
(DosIncident, DosIncidentSerializer),
(ConfigIncident, ConfigIncidentSerializer),
(ConcurrentLoginIncident, ConcurrentLoginIncidentSerializer),
(BruteforceIncident, BruteforceIncidentSerializer),
]
Unified Event Structure:
Each event is enriched with standardized fields:
event_type: Classification ("login", "incident", "config", etc.)severity: Risk assessment ("normal", "warning", "critical")
Automatic Severity Classification:
# Implicit in serializer:
# - Successful logins → "normal"
# - Failed logins → "warning"
# - All incidents → "critical"
# - Config changes → "normal"
Intelligent Field Filtering:
Only relevant fields are transmitted to optimize performance:
fields_to_keep = [
'timestamp', 'event_type', 'reason', 'src_ip_address', 'dst_ip_address',
'action', 'result', 'severity', 'packet_input', 'incident_type',
'protocol', 'count', 'table'
]
Chronological Sorting:
The final list is sorted by timestamp (newest first):
sorted_events = sorted(
filtered_events,
key=lambda x: x.get('timestamp') or '0000-00-00T00:00:00',
reverse=True
)
Advantage: Clients receive a complete overview of all security-relevant events with a single API call, chronologically ordered and with uniform structure.
utils.py – Performance-optimized Helper Functions
get_filtered_queryset – Universal Data Query Engine
This central function standardizes data querying for all analytics endpoints:
def get_filtered_queryset(model, serializer_class, start=None, end=None, fields_to_keep=None):
queryset = model.objects.all()
# Time filtering
if start:
queryset = queryset.filter(timestamp__gte=start)
if end:
queryset = queryset.filter(timestamp__lte=end)
# Automatic sorting
queryset = queryset.order_by('-timestamp')
# Serialization
serializer = serializer_class(queryset, many=True)
data = serializer.data
# Optional field filtering
if fields_to_keep:
return filter_fields(data, fields_to_keep)
return data
filter_fields – Memory-optimized Field Filtering
def filter_fields(data, fields_to_keep):
return [{k: item[k] for k in fields_to_keep if k in item} for item in data]
Advantages:
- Consistent API structure across all endpoints
- Performance optimization through selective field transmission
- Automatic sorting for better UX
- Flexible time filtering without code duplication
validation.py – Enterprise Security through Keycloak
Keycloak JWT Token Validation – Modern Authentication
All API endpoints are protected by Keycloak JWT token validation:
@method_decorator(keycloak_required, name='dispatch')
class LogFileUploadView(APIView):
# Only authenticated users can access
validate_keycloak_token – Robust Token Validation
def validate_keycloak_token(auth_header):
if not auth_header or not auth_header.startswith('Bearer '):
return None
token = auth_header.split(' ')[1]
Extended Security Features:
- Dynamic Public Key Lookup: Automatic retrieval of Keycloak certificates
- RSA256 Signature Validation: Cryptographic token verification
- Token Expiry Check: Automatic expiration control
- Key ID Matching: Use of correct public keys
# Get Keycloak public keys
response = requests.get(f'{KEYCLOAK_URL}/protocol/openid-connect/certs')
keys_data = response.json().get('keys', [])
# Decode token header to get the kid (key ID)
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get('kid')
# Find the matching key and validate
key_data = None
for key in keys_data:
if key.get('kid') == kid:
key_data = key
break
# Convert JWK to PEM and validate token
public_key = RSAAlgorithm.from_jwk(key_data)
decoded = jwt.decode(token, public_key, algorithms=['RS256'], audience='account')
keycloak_required Decorator – Seamless Integration
def keycloak_required(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
auth_header = request.META.get('HTTP_AUTHORIZATION')
user_data = validate_keycloak_token(auth_header)
if not user_data:
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# Attach user data to request
request.keycloak_user = user_data
return view_func(request, *args, **kwargs)
return wrapper
Security Advantages:
- Stateless Authentication: No server session required
- Automatic Public Key Rotation: Supports Keycloak key updates
- User Context Injection: Authenticated user data automatically available
- Enterprise-ready: Scalable for multi-tenant environments
serializers.py
Data structures for API communication
This script defines the serializers. They control how database objects are translated to JSON and converted back – especially when accessing log data via API endpoints.
The script contains the classes, one for each data model:
| Serializer Class | Purpose |
|---|---|
UserLoginSerializer |
For login events |
UserLogoutSerializer |
For logout events |
UsysConfigSerializer |
For configuration changes |
LogFileSerializer |
For log file upload process |
class Meta:
model = User_Login
fields = '__all__'
- Returns all fields of the User_Login model as JSON.
- Used e.g. in the processed_logins view.
For each detected incident type, there is also a separate serializer for structured output of the respective data:
BruteforceIncidentSerializerfor brute force attacksDosIncidentSerializerfor DoS attacksDDosIncidentSerializerfor DDoS attacksConcurrentLoginIncidentSerializerfor concurrent loginsConfigIncidentSerializerfor suspicious configuration changes
These serializers are each based on their associated database models and output all fields (fields = '__all__'). They enable clear API representation and facilitate further processing by frontend or other systems.
Also in incident_detector.serializers.IncidentDetectorConfigSerializer, where automatic configuration is done through the following defaults:
brute_force:attempt_threshold=10,time_delta=120,repeat_threshold=600dos:packet_threshold=100,time_delta=30,repeat_threshold=120ddos:packet_threshold=30,time_delta=30,repeat_threshold=60,min_sources=2
Automatic configuration through sensible default values improves user-friendliness and reduces the effort for manual settings. Also in the incident_detector, where the IncidentDetectorConfigSerializer already provides defaults for each attack type (brute force, DoS, DDoS).
urls.py – Routing
Project URLs (backend/urls.py)
This file forwards all requests that start with /api/ to the log_processor app:
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('api/', include('log_processor.urls')),
]
App URLs (log_processor/urls.py)
Here the specific API endpoints of the log_processor application are defined:
from django.urls import path
from . import views
urlpatterns = [
path('logfiles/', LogFileUploadView.as_view(), name='upload-log-file'),
path('incidents-config/', IncidentConfigAPIView.as_view(), name='incidents-config'),
path('logfiles/processed-logins/', processed_logins, name='processed-logins'),
path('logfiles/config-changes/', processed_config_changes, name='processed-config-changes'),
path('logfiles/unified-event-log/', unified_event_log, name='unified-event-log'),
path('logfiles/dos-packets/', dos_packets, name='dos-packets'),
path('logfiles/ddos-packets/', ddos_packets, name='ddos-packets'),
]
| Endpoint | Method | Description |
|---|---|---|
/api/logfiles/ |
POST | Upload a log file |
/api/incidents-config/ |
POST | Get current thresholds for incident detection |
/api/logfiles/processed-logins/ |
GET | List of processed login events |
/api/logfiles/config-changes/ |
GET | List of detected configuration changes |
/api/logfiles/unified-event-log/ |
GET | Combined event log (login, logout, ...) |
/api/logfiles/dos-packets/ |
GET | Overview of detected DoS packets |
/api/logfiles/ddos-packets/ |
GET | Overview of detected DDoS packets |
Performance Features & Optimizations
System-wide Performance Improvements:
- Lazy Loading: Data is only loaded when needed
- Selective Serialization: Only required fields are transmitted
- Intelligent Caching: SHA256 hash system prevents duplicates
- Streaming Processing: Large files are processed in chunks
- Optimized Queries: Automatic timestamp sorting at database level
Memory Management:
- Chunk-wise file processing prevents memory overflow
- Field filtering reduces network traffic
- Optimized serializers minimize CPU load
Comprehensive Error Handling & Logging
Detailed Logging:
Each endpoint logs important activities:
logger.info(f"Audit log uploaded by {uploaded_by_user}: {uploaded_file.name}")
logger.warning("Upload attempt with invalid file type.")
logger.exception("Error while processing log file.")
Graceful Degradation:
- Meaningful error messages without system exposure
- Structured HTTP status codes for all scenarios
- Automatic cleanup on errors (temporary files)
Risks & Security Considerations
Potential Security Risks:
-
DoS through large files: An attacker could exhaust server resources (CPU, disk space) by repeatedly uploading large files.
- Mitigation: Upload size limitation and file type validation implemented
- Recommendation: Add rate limiting per user
-
JWT Token Compromise: Compromised tokens could enable unauthorized access.
- Mitigation: Short token lifespan and automatic rotation
- Recommendation: Additional IP-based validation
-
Memory Exhaustion: Very large log files could cause memory problems.
- Mitigation: Chunk-based processing implemented
- Recommendation: Define maximum file size per upload
Security Recommendations:
- Regular security audits of Keycloak configuration
- Monitoring of upload patterns for anomaly detection
- Implementation of request rate limiting
- Automatic cleanup of old temporary files