Accessing Data on the Platform - adobe/experience-platform-dsw-reference GitHub Wiki
The following set of code snippets illustrate how to access data on the platform using one of the supported kernels. You may access data from JupyterLab or custom recipe code.
You can auto-generate the code for a basic read operation by clicking 'Explore Data in Notebook' in the Jupyter Notebooks environment in the kernel of your choice.
For Python and R the authentication to Query Service is done through building the client context.
# Python
from platform_sdk.client_context import ClientContext
client_context = ClientContext(api_key=<api key>,
org_id=<ims org id>,
user_token=<user token>,
service_token=<service token>)
# R
library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
client_context <- psdk$client_context$ClientContext(api_key=<api key>,
org_id=<ims org id>,
user_token=<user token>,
service_token=<service token>)
If you are using DSW Notebook, this is already set for you.
# Python
client_context = PLATFORM_SDK_CLIENT_CONTEXT
# R
library(reticulate)
use_python("/usr/local/bin/ipython")
psdk <- import("platform_sdk")
py_run_file("../.ipython/profile_default/startup/platform_sdk_context.py")
client_context <- py$PLATFORM_SDK_CLIENT_CONTEXT
# Python
from platform_sdk.dataset_reader import DatasetReader
dataset_reader = DatasetReader(client_context, "<dataset id>")
df = dataset_reader.read()
df.head()
# R
DatasetReader <- psdk$dataset_reader$DatasetReader
dataset_reader <- DatasetReader(client_context, "<dataset id>")
df <- dataset_reader$read()
df
For Spark based jobs, we have two modes to read the data - interactive and batch mode. For notebooks the default is interactive mode.
To change the mode, switch the PLATFORM_SDK_READ_MODE = "batch"
# PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
PLATFORM_SDK_READ_MODE = "interactive"
pd = spark.read.format("com.adobe.platform.query") \
.option("user-token", os.environ["PYDASDK_IMS_USER_TOKEN"]) \
.option("ims-org", os.environ["IMS_ORG_ID"]) \
.option("api-key", os.environ["PYDASDK_IMS_CLIENT_ID"]) \
.option("service-token", os.environ["PYDASDK_IMS_SERVICE_TOKEN"]) \
.option("mode", PLATFORM_SDK_READ_MODE) \
.option("dataset-id", datasetId) \
.load()
pd.describe()
In Notebooks, we have the magix cell command to achieve the same as above:
# PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
%dataset read --datasetId 5e45311dd407c41872fffa87 --dataFrame pd0
// Spark
import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().master("local").getOrCreate()
val PLATFORM_SDK_READ_MODE = "interactive"
val df = spark.read.format("com.adobe.platform.query")
.option("user-token", sys.env("PYDASDK_IMS_USER_TOKEN"))
.option("ims-org", sys.env("IMS_ORG_ID"))
.option("api-key", sys.env("PYDASDK_IMS_CLIENT_ID"))
.option("service-token", sys.env("PYDASDK_IMS_SERVICE_TOKEN"))
.option("mode", PLATFORM_SDK_READ_MODE)
.option("dataset-id", datasetId)
.load()
df.show()
Query Service in Jupyter Notebook Tutorial
For Experience Event datasets (e.g. datasets from Adobe Analytics), users are encouraged to filter by time range. If the time range options are specified, all batches are considered and limit
and start
will be ignored. All timestamps are in UTC.
# Python
from platform_sdk.dataset_reader import DatasetReader
from datetime import date
dataset_reader = DatasetReader(get_platform_sdk_client_context(), dataset_id="<dataset id>")
df = dataset_reader.where(\
dataset_reader['timestamp'].gt('2020-06-05 15:00:00').\
And(dataset_reader['timestamp'].lt('2020-06-05 17:00:00'))\
).read()
df.head()
# R
df2 <- dataset_reader$where(
dataset_reader['timestamp']$gt('2020-06-05 15:00:00')$
And(dataset_reader['timestamp']$lt('2020-06-05 17:00:00'))
)$read()
df2
To filter Experience Event datasets in Pyspark or Scala please use spark.sql
# Pyspark
pd.createOrReplaceTempView("event")
timepd = spark.sql("SELECT * FROM event WHERE timestamp > CAST('2019-01-14 00:00:00.0' AS TIMESTAMP) AND timestamp < CAST('2019-01-15 00:00:00.0' AS TIMESTAMP)")
timepd.show()
//Spark
dataFrame.createOrReplaceTempView("event")
val timedf = spark.sql("SELECT * FROM event WHERE timestamp > CAST('2019-01-14 00:00:00.0' AS TIMESTAMP) AND timestamp < CAST('2019-01-15 00:00:00.0' AS TIMESTAMP)")
timedf.show()
Use the code below to merge different slices of the same dataset. Please note that different slices might contain different number of columns. The merged data frame should fill a newly joined column with "missing value".
# Python
import pandas as pd
df = pd.concat([df0,df1], ignore_index=True)
# R
df = merge(df0,df1,all = TRUE)