Examples - RobertoPrevato/rolog GitHub Wiki
Defining a LogTarget
A log target must implement a single method async def log(self, record: LogRecord):, responsible of delivering to a desired destination a log record. In this example, it's shown the definition of the test target used in unit tests inside rolog
source code:
from rolog import LogTarget
class InMemoryTarget(LogTarget):
def __init__(self):
self.records = []
async def log(self, record):
self.records.append(record)
Defining a FlushLogTarget
A flush log target is a kind of LogTarget
, with methods to handle logging of group of records, with automatic retries and progressive delays in case of failure. For example, this class should be used when log records need to be sent to an external web api (such as Azure Application Insights
or DataDog
), or to a database of choice. In such cases, it wouldn't make sense to make a shot for every single log record.
from typing import List
from rolog import FlushLogTarget, LogRecord
class SomeLogApiFlushLogTarget(FlushLogTarget):
def __init__(self, http_client):
super().__init__()
self.http_client = http_client
async def log_records(self, records: List[LogRecord]):
# NB: implement here your own logic to make web requests to send log records
# to a web api, such as Azure Application Insights
# (see for example https://pypi.org/project/asynapplicationinsights/)
pass
The behavior of a FlushLogTarget
can be controlled using the constructor parameters described in the table below.
Parameter | Type | Description |
---|---|---|
max_size | int, default 500 | The number of log records to store in memory, before flushing and calling log_records method. |
max_retries | int, default 3 | The number of max retries, when a call to log_records fails. |
retry_delay | float, default 0.6 | The number of seconds before a retry, when an attempt to send records to desired destination fails. |
progressive_delay | bool, default True | Whether to increase the retry_delay , multiplying it by the attempt number. |
fallback_target | LogTarget, default None | A fallback target to use when calls to log_records fails more than max_retries times. |
Registering multiple targets
This example shows how to register multiple targets in a LoggerFactory
, with different minimum log level.
import pytest
from rolog import LogLevel, LoggerFactory, LogRecord
from rolog.targets import BuiltInLoggingTarget, DynamicBuiltInLoggingTarget
from rolog.tests import InMemoryTarget
@pytest.mark.asyncio
async def test_logger_factory_minimum_level():
factory = LoggerFactory()
test_target_1 = InMemoryTarget()
test_target_2 = InMemoryTarget()
factory\
.add_target(test_target_1, LogLevel.INFORMATION) \
.add_target(test_target_2, LogLevel.ERROR)
logger = factory.get_logger(__name__)
assert logger is not None
await logger.info('Hello, World')
assert 'Hello, World' == test_target_1.records[0].message
assert not test_target_2.records
await logger.error('Oh, no!')
assert 'Oh, no!' == test_target_1.records[1].message
assert 'Oh, no!' == test_target_2.records[0].message
Including extra arguments in log records
@pytest.mark.asyncio
async def test_records_support_extra_arguments():
factory = LoggerFactory()
test_target = InMemoryTarget()
factory.add_target(test_target)
logger = factory.get_logger(__name__)
assert logger is not None
await logger.info('Hello, World', 'One', 'Two', 'Three')
record = test_target.records[0] # type: LogRecord
assert ('One', 'Two', 'Three') == record.args
Including extra data in log records
@pytest.mark.asyncio
async def test_records_support_extra_data():
factory = LoggerFactory()
test_target = InMemoryTarget()
factory.add_target(test_target)
logger = factory.get_logger(__name__)
assert logger is not None
await logger.info('Hello, World', id=2016, name='Tyberiusz')
record = test_target.records[0] # type: LogRecord
assert {
'id': 2016,
'name': 'Tyberiusz'
} == record.data