Deprecated_Parametrized assets via API: consumption - Mobility-Data-Space/mobility-data-space GitHub Wiki
Let us consume the same data offer as described here.
Pre-condition: the data offer exists and was successfully negotiated.
First of all you need an access token to call your CaaS-API. Please contact your CaaS-Provider to get your client id, client secret and access token url of your CaaS. Then create the following auth script.
curl \ -d 'grant_type=client_credentials' \ -d 'client_id=[Please set your client id]' \ -d 'client_secret=[Please set your client secret]' \ '[Please set the access token url]'
Afterwards you need to create a custom json request for transfer process initiating. Please use the following one as template.
{ "type":"PARAMS_ONLY", "params":{ "contractAgreementId":"[Please set your contract agreement id which can be found under 'Contracts']", "dataSinkProperties":{ "type":"HttpData", "baseUrl":"[Please set your data sink address, e.g. webhook]", "method":"POST" }, "properties":{ "method":"GET", "pathSegments":"[Please set your custom path e.g. /datasets/v1/verkehrslage/collections/verkehrslage/items]", "queryParams":"[Please set parameters you needed e.g. f=json&zustandsklasse=dicht]", } } }
Last but not least you need to send the json request to your CaaS-API using the authorisation token from the auth script. You can just use the following python script as example or write your own script.
Note: use the endpoint POST [CaaS base url]/control/data/wrapper/ui/pages/contract-agreement-page/transfers
for starting the transfer process.
import os import requests import json def get_authentication_token(script_path): # Run the shell script and capture its output result = os.popen(script_path).read() # Parse the JSON output try: json_result = json.loads(result) authentication_token = json_result['access_token'] return authentication_token except json.JSONDecodeError as e: print(f"Error decoding JSON from the script output: {e}") return None def send_authenticated_request(api_url, json_file_path, token): # Load parameters from JSON file with open(json_file_path, 'r') as file: params = json.load(file) # Set the token in the request headers headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json', } # Send POST request to the API with authentication headers response =, json=params, headers=headers) # Check if the request was successful (status code 200) if response.status_code == 200: print("Request successful. Response:") print(response.json()) else: print(f"Request failed. Status code: {response.status_code}") print(response.text) if __name__ == "__main__": # Replace these values with your API URL, JSON file path, and authentication token api_url = "[please set your CaaS base url]/control/data/wrapper/ui/pages/contract-agreement-page/transfers" json_file_path = "[please set the path to the json request file]" script_path = "[please set the path to the auth script]" authentication_token = get_authentication_token(script_path) send_authenticated_request(api_url, json_file_path, authentication_token)