User Query Monitoring in Spark

Sev Leonard
The Nuna Blog
Published in
8 min readOct 13, 2020

--

Image by Jesus Chico

The Analytic Reporting Environment for Medicaid And CHIP (AREMAC) provides access to Medicaid and CHIP data for reporting and analysis. To work with data of this size we rely on the Spark ecosystem for ingesting, organizing, and surfacing this data to end users.

In many big data systems analytical workloads are developed alongside the data platform, informing optimizations such as file type choice and partition strategies. With AREMAC we aren’t able to be as tightly coupled between engineering and analysis due to the variety of user groups and workloads we support. Additionally, most AREMAC users prefer business intelligence (BI) tools such as Microstrategy and SAS, which add another layer of obfuscation.

One way we address the challenge of optimizing AREMAC is our Spark real-time query monitoring platform, built to provide visibility into the analytic workloads run on AREMAC. By creating a database of user queries we are able to identify performance improvements and provide insight into query performance and data lineage.

In this article we will do a deep dive into how we collect, analyze, and use Spark query metrics. We will start with the process of query discovery and various techniques we evaluated for capture. Next, we will describe the design of the query monitoring framework and how we deploy it on Databricks. To conclude, we’ll provide a few use cases of how we’ve turned these insights into performance gains.

Monitoring Spark Queries

Existing monitoring efforts typically focus on Spark performance, and much of the SparkUI and Spark History Server is organized to provide this information. In our case, we also want to capture the content and construction of queries, that is, the SQL statements and the query plans generated to run them, as well as their performance metrics. The SQL statements will help us understand query patterns, and the plans will give us insight into how they are being run on Spark. Both will help us identify optimization opportunities. In addition, the user who ran the query is important information, so we can engage appropriate user groups when analyzing query monitoring results.

Most AREMAC users interact with the system through BI tools, which connect to Spark through the Thrift JDBC/ODBC Server. Information about these queries is surfaced in the SparkUI through the JDBC/ODBC Server tab, pictured below. Of particular note is the SQL statistics, which is the data we will be capturing.

The SQL statistics give us several important pieces of information:

  • User. The user who ran the query.
  • Group ID. A unique identifier for a given statement.
  • Session ID. With AREMAC BI tool connections, a session can correspond to running a report or a user connection. This is helpful for linking several statements together as being associated with a particular report or user.
  • Timings. Used to determine performance characteristics.
  • Statement. The statement generated by ThriftServer. A single SQL query will be split into several statements by ThriftServer to run on Spark. This is another place where the session ID is helpful to consolidate statements into a report or analysis from a BI tool.
  • Detail. This can contain the query plan or debug information.

Scraping the SparkUI

Early on we considered scraping the SparkUI to capture this information. This had several shortcomings:

  • The need to create a system to monitor and scrape the SparkUIs for all user clusters.
  • The truncation of large query plans in the UI, resulting in lost information.
  • The lack of diagnostic data for failed queries. If a statement fails, its detail field will include a stack trace while the query is in a FAILED state. This state eventually moves to CLOSED and the stack trace is lost.
  • The roll off of queries after the THRIFTSERVER_UI_STATEMENT_LIMIT is hit. This Spark setting determines how many statements are retained for surfacing in the SparkUI. We could increase this value at the cost of additional memory overhead, but figuring out where to set this limit with unpredictable cluster loads is difficult.

Spark Listeners and State

Spark provides a variety of listener interfaces to monitor activity on the system. One option we explored was the QueryExecutionListener. This returns a QueryExecution object providing the query plans and execution times. Unfortunately the SQL query itself is not available.

We also looked into the SQLListener, which provides execution details throughout a statement lifecycle. This listener would require adding a stateful component for updating query metrics throughout the statement lifecycle.

An additional avenue we pursued was looping through the spark.sharedState.statusStore.executionsList. This sometimes yielded the query, but lacked the detailed plans of the listeners.

Additionally, none of the above methods provide the user name or session information we need to identify groups of queries. Fundamentally, the listeners are tracking statements as they are executed by Spark. This occurs after SQL queries are converted to statements by ThriftServer. The SQL query and the user name are discarded after ThriftServer generates the statements, so we have to find a way to get this information upstream of this conversion.

Ultimately we decided to trace the origins of the ThriftServer data surfaced in the SparkUI JDBC/ODBC Server tab. This had all the information we needed, and was clearly buried somewhere in the codebase. With any luck we could find a way to access it.

Capturing ThriftServer Queries in Spark 2.4.5

The question “Where does the SparkUI get ThriftServer data?” led us to the ThriftServerPage, which begins with a comment “Page for Spark Web UI that shows statistics of the thrift server.” Navigating to its parent, we see that the source of information is the HiveThriftServer listener.

The HiveThriftServer code provided some clues on how we can access the data surfaced in the SparkUI. A private internal class ExecutionInfo contains the data surfaced in the SparkUI. The HiveThriftServer2Listener provides access to the executionList, storing the ExecutionInfo objects for each query. You can see the use of the THRIFTSERVER_UI_STATEMENT_LIMIT in the retainedStatements variable, and the culling of the executionList in the trimStatementIfNecessary method, called in onStatementStart.

At this point we know how to access the queries, but given the statement limit we would need to know when the statements were running to capture their ExecutionInfo before they get trimmed from the executionList. Looking for the calls to onStatementStart led us to the SparkExecuteStatementOperation execute method, wherein we have three important pieces of information:

  • The inception of an ExecutionInfo object via onStatementStart.
  • The creation of a unique ID to identify the statement, referred to as the statementId in SparkExecuteStatementOperation, which is referred to as the groupId in the ExecutionInfo object.
  • Critically, the presence of logging, on the creation of a new statement and throughout its lifecycle.

We now know where to get ThriftServer query data and that we can use log files to follow a statement through its lifecycle, looking for the unique identifier statement ID, aka group ID, to differentiate between queries. An additional avenue we looked into was relating the statement ID to the data returned from the listeners. The listener data is indexed by SQL execution ID, not statement ID, and we were unable to find a link between the two.

Query Monitoring Platform Design and Deployment

AREMAC uses Databricks to enable users to connect and run queries. Clusters are set up for different user groups, utilizing table access controls and high concurrency clusters to enforce access constraints and improve performance. These cluster settings prohibit the use of Scala, which makes accessing the Spark internal listener a challenge.

In addition to accessing the listener, we also want to monitor log4j in real time to determine when a new statement is being executed. As it turns out, the real time monitoring need and constraint on users executing Scala code can both be dealt with by using a log4j appender, which can monitor the log and act on specific logging events. As a system level service the appender does not violate the cluster constraints on executing Scala code. Also, the log4j appender can access the HiveThriftServer listener to look up queries as they are executed.

At a high level, when a query is captured by the appender, the ExecutionInfo is written out to S3. A scheduled job consolidates the S3 data into a Delta table, creating a historical record of ThriftServer queries. This process is outlined in the diagram below.

Diagram of the Query Monitoring System, showing the capture and consolidate mechanisms
Query Monitoring System Diagram

Capture:

  1. Users run a report on a BI tool which sends SQL queries over JDBC/ODBC, where they are picked up by ThriftServer.
  2. Thrift server converts the SQL queries into statements, assigning a unique statement ID, signified by the numerical strings in the diagram.
  3. The statements are passed to the SparkSqlOperationManager where they are run on Spark.
  4. As a statement runs, updates are emitted in the log, carrying the statement ID.
  5. The log4j appender, ThriftAppender, listens for log messages, using the statement ID to lookup the corresponding ExecutionInfo object in the executionList in HiveThriftServer2Listener.
  6. The ExecutionInfo is serialized to JSON and written to S3.

Consolidate: A scheduled process consolidates the JSON files generated by ThriftAppender into a Delta table. This data is then analyzed for performance improvement opportunities.

The query monitoring is deployed as a library installed on a Databricks cluster, with an initialization script to set up the log4j appender and S3 location for saving the query data. This makes setting up query monitoring on new Databricks clusters simple and straightforward.

Improving Performance with Query Monitoring Data

We are just beginning to leverage the data we’ve collected from user clusters to improve the AREMAC platform. On the platform side, we’re consolidating queries into like-structures using a query signature analysis. An example of this analysis is below:

Table displaying a few different query signatures and associated metrics.
Preliminary query signature analysis sample

Using the python sqlparse library, we created a query signature to group similar queries together by their structure. The left-most column above contains some of these signatures. To identify which signatures to further analyze, we look at two metrics:

  • Total time: The execution time x the number of occurrences. This gives us an idea of how costly a signature is.
  • Normalized standard deviation of the execution time: The lower this number, the smaller the deviation is, meaning the runtime of this signature is more consistent. It wont help for us to improve performance of a query signature that has a single long runtime outlier affecting the total system time.

Using query signature analysis we’ve identified common lookup tables, case statements, and temporary table creations that are typical of BI tool query generation. These can be converted to hive tables and surfaced in AREMAC, as opposed to being created on the fly every time a query runs, saving execution time.

Query monitoring data is also being used directly by our user groups, who use the query data to identify optimization opportunities within their BI tools and queries. As a result query monitoring provides insight and optimization opportunities for both the platform and the end user.

In the future we plan to use this dataset to identify additional improvements, such as aggregations and partitioning strategies, as well as leveraging the query information for data lineage of user generated data products.

--

--