Development Recipes - EXIOBASE/docs GitHub Wiki

Development Recipes

Herein are some step-by-step instructions on how to setup new functionality.

Reminders!

There are a few things you need to be careful with. Some of these could be automated away but not others. It's possible to write tests for some of these but more tricky with other issues, e.g. with the CLI code. In any event, it's up to the developer to make sure there are no mistakes.

Ensuring Things Stay Up-to-Date with ExtArgs

  • Ensure that get_commandline_args() matches with ExtArgs*. If you update ExtArgs, ExtArgsCommandStr, or ExtArgsQueryStr then you must ensure that get_commandline_args() is updated accordingly. The code in get_commandline_args() could be automated to read the fields in ExtArgs* but it wasn't worth the time.

  • Ensure that get_snakemake_args() matches with ExtArgs. If you update ExtArgs then you'll need to update get_snakemake_args() too. However, unlike the former case, you do not need to worry about ExtArgsCommandStr, or ExtArgsQueryStr because these are handled automatically.

  • Ensure that exec_control/commands.py is updated if you add/remove an instruction (in dispatcher.py). It won't affect much functionality at all but it does provide the help page for the CLI. So, make sure that commands.py is kept up-to-date with any changes in what instructions are available, i.e. anything that dispatcher.py dispatches on.

When Creating or Modifying Tasks/Instructions/Specs/QuickFns

  • Ensure that any task that creates new files in activeset updates the file registry, offers an auto_archive, and updates transactions. This does not apply to the http downloads and the specialised archive stuff for that. But anything that creates files needs to do this. See the specs, task, instructions, and quick functions for the aux resourcse for how this is done.

Create a New Command That Doesn't Use Specifications

For a command that doesn't need to much heavy processing or match files with resource definitions, one can create an Instruction class and handle the work directly in that. However, the more typical case is that one needs to take a list of output files, match resource definitions to these, and then process them. In this situation, Task objects are created by Specifications depending on the output file list supplied.

In this section, we deal with the simple case. In the next section, we deal with the more typical, albeit more involved, case.

  1. This recipe assumes that you've already got a working repo setup, i.e. datamanager itself or macrodb. If you're creating a new repo that works in datamanager's structure, follow the design pattern used in macrodb. Make sure to follow the naming conventions! Lets imagine that your new command is "gallus" ("gallus" is Scottish for "confident, bold").

  2. i. Create an new suitably-named file, e.g. gallus.py under instructions/ and define a class that inherits from the correct base class, which will likely be BaseInstruction in this context, e.g. GallusInstruction.

  3. ii. Make sure there's an __init__() with the following siguature and call to the super class. Parameters can be passed directly to your instruction via the kwargs; we'll assume GallusInstruction needs to know the number of spuds (potatos) for dinner. You can also access the fields in command_str (handled by the base class) by self._command_str.attribute_name. Lets imagine we use an attribute from command_str, is_dreich.

    def __init__(self, instruction_info: InstructionInfo, **kwargs):

        super().__init__(instruction_info, **kwargs)

        self._spuds = int(kwargs["spuds"])

        self._is_dreich = bool(
            self._command_str.is_dreich
        )

  1. iii. Put the actual "doing stuff" code into the .perform(self) method. As always, feel free to create some private methods can call them from .perform() is it's more tidy.

  2. i. Add the command to dispatcher.py. First it'll have to be in the imports then add the instruction in the body.

    # ------------------------------------------------------------
    # Command to do something gallus
    if simple_command == "gallus":
        logger.info(f"Dispatching command: {simple_command}")

        # Create InstructionInfo object
        instruction_info = InstructionInfo(
            instruction_id="gallus",
            snakemake_rule=snakemake_rule,
            simple_command=simple_command,
            exio_version=ext_args.exio_version,
        )

        gallus_instruction = GallusInstruction(
            instruction_info=instruction_info,
            command_str=command_str,
            spuds=5, # just using a literal in this example
        )
        gallus_instruction.perform()

        return None
  1. ii. Add command into exec_control/commands.py. This is really for the CLI but it needs doen nonetheless. All commands have to be included in the simple_commands dictionary; the format is to use command_name in the simple_command == 'command_name' conditional above as the key and then have the value as a short textual description of what that command does. If your command needs an active run then it should also be in commands_using_active_runid.

  2. iii. Add the command into quick_fns_simple_command.py. This is boilerplate code but it is not all exactly the same! Look at the examples that match what you're doing and follow them. In this case, something like create_run() might be similar. At a minimum, you need to construct ExtArgsCommandStr and ExtArgs then pass ExtArgs to entry_point(). Note that the pattern is to define a dictionary, loc_command_str_dict, for command_str, possibly using helper functions, and then pass to .model_validate() on ExtArgsCommandStr.

Create a New Command That Uses Specifications

This section describes how to setup a new command that uses specifications, i.e. a command that is likely to process some provided input files to output files a la a Snakemake rule.

  1. As previously, this recipe assumes that you've already got a working repo setup, i.e. datamanager itself or macrodb. If you're creating a new repo that works in datamanager's structure, follow the design pattern used in macrodb. Make sure to follow the naming conventions! Lets imagine that your new command is "gee-it-laldie" (imperative, meaning to do something with a lot of enthusiasm).

  2. i. Create an new suitably-named file, e.g. gee_it_laldie.py under instructions/ and define a class that inherits from the correct base class, which will likely be BaseFilesInstruction in this context, e.g. GeeItLaldieInstruction.

  3. ii. As before, make sure there's an __init__() with the following siguature and call to the super class. Parameters can be passed directly to your instruction via the kwargs; we'll assume GeeItLaldieInstruction needs to know the number of spuds (potatos) for dinner. You can also access the fields in command_str (handled by the base class) by self._command_str.attribute_name. Lets imagine we use an attribute from command_str, is_dreich.

    def __init__(self, instruction_info: InstructionInfo, **kwargs):

        super().__init__(instruction_info, **kwargs)

        self._spuds = int(kwargs["spuds"])

        self._is_dreich = bool(
            self._command_str.is_dreich
        )

  1. iii. Unlike previously, the code in .perform() is going to call Specifications to get a task and then execute the task. In doing this, you'll need to instantiate and pass a TaskInfo object. In this example, we're also passing spuds and is_dreich. Note that we're passing the self._output_files to gee_it_laldie_specs.create_task_from_filenames(): this will not only instantiate the task but also populate it with the resources corresponding to the output filenames list we provded.
    def perform(self):
        """Perform the instruction"""

        # Create an appropriate TaskInfo object
        task_info = TaskInfo(
            task_id=self._instruction_info.instruction_id,
            runid_meta=self._instruction_info.runid_meta,
        )

        # Check if there's actually anything to do
        if len(self._output_filenames) == 0:
            logger.warning("Nothing to do as there's no target files!")

        # Get the task list from the specs
        gee_it_laldie_specs = GeeItLaldieSpecs()
        gee_it_laldie_collection = gee_it_laldie_specs.create_task_from_filenames(
            task_info,
            self._h_transaction,
            self._output_filenames,
            res_versions=self._res_versions,
            auto_archive=self._auto_archive,
            spuds=self._spuds,
            is_dreich=self._is_dreich,
        )

        logger.debug("self._output_filenames={self._output_filenames}")

        # Perform
        gee_it_laldie_collection.perform()
  1. i. Add the command to dispatcher.py. First it'll have to be in the imports then add the instruction in the body.
    # ------------------------------------------------------------
    # Command to do something gallus
    if simple_command == "gee-it-laldie":
        logger.info(f"Dispatching command: {simple_command}")

        # Create InstructionInfo object
        instruction_info = InstructionInfo(
            instruction_id="gee-it-laldie",
            snakemake_rule=snakemake_rule,
            simple_command=simple_command,
            exio_version=ext_args.exio_version,
        )

        gee_it_laldie_instruction = GeeItLaldieInstruction(
            instruction_info=instruction_info,
            command_str=command_str,
            spuds=5, # just using a literal to demonstrate, you almost always want this to come from ExtArgs*
        )
        gee_it_laldie_instruction.perform()

        return None
  1. ii. Add command into exec_control/commands.py. This is really for the CLI but it needs doen nonetheless. All commands have to be included in the simple_commands dictionary; the format is to use command_name in the simple_command == 'command_name' conditional above as the key and then have the value as a short textual description of what that command does. If your command needs an active run then it should also be in commands_using_active_runid.

  2. iii. Add the command into quick_fns_simple_command.py. This is boilerplate code but it is not all exactly the same! Look at the examples that match what you're doing and follow them. In this case, something like download_files_http() might be similar: note the difference here, when we're calling a "files" instruction, compared to the previous example. At a minimum, you need to construct ExtArgsCommandStr and ExtArgs then pass ExtArgs to entry_point(). Note that the pattern is to define a dictionary, loc_command_str_dict, for command_str, possibly using helper functions, and then pass to .model_validate() on ExtArgsCommandStr. You may need to include the auto_archive parameter, see convert_ext_raw_to_parquet() in macrodb for an example.

  3. i. You should now create a suitable resource definition. So, this is likely to be in resource_definitions/gee_it_laldie.py. The class should derive from BaseResource. What fields you have in the resource definition depends on what you're doing but you typically will need at least the output filename or, better, a LocalRelaxedFile to represent the output file. The actual resources will be encoded in a YAML file; the resources that are being used will each be instantiated into one of these resource definition objects. Note the model_config = ... : this is syntax for Pydantic, see their docs for more info.

class GeeItLaldieResource(BaseResource):
    """Definition of a single auxiliary resource (in the aux_resource repo)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    target_file: LocalFileRelaxed

    mm_of_rain: float
    
  1. ii. Within the resource definition class, you may need to define some validators (which help with "deserializing") and some serializers. The general rule is standard Python types typically don't need this but if you've defined your own classes then it might. There's more info in the Pydantic docs: for validators and serialization. So we would likely want to include the methods:
    @model_validator(mode="before")
    @classmethod
    def pre_process(cls, data: Any) -> Any:
        if isinstance(data, dict):
            if "target_file" in data.keys() and isinstance(data["target_file"], dict):
                data["target_file"] = LocalFileRelaxed(**data["target_file"])

        return data

    @field_serializer("target_file", mode="plain")  # , return_type=str
    def serialize_target_file(
        self, value: LocalFileRelaxed, info: FieldSerializationInfo
    ):
        return value.to_dict()
  1. i. Ok, now we take a slight diversion and setup the task. So, create a suitable file. In this case it'd be tasks/gee_it_laldie.py. Lets start defining the relevant task. It needs to derive from BaseTask.
class GeeItLaldieTask(BaseTask):

    def __init__(self, task_info: TaskInfo, h_transaction: TransactionHandle, **kwargs):
        super().__init__(task_info, h_transaction)

        # If we create files, we need to instantiate file registry ctrl to add output files to the registry
        self._fileregistry_info_ctrl = FileregistryInfoCtrl(runid=task_info.runid_meta)

        # Typically we might need to auto archive too
        self._auto_archive = bool(
            kwargs["auto_archive"]
        )

        # And our custom parameters
        self._spuds = int(kwargs["spuds"])
        self._is_dreich = bool(kwargs["is_dreich"])

  1. ii. Now, we want the user of this class to be able to instantiate and then add resources. This is mostly handled by the super class but we should help a little re type checking. So add the add_res() method.
    def add_res(self, res_list: list[GeeItLaldieResource] | GeeItLaldieResource):
        """Add new resources

        Parameters
        ----------
        res_list : An GeeItLaldieResource or a List of GeeItLaldieResource
            The specification(s) of the resources.
        """

        loc_res_list = check_is_list_of_type(res_list, [GeeItLaldieResource])
        super().add_res(loc_res_list)

        return self._res_collection
  1. iii. As always, you need to define the .perform() to actually do the task. Typically, the you'd define private methods to do this and call them from .perform(). See the other tasks for examples on how this can work.

  2. i. Now, we need to tie all this together with the specifications. First, create the appropriate specifications Python file. In this case, specifications/gee_it_laldie.py. You need to define the specifications class. The inheritance in this case needs to take account of the type of the resource being used.

class GeeItLaldieSpecs(BaseSpecs[GeeItLaldieResource]):
    """Specifications for geeing it laldie
    """

    def __init__(self):
        super().__init__()

        # Import the YAML
        self._resources = self._import_resources_from_yaml(
            "yaml_gee_it_laldie_res", GeeItLaldieResource
        )

  1. ii. Next, you need to actually define the .create_task_from_filenames() method. But don't worry, the .get_res_by_filenames() method in the base class does most of the work.
    def create_task_from_filenames(
        self,
        task_info: TaskInfo,
        h_transaction: TransactionHandle,
        target_filenames: Tlist_of_str_path,
        res_versions: Tdict_file_to_version | None = None,
        **kwargs,
    ) -> GeeItLaldieTask:

        res_list = self.get_res_by_filenames(
            target_filenames, res_versions=res_versions
        )

        # We could iterate through the resources to do some
        # sort of processing
        for aux_res in res_list:
            assert isinstance(aux_res, GeeItLaldieResource)
            # some sort of sanity check / processing?

        # Create task and add the required target files
        task_gee_it_laldie = GeeItLaldieTask(task_info, h_transaction, auto_archive=kwargs["auto_archive"])
        task_gee_it_laldie.add_res(res_list)

        # Debug
        logger.debug(f"GeeItLaldieTask: task_aux={task_gee_it_laldie}")

        return task_gee_it_laldie
  1. iii. Lets create the YAML file that the resources will be read from. Create a subdirectory and then define the file as specifications/gee_it_laldie/gee_it_laldie_resources.yaml. The entries must match the format of the resource definition we created in step 3. Each resource must have a res_id, in the form of alphanumeric+hypens. Any field that is a LocalFileRelaxed needs to define the mandatory fields (you'll need the local_rel_filename, extension, and version). Remember the version indicates the version of the resource, not the file, and must be in the defined format.
- res_id: gee-it-laldie-oban-rainfall-calcs
  target_file:
    extension: parq
    local_rel_filename: parquetraw/macrodb/rainfall/towns/oban.parq
    comment: Rainfall calculations for Oban
    version: 20240705001
  mm_rain: 12.5

- res_id: gee-it-laldie-paisley-rainfall-calcs
  target_file:
    extension: parq
    local_rel_filename: parquetraw/macrodb/un/sna-mainagg/gdp-usd-current-regions.parq
    comment: Rainfall calculations for Paisley
    version: 20240705001
  mm_rain: 155.0
  1. iv. You need to ensure paths.py knows that "yaml_gee_it_laldie_res" should point to the resource YAML file. For anything in datamanager, you can just change g_repo_subdirs_files in local/paths.py; the path itself is relative to ./repo/. If in another repo then you should use add_abs_path_repo_sub(). See the example in macrodb's __init__.py.

Creating a Query Command

Query commands are quite similar to before but they have their own separate arrangements. This is to ensure output is made consistent and to reduce wordload.

  1. As previously, this recipe assumes that you've already got a working repo setup, i.e. datamanager itself or macrodb. If you're creating a new repo that works in datamanager's structure, follow the design pattern used in macrodb. Make sure to follow the naming conventions!

  2. i. All the queries, for a given repo, go into instructions/query.py. So that much is easy enough. The query will inherit from either BaseSpecsQueryInstruction or BaseMetaQueryInstruction; there was a reason for this which unfortunately escapes me right now (TODO, clarify). Here, we illustrate with the GetHttpDownloadMetaArchiveByOidInstruction query from datamanager. You can see how it's setup, that .perform() queries Meta and then returns those rows passed through process_query_output().

class GetHttpDownloadMetaArchiveByOidInstruction(BaseMetaQueryInstruction):
    """Get rows from archived HTTP meta by oid, possibly filter by runid"""

    def __init__(self, instruction_info, **kwargs):
        """Initialise

        Parameters
        ----------
        instruction_info : InstructionInfo
        """

        super().__init__(instruction_info, **kwargs)
        assert isinstance(self._query_str, ExtArgsQueryStr)
        self._oid = assign_error_if_none(self._query_str.oid, "self._query_str.oid")

    def perform(self):
        """Perform the instruction"""

        http_meta = HttpMeta()

        query_result = http_meta.get_archive_by_oid(
            self._oid,
        )
        assert isinstance(self._query_str, ExtArgsQueryStr)

        return process_query_output(
            self._query_str.query_output_format,
            query_result,
            auto_process_sqlite3row=True,
        )
  1. i. The query has to be specified in dispatcher.py. This is slightly different from before: you're looking at list_of_std_queries(), just add as a new tuple into the list_of_std_queries list. The tuple is of the form (query_name, instruction_name), e.g. using the example from 1. i., it's ("get-http-download-meta-archive-by-oid", GetHttpDownloadMetaArchiveByOidInstruction).

  2. ii. You need to edit exec_control/commands.py to include the new query in query_commands.

  3. iii. You need to add an appropriate wrapper function for this query in exec_control/quick_fns_query.py. At a minimum, you need to setup ExtArgsQueryStr and ExtArgs, then pass the ext_args object to entry_point(). Follow the design pattern of existing queries in datamanager.

Using Meta

How to Have a Task Call Functions / Methods Noted in Resource Definitions

Whilst it'd very nice to be able to have some general logic coded into task, and for specifications to set it up for bulk processing, that isn't always possible. At times you might have to call some imperative logic that is unique to each file.

There is a template way of doing this cleanly and safely. See tasks/ext_file_raw_to_parquet.py and associated resource definitions in macrodb for how to do this.

Adding Attributes into ExtArgs, ExtArgsCommandStr, or ExtArgsQueryStr

The first thing to remember is that ExtArgs* is defined seperately for each repo. However, they all derive from base classes in datamanager, in common/definitions.py, i.e. BaseExtArgs, BaseExtArgsCommandStr, and BaseExtArgsQueryStr.

The ExtArgs* for a given repo will be in local/ext_args.py.

  1. It's assumed the repo(s) are already setup, so on and so forth.

  2. i. Decide whether your new attribute applies only to one repo or whether it should be common. If it's the former, then you're editing local/ext_args.py; if it's the latter, you're editing datamanager's common/definitions.py.

  3. ii. If your attribute should apply only to a command then it should go into ExtArgsCommandStr (or the base). If it applies only to queries then it should go in ExtArgsQueryStr. If it applies to both then it goes into ExtArgs proper.

  4. i. Ok, once you're familiar with Pydantic's field types, etc, then add your attribute. If your attribute is anything other than a standard Pydantic type then you'll likely have to define serialization and validation. See existing code for examples on how to code this stuff.

  5. ii. You'll need to update get_commandline_args() in exec_control/external.py. If you've edited one of the base classes then you'll need to do this change in datamanager, macrodb, etc.

  6. iii. If you've edited ExtArgs then you'll need to update get_snakemake_args() in exec_control/external.py but not for ExtArgsCommandStr and ExtArgsQueryStr as this is hanlded automatically. Again, if you've edited one of the base classes then you'll need to do this for datamanager, macrodb, etc.

Snakemake New Rule

xxxxxxxxxxxxxx in progresss xxxxxxxxxx

def get_ext_raw_to_parquet_file_list():
    return [
        "activeset/parquetraw/macro/un/sna-mainagg/gdp-usd-current-countries.parq",
        "activeset/parquetraw/macro/un/sna-mainagg/gdp-usd-constant-countries.parq",
        "activeset/parquetraw/macro/un/sna-mainagg/gdp-usd-current-regions.parq",
        "activeset/parquetraw/macro/un/sna-mainagg/gdp-usd-constant-regions.parq",
        "activeset/parquetraw/macro/un/sna-mainagg/implicit-price-deflators-national-and-usd.parq",
        "activeset/parquetraw/macro/un/sna-mainagg/exchange-rate-and-population-countries.parq",
        "activeset/parquetraw/macro/eurostat/exchange-rates/exchange-rate-bilateral-annual.parq",
        "activeset/parquetraw/macro/nstats-taiwan/national-accounts/gdp-by-expenditures-gfcf.parq",
    ]
rule st_update_beans_macrodb:
    """Do a reckoning of the number of beans we have
    """
    input:
        http_downloads_filelist_activeset,
    output:
        ext_raw_to_parquet_filelist,
    log:
        stdout = exio_paths["exio_var_log_snakemake"] / "st_ext_file_raw_to_parquet_macrodb__stdout.log",
        stderr = exio_paths["exio_var_log_snakemake"] / "st_ext_file_raw_to_parquet_macrodb__stderr.log"
    conda:
        TARGET_CONDA_ENV
    params:
        instruction_id="script-ext-file-raw-to-parquet-macro",
        simple_command="convert-ext-raw-to-parquet",
        exio_version=EXIOBASE_VERSION,
    script:
        macrodb_main_path