[SPARK-56251][SQL] Add default fetchSize for postgres to avoid loading all data in memory#55053
[SPARK-56251][SQL] Add default fetchSize for postgres to avoid loading all data in memory#55053ivoson wants to merge 9 commits intoapache:masterfrom
Conversation
3a4f6b0 to
e77f950
Compare
e77f950 to
ba034ad
Compare
|
cc @yaooqinn @cloud-fan can you please take a look at this PR? Thanks |
| """.stripMargin | ||
| ) | ||
|
|
||
| val fetchSize = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt |
There was a problem hiding this comment.
Not really, I think...The behavior is still the same except for PostgresDialect.
Just moved the logic from JDBCOptions to JdbcDialects. See: https://github.com/apache/spark/pull/55053/changes#diff-1533255ad629a18e883f8186b303fdf4fae99043551ce20c5b5ea06d085e0b14R352
The default fetchSize is still 0 except for PostgresDialect.
For PostgresDialect, changed the default value from 0 to 1000.
There was a problem hiding this comment.
Yeah, you're right.
However, please restore this field according to 77413d4
There was a problem hiding this comment.
Sounds good. Updated
| * Dialects can override this to provide a sensible default when the user does not | ||
| * explicitly set the fetchSize option. | ||
| */ | ||
| def effectiveFetchSize(options: JDBCOptions): Int = options.fetchSize |
There was a problem hiding this comment.
add @Since("4.2.0")
I think I understand your intention of using "effective" here, but I would follow the style of the existing method names to call it getFetchSize
| logWarning(s"No fetchSize option set for PostgreSQL JDBC read. " + | ||
| s"Defaulting to $POSTGRES_DEFAULT_FETCH_SIZE to avoid loading all rows into memory. " + | ||
| s"Set the 'fetchsize' option explicitly to override this behavior.") |
There was a problem hiding this comment.
with this change, missing setting fetchSize via the option should be fine? if so, this message should be info or debug, not warning.
There was a problem hiding this comment.
Yes, it's fine if not set the parameter.
Will keep it as info since the default behavior changes.
What changes were proposed in this pull request?
This PR adds a default
fetchSizeof 1000 for the PostgreSQL JDBC dialect to prevent loading entiretables into memory when no explicit
fetchSizeis specified by the user.The changes include:
JdbcDialect: AddeddefaultFetchSize(returns 0 by default) andeffectiveFetchSize(options)as the single source of truth for resolving the effective fetch size —user-specified value takes precedence, otherwise falls back to the dialect's default.
PostgresDialect: OverridesdefaultFetchSizeto 1000, and updatesbeforeFetchto useeffectiveFetchSize()for theautoCommitdecision.AggregatedDialect: DelegatesdefaultFetchSize,effectiveFetchSize, andbeforeFetchtodialects.head, consistent with how other methods (e.g.,quoteIdentifier,getTruncateQuery) aredelegated.
JDBCOptions: Removed the unusedfetchSizefield since the effective fetch size is now fullyresolved through
JdbcDialect.effectiveFetchSize().JDBCRDD: Usesdialect.effectiveFetchSize(options)forstmt.setFetchSize().Why are the changes needed?
By default, the PostgreSQL JDBC driver loads all rows into memory when
fetchSizeis 0 (the Sparkdefault). Without partitioning information, a single task may load the entire table into memory,
which can easily cause executor OOM.
Unlike most JDBC drivers, PostgreSQL requires both
fetchSize > 0andautoCommit = falsetoenable cursor-based fetching (see PostgreSQL JDBC
documentation). Setting
a sensible default fetch size of 1000 enables cursor-based row batching automatically, preventing the
driver from buffering the entire result set.
Users can still override the default by explicitly setting the
fetchsizeoption (includingfetchsize=0to restore the old behavior).Does this PR introduce any user-facing change?
NA
How was this patch tested?
UTs added.
Manually verified the behavior for Postgres.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code v2.1.87