Monitoring Spark metrics and applications in Azure Monitor - blendax/MonitorDatabricks GitHub Wiki
This wiki is specifically focused on how to enable Azure Databricks/Spark to send metrics and application logs to Azure Monitor.
If you want and overview of Azure Monitor and the architecture and how it's related to Azure Log Analytics Workspaces please see here:https://docs.microsoft.com/en-us/azure/azure-monitor/overview and here:https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-platform to get a better understanding of the bigger picture.
This wiki is focused on spark application logging and metrics. For batch monitoring see Data Factory Monitoring.
Configure Azure Databricks to send log info to Azure Monitor
To use Azure monitor you will need a LogAnalytics Workspace in Azure. If you don't have a Log Analytics workspace you will need to create one. The Log Analytics workspace is the "container" that will store all your log data from different applications that you chose. It also offers an efficient way to query all logs.
Don't head to the portal to create a Log Analytics workspace from the UI. It's easier if you create the Log Analytics Workspace with script/CLI. If you do this we can bootstrap the log analytics workspace with pre-populated queries specifically tailored for Spark in Azure Databricks. This will ease your life a lot. Note: It's still possible to create the Log Analytics Workspace from the UI in the portal, but then you will have to paste in the Spark queries yourself.
Create Log Analytics workspace
Instructions you should follow are found here [1]: https://docs.microsoft.com/en-us/azure/architecture/databricks-monitoring/dashboards (Ignore the Grafana part as that is another topic not covered here) This wiki is ment to complement the instructions in the link [1] above. So read them in parallel. Visit the linked Github library project and create a LogAnalytics WS from the template mentioned: https://docs.microsoft.com/en-us/azure/architecture/databricks-monitoring/dashboards#deploy-the-azure-log-analytics-workspace To run the CLI command you need the az cli. See: https://docs.microsoft.com/en-us/cli/azure/get-started-with-azure-cli?view=azure-cli-latest If you can't or don't want to install the CLI locally you can use Azure Cloud Shell in the portal that already has the bash CLI installed. See: https://docs.microsoft.com/en-us/azure/cloud-shell/quickstart You can upload the file "logAnalyticsDeploy.json" to Azure Cloud Shell. Run the command below and replace:
<resource-group-name>
with your resource group and your location e.g. "Western Europe" and make sure you execute the command in the same folder as the logAnalyticsDeploy.json file.
az group deployment create --resource-group <resource-group-name> --template-file logAnalyticsDeploy.json --parameters location='Western Europe' serviceTier='Standalone'
az group deployment create --resource-group LogAnalyticsDemo --template-file logAnalyticsDeploy.json --parameters location='North Europe' serviceTier='Standalone'
Go to you your new resource by searching in the portal: img/1sparkmonitorWS.png
Select logs in the left menu: img/select_logs.png
Click on Query Explorer and expand the saves queries -> Spark Metrics img/clickOnQueryExplorer.png
Thanks to the template we used we already have pre-populated queries for spark and Databricks.
Now we have a place to store the information and to query the information (the Log Analytics workspace). Next step is to make sure we can send log data and metrics from Databricks to azure Monitor/Log Analytics workspace.
Enable Databricks to send data to Azure Monitor / Log Analytics workspace
Azure Databricks does not natively support sending log data to Azure monitor, but a library for this functionality is available in GitHub.
Genral info about Databricks monitoring: https://docs.microsoft.com/en-us/azure/architecture/databricks-monitoring/
Build the library
To enable logging from e.g. Streaming applications in Databricks you will need to extend the core functionality to send streaming query event information to Azure Log Analytics. There is a Github library that enables this:
https://github.com/mspnp/spark-monitoring
Read the instructions on the Github readme page.
Summary:
- You need java 8 JDK
- You need maven installed and accessible from command line
- or
- Docker
This can be seen in Databricks when you edit the cluster: img/Small62monitor.png
Make sure you have/change the versions as the active profile in the POM file to be the same as the cluster versions. If you don´t see the versions you need create a new profile. (Copy paste from the others) img/pomxml1.png
In the terminal execute the command line:
mvn package
while standing in the root folder of the project where the top most pom.xml file is locatded.
You have now built the two jar files that we need Azure Databricks Cluster to load. You will find the jar files in the target directory:(the names can vary depending on versions used)
/src/target/spark-listeners_2.11_2.4.4-1.0.0.jar
/src/target/spark-listeners-loganalytics_2.11_2.4.4-1.0.0.jar
Script for loading jars and and setting Log Analytics Workspace in Azure Databricks
You need to tell spark to load the jar files, also you need to connect the logging to your Log Analytics Workspace in Azure so Spark knows to which location it will log. Follow the instructions here: https://github.com/mspnp/spark-monitoring#configure-the-databricks-workspace
You find the Log Analytics WS ID and Primary Key in the portal under Advanced settings for your Log Analytics Wosrkspace img/AdvancedSettings.png
Also change the version of spark to the version you are using in the script: spark-monitoring.sh
The databricks CLI will be needed (as in the instructions) to upload the the script and the jar files. It can be installed locally or in the Azure Cloud Shell: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/cli/
Before you copy your files to the databricks filesystem make sure you create the folder first with the command:
dbfs mkdirs dbfs:/databricks/spark-monitoring/
When you follow the instructions here: https://github.com/mspnp/spark-monitoring#create-and-configure-the-azure-databricks-cluster make sure that you have the same version of the spark cluster and scala in your script file as you had in your pom.xml file. (Instructions are outdated mentioning Databricks runtime version 5.0)
Just update the rows:
SPARK_VERSION=${SPARK_VERSION:-2.4.3}
SPARK_SCALA_VERSION=${SPARK_SCALA_VERSION:-2.11}
These versions are used in the script to copy the jar files to spark (so versions must match):
JAR_FILENAME="spark-listeners_${SPARK_SCALA_VERSION}_${SPARK_VERSION}-${SPARK_LISTENERS_VERSION}.jar"
cp -f "$STAGE_DIR/$JAR_FILENAME" /mnt/driver-daemon/jars
At the moment of writing the two latest versions of spark are 2.4.4 or 2.4.5. Now you have a cluster that will use a script to load the two jars and point to the correct Azure Log Analytics Workspace.
Under advanced settings for your cluster it should look something like this for Init Scripts: img/ClusterAdvSet.png
Next step is to start the cluster and run some code (e.g. streaming) and see if we can see the log results in our Log Analytics Workspace. Run some streaming notebook to test the logging. An easy way to test is to use this example notebook: https://docs.databricks.com/getting-started/spark/streaming.html#notebook-stream
We use the Saved Query we got from deploying the template before. We can e.g. see Stream throughput over time for the cluster. img/LogAnWS1.png
There are a lot of metrics and saved queries to use. It is possible to set up alarms connected to queries to get alerts.
One example of alerting is to alert if the cluster stream less than 50 rows per second in 2 consecutive measures: img/ConfigureSignal.png
NOTE:
To make alerts trigger on metrics like above you need to add an AggregatedValue= to your query.
Query Before fix:
SparkListenerEvent_CL
| where Event_s contains "progress"
| extend sname=strcat(progress_name_s,"-ProcRowsPerSecond")
| extend status = todouble(extractjson("$.[0].processedRowsPerSecond", progress_sources_s))
| summarize percentile(status,90) by bin(TimeGenerated, 1m) , sname
| order by TimeGenerated asc nulls last
Query After fix (adding AggregatedValue=) to work with Alerts:
SparkListenerEvent_CL
| where Event_s contains "progress"
| extend sname=strcat(progress_name_s,"-ProcRowsPerSecond")
| extend status = todouble(extractjson("$.[0].processedRowsPerSecond", progress_sources_s))
| summarize AggregatedValue=percentile(status,90) by bin(TimeGenerated, 1m) , sname
| order by TimeGenerated asc nulls last
Then you can work with action groups to send out the alert: img/AddActionGroup.png
After this you have the basis in place for monitoring Azure Databricks in Azure Monitor and Azure Log Analytics. You can play around with many of the pre-built queries or create your own. A list of the pre-built queries with some explanation can be found here: https://docs.microsoft.com/en-us/azure/architecture/databricks-monitoring/dashboards#visualizations-in-the-dashboards
Custom application metrics and application logging
It´s also possible to create custom application metrics like your own counter. There are exmaples in the GitHub repo how to do that. That exmaple also includes application logging using Log4J. Both connected to Azure Log Anaytics Workspace.
That code in a notebook (scala) in Azure Databricks could look like:
package com.microsoft.pnp.samplejob
import com.microsoft.pnp.logging.Log4jConfiguration
import com.microsoft.pnp.util.TryWith
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.UserMetricsSystems
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.window
import org.apache.spark.sql.types.{StringType, StructType, TimestampType}
object StreamingQueryListenerSampleJob extends Logging {
private final val METRICS_NAMESPACE = "streamingquerylistenersamplejob"
private final val COUNTER_NAME = "rowcounter"
def main(args: Array[String]): Unit = {
// Configure our logging
TryWith(getClass.getResourceAsStream("/com/microsoft/pnp/samplejob/log4j.properties")) {
stream => {
Log4jConfiguration.configure(stream)
}
}
logTrace("Trace message from StreamingQueryListenerSampleJob")
logDebug("Debug message from StreamingQueryListenerSampleJob")
logInfo("Info message from StreamingQueryListenerSampleJob")
logWarning("Warning message from StreamingQueryListenerSampleJob")
logError("Error message from StreamingQueryListenerSampleJob")
val spark = SparkSession
.builder
.getOrCreate
import spark.implicits._
// this path has sample files provided by databricks for trying out purpose
val inputPath = "/databricks-datasets/structured-streaming/events/"
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
val driverMetricsSystem = UserMetricsSystems
.getMetricSystem(METRICS_NAMESPACE, builder => {
builder.registerCounter(COUNTER_NAME)
})
driverMetricsSystem.counter(COUNTER_NAME).inc
// Similar to definition of staticInputDF above, just using `readStream` instead of `read`
val streamingInputDF =
spark
.readStream // `readStream` instead of `read` for creating streaming DataFrame
.schema(jsonSchema) // Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) // Treat a sequence of files as a stream by picking one file at a time
.json(inputPath)
driverMetricsSystem.counter(COUNTER_NAME).inc(5)
val streamingCountsDF =
streamingInputDF
.groupBy($"action", window($"time", "1 hour"))
.count()
// Is this DF actually a streaming DF?
streamingCountsDF.isStreaming
driverMetricsSystem.counter(COUNTER_NAME).inc(10)
val query =
streamingCountsDF
.writeStream
.format("memory") // memory = store in-memory table (for testing only in Spark 2.0)
.queryName("counts") // counts = name of the in-memory table
.outputMode("complete") // complete = all the counts should be in the table
.start()
}
}
com.microsoft.pnp.samplejob.StreamingQueryListenerSampleJob.main(Array())
And shows how to use standard logging like logInfo() and a custom metric like driverMetricsSystem.counter(COUNTER_NAME).inc(5)
Both can later be queried in log analytics. Querying Log4J for INFO for an application can look like:
Quering custom metric rowcounter can look as: