Ingest - NYCPlanning/db-data-library GitHub Wiki

As the name suggested. The ingest handles getting the dataset from a source. As simple as it sounds, this work is one of the most extensive part of the repo. We will dive into the details of how the ingestion works.

Class Ingestor

A notable feature about the Ingestor class is use of the translator wrapper functions. Each translator function is specific type of input format that will return different set of ingesting parameters to the ingestor class object. Using the csv input type as example below to show what each of those wrapper function returns. The wrapper function allows there to be a standardized sets of parameters to be returned to the ingestor object each time when ingestor is initialized.

    @translator
    def csv(
        self, path: str, compress: bool = False, inplace: bool = False, *args, **kwargs
    ):
        """
        https://gdal.org/drivers/vector/csv.html

        path: path of the configuration file
        compress: True if compression is needed
        inplace: True if the compressed file will replace the original output
        """
        return None, "CSV", "csv", compress, inplace

and the exact phrase the wrapper function is called is in the line below in the Ingestor class object.

(dstDS, output_format, output_suffix, compress, inplace) = func(
    self, *args, **kwargs
)

However, the actual ingestion is still waiting to happen. The wrapper functions are just setting the table for the entire ingestion process by defining things such as the output format, output suffix etc. To fully complete the set up, Ingestor then call Config. Config essentially use the yaml template provided for each dataset to configure the remaining parameters needed to complete the ingestion and also handle edge cases where a Python process is needed for ingestion. The part of ingest process calls the config in snippet below:

c = Config(path, kwargs.get("version", None))
dataset, source, destination, _ = c.compute_parsed

For full explanation of the Config, please visit the Config wiki page. When all appropriate parameters are fetched from Config, then actual ingestion of the source data takes place through one of the three path. 1) the source is from a postgres database 2) the source is a "generic source" which then goes to the gdal 3) last is the python process in config

generic_source

This handles the vast majority of data library use cases. It can be broadly broken down into three categories: the s3 public stored flat files, the flat file lives remotely on a site address specified by url, and flat files lives in a local folder.

            srcDS = generic_source(
                path=source["url"]["gdalpath"],
                options=source["options"],
                fields=destination["fields"],
            )

the definition for these two workflows both live in the sources.py.

postgres_source

if the file is a living inside a postgres database.

then using the gdal.VectorTranslate to send the final output to its detination with all appropriate final adjustment on the dataset in the follow step. This includes casting the geometries into the correct SRS if needed. Set the layername and create the appropriate sql query if the output format is pgdump.

gdal.VectorTranslate(
    dstDS,
    srcDS,
    format=output_format,
    layerCreationOptions=destination["options"],
    dstSRS=destination["geometry"]["SRS"],
    srcSRS=source["geometry"]["SRS"],
    geometryType=destination["geometry"]["type"],
    layerName=destination["name"],
    accessMode="overwrite",
    makeValid=True,
    # optional settings
    SQLStatement=sql,
    SQLDialect="sqlite",
    callback=update_progress,
)

One notable thing here is this function is run without a return object. This is because the output will be sent to the dtsDS, or the first argument passed, as function finished running.