Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/SystemConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ private SystemConfigs() {}
Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
Integer::parseUnsignedInt);

/** Sets the core size of the thread pool used for refreshing authentication data. */
public static final ConfigEntry<Integer> AUTH_REFRESH_THREAD_POOL_SIZE =
new ConfigEntry<>(
"iceberg.rest.auth.refresh.num-threads",
"ICEBERG_AUTH_REFRESH_NUM_THREADS",
1,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we eventually want to share this thread pool, I wonder whether the default should be higher

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 1 is fine. Most of the time, we'll have only 1 auth manager per JVM, and sometimes 2 (e.g. request signing + credential refreshing) – but realistically, I think we won't see more than 2 per JVM. Given that the typical usage of this executor is to schedule token refreshes every hour or so, 1 is probably fine (also: this is the core pool size, not the maximum pool size). Wdyt?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, yeah ok I think 1 should be fine then

Integer::parseUnsignedInt);

/** Whether to use the shared worker pool when planning table scans. */
public static final ConfigEntry<Boolean> SCAN_THREAD_POOL_ENABLED =
new ConfigEntry<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
/**
* An {@link AuthManager} that provides machinery for refreshing authentication data asynchronously,
* using a background thread pool.
*
* @deprecated since 1.10.0, will be removed in 1.11.0; use {@link ThreadPools#authRefreshPool()}.
*/
@Deprecated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're deprecating this class, then I don't think we should be removing functionality here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact I believe we should only be deprecating this here without updating the functionality (as that would be a breaking change). Once RefreshingAuthManager is removed, we can switch the OAuth2Manager to use the new auth refresh pool, but I don't think we can deprecate + switch within the same release

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra I don't mind doing the change in 2 steps, but in which case do you think this would be a breaking change? The API is not affected (and revapi didn't complain).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if somebody else from the community is extending RefreshingAuthManager and relying on its functionality? We're effectively removing the "refresh" part, which is a breaking change in behavior, even though RevAPI wasn't complaining

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we can do though, is revert the changes to RefreshingAuthManager just in case someone is extending that class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I commented before I saw your comment 😅 – I agree that's an issue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we can do though, is revert the changes to RefreshingAuthManager just in case someone is extending that class.

Yes that's what I was proposing in my earlier comment

public abstract class RefreshingAuthManager implements AuthManager {

private static final Logger LOG = LoggerFactory.getLogger(RefreshingAuthManager.class);
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ThreadPools.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.iceberg.util;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -51,6 +53,9 @@ private ThreadPools() {}
private static final ExecutorService DELETE_WORKER_POOL =
newExitingWorkerPool("iceberg-delete-worker-pool", DELETE_WORKER_THREAD_POOL_SIZE);

public static final int AUTH_REFRESH_THREAD_POOL_SIZE =
SystemConfigs.AUTH_REFRESH_THREAD_POOL_SIZE.value();

/**
* Return an {@link ExecutorService} that uses the "worker" thread-pool.
*
Expand Down Expand Up @@ -82,6 +87,20 @@ public static ExecutorService getDeleteWorkerPool() {
return DELETE_WORKER_POOL;
}

/**
* A shared {@link ScheduledExecutorService} that REST catalogs can use for refreshing their
* authentication data.
*/
public static ScheduledExecutorService authRefreshPool() {
return AuthRefreshPoolHolder.INSTANCE;
}

private static class AuthRefreshPoolHolder {
private static final ScheduledExecutorService INSTANCE =
ThreadPools.newExitingScheduledPool(
"auth-session-refresh", AUTH_REFRESH_THREAD_POOL_SIZE, Duration.ZERO);
}

/**
* Creates a fixed-size thread pool that uses daemon threads. The pool is wrapped with {@link
* MoreExecutors#getExitingExecutorService(ThreadPoolExecutor)}, which registers a shutdown hook
Expand Down Expand Up @@ -152,6 +171,23 @@ public static ScheduledExecutorService newScheduledPool(String namePrefix, int p
return new ScheduledThreadPoolExecutor(poolSize, newDaemonThreadFactory(namePrefix));
}

/**
* Create a new {@link ScheduledExecutorService} with the given name and pool size.
*
* <p>Threads used by this service will be daemon threads.
*
* <p>The service registers a shutdown hook to ensure that it terminates when the JVM exits. This
* is suitable for long-lived thread pools that should be automatically cleaned up on JVM
* shutdown.
*/
public static ScheduledExecutorService newExitingScheduledPool(
String namePrefix, int poolSize, Duration terminationTimeout) {
return MoreExecutors.getExitingScheduledExecutorService(
(ScheduledThreadPoolExecutor) newScheduledPool(namePrefix, poolSize),
terminationTimeout.toMillis(),
TimeUnit.MILLISECONDS);
}

private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d").build();
}
Expand Down