Notes on orchestrator - multiply-org/multiply-core GitHub Wiki

A very basic orchestrator

Introduction

Role of the orchestrator

The MULTIPLY platform, or system, or whatever it is called, aims to combine satellite observations, and a set of sources of a priori information, in order to produce an inference on biophysical parameters with uncertainty quantification. Broadly speaking, a number of components make the different bits of information available to the inference engine, which is then run to provide the desired output.

The user is required to define the problem. This includes selecting the EO observations that will be used, as well as the spatial and temporal domains where the inference will take place, the linkage between the observations of (e.g.) backscatter or reflectance and the land surface parameters of interest (through a so-called observation operator), and any relevant prior information.

As far as I see it, the role of the orchestrator is to set up the linkages to the user-required functionality, and feed this in a sensible way to the inference engine. Given the large data volumes and high computational processing needs, it ought to be the role of the orchestrator to distribute the computations over the available infrastructure.

It is important to note that currently the inference engine works sequentially: it requires the output from one time step in orer to process the following one. In the future, this might not be the case, but currently it is a limitation. Spatial processing is currently very limited and will be discussed in its own section.

Defining the problem: required bits and bobs

The state grid or mask or whatever

The state mask delineates a region of interest (ROI). The user can define this as s/he pleases, but a raster file will be required. The raster file should have a location information (a so-called geotransform), projection, etc, but in reality it's just a binary mask which selects which cells get processed and which ones get ignored. A more sophisticated mask might define different pixels that might be processed differently (e.g. croplands and forests), but ultimately, the orchestrator can reduce that to a binary choice arranged sequentially (e.g. first do the croplands then do the forests).

Encoding the state mask in a simple raster file (e.g. a GeoTIFF with a Byte type) is convenient: all the required information is there, and it can be readily used to reproject/resample other components to match the current state mask. Creating such a mask from a user-supplied vector or whatever is quite trivial, and we already have code in the repository to do this.

The temporal grid

Once the user defines the spatial extent of the inference, the temporal extent has to be provided. Basically, the inferences will be provided for the time steps in the temporal grid. This can be e.g. daily, or weekly or for every dekad (annoying way to call a period of ten days). For a time trid given by t=[t0, t1, t2, t3], inferences would be made at time t1, including all the observations between t0 and t1. It is assumed that within a timestep, there are no parameter dynamics (e.g. land surface parameters appear constant), suggesting that for most things, the upper bound temporal spacing ought to be "a few days". A list of datetime objects is appropriate to define the temporal grid.

The observations

A critical part of the problem, as I'm sure you agree. In KaFKA, and unless somebody comes with a better idea, an observation is a raster file that is acquired at some time and is composed of a number of bands. Typically, these will be spectral bands (optical domain) or different polarisations (SAR). Or both (passive microwave). The aim here is to define some functionality that defines the observations, and that is able to proivde the inference engine with the relevant data when required. It is assumed here that at this stage, the observations have been pre-processed (in the optical domain, atmospheric correction and cloud clearing, in the SAR domain, geocoding, speckle reduction, ...).

A more detailed explanation of the interface is given here.

The prior

In KaFKA, we use two types of prior: a traditional prior and/or a dynamic model type of prior. The former calculates the prior mean vector and associated inverse covariance matrix itself, whereas the dynamic model updates the output from one time step to the next. In fact, it is possible that both pieces of information may be combined (they are two independent multivariate Gaussian pdfs).

More details on the prior class here. The state propagator returns the same stuff, but has a slightly different interface.

Options for parallel processing

In principle, no spatial processing has yet been implemented in the KaFKA codebase (although I have tested it outside). This means that each pixel in the state mask is processed independently of others, and that means that it can be easily parallelised by splitting the state into spatial windows. Each can then be processed independently and the solutions mosaicked back as a postprocessing stage.

Quick'n'dirty implementation

The following is a handy iterator that chunks a GDAL file (or opened dataset).

    def chunk_me(fname, tile_size=256):
        """An image chunker iterator. Takes a geodataset either opened already
        or an actual filename. The iterator returns an open in-memory dataset
        with a window on the original file"""
        """
        try:
            width = fname.RasterXSize
            height = fname.RasterYSize
        except AttributeError:
            g = gdal.Open(fname)
            if g is None:
                raise IOError(f'{fname} is neither an open GDAL dataset' + 
                            ' or a file that I can open')


        for i in range(0, width, tilesize):
            for j in range(0, height, tilesize):
                w = min(i+tilesize, width) - i
                h = min(j+tilesize, height) - j
                srcwin=[ i, j, w, h]
                retval = gdal.Translate('', fname, format="MEM", srcWin=srcwin)
                yield retval

Another function could wrap the functionality that sets up the prior, observations and LinearKalman to take the output of chunk_me. I'm yet to do this though!