Data Provider Setup Guide - Mobility-Data-Space/mobility-data-space GitHub Wiki

This guide covers how to set up and configure a data provider in the MDS ecosystem to share Kafka streams through the Eclipse Dataspace Connector (EDC).

Prerequisites

  • Java 21+
  • MDS Connector or MDS CaaS
  • Docker and Docker Compose
  • Access to a Kafka cluster
  • OIDC provider

Asset Creation

Create a Kafka topic asset via the Management API. This defines the data resource that will be shared:

  curl -X POST <http://localhost:8182/management/v3/assets> \\ 
    -H "Content-Type: application/json" \\ 
    -H "X-Api-Key: your-api-key" \\ 
    -d '{
      "@context": {
        "@vocab": "<https://w3id.org/edc/v0.0.1/ns/>" 
       }, 
      "@id": "mobility-events-stream", 
      "dataAddress": { 
         "type": "Kafka", 
         "topic": "mobility-events", 
         "kafka.bootstrap.servers": "kafka.example.com:9092", 
         "kafka.sasl.mechanism": "OAUTHBEARER", 
         "kafka.security.protocol": "SASL_SSL" 
       } 
     }'

Key Asset Properties

  • @id; Unique identifier for the asset
  • type: Must be "Kafka" for Kafka streaming assets
  • topic: The Kafka topic name
  • kafka.bootstrap.servers: Kafka cluster connection string
  • kafka.sasl.mechanism: Authentication mechanism (OAUTHBEARER, PLAIN, etc.)
  • kafka.security.protocol: Security protocol (SASL_SSL, SASL_PLAINTEXT, etc.)

Policy Definition

Define usage policies for your stream. Policies control who can access the data and under what conditions:

  curl -X POST <http://localhost:8182/management/v2/policydefinitions> \\ 
    -H "Content-Type: application/json" \\ 
    -H "X-Api-Key: your-api-key" \\ 
    -d '{ 
      "@context": { 
      "@vocab": "<https://w3id.org/edc/v0.0.1/ns/>" 
     },
     "@id": "streaming-policy", 
     "policy": { 
        "permissions": [{ 
           "action": "USE", 
           "constraint": { 
              "leftExpression": "REFERRING_CONNECTOR", 
              "operator": "EQ", 
              "rightExpression": "MDSXXXX.YYYYY" 
         } 
       }] 
     } 
   }'

Contract Definition

Link your asset with the policy to create an offer that consumers can negotiate:

  curl -X POST <http://localhost:8182/management/v2/contractdefinitions> \\ 
    -H "Content-Type: application/json" \\ 
    -H "X-Api-Key: your-api-key" \\ 
    -d '{ 
       "@context": { 
       "@vocab": "<https://w3id.org/edc/v0.0.1/ns/>" 
     }, 
    "@id": "mobility-events-contract", 
    "accessPolicyId": "streaming-policy", 
    "contractPolicyId": "streaming-policy", 
    "assetsSelector": { 
       "operandLeft": "@id", 
       "operator": "=", 
       "operandRight": "mobility-events-stream" 
     } 
   }'

Kafka Producer Implementation

Create a producer to publish data to your topic:

  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import java.util.Properties;

  public class MobilityEventProducer { 
    private final KafkaProducer<String, String> producer; 

    public MobilityEventProducer() { 
      Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

      this.producer = new KafkaProducer<>(props); 
    } 

    public void publishEvent(String eventId, String eventData) { 
      ProducerRecord<String, String> record = new ProducerRecord<>("mobility-events", eventId, eventData);
      producer.send(record, (metadata, exception) -> { 
        if (exception == null) { 
          System.out.printf("Event sent to topic %s, partition %d, offset %d%n", metadata.topic(), 
            metadata.partition(), metadata.offset()); 
        } 
        else { 
          exception.printStackTrace(); 
        } 
      }); 
    }
  }

Security Configuration

OIDC Authentication

Configure OIDC for secure Kafka access during asset creation:

 { 
   "dataAddress": { 
      "type": "Kafka", 
      "topic": "mobility-events", 
      "kafka.bootstrap.servers": "kafka.example.com:9092", 
      "kafka.sasl.mechanism": "OAUTHBEARER", 
      "kafka.security.protocol": "SASL_SSL", 
      "oidc.discovery.url": "<https://auth.example.com/.well-known/openid_configuration>", 
      "oidc.client.registration.endpoint": "<https://auth.example.com/clients>" 
    }
 }