Writing and Integrating Data Processors - dlr-eoc/prosEO GitHub Wiki
This page introduces aspects to consider when creating a data processor intended for use with prosEO and when integrating existing data processors into prosEO.
The application examples presented below are based on the Sample Processor
and Sample Wrapper
provided in the prosEO source code in the directory <project root>/samples/sample-processor
and <project root>/samples/sample-wrapper
, respectively.
The main link between the prosEO Production Planner and any data processor being run by prosEO is the Job Order File (JOF). Any processor to be run within the prosEO framework is required to adhere to ESA's Generic IPF Interface Specification (MMFI-GSEG-EOPG-TN-07-0003 or its variant GMES-GSEG-EOPG-TN-09-0016; see Configure Processor Classes, Versions and Configurations). The two reference documents present slightly different versions of the XML structure of a Job Order File, which will be detailed below.
First let's have a look at a simple Job Order File, as it would be fed into the Sample Processor
:
<?xml version="1.0" encoding="utf-8" standalone="no"?>
<Ipf_Job_Order>
<Ipf_Conf>
<Processor_Name>PTML2</Processor_Name>
<Version>2.0.0</Version>
<Stdout_Log_Level>INFO</Stdout_Log_Level>
<Stderr_Log_Level>INFO</Stderr_Log_Level>
<Test>false</Test>
<Breakpoint_Enable>false</Breakpoint_Enable>
<Processing_Station>prosEO Test Mission localhost</Processing_Station>
<Config_Files>
<Conf_File_Name>/usr/share/sample-processor/conf/ptm_l2_config.xml</Conf_File_Name>
</Config_Files>
<Sensing_Time>
<Start>20191104_090000200000</Start>
<Stop>20191104_104110300000</Stop>
</Sensing_Time>
<Dynamic_Processing_Parameters>
<Processing_Parameter>
<Name>Threads</Name>
<Value>10</Value>
</Processing_Parameter>
<Processing_Parameter>
<Name>logging.root</Name>
<Value>notice</Value>
</Processing_Parameter>
<Processing_Parameter>
<Name>Processing_Mode</Name>
<Value>OPER</Value>
</Processing_Parameter>
</Dynamic_Processing_Parameters>
</Ipf_Conf>
<List_of_Ipf_Procs count="1">
<Ipf_Proc>
<Task_Name>ptm_l2</Task_Name>
<Task_Version>2.0.0</Task_Version>
<List_of_Inputs count="1">
<Input>
<File_Type>PTM_L1B_P1</File_Type>
<File_Name_Type>Physical</File_Name_Type>
<List_of_File_Names count="1">
<File_Name>/proseo/data/cache/2025/8/28/16/54/PTM_TEST_PTM_L1B_P1_20191104T090000_20191104T104110_03000_2_2.0.0_20250828T165438.nc</File_Name>
</List_of_File_Names>
</Input>
</List_of_Inputs>
<List_of_Outputs count="1">
<Output>
<File_Type>PTM_L2_B</File_Type>
<File_Name_Type>Physical</File_Name_Type>
<File_Name>/proseo/data/processor/364176106/PTM_TEST_PTM_L2_B_20191104T090000_20191104T104110_03000_99_2.0.0_20250828T165452.nc</File_Name>
</Output>
</List_of_Outputs>
</Ipf_Proc>
</List_of_Ipf_Procs>
</Ipf_Job_Order>
Basically a Job Order File consists of a configuration section and a list of one or more processor declarations (depending on the number of tasks defined in the selected processor). It is important to understand, where the elements in the Job Order File come from in prosEO:
Job Order Element | prosEO Entity/Attribute | Explanation |
---|---|---|
Processor_Name | ProcessorClass/processorName | The name of the processor algorithm to run |
Version | Processor/processorVersion | Version of the processor executable, a "newer" version is denoted by a lexicographically higher version string |
Stdout_Log_Level Stderr_Log_Level |
JobStep/stdoutLogLevel JobStep/stderrLogLevel |
Log level settings (in prosEO currently fixed to the default value `INFO`) |
Test | Processor/isTest | Flag indication test processing mode |
Breakpoint_Enable | N/A | Use of breakpoints (in prosEO currently fixed to the default value `false`) |
Processing_Station | Mission/Name + " " + ProcessingFacility/name | Data processing entity |
Config_Files | Configuration/configurationFiles/fileName | List of file paths for configuration files |
Sensing_Time/Start Sensing_Time/Stop |
ProcessingOrder/startTime ProcessingOrder/stopTime |
Start and stop time for the output product(s) |
Dynamic_Processing_Parameters | Configuration/dynProcParameters and/or ProcessingOrder/dynamicProcessingParameters |
A list of key/value pairs with processing parameters |
Task_Name | Task/taskName | The name of the processing task |
Task_Version | Task/taskVersion | The task version string |
Input | N/A | Description of one or more input files of the same `File_Type` (derived from Configuration/staticInputFiles and/or determined dynamically through [selection rules](https://github.com/dlr-eoc/prosEO/wiki/Creating-and-Configuring-a-Mission#configure-product-classes-and-selection-rules)) |
Input/File_Type | ProductClass/productType | The product class name of the input product ("product type" in ESA terminology) |
Input/File_Name_Type | Configuration/staticInputFiles/fileNameType or fixed value `Physical` |
Type of the file path given: typically either a physical file (`Physical`) or a directory (`Directory`) |
Input/List_of_File_Names | Configuration/staticInputFiles/fileNameType or determined dynamically |
A list of file paths to input files (either defined in the configuration or determined through selection rules) |
Output | N/A | Description of the expected output product (the prosEO Production Planner always generates exactly one `Output` element per Job Order File) |
Output/File_Type Output/File_Name_Type |
ProcessingOrder/requestedProductClasses | Same as for `Input` |
Output/File_Name | Mission/productFileTemplate or ProductClass/productFileTemplate |
File path to the expected output product; the file name is determined using the [product file template](https://github.com/dlr-eoc/prosEO/wiki/Creating-and-Configuring-a-Mission#configuring-the-product-file-template) defined in either the mission or the product class |
As mentioned above, ESA has two documents specifiying the format of Job Order Files with the following differences to the tag names:
MMFI-GSEG-EOPG-TN-07-0003, issue 1.8 | GMES-GSEG-EOPG-TN-09-0016, issue 1.1 |
Config_Files | List_of_Config_Files |
Conf_File_Name | Config_File |
Dynamic_Processing_Parameters | List_of_Dyn_Processing_Parameters |
Processing_Parameter | Dyn_Processing_Parameter |
The JOF structure is the same for both variants. The applicable variant can be selected by setting the jobOrderVersion
attribute of the Processor object to either MMFI_1_8
or GMES_1_1
.
There is a newer "Generic Processor ICD" by ESA (ESA-EOPG-EEGS-ID-0083, issue 1.3), but its specifications have not yet been implemented in prosEO.
In order to minimize the integration effort for a data processor, the following recommendations should be kept in mind when designing a processor:
- Use the
Output/File_Type
JOF element rather than theProcessor_Name
element to select the applicable processing algorithm. TheProcessor_Name
element uniquely identifies the processor executable, not individual algorithms within it. - The values for
Stdout_Log_Level
,Stderr_Log_Level
andBreakpoint_Enable
currently cannot be configured in prosEO, so the default values (INFO
andfalse
, respectively) shall be acceptable for the processor. - Static data comes in two flavours: Configuration files and static input files. Configuration files are assumed to be comparatively small and may be included in the processor container image, they go into the
Config_Files
element. "Static input files" may be much larger (e. g. a global Digital Elevation Model), so they should be kept outside of the container image (they can be linked into the container via bind mounts on a common NFS share) and should be defined in the corresponding section of the Configuration object. - Dynamically selected input files are provided as
Physical
file paths, never as directories. - If multiple input files are selected for any given product type (
File_Type
), they will be combined in a singleInput
element with several file paths in theList_of_File_Names
. - A processing order requesting multiple output product classes will be split into several job steps, each with exactly one output product class. Therefore the JOF contains exactly one
Output
element per task (but if several tasks are generated, they all get the same output product specification). - The processor may choose between the JOF standards given in either ESA specification (MMFI-GSEG-EOPG-TN-07-0003 or GMES-GSEG-EOPG-TN-09-0016), but the standards must not be mixed or element tags changed.
Not adhering to these recommendations will complicate the processor integration. Except for recommendation (1), which can be handled by the processor configuration, all deviations have to be addressed in the processor wrapper.
For any data processor to be run in prosEO, a wrapper component is required, which provides the link between the (processor-agnostic) prosEO framework and the (prosEO-agnostic) externally developed data processor. (It would of course be possible to develop a processor in such a way that it would integrate seamlessly with prosEO without the need for a separate wrapper, but then the processor could not be run in other environments any more.)
The diagram below shows how the wrapper links the data processor (termed the "Instrument Processing Facility" or IPF by ESA) to the Production Planner on one side and to the Storage Manager on the other:
The IPF Wrapper fulfils the role of the "Management Layer" in ESA's "Generic IPF Specifications" (MMFI-GSEG-EOPG-TN-07-0003). In contrast to the other prosEO components it is highly mission- and processor-specific. Its main task is to communicate with the Storage Manager component to fetch the required input files (and the Job Order File itself) from the backend storage (which typically is not a POSIX file system, but some S3 object storage) and to push the output files to that backend storage. It also communicates with the Ingestor component to update the output product metadata and with the Production Planner to notify it of its completion (successful or otherwise).
One mandatory task of the wrapper is to translate the JOF as received from the Production Planner into a JOF recognizable by the data processor, because the Production Planner sends a somewhat enhanced version of the JOF to the wrapper. For the example described above, the input and output file lists as received from the Production Planner might look like this:
<List_of_Inputs count="1">
<Input>
<File_Type>PTM_L1B_P1</File_Type>
<File_Name_Type>Physical</File_Name_Type>
<List_of_File_Names count="1">
<File_Name FS_Type="S3">s3://backend/2025/8/28/16/54/PTM_TEST_PTM_L1B_P1_20191104T090000_20191104T104110_03000_2_2.0.0-SNAPSHOT_20250828T165438.nc</File_Name>
</List_of_File_Names>
</Input>
</List_of_Inputs>
<List_of_Outputs count="1">
<Output Product_ID="56">
<File_Type>PTM_L2_B</File_Type>
<File_Name_Type>Physical</File_Name_Type>
<File_Name FS_Type="POSIX">PTM_TEST_PTM_L2_B_20191104T090000_20191104T104110_03000_99_2.0.0-SNAPSHOT_20250828T165452.nc</File_Name>
</Output>
</List_of_Outputs>
You will notice that:
- The
File_Name
element of the input file has an additionalFS_Type
attribute with the valueS3
, and the file path is not a valid POSIX file path, but an S3 URL. This URL is sent to the Storage Manager to retrieve the input product, and in the final JOF the string is replaced by the actual file path in a POSIX file system, where the product has been downloaded to. The (as per the JOF schema invalid)FS_Type
attribute is removed. - The
Output
element has aProduct_ID
attribute, which is stored locally by the wrapper and used to find the correct product metadata record to update, when processing is completed. In the final JOF, the attribute is removed. - The
Output/File_Name
element only contains a file name, but not a directory path. The absolute path is determined by the wrapper and will be added, when the final JOF is generated.
Since the communication with Storage Manager, Ingestor and Production Planner and the modifications for the final JOF are standardized, a convenience class is provided, which handles all that: The BaseWrapper
class. Any concrete wrapper for a specific data processor will extend this base class and will fill in the three extension points provided by it as required.
The flow of control of the BaseWrapper (and hence any wrapper) is as follows (see BaseWrapper.java):
- Check environment variables (acting as invocation parameters),
- Fetch the Job Order File from the Storage Manager,
- Create a Job Order document from the Job Order File,
- Extension hook (optional): Modify Job Order document for processor operation,
- Fetch input files and remap file names in Job Order,
- Extension hook (optional): Modify fetched data and Job Order document for processor,
- Create Job Order File in file system for container context,
- Execute data processor,
- Extension hook (optional): Perform processor-specific updates to the Job Order document,
- Push Processing Results to prosEO Storage,
- Register pushed products with prosEO Ingestor,
- Cleanup temporary files/directories,
- Report success or failure to Production Planner.
In the case of a "well-behaved" data processor (i. e. one conforming to all rules set out above), no further implementation in the processor-specific wrapper is required. Alas, this is very rarely the case, therefore a typical wrapper will need to do some adaptations in one or more of the extension hooks (steps 4, 6 and 9 above). The basic structure of a processor-specific IPF Wrapper is this:
package de.dlr.proseo.samplewrap;
import java.io.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.dlr.proseo.basewrap.BaseWrapper;
import de.dlr.proseo.model.enums.JobOrderVersion;
import de.dlr.proseo.model.joborder.JobOrder;
public class SampleWrapper extends BaseWrapper {
private static Logger logger = LoggerFactory.getLogger(SampleWrapper.class);
@Override
protected void preFetchInputHook(JobOrder jobOrderDoc) throws WrapperException {
...
}
@Override
protected void postFetchInputHook(JobOrder jobOrderDoc) throws WrapperException {
...
// Save the final JOF in the log for post-mortem analysis
ByteArrayOutputStream jof = new ByteArrayOutputStream();
jobOrderDoc.writeXMLToStream(jof, false, JobOrderVersion.valueOf(ENV_JOBORDER_VERSION));
logger.info("Updated Job Order file:\n" + jof.toString());
}
@Override
protected void postProcessingHook(JobOrder joWork) throws WrapperException {
...
}
}
While not strictly necessary it is recommended practice to log the final Job Order File before invocation of the processor for error diagnosis.
Things to do in these methods (and actually observed in the wild) include:
- For
preFetchInputHook
:- Check availability of all required input,
- Remove unnecessary input (e. g. a selection rule may return several input products of a specific product type, or several selection rules may provide products of alternative product types, and only one of them must be fed to the processor);
- For
postFetchInputHook
:- Change the
File_Type
values in the Job Order document (some processors have their own ideas about how the product types are named), - Add more output products (remember that prosEO only knows about one, the "primary" output product; everything else is considered "windfall output"),
- Replace
Sensing_Time/Start
andSensing_Time/Stop
by0000000_000000000000000
and999999_999999999999999
, respectively (yes, there are processors which need that), - Any other modification of the contents of the Job Order document the processor might require;
- Change the
- For
postProcessingHook
:- Retrieve the actual output files (because they are not named as prescribed in the JOF),
- Ingest any "windfall output" (it is not handled by the BaseWrapper code),
- Extract additional metadata from the output product file(s) and update the product metadata accordingly,
- If a single output file is required: Create a zipped archive of all the output files,
- Transfer some output files directly to an external pickup point, where they need to be provided as soon as practicable,
- Generate processing orders for further processing of the output files (this should become obsolete once proper functionality for automatic order generation is added to prosEO).
Of course this list is by no means complete, and much more complicate integrations have been required over time. But you get the gist ... And if anything goes wrong, log an error message and throw a WrapperException
.
To support some of the more common tasks listed above, the BaseWrapper class provides some convenience methods for its subclasses (see BaseWrapper Javadocs):
protected RestProduct retrieveProductMetadata(InputOutput output) throws WrapperException
protected void updateProductMetadata(InputOutput output, RestProduct productMetadata) throws WrapperException
protected IngestorProduct createIngestorProduct(Path outputFilePath)
protected void ingestProduct(IngestorProduct product)
protected String extractProseoMessage(HttpResponseInfo responseInfo)
In addition, some of the BaseWrapper's environment variables are accessible for the processor-specific wrappers:
Environment variable | Variable meaning |
ENV_JOBORDER_VERSION | The Job Order File format version to be used |
ENV_LOCAL_FS_MOUNT | Mount point of shared local file system |
`wrapperDataDirectory | Directory for temporary/output files created by the wrapper |
ENV_PROSEO_USER | User name for prosEO Control Instance |
ENV_PROSEO_PW | Password for prosEO Control Instance |
ENV_PROCESSING_FACILITY_NAME | Name of the Processing Facility the wrapper is running in |
ENV_INGESTOR_ENDPOINT | HTTP endpoint for Ingestor callback |
ENV_PROCESSOR_SHELL_COMMAND | Shell command to run the processor (with path to Job Order File as sole parameter) |
The IPF Wrapper container image is built from the container image of the data processor (IPF) like so (note that in our sample environment the data processor is a Java program, too, which is unusual):
FROM localhost:5000/proseo-sample-processor:1.2.0
WORKDIR /usr/share/proseo-sample-wrapper
ADD target/proseo-sample-wrapper-jar-with-dependencies.jar proseo-sample-wrapper.jar
# Timeout if other prosEO services do not respond within 10 min
ENV HTTP_TIMEOUT=600
ENV PROCESSOR_SHELL_COMMAND="java -jar /usr/share/sample-processor/proseo-sample-processor.jar"
ENTRYPOINT ["java", "-jar", "/usr/share/proseo-sample-wrapper/proseo-sample-wrapper.jar", "de.dlr.proseo.samplewrap.SampleWrapper"]