feat: expose scan stats to spark sql metrics#556
Conversation
|
@yanghua Could you please take a look at it ? |
|
Got it! Will review it later. |
| } | ||
| } | ||
| if (fragmentReader != null) { | ||
| metricsTracker.addScanStats(fragmentReader.getScanStats()); |
There was a problem hiding this comment.
In some cases, this logic may cause duplicated accumulation. Because the next() method may be re-called in some scenarios. So, it would be better to accumulate it when fragmentReader#close(), or do something like this:
private boolean currentReaderStatsAdded = false;
while (...) {
if (fragmentReader != null) {
if (!currentReaderStatsAdded) {
metricsTracker.addScanStats(fragmentReader.getScanStats());
currentReaderStatsAdded = true;
}
fragmentReader.close();
}
fragmentReader = LanceFragmentColumnarBatchScanner.create(...);
currentReaderStatsAdded = false;
...
}
if (fragmentReader != null && !currentReaderStatsAdded) {
metricsTracker.addScanStats(fragmentReader.getScanStats());
currentReaderStatsAdded = true;
}
return false;
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { |
There was a problem hiding this comment.
Maybe we also need to accumulate the metrics here to avoid the missing? We can combine with the currentReaderStatsAdded (in the front comment) flag to avoid duplicate accumulation.
|
|
||
| // Test new metrics | ||
| assertTrue( | ||
| metrics.getOrDefault(LanceCustomMetrics.NUM_IOPS, 0L) >= 0, "numIops should be >= 0"); |
There was a problem hiding this comment.
May these assertions not work? Because the default value is 0, right?
| @@ -31,7 +35,7 @@ public abstract class BaseLanceCustomMetricsTest { | |||
| @Test | |||
| void testAllMetricsReturnsSixMetrics() { | |||
There was a problem hiding this comment.
Six is not correct now, right?
| | `numFragmentsScanned` | counter | Lance fragments actually opened by this task. Compare against the table fragment count to verify pruning is working. | | ||
| | `numBatchesLoaded` | counter | Arrow batches returned from the JNI scanner. | | ||
| | `numRowsScanned` | counter | Rows read from storage before filter evaluation. Pair with Spark's built-in `numOutputRows` to compute filter selectivity (`numOutputRows / numRowsScanned`). | | ||
| | `numIops` | counter | Number of I/O operations performed during scanning. | |
There was a problem hiding this comment.
Number of I/O operations performed by the scanner.
| | `numRowsScanned` | counter | Rows read from storage before filter evaluation. Pair with Spark's built-in `numOutputRows` to compute filter selectivity (`numOutputRows / numRowsScanned`). | | ||
| | `numIops` | counter | Number of I/O operations performed during scanning. | | ||
| | `numRequests` | counter | Number of requests made to the storage layer. | | ||
| | `numBytesRead` | counter | Total bytes read from storage. | |
There was a problem hiding this comment.
Total bytes read from storage by the scanner.
|
@yanghua I have fixed the CR comments. Could you please look at it again? Thanks a lot. |
No description provided.