Writing Data on the Platform - adobe/experience-platform-dsw-reference GitHub Wiki

The following set of snippets illustrates how to write back data on the platform using one of the supported kernels. You may write back data from JupyterLab or custom recipe code.

Python3

# Python Data Access SDK 0.5.7
from data_access_sdk_python.writer import DataSetWriter

writer = DataSetWriter()
writer.write(data_set_id="<dataset id>", dataframe=<dataframe>, ims_org="<ims org>", file_format="json")

R

# R Data Access SDK 0.5.7
library(reticulate)
use_python("/usr/local/bin/python3")

writer <- psdk$writer$DataSetWriter()
writer$write(data_set_id="<dataset id>", dataframe=<dataframe>, ims_org="<ims org>", file_format="json")

PySpark3

From Recipe Code

# Data Access SDK 1.1.12
serviceToken = "<service token>"
userToken = "<user token>"

df.write.format("com.adobe.platform.dataset") 
        .option('orgId', org_id) 
        .option('serviceToken', service_token) 
        .option('userToken', user_token) 
        .option('serviceApiKey', api_key) 
        .save(scored_dataset_id)

From a Jupyter Lab Notebook

To write data back to the platform follow the following steps.

  1. Navigate to the Data Management - Schema tab and define a schema for the dataframe of interest.

  2. Navigate to the Data Management - Datasets tab and create a dataset using the defined schema from (1), but without uploading any files. Note the dataset id and set it as a string variable in the following cell.

dataset_id = "<dataset_id>"
  1. Navigate to the Data Management - Queries tab and go to Credentials. Copy the Username and set it as the orgId in the following cell.
orgId = "<Username>"
  1. Finally, run the following cell to write the data back to the platform (_tenantID placeholders in alias should be replaced with original value e.g. '_acpccedemo').
from pyspark.sql.functions import lit, monotonically_increasing_id, struct

dataframe_to_write_decorated = dataframe_to_write.withColumnRenamed('timestamp_date', 'timestamp')
                                             .withColumn('_id', monotonically_increasing_id())          
                                             .withColumn('eventType', lit(''))
                                             .select('_id', 'eventType', 'timestamp', struct('count', 'day_of_week', 'week', 'total', 'weekly_prop', 'is_weekday').alias(<_tenantID>))

dataframe_to_write_decorated.printSchema()

userToken = spark.sparkContext.getConf().get("spark.yarn.appMasterEnv.USER_TOKEN")
serviceToken = spark.sparkContext.getConf().get("spark.yarn.appMasterEnv.SERVICE_TOKEN")
serviceApiKey = spark.sparkContext.getConf().get("spark.yarn.appMasterEnv.SERVICE_API_KEY")

dataframe_to_write_decorated.write.format("com.adobe.platform.dataset")
     .option("orgId", orgId)
     .option("serviceToken", serviceToken)
     .option("userToken", userToken)
     .option("serviceApiKey", serviceApiKey)
     .save(dataset_id)

Scala

# Data Access SDK 1.1.12
serviceToken = "<service token>"
userToken = "<user token>"

df.write.format("com.adobe.platform.dataset") 
      .option(DataSetOptions.orgId, orgId) 
      .option(DataSetOptions.serviceToken, serviceToken) 
      .option(DataSetOptions.userToken, userToken) 
      .option(DataSetOptions.serviceApiKey, apiKey) 
      .save(scoredDataSetId)
⚠️ **GitHub.com Fallback** ⚠️