Impala Query Profile Explained – Part 5 – Query Metrics

Impala Query Profile Explained – Part 5 – Query Metrics

This is part 5 of my Impala Query Profile Explained series. If you have missed the last 4 parts, please follow the links below:

Impala Query Profile Explained – Part 1
Impala Query Profile Explained – Part 2
Impala Query Profile Explained – Part 3
Impala Query Profile Explained – Part 4

Thanks to my colleague Robbie Zhang, who is the SME (Subject Matter Expertise) for Impala in my team. He had spent a great deal of time trying to read through the Impala source code and summarized the full list of metrics that you will see in Impala Profile and explained what each metric means. He has kindly agreed for me to post his work in my blog to share with the wider audience.

There is not much for me to say, but to copy and paste Robbie’s work. Please keep in mind that Impala Query Profile content changes over time in different versions. This list was based on latest CDH 5.16 and CDH 6.x version of Impala, so depending on the version of Impala you are using, the metric you are looking for might not be on this list.

Summary

Admission result:

Here we can see if the query was admitted immediately, queued or rejected. If the query was queued or rejected, we can see the reason.

Start Time:

The time when the query was submitted.

End Time:

The time when the query was unregistered.

ExecSummary:

The summary of each operator(exec node), including time, number of rows and memory usage. It’s invisible until the query is closed.

Max Per-Host Resource Reservation:

Maximum possible (in the case all fragments are scheduled on all hosts with max DOP) minimum reservation required per host, in bytes. It’s the initially reserved memory for this query.

Per-Host Resource Estimates:

Estimated per-host peak memory consumption in bytes. Used by admission control. It’s ignored if query option MEM_LIMIT is set.

Plan:

Query plan delimited by lines “—————”.

Query Compilation / Planner Timeline:

It was named “Planner Timeline” until CDH5.15.0. Timeline of important events in the planning process (FE/Java), used for debugging and profiling. The timelines started from the time when logging message “Analyzing query: <query string>” appeared.

Query Timeline:

Timeline of important events in the execution process (BE/C++). The timelines started from the time when this query was registered.

ImpalaServer

ClientFetchWaitTimer:

Time spent by the coordinator while idle waiting for a client to fetch rows.

MetastoreUpdateTimer:

Time spent to gather and publish all required updates to the metastore.

RowMaterializationTimer:

Time spent by the coordinator to fetch rows.

Execution Profile

Metrics in Coordinator

CatalogOpExecTimer

Time spent by the coordinator to send catalog operation execution request and wait for the response from the catalogd.

ComputeScanRangeAssignmentTimer:

Compute the assignment of scan ranges to hosts for each scan node.

FiltersReceived:

The total number of filter updates received (always 0 if filter mode is not GLOBAL). Excludes repeated broadcast filter updates. 

FinalizationTimer:

Total time spent in finalization (typically 0 except for INSERT into HDFS tables).

Metrics in Fragment Instance:

ExecTime:

Time spent in fragment execution. Basically, we can consider it as ExecTreeExecTime plus the time used by the sink to send rows to the next fragment or client.

ExecTreeExecTime:

Time spent by execution node and its offsprings in this fragment to retrieve rows and return them via row_batch.

ExecTreeOpenTime:

Time spent by execution node and its offsprings in this fragment to perform any preparatory work prior to retrieving rows.

Filter X arrival:

The amount of time waited since registration for the filter to arrive. 0 means that filter has not yet arrived.

Fragment Instance Lifecycle Event Timeline

Event sequence tracking the completion of various stages of this fragment instance. The timelines started after the timestamp of message “descriptor table for query=<query_id>” in logs.

OpenTime:

Time spent in fragment Open() logic. It includes ExecTreeOpenTime, the time to generate LLVM code and the time to open sink in this fragment.

PerHostPeakMemUsage:

A counter for the per query, per host peak mem usage. Note that this is not the max of the peak memory of all fragments running on a host since it needs to take into account when they are running concurrently. All fragments for a single query on a single host will have the same value for this counter.

PrepareTime:

Time to prepare for fragment execution.

RowsProduced:

The number of rows returned by this fragment instance.

TotalNetworkReceiveTime:

Total time spent receiving over the network (across all threads).

TotalNetworkSendTime:

Total time spent waiting for RPCs to complete. This time is a combination of:

  • network time of sending the RPC payload to the destination
  • processing and queuing time in the destination
  • network time of sending the RPC response to the originating node

TotalStorageWaitTime:

Total time waiting in storage (across all threads).

TotalThreads:

Total CPU utilization for all threads in this plan fragment.

Common metrics:

InactiveTotalTime:

Total time spent waiting (on non-children) that should not be counted when computing local_time_percent_. This is updated for example in the exchange node when waiting on the sender from another fragment.

LocalTime:

Time spent in this node (not including the children). Computed in ComputeTimeInProfile().

Node Lifecycle Event Timeline

ExecNode lifecycle events for this ExecNode. Introduced since CDH6.2.0. Same as Fragment Instance Lifecycle Event Timeline, the timelines here also started after the timestamp of message “descriptor table for query=<query_id>” in logs.

TotalTime:

The total elapsed time.

Metrics in Buffer Pool:

AllocTime:

The total amount of time spent inside BufferAllocator::AllocateBuffer().

CumulativeAllocationBytes:

Bytes of buffers allocated via BufferAllocator::AllocateBuffer().

CumulativeAllocations:

The number of buffers allocated via BufferAllocator::AllocateBuffer().

PeakReservation:

The tracker’s peak reservation in bytes.

PeakUnpinnedBytes:

The peak total size of unpinned pages.

PeakUsedReservation:

The tracker’s peak usage in bytes.

ReadIoBytes:

Total bytes read from disk.

ReadIoOps:

The total number of read I/O operations issued.

ReadIoWaitTime:

Amount of time spent waiting for reads from disk to complete.

ReservationLimit:

The hard limit on the tracker’s reservations.

WriteIoBytes:

Total bytes written to disk.

WriteIoOps:

The total number of write I/O operations issued.

WriteIoWaitTime:

Amount of time spent waiting for writes to disk to complete.

Metrics in Data Sender:

BytesSent:

Time series of the number of bytes sent, samples bytes_sent_counter_.

EosSent:

Total number of EOS sent.  

NetworkThroughput:

Summary of network throughput for sending row batches. Network time also includes queuing time in KRPC transfer queue for transmitting the RPC requests and receiving the responses.

OverallThroughput:

Throughput per total time spent in the sender.

RowsReturned:

The number of row batches enqueued into the row batch queue.

RowsSent:

The total number of rows sent.  

RpcFailure:

The total number of times RPC fails or the remote responds with a non-retryable error.

RpcRetry:

Number of TransmitData() RPC retries due to remote service being busy.

SerializeBatchTime:

Time for serializing row batches.

TotalBytesSent:

The total number of bytes sent. Updated on RPC completion.

TransmitDataRPCTime:

The concurrent wall time spent sending data over the network.

UncompressedRowBatchSize:

The total number of bytes of row batches before compression.  

Metrics in Data Receiver:

BytesDequeued:

Time series of bytes of deserialized row batches, samples ‘bytes_dequeued_counter_’.

BytesReceived:

The total number of bytes of serialized row batches received.

BytesSkipped:

The number of bytes skipped when advancing to next sync on error.

DataWaitTime:

Total wall-clock time spent waiting for data to be available in queues.

DeferredQueueSize:

Time series of the number of deferred row batches, samples ‘num_deferred_rpcs_’.

DeserializeRowBatchTimer:

Total wall-clock time spent deserializing row batches.  

DispatchTime:

Summary stats of time which RPCs spent in KRPC service queue before being dispatched to the RPC handlers.

FirstBatchArrivalWaitTime:

Time spent waiting until the first batch arrives across all queues.

FirstBatchWaitTime:

Wall-clock time spent waiting for the first batch arrival across all queues.

SendersBlockedTimer:

Wall time senders spend waiting for the recv buffer to have the capacity.

SendersBlockedTotalTimer:

Total time (summed across all threads) spent waiting for the recv buffer to be drained so that new batches can be added. Remote plan fragments are blocked for the same amount of time.

TotalBatchesEnqueued:

The total number of deserialized row batches enqueued into the row batch queues.

TotalBatchesReceived:

The total number of serialized row batches received.

TotalBytesDequeued:

The number of bytes of deserialized row batches dequeued.

TotalBytesReceived:

The total number of bytes of serialized row batches received.

TotalEarlySenders:

The total number of senders which arrive before the receiver is ready.

TotalEosReceived:

Total number of EOS received.

TotalGetBatchTime:

Total wall-clock time spent in SenderQueue::GetBatch().  

TotalHasDeferredRPCsTime:

Total wall-clock time in which the ‘deferred_rpcs_’ queues are not empty.

TotalRPCsDeferred:

Total number of RPCs whose responses are deferred because of early senders or full row batch queue.

Metrics in Data Sink:

BytesWritten:

The total number of bytes written into files.

CompressTimer:

Time spent compressing data before writing into files.

EncodeTimer:

Time spent converting tuple to the on-disk format.

FilesCreated:

The number of created files.

HdfsWriteTimer:

Time spent writing to HDFS.

KuduApplyTimer:

Time spent applying Kudu operations. In normal circumstances, the Kudu operation should be negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled. Significant KuduApplyTimer may indicate that Kudu cannot buffer and send rows as fast as the sink can write them.

NumRowErrors:

The number of (Kudu) rows with errors.

PartitionsCreated:

The total number of partitions created.

RowsInserted:

The number of inserted rows.

RowsProcessedRate:

The rate at which the sink consumes and processes rows, i.e. writing rows to Kudu or skipping rows that are known to violate nullability constraints.

TotalNumRows:

The total number of rows processed, i.e. rows written to Kudu and also rows with errors.

Metrics in TopN Node:

InsertBatchTime:

Timer for time spent in InsertBatch() function (or codegen’d version) .

TuplePoolReclamations:

Number of times tuple pool memory was reclaimed

Metrics in Scan Node:

AverageHdfsReadThreadConcurrency:

The average number of HDFS read threads executing read operations on behalf of this scan. Higher values show that this scan is using a larger proportion of the I/O capacity of the system. Lower values show that either this thread is not I/O bound or that it is getting a small share of the I/O capacity of the system because of other concurrently executing queries.

AverageScannerThreadConcurrency:

The average number of scanner threads executing between Open() and the time when the scan completes. Present only for multithreaded scan nodes.

BytesRead:

Total bytes read from disk by this scan node. Provided as a counter as well as a time series that samples the counter. Only implemented for scan node subclasses that expose the bytes read, e.g. HDFS and HBase.

BytesReadDataNodeCache:

The total number of bytes read from the data node cache.

BytesReadLocal:

The total number of bytes read locally.

BytesReadRemoteUnexpected:

The total number of bytes read remotely that were expected to be local.

BytesReadShortCircuit:

The total number of bytes read via short circuit read.

CachedFileHandlesHitCount:

The total number of file handle opens where the file handle was present in the cache.

CachedFileHandlesMissCount:

The total number of file handle opens where the file handle was not in the cache.

CollectionItemsRead:

The total number of nested collection items read by the scan. Only created for scans (e.g. Parquet) that support nested types.

DecompressionTime:

Time spent decompressing bytes.

DelimiterParseTime:

Time spent parsing the bytes for delimiters in text files.

FooterProcessingTime:

Average and min/max time spent processing the (parquet) footer by each split.

Hdfs Read Thread Concurrency Bucket:

The bucket counting (%) of HDFS read thread concurrency.

MaterializeTupleTime:

Wall clock time spent materializing tuples and evaluating predicates. Usually, it’s affected by the load on the CPU and the complexity of expressions.

MaxCompressedTextFileLength:

The size of the largest compressed text file to be scanned. This is used to estimate scanner thread memory usage.

NumColumns:

The number of (parquet) columns that need to be read.

NumDictFilteredRowGroups:

The number of (parquet) row groups skipped due to dictionary filter.

NumDisksAccessed:

The number of distinct disks accessed by HDFS scan. Each local disk is counted as a disk and each remote disk queue (e.g. HDFS remote reads, S3) is counted as a distinct disk.

NumRowGroups:

The number of (parquet) row groups that need to be read.

NumScannerThreadsStarted:

The number of scanner threads started for the duration of the scan node. This is at most the number of scan ranges but should be much less since a single scanner thread will likely process multiple scan ranges. This is *not* the same as peak scanner thread concurrency because the number of scanner threads can fluctuate during the execution of the scan.

NumScannersWithNoReads:

The number of scanners that end up doing no reads because their splits don’t overlap with the midpoint of any row-group in the (parquet) file.

NumStatsFilteredRowGroups:

The number of row groups that are skipped because of Parquet row group statistics.

PeakScannerThreadConcurrency:

The peak number of scanner threads executing at any one time. Present only for multithreaded scan nodes.

PerReadThreadRawHdfsThroughput:

The read throughput in bytes/sec for each HDFS read thread while it is executing I/O operations on behalf of this scan.

RemoteScanRanges:

The total number of remote scan ranges.

RowBatchBytesEnqueued:

The number of row batches and bytes enqueued in the scan node’s output queue.

RowBatchesEnqueued:

The number of row batches enqueued into the row batch queue.

RowBatchQueueCapacity:

The capacity in batches of the scan node’s output queue.

RowBatchQueueGetWaitTime:

Wall clock time that the fragment execution thread spent blocked waiting for row batches to be added to the scan node’s output queue.

RowBatchQueuePeakMemoryUsage:

Peak memory consumption of row batches enqueued in the scan node’s output queue.

RowBatchQueuePutWaitTime:

Wall clock time that the scanner threads spent blocked waiting for space in the scan node’s output queue when it is full.

RowsRead:

The number of top-level rows/tuples read from the storage layer, including those discarded by predicate evaluation. Used for all types of scans.

ScannerIoWaitTime:

The total amount of time scanner threads spent waiting for I/O. This is comparable to ScannerThreadsTotalWallClockTime in the traditional HDFS scan nodes and the scan node total time for the MT_DOP > 1 scan nodes.  Low values show that each I/O completed before or around the time that the scanner thread was ready to process the data. High values show that scanner threads are spending significant time waiting for I/O instead of processing data.  Note that if CPU load is high, this can include the time that the thread is runnable but not scheduled.

ScannerThreadsSysTime / ScannerThreadsUserTime / ScannerThreadsVoluntaryContextSwitches / ScannerThreadsInvoluntaryContextSwitches:

These are aggregated counters across all scanner threads of this scan node. They are taken from getrusage. See RuntimeProfile::ThreadCounters for details.

ScannerThreadsTotalWallClockTime:

Total wall clock time spent in all scanner threads.

ScanRangesComplete:

The number of scan ranges completed. Initialized for scans that have a concept of “scan range”.

TotalRawHbaseReadTime:

The total wall clock time spent in HBase read calls. For example, if we have 3 threads and each spent 1 sec, this counter will report 3 sec.

TotalRawHdfsOpenFileTime:

The total wall clock time spent by Disk I/O threads in HDFS open operations. For example, if we have 3 threads and each spent 1 sec, this counter will report 3 sec.

TotalRawHdfsReadTime:

The total wall clock time spent by Disk I/O threads in HDFS read operations. For example, if we have 3 threads and each spent 1 sec, this counter will report 3 sec.

TotalReadThroughput:

BytesRead divided by the total wall clock time that this scan was executing (from Open() to Close()). This gives the aggregate rate that data is read from disks. If this is the only scan executing, ideally this will approach the maximum bandwidth supported by the disks.

Metrics in Runtime Filter:

BloomFilterBytes:

The total amount of memory allocated to Bloom Filters.

Rows processed:

Total number of rows to which each filter was applied

Rows rejected:

Total number of rows that each filter rejected.

Rows total:

The total number of rows that each filter could have been applied to (if it were  available from row 0).

Metrics in Join Node:

BuildRows:

The number of build (right child) rows.

BuildRowsPartitioned:

The number of build rows that have been partitioned.

BuildRowsPartitionTime:

Time spent partitioning build rows.

BuildTime:

Time to prepare build side.

HashBuckets:

The total number of hash buckets across all partitions.

HashCollisions:

The number of cases where we had to compare buckets with the same hash value, but the row equality failed.

HashTablesBuildTime:

Time spent building hash tables.

LargestPartitionPercent:

The largest fraction after repartitioning. This is expected to be 1 / PARTITION_FANOUT. A value much larger indicates skew.

MaxPartitionLevel:

Level of the max partition (i.e. number of repartitioning steps).

NullAwareAntiJoinEvalTime:

Time spent evaluating other_join_conjuncts for NAAJ.  

NumHashTableBuildsSkipped:

The number of partitions which had zero probe rows and we therefore didn’t build the hash table.

NumRepartitions:

The number of partitions that have been repartitioned.

PartitionsCreated:

The total number of partitions created.

ProbeRows:

The number of probe (left child) rows.

ProbeRowsPartitioned:

The number of probe rows that have been partitioned.  

ProbeTime:

Time to process the probe (left child) batch.

RepartitionTime:

Time spent repartitioning and building hash tables of any resulting partitions that were not spilled.

SpilledPartitions:

The number of partitions that have been spilled.

Metrics in Sort Node:

InitialRunsCreated / RunsCreated:

The number of initial runs created (Sorter).

InMemorySortTime:

Time spent sorting initial runs in memory.

MergeGetNext:

Time to get and return the next batch of sorted rows from this merger.

MergeGetNextBatch:

Times calls to get the next batch of rows from the input run.

NumRowsPerRun:

Min, max, and avg size of runs in number of tuples (Sorter).

TotalMergesPerformed:

Number of merges of sorted runs.

SortDataSize:

The total size of the initial runs in bytes.

SpilledRuns:

The number of runs that were unpinned and may have spilled to disk, including initial and intermediate runs.

Metrics in Aggregation Node:

BuildTime:

Time to prepare build side.

GetResultsTime:

Time spent returning the aggregated rows.

HashBuckets:

The total number of hash buckets across all partitions.

HTResizeTime:

Total time spent resizing hash tables.

LargestPartitionPercent:

The largest fraction after repartitioning. This is expected to be 1 / PARTITION_FANOUT. A value much larger indicates skew.

MaxPartitionLevel:

Level of the max partition (i.e. number of repartitioning steps).

NumRepartitions:

The number of partitions that have been repartitioned.

PartitionsCreated:

The total number of partitions created.

ReductionFactorEstimate:

The estimated reduction of the pre-aggregation.

ReductionFactorThresholdToExpand:

Expose the minimum reduction factor to continue growing the hash tables.

RowsPassedThrough:

The number of rows passed through without aggregation.

RowsRepartitioned:

The number of rows that have been repartitioned.  

SpilledPartitions:

The number of partitions that have been spilled.

StreamingTime:

Time spent in streaming pre-aggregation algorithm.  

Metrics in Exchange Node:

ConvertRowBatchTime:

Time spent reconstructing received rows

Metrics in Eval Node:

EvaluationTime:

Time spent processing the child rows (AnalyticEvalNode).

Metrics in CodeGen:

CodegenTime:

Time spent doing codegen (adding IR to the module).

CompileTime:

Time spent compiling the module.

ExecTreePrepareTime:

Time to prepare sink, set up internal structures in execution node and its offsprings, start the profile-reporting thread and wait until it’s active.

LoadTime:

Time spent reading the .ir file from the file system.

ModuleBitcodeSize:

The total size of bitcode modules loaded in bytes.

NumFunctions:

The number of functions that are optimized and compiled after pruning unused functions from the module.

NumInstructions:

The number of instructions that are optimized and compiled after pruning unused functions from the module.

OptimizationTime:

Time spent optimizing the module.

PrepareTime:

Time spent constructing the in-memory module from the ir.

Metrics in TmpFileMgr:

TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files on the filesystem and I/O to and from them.

ScratchBytesRead:

The number of bytes read from disk (includes reads started but not yet completed).

ScratchBytesWritten:

The number of bytes written to disk (includes writes started but not yet completed).

ScratchFileUsedBytes:

Amount of scratch space allocated in bytes.

ScratchReads:

The number of READ operations (includes reads started but not yet completed).

ScratchWrites:

The number of write operations (includes writes started but not yet completed).

TotalEncryptionTime:

Time spent in disk spill encryption, decryption, and integrity checking.

TotalReadBlockTime:

Time spent waiting for disk reads.

3 Comments

    1. Eric Lin

      Hi Hemanth,

      Thanks for visiting my blog and provide positive feedback. I am glad that my blog helps in anyway.

      I will keep posting more articles related to Impala.

      Cheers
      Eric

  1. Harsha

    Hi Eric,

    Really helpful posts. I have gone through all the parts. Very informative. Thanks for creating these.

    I have a question on query profiles. There is a ExecSummary in the Summary section, which is a string blob of summary data which displays information about operators and fragments. Can this be built using the tree which is present in the thrift data? Parsing the ExecSummary to get the required information is quite hard. Can the same data be obtained from the tree in the profile? Could you please help regarding this?

Leave a Reply

Your email address will not be published.

My new Snowflake Blog is now live. I will not be updating this blog anymore but will continue with new contents in the Snowflake world!