Demo Preprocessing - jejjohnson/ml4eo GitHub Wiki

Welcome to the ml4eo wiki!


def preprocess(
    ds,
    min_lon: float= -66, max_lon: float=-54,
    min_lat: float = 32, max_lat: float=44,
    min_time: str = '2016-12-01', max_time: str='2018-02-01',
    ):
    return (
        ds.rename(longitude='lon', latitude='lat')
        .pipe(ocnval.validate_latlon)
        .pipe(ocnval.validate_time)
        .pipe(lambda d: d.where(
            (d.lon.load() >= min_lon) & (d.lon <= max_lon)
            & (d.lat.load() >= min_lat) & (d.lat <= max_lat)
            & (d.time.load() >= pd.to_datetime(min_time)) & (d.time <= pd.to_datetime(max_time))
            , drop=True,
        ))
        .assign(ssh = lambda d: d.sla_filtered + d.mdt - d.lwe)
        .pipe(ocnval.validate_ssh)
        .sortby('time')
        ['ssh'](/jejjohnson/ml4eo/wiki/'ssh')
    )

Demo preprocessing with xarray preprocess function.


def run(
    input_dir: str = 'data/downloads/default',
    output_path: str = 'data/prepared/default.nc',
    min_lon: float= -65, max_lon: float=-55,
    min_lat: float = 33, max_lat: float=43,
    min_time: str = '2017-01-01', max_time: str='2018-01-01',
    _skip_val: bool = False,
):
    log.info("Starting")
    if not _skip_val:
      input_validation(input_dir=input_dir)

    partial_prepro = partial(
        preprocess,
        min_lon=min_lon, max_lon=max_lon,
        min_lat=min_lat, max_lat=max_lat,
        min_time=min_time, max_time=max_time
    )
    #  Curate
    ds = xr.open_mfdataset(
        Path(input_dir).glob('**/*.nc'),
        preprocess=partial_prepro,
        concat_dim='time',
        combine='nested', chunks='auto'
    )
    Path(output_path).parent.mkdir(exist_ok=True, parents=True)
    ds.load().sortby('time').to_netcdf(output_path)

    if not _skip_val:
      output_validation(
        output_path=output_path,
        min_lon=min_lon, max_lon=max_lon,
        min_lat=min_lat, max_lat=max_lat,
        min_time=min_time, max_time=max_time
      )


Demo preprocessing with the xarray pipe function.


# PROCESS: Parameterize and implement how to go from input_files to output_files
def run(
    track_path: str = '???',
    grid_path: str = '???',
    grid_var: str = '???',
    output_path: str = '???',
    _skip_val: bool = False,
):
    log.info("Starting")
    if not _skip_val:
     input_validation(grid_path=grid_path, grid_var=grid_var, track_path=track_path)

    map = (
        xr.open_dataset(grid_path)
        .pipe(ocnval.validate_latlon)
        .pipe(ocnval.validate_time)
        .pipe(partial(ocnval.validate_ssh, variable=grid_var)) # TODO validate rec ssh (add partial) 
        [grid_var](/jejjohnson/ml4eo/wiki/grid_var) 
    )

    Path(output_path).parent.mkdir(parents=True, exist_ok=True) # Make output directory
    ocngri.grid_to_coord_based(
        src_grid_ds=map,
        tgt_coord_based_ds=xr.open_dataset(track_path)
    ).to_netcdf(output_path)


    if not _skip_val:
      output_validation(output_path=output_path, track_path=track_path)