Skip to content

Conversation

@dengzhhu653
Copy link
Member

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

@sonarqubecloud
Copy link

Comment on lines +4653 to +4655
part_vals = getPartValsFromName(t, dropPartitionReq.getPartName());
}
partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
part_vals = getPartValsFromName(t, dropPartitionReq.getPartName());
}
partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals));
partNames.add(dropPartitionReq.getPartName());
} else {
partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals));
}

private A result;
private boolean async;
private Future<A> future;
private ExecutorService executor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use a shared thread pool for the operation handler? In the current implementation, the number of threads is not bounded, which could lead to resource exhaustion or even crashes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of handler's threads is limited by the max threads the thrift server can spawn, which is set by hive.metastore.server.max.threads.

Copy link
Member Author

@dengzhhu653 dengzhhu653 Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In production, I think we shouldn't not have such a high drop databases/table operations happens near the same time, usually the database is the bottleneck before the Metastore hits the limit, if this is the case, we can tune down the hive.metastore.server.max.threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the async mode, a thread may trigger multiple operation handlers, so the hive.metastore.server.max.threads could not limit total threads here. If we configure a fixed size pool for the async operations, it can help limit service traffic to some extent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the new handler runs inside the same thread as the parent, such as DropDatabaseHandler, involves multiple DropTableHandlers, these DropTableHandlers runs inside the same thread as the DropDatabaseHandler

Copy link
Member Author

@dengzhhu653 dengzhhu653 Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually the new handler runs inside the same thread as the parent, such as DropDatabaseHandler, involves multiple DropTableHandlers, these DropTableHandlers run inside the same thread as the DropDatabaseHandler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I mean that for async request, the HMSHandler thread could create an operation handler where the executor starts a new thread. Then the HMSHandler thread finish this request immediatly and could handle another request where it may produce another new thread.

If such async request is frequent, it may lead to an explosion in the number of threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though Metastore handles the async request in background, the client doesn't, it pings the server for the status until the end:

while (!resp.isFinished() && !Thread.currentThread().isInterrupted()) {
resp = client.drop_database_req(req);
if (resp.getMessage() != null) {
LOG.info(resp.getMessage());
}
}

the client will know whether the request is successful or not as usual at the end, and the Metastore needs a handler thread to answer the request.

the HMSHandler thread finish this request immediatly

result = async ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();

Now it will wait for 5 seconds before answering the API for long running drop unless the request is satisfied within this timeout, then we can get the result.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are customized clients based on ThriftHiveMetastore.Iface, which may not guarantee such behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncDrop defaults to false, unless it's specified explicitly as true, then the customized client should take care of his case.

@saihemanth-cloudera
Copy link
Contributor

A couple of test failures seem to be related to this patch.

wh.addToChangeManagement(funcCmPath);
}
if (req.isDeleteData()) {
// Moving the data deletion out of the async handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this into the operation handler, because if a thrift client only calls this api once in async mode, then such cleanup code would never be run.

Copy link
Member Author

@dengzhhu653 dengzhhu653 Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In async mode, the client still need to ping the server for the operation status until the end, the client needs to know whether the request is a failure or not.
The main reason is the TUGIBasedProcessor/TUGIAssumingProcessor might close the shared FileSystem behind, causing the "java.io.IOException: Filesystem closed" for the handler running in background.

Still we need to address this "Filesystem closed" issue, as we don't know whether there are Filesystem operations in the Metastore listeners.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileSystem.closeAllForUGI(clientUgi); in TUGIAssumingProcessor seems a bug, assume that there are two requests with same ugi to handle the same path uri concurrently, it may also hit the "Filesystem closed" issue.

This is indeed a tricky problem, not sure if we can only remove cache for inactive ugi to solve it. And for this thread, it still has an issue if the client crush between two pings before the operation handler finished, the cleanup code will not take effect either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, we should take the client crash into the picture

if (ugiTransport.getClientUGI() == null) {
ugiTransport.setClientUGI(clientUgi);
}
clientUgi = ugiTransport.getClientUGI();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line unnecessary? clientUgi is already initialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ugi is identical: https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java#L483-L491,
reuse the ugi cached in ugiTransport if possible so the connection will get the same FileSystem instance from cache in the whole lifetime

if (request.isNeedResult()) {
AddPartitionsHandler addPartsOp = AbstractOperationHandler.offer(this, request);
if (addPartsOp.success() && request.isNeedResult()) {
AddPartitionsHandler.AddPartitionsResult addPartsResult = addPartsOp.getResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we store the partition list in the AddPartitionsResult and return it directly here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no enough, the addPartsOp.success() need to check on the state(success or not) of addPartsOp.getResult()

if (async) {
OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, TimeUnit.HOURS);
}
afterExecute(resultV);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If afterExecute() is needed only when the execute() is success, we can check the result here

Suggested change
afterExecute(resultV);
if (resultV != null && resultV.success()) {
afterExecute(resultV);
}```

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the afterExecute is also called in case of failure to free up some resources the handler might hold

@sonarqubecloud
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants