Skip to content

Spark: preload delete files to avoid deadlocks#15712

Open
kinolaev wants to merge 1 commit intoapache:mainfrom
kinolaev:preload-delete-files
Open

Spark: preload delete files to avoid deadlocks#15712
kinolaev wants to merge 1 commit intoapache:mainfrom
kinolaev:preload-delete-files

Conversation

@kinolaev
Copy link
Copy Markdown

In spark data file loading holds a connection while lazy delete file loading tries to acquire another. When number of simultaneous connections is limited (for example by http-client.apache.max-connections) it leads to a deadlock as soon as all connections are held by data files loading. To avoid the deadlock this PR adds delete file preloading to SparkDeleteFilter constructor.

The problem can be reproduced using spark-sql with S3FileIO and spark.sql.catalog.iceberg.http-client.apache.max-connections=1:

create table sparkdeletefilter(id bigint)
  tblproperties('write.delete.mode'='merge-on-read');
-- create a data file
insert into sparkdeletefilter select id from range(2);
-- create a delete file
delete from sparkdeletefilter where id in (select id from range(0, 2, 2));
-- Reader opens the data file first, keeps it open and
-- fails to load the delete file with ConnectionPoolTimeoutException
select count(id) from sparkdeletefilter;

Signed-off-by: Sergei Nikolaev <kinolaev@gmail.com>
@danielcweeks
Copy link
Copy Markdown
Contributor

@kinolaev is this an issue you see in practice or just if you reduce the connection pool size? Typically, there would only be one data file processed by a task, so as long as there's at least two connections available, I would think that the delete operation would be able to process the data file (the task being operated on) and at least one delete at a time.

I believe the default connection pool size is 50, so why would this be an issue in practice?

@kinolaev
Copy link
Copy Markdown
Author

kinolaev commented Mar 24, 2026

I see the issue in practice. I guess data files are loaded in parallel and at least in case of S3FileIO the connection pool is shared. I'm sure that two connections are not enough. I've just added two more inserts and deletes

create table sparkdeletefilter(id bigint)
  tblproperties('write.delete.mode'='merge-on-read');
insert into sparkdeletefilter select id from range(0, 2);
insert into sparkdeletefilter select id from range(2, 4);
insert into sparkdeletefilter select id from range(4, 6);
delete from sparkdeletefilter where id in (select id from range(0, 2, 2));
delete from sparkdeletefilter where id in (select id from range(2, 4, 2));
delete from sparkdeletefilter where id in (select id from range(4, 6, 2));
select count(id) from sparkdeletefilter;

and about half the time I get ConnectionPoolTimeoutException with http-client.apache.max-connections=2. The more operations I add the more often it fails. It's a race condition I suppose. With the PR applied there is no exception.
I don't know exactly how many data and delete files we need to lock 50 connections but I guess it's less than 1000.

Upd: I see what you mean - the parallelism is limited by available executor cores. In theory two connections should always be enough for an executor with one core. I will check it later.

@kinolaev
Copy link
Copy Markdown
Author

kinolaev commented Mar 24, 2026

Thanks @danielcweeks , you are right, most probably I have a configuration issue: I've more executor cores than connections in the pool. Locally the example runs successfully with --master local[x] and http-client.apache.max-connections=y when x<y. I will check it in production later.
So, the problem can affect big executors in production with default max-connections value. It would be good to either preload deletes or document that the connection pool size should be greater than the number of parallel processed data files.
Is there any benefit of lazy loading delete files? As I understand, they must always be loaded before we can start processing data rows. That is why I don't see any reason to open a connection for data file before building its filter predicate. What do you think?

@kinolaev
Copy link
Copy Markdown
Author

@danielcweeks , I've double checked production logs, I was wrong, in production I had no problem with the connection pool exhausted by data file connections, the timeouts were always caused by ManifestFilterManager #15713. The problem this PR addresses I only encountered locally, when I tried to reproduce ManifestFilterManager's issue by reducing max-connections.
#15713 was also caused by invalid configuration: thread count * 2 > connection pool size. I run spark in kubernetes, and although I set spark.executor.cores, it isn't used for the executor's resources.limits.cpu. That is why I had too many threads for the ManifestFilterManager.filterManifests call, because worker pool size was based on node cpu count instead of container cpu count https://github.com/apache/iceberg/blob/apache-iceberg-1.10.1/core/src/main/java/org/apache/iceberg/SystemConfigs.java#L33-L54.
Without any patches 50 connections should be enough for 4 cpu cores: 4 tasks in parallel, 4 threads in worker pool, 16 threads in delete worker pool. In the worst case each thread opens 2 simultaneous connections, that's 48 connections in total.
I still think that loading delete files before a data file is the right thing. But, yes, it doesn't fix anything or significantly reduce resource usage in a properly configured setup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants