Response Handling - CrowdStrike/falconpy GitHub Wiki
The responses received from the CrowdStrike Falcon API will be in either binary
or JSON
format, with the bulk of these responses being JSON. All requests to the API resulting in an error will produce a JSON formatted response.
The default FalconPy behavior returns API results in the format they are received.
Starting in v1.3.0, developers are able to specify they would prefer API responses be received as Python objects when they construct an instance of the class using the pythonic
keyword. For more detail regarding pythonic responses and how to use them please review the Python Responses documentation.
WARNING
client_id
andclient_secret
are keyword arguments that contain your CrowdStrike API credentials. Please note that all examples below do not hard code these values. (These values are ingested as strings.)CrowdStrike does NOT recommend hard coding API credentials or customer identifiers within source code.
Binary responses are files that have been requested by the operation performed, and can have varying file formats. These return payloads are intended to be saved locally as part of your handling of the result.
from falconpy import SampleUploads
falcon = SampleUploads(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
# File unique identifier
file_sha = "50d858e0985ecc7f60418aaf0cc5ab587f42c2570a884095a9e8ccacd0f6545c"
# Name of the file we are saving our result to. The extension should
# be aligned to match the requested file format. (This particular
# API operation example returns the result in ZIP format.)
save_file = "api_response.zip"
# We use the 'password_protected' keyword to inform the API we want
# the result bundled in a zip archive with a password applied.
response = falcon.get_sample(password_protected=True, ids=file_sha)
# We can then write the response we've received from the API to a file.
with open(save_file, 'wb') as save_to:
save_to.write(response)
Beginning in v1.1.0, FalconPy supports result object expansion. Result object expansion provides support for scenarios where a binary response is expected as the result of the operation, but HTTP status code or headers retrieval is also necessary. Result expansion is supported for all operations, but provides little advantage for operations that do not produce binary responses.
Expanded results can be requested using the expand_result
keyword when calling the operation. Results are returned as a tuple, (status_code
, headers
, content
).
# Pass a boolean True to the `expand_result` keyword to request expanded results.
download_result = samples.get_sample(ids=file_sha, expand_result=True)
# We're returned a tuple (status_code, headers, content)
# Status will be in 0
print(f"Status returned: {download_result[0]}")
# Headers will be in 1
print(f"Headers returned: {download_result[1]}")
# File content will be in 2
with open(example_file, "wb") as download_file:
download_file.write(download_result[2])
The bulk of the responses from the CrowdStrike Falcon API are in JSON format and have a standardized structure across all API service collections.
{
"status_code": integer,
"headers": {
"Content-Encoding": "string",
"Content-Length": "integer string",
"Content-Type": "application/json",
"Date": "GMT timestamp",
"X-Cs-Region": "string",
"X-Ratelimit-Limit": "integer string",
"X-Ratelimit-Remaining": "integer string"
},
"body": {
"meta": {
"query_time": float,
"pagination": {
"offset": integer,
"limit": integer,
"total": integer
},
"powered_by": "string",
"trace_id": "1a234b56-cd7e-8f90-1234-56789012a3b4"
},
"errors": [],
"resources": [
Results will be returned here either as a list of strings,
a list of integers, or a list of JSON dictionaries
]
}
}
API responses are segregated into 3 main branches.
Branch name | Description |
---|---|
status_code |
This will contain the resulting HTTP status code received from the API in integer format. These results follow standard HTTP status code syntax:
|
headers |
The headers branch contains details regarding the returned payload. This will include content details, the time of the request, the CrowdStrike cloud returning the response, and the overall remaining requests available within your current rate limit. |
body |
This branch contains the primary result for the response. The body branch has three sub-branches.
|
In this example we will interact with the Hosts API to create a list of IDs. For example purposes, we're going to drop the limit down and build our list using multiple requests to the API.
# Maximum number of rows to return. Different
# operations support different limit maximums.
max_rows = 100
# Set our total to one so our loop begins
total = 1
# Start with the first record
offset = 0
# List to hold all of the IDs returned by our example
all_ids = []
# Start our loop
while offset < total:
# We use the same integer we use to control our loop for our offset.
response = falcon.query_devices_by_filter(sort="hostname|asc",
limit=max_rows,
offset=offset
)
if response["status_code"] == 200:
# Retrieve our body branch
result = response["body"]
# Retrieve the value of the offset.
offset = result["meta"]["pagination"]["offset"]
# This will be the same every time, overrides our initial value of 1.
total = result["meta"]["pagination"]["total"]
# Retrieve the list of IDs returned.
id_list = result["resources"]
# Append this list to our running list of all IDs.
# In a normal program, processing would occur here.
all_ids.extend(id_list)
else:
# API error has occurred
for error_result in response["body"]["errors"]:
print(error_result["message"])
# We're done, show our entire list.
print(all_ids)
Depending on the API operation performed, there may be more results available than what can be returned by the API. When this occurs, metadata is present within the result to allow developers to specify the next page of results. As demonstrated in the example above, results pagination is typically handled by leveraging the value of offset
and total
to determine your position within the recordset (as determined by the value specified by limit
or by the API maximum) and to retrieve the next batch of results. For larger data sets, CrowdStrike APIs support one of two variations of deep pagination, tokenized and timestamp.
For scenarios where the amount of data requested exceeds limits set by the API, developers should leverage deep pagination to achieve the desired result. For most APIs, this is handled by using a string token to indicate the position within the overall recordset, and will be named after
. There are a couple of API operations where this token will be stored in offset
instead. Please refer to the documenation for that specific Service Collection and Operation for more detail as to which value to use. (See QueryDevicesByFilterScroll for an example of this. A sample has also been developed discussing the two variations of offset found within the Hosts Service Collection.)
In this example we will interact with the QueryVulnerabilities operation within the Spotlight Vulnerabilities API. This operation provides an example of deep pagination that leverages tokenized offsets.
# Import the Spotlight Vulnerabilities Service Class
from falconpy import SpotlightVulnerabilities
# Instantiate the Service Class.
spotlight = SpotlightVulnerabilities(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
# Total number of records to retrieve per request to the QueryVulnerabilities operation. (1-400)
LIMIT = 400
# A simple filter for the query operation
FILTER = "last_seen_within:'45'"
# List to hold our retrieved IDs
id_list = []
# We set our total to one (1) so our initial loop step executes,
# this value is reset by the results of our API call.
total = 1
# We will store the returned pagination attribute 'after' here, and then
# pass it to the method on subsequent executions of the loop. We do
# not pass a value for the after keyword on the first iteration.
position = None
# For this example our loop runs as long as our list holds less than the total results available
while len(id_list) < total:
# Query the Spotlight Vulnerabilities API using the FILTER constant defined above.
# Set the limit to be the value of the LIMIT constant, and our positional after token to be
# the value of the 'position' variable.
returned = spotlight.query_vulnerabilities(limit=LIMIT, filter=FILTER, after=position)
if returned["status_code"] == 200:
# Retrieve pagination detail
page = returned["body"]["meta"]["pagination"]
# Total records returned for the filter used
total = page["total"]
# The 'position' variable holds our next positional token based
# upon the values of limit and the current `after` keywords.
position = page["after"]
# Extend our list by adding in the returned IDs for this iteration.
id_list.extend(returned["body"]["resources"])
# Display running progress
print(f"Total: {total}\nPosition: {position}\nRetrieved so far: {len(id_list)}\n")
else:
# Set total to zero (0) to end the loop
total = 0
# Retrieve the errors branch
errors = returned["body"]["errors"]
# Display each error returned
for err in errors:
# Error code
ecode = err["code"]
# Error message
emsg = err["message"]
print(f"[{ecode}] {emsg}")
# Print our grand total
print(f"Total IDs retrieved: {total}")
Some CrowdStrike APIs support leveraging a timestamp to indicate your position within the resultset. (Example: QueryIntelIndicatorEntities) In this scenario, the marker value is specified as part of the filter
keyword and provided as a timestamp (seconds from epoch), and should be named _marker
.
This basic example demonstrates pagination interaction without addressing potential performance optimizations.
# Import the datetime module so we can calculate a _marker
import datetime
# Import the Intel Service Class
from falconpy import Intel
# Instantiate the Service Class
intel = Intel(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
# Calculate our current timestamp in seconds (%s),
# we will use this value for our _marker timestamp.
current_page = datetime.datetime.now().timestamp()
# List to hold the indicators retrieved
indicators_list = []
# The maximum number of records to return from the QueryIndicatorEntities operation. (1-5000)
LIMIT = 5000
# Sort for our results. We will sort ascending using our _marker timestamp.
SORT = "_marker.asc"
# Set total to one (1) so our initial loop starts. This will get reset by the API result.
total = 1
# Start retrieving indicators until our total is zero (0).
while total > 0:
# Retrieve a batch of indicators passing in our marker timestamp and limit
returned = intel.query_indicator_entities(filter=f"_marker:>='{current_page}'",
limit=LIMIT,
sort=SORT
)
if returned["status_code"] == 200:
# Retrieve the pagination detail for this result
page = returned["body"]["meta"]["pagination"]
# Based upon the timestamp within our _marker (first 10 characters),
# a total number of available indicators is shown in the 'total' key.
# This value will be reduced by our position from this timestamp as
# indicated by the unique string appended to the timestamp, so as our
# loop progresses, the total remaining will decrement. Due to the
# large number of indicators created per minute, this number will also
# grow slightly while the loop progresses as these new indicators are
# appended to the end of the resultset we are working with.
total = page["total"]
# Extend our indicators list by adding in the new records retrieved
indicators_list.extend(returned["body"]["resources"])
# Set our _marker to be the last one returned in our list,
# we will use this to grab the next page of results
current_page = indicators_list[-1].get("_marker", "")
# Display our running progress
print(f"Retrieved: {len(indicators_list)}, Remaining: {total}, Marker: {current_page}")
else:
# Retrieve all errors returned from the API
errors = returned["body"]["errors"]
# Tell the loop to stop processing
total = 0
# Display each error returned
for err in errors:
# Error code
ecode = err["code"]
# Error message
emsg = err["message"]
print(f"[{ecode}] {emsg}")
# Display the grand total of indicators retrieved
print(f"Total indicators retrieved: {len(indicators_list)}")
API responses from GraphQL will follow the same general JSON format detailed above, with differences starting to appear within the body
branch of the response.
{
"status_code": 200,
"headers": {
"Server": "nginx",
"Date": "Sat, 04 Feb 2023 17:52:51 GMT",
"Content-Type": "application/json; charset=utf-8",
"Content-Length": "348",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Content-Encoding": "gzip",
"Etag": "REDACTED",
"Expires": "Sat, 04 Feb 2023 17:52:50 GMT",
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=15724800; includeSubDomains, max-age=31536000; includeSubDomains",
"X-Appliance-Date": "2023-02-04T17:52:51+00:00",
"X-Appliance-Id": "REDACTED",
"X-Content-Type-Options": "nosniff",
"X-Cs-Region": "us-1",
"X-Cs-Traceid": "REDACTED",
"X-Dns-Prefetch-Control": "off",
"X-Download-Options": "noopen",
"X-Frame-Options": "SAMEORIGIN",
"X-Powered-By": "Express",
"X-Preempt-Version": "5.40.43656",
"X-Ratelimit-Limit": "6000",
"X-Ratelimit-Remaining": "5980, 5995",
"X-Xss-Protection": "1; mode=block"
},
"body": {
"data": {
"entities": {
"nodes": [
{
"primaryDisplayName": "example-host1",
"secondaryDisplayName": "example.com\\example-host1",
"accounts": [
{
"domain": "example.com"
}
]
},
{
"primaryDisplayName": "example-host2",
"secondaryDisplayName": "example.com\\example-host2",
"accounts": [
{
"domain": "example.com"
}
]
}
],
"pageInfo": {
"hasNextPage": true,
"endCursor": "PAGINATION_TOKEN_STRING"
}
}
},
"extensions": {
"runTime": 57,
"remainingPoints": 499995,
"reset": 9997,
"consumedPoints": 2
}
}
}
For this example, pagination details will be found in the pageInfo
branch (nested within the entities
branch of the main data
branch).
# Import the IdentityProtection Service Class
from falconpy import IdentityProtection
# Create a constant that contains our base query
IDP_QUERY = """
query ($after: Cursor) {
entities(types: [USER], archived: false, learned: false, first: 5, after: $after) {
nodes {
primaryDisplayName
secondaryDisplayName
accounts {
... on ActiveDirectoryAccountDescriptor {
domain
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
# Create an instance of the IdentityProtection Service Class, providing our
# credentials in the process. Do NOT hard code credentials.
idp = IdentityProtection(client_id=CLIENT_ID, client_secret=CLIENT_SECRET)
# Boolean to handle the processing of our loop
running = True
# List containing our returned results
returned_nodes = []
# Our variables dictionary that will store our pagination token
variables = None
# Start our loop
while running:
# Provide a status update to the user
print("Requesting a page of results")
# Query for a batch of results, providing the variables dictionary.
# For the first call, this value will be null.
result = idp.graphql(query=IDP_QUERY, variables=variables)
# Check for successful API response
if result["status_code"] != 200:
# Bad response, quit the process
raise SystemExit("API error, check query contents.")
# If results were returned
if result["body"]["data"].get("entities"):
# No nodes were returned for this iteration
if "nodes" in result["body"]["data"]["entities"]:
# Extend our list of returned results with the contents of this batch
returned_nodes.extend(result["body"]["data"]["entities"]["nodes"])
# Grab the page info branch so we can retrieve our pagination detail
page_info = result["body"]["data"]["entities"]["pageInfo"]
# If the token is present
if page_info["hasNextPage"]:
# Add this value to our variables dictionary for the next iteration
variables = {
"after": page_info["endCursor"]
}
else:
# Break the loop
running = False
else:
# Break the loop
running = False
else:
# No results were found
raise SystemExit("No results returned.")
# Inform the user of our returned results
for node in returned_nodes:
print(node["primaryDisplayName"])
FalconPy supports handling responses from the CrowdStrike API using Python objects as opposed to JSON dictionaries. Binary responses are still processed as normal. Whenever the SDK is unable to properly create an object modeled after the API response received, the result falls back to a raw JSON formatted dictionary.
When enabled, pythonic response handling will generate Python error conditions as opposed to handling errors and returning the error message. This allows developers to catch and handle error conditions generated while interacting with API endpoints using their own logic.
API response handling is performed by the Result
object. For more information regarding the architecture of the Result
object, including attributes, methods and properties available, please review the result object documentation.
Pythonic response support was released in FalconPy v1.3.0.
from falconpy import Hosts
# Enable pythonic responses when you construct an instance of the class
hosts = Hosts(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, pythonic=True)
result = hosts.query_devices_by_filter_scroll()
# You can check to see if this a raw response by
# confirming that the raw attribute is not populated.
if not result.raw:
# You can then loop the returned results
# using the result object as an iterator.
for device_id in result:
print(device_id)
123abcde45f67890a1234b5c6de789f0
a1b2cde3fa4567bcd8e9f0a1b2c34d5d
a1b23c45d67e8901fa2a34b56cd78ef9
1a2b3cd4ef5a67b8c9012dfe34567890
1a2bcde34f5a6b789012cd3ef456a7b8
# ... continued from the example above
details = hosts.get_device_details(ids=result.data)
if not details.raw:
for device_info in details:
print(device_info["hostname"])
example-host1
example-host3
example-host4
example-host5
example-host2
Results can be pruned using the prune
method.
# ... continued from the examples above
pruned = result.prune("1a2b")
print(pruned)
["1a2b3cd4ef5a67b8c9012dfe34567890", "1a2bcde34f5a6b789012cd3ef456a7b8"]
Errors received from the API can be handled using standard Python error handling.
from falconpy import HostGroup, APIError
falcon = HostGroup(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, pythonic=True)
try:
result = falcon.get_host_groups("12345678901234567890123456789012")
except APIError as api_error:
print("This record was not found.")