From 7705a69c8409d114c9573f86effa3f5084f68244 Mon Sep 17 00:00:00 2001 From: Arvind Kandpal Date: Mon, 8 Jun 2026 16:27:17 +0530 Subject: [PATCH 1/2] [FLINK-37502][docs] Translate documentation for Disaggregated State Management into Chinese --- .../dev/datastream/fault-tolerance/state.md | 2 +- .../datastream/fault-tolerance/state_v2.md | 375 +++++++----------- .../docs/ops/state/disaggregated_state.md | 212 ++++------ .../dev/datastream/fault-tolerance/state.md | 2 +- .../datastream/fault-tolerance/state_v2.md | 2 +- 5 files changed, 219 insertions(+), 374 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md index 431277352c38c..59812ebb8e0fb 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state.md @@ -504,7 +504,7 @@ void initializeState(FunctionInitializationContext context) throws Exception; 比如说,算子 A 的并发读为 1,包含两个元素 `element1` 和 `element2`,当并发读增加为 2 时,`element1` 会被分到并发 0 上,`element2` 则会被分到并发 1 上。 - **Union redistribution:** 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 - Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors. + 在列表基数很大时不要使用此功能,因为 Checkpoint metadata 会为每个列表条目存储一个偏移量, 可能会超过 RPC framesize 限制或导致 out-of-memory 错误。 下面的例子中的 `SinkFunction` 在 `CheckpointedFunction` 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。 diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md index d3cde17b75d6e..b39376644813e 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md @@ -25,32 +25,22 @@ specific language governing permissions and limitations under the License. --> -# Working with State V2 (New APIs) +# 使用 State V2 (新的 API) -In this section you will learn about the new APIs that Flink provides for writing -stateful programs. Please take a look at [Stateful Stream -Processing]({{< ref "docs/concepts/stateful-stream-processing" >}}) -to learn about the concepts behind stateful stream processing. +本章节您将了解 Flink 用于编写有状态程序的新 API,要了解有状态流处理背后的概念,请参阅[Stateful Stream +Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})。 -The new state API is designed to be more flexible than the previous API. User can perform -asynchronous state operations, thus making it more powerful and more efficient. -The asynchronous state access is essential for the state backend to be able to handle -large state sizes and to be able to spill to remote file systems when necessary. -This is called the 'disaggregated state management'. For more information about this, -please see [Disaggregated State Management]({{< ref "docs/ops/state/disaggregated_state" >}}). +新的状态 API 比以前的 API 更灵活,用户可以通过新的状态 API 异步访问状态,是一套更强大有效的 API。 +异步状态访问对存算分离来说是必要的,即令状态后端支持大状态时将文件溢出到远程文件系统的能力。更多关于 +存算分离的具体信息,请参阅[Disaggregated State Management]({{< ref "docs/ops/state/disaggregated_state" >}})。 ## Keyed DataStream -If you want to use keyed state, you first need to specify a key on a -`DataStream` that should be used to partition the state (and also the records -in the stream themselves). You can specify a key using `keyBy(KeySelector)` -in Java API on a `DataStream`. This will yield a `KeyedStream`, which then allows operations -that use keyed state. You should perform `enableAsyncState()` on the `KeyedStream` to enable -asynchronous state operations. +如果你希望使用 keyed state,首先需要为`DataStream`指定 key(主键)。这个主键用于状态分区(也会给数据流中的记录本身分区)。 +你可以使用 `DataStream` 中 Java API 的 `keyBy(KeySelector)` 。 它将生成 `KeyedStream`,接下来允许使用 keyed state 操作。 +您可以通过 `KeyedStream` 的 `enableAsyncState()` 方法来启动异步状态操作。 -A key selector function takes a single record as input and returns the key for -that record. The key can be of any type and **must** be derived from -deterministic computations. +Key selector 函数接收单条记录作为输入,返回这条记录的 key。该 key 可以为任何类型,但是它的计算产生方式**必须**是具备确定性的。 The data model of Flink is not based on key-value pairs. Therefore, you do not need to physically pack the data set types into keys and values. Keys are @@ -79,135 +69,96 @@ KeyedStream keyed = words {{< top >}} -## Using Keyed State V2 - -Unlike the previous state API, the new state API is designed for asynchronous state access. -Each type of state gives two versions of the API: synchronous and asynchronous. The synchronous -API is a blocking API that waits for the state access to complete. The asynchronous API is -non-blocking and returns a `StateFuture` that will be completed when the state access -is done. After that a callback or following logic will be invoked (if any). -The asynchronous API is more efficient and should be used whenever possible. -It is highly **not** recommended to mixed synchronous and asynchronous state access in the same -user function. - -The keyed state interfaces provide access to different types of state that are all scoped to -the key of the current input element. This means that this type of state can only be used -on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java. And then most importantly, -the keyed stream needs to be enabled for asynchronous state access by calling `enableAsyncState()`. -The new API set are only available on `KeyedStream` with `enableAsyncState()` invoked. - -Now, we will look at the different types of state available, and then we will see -how they can be used in a program. Since the synchronous APIs are identical with the original APIs, -We only focus on the asynchronous ones here. - -### The Return Values - -First of all, we should get familiar with the return value of those asynchronous state access methods. - -`StateFuture` is a future that will be completed with the result of the state access. -The return bype is T. It provides multiple methods to handle the result, listed as: -* `StateFuture thenAccept(Consumer)`: This method takes a `Consumer` that will be called with the result - when the state access is done. It returns a `StateFuture`, which will be finished when the - `Consumer` is done. -* `StateFuture thenApply(Function)`: This method takes a `Function` that will be called with the result - when the state access is done. The return value of the function will be the result of the following - `StateFuture`, which will be finished when the `Function` is done. -* `StateFuture thenCompose(Function>)`: This method takes a `Function` that will - be called with the result when the state access is done. The return value of the function should be - a `StateFuture`, which is exposed to the invoker of `thenCompose` as a return value. - The `StateFuture` will be finished when the inner `StateFuture` of `Function` is finished. -* `StateFuture thenCombine(StateFuture, BiFunction)`: This method takes another `StateFuture` and a - `BiFunction` that will be called with the results of both `StateFuture`s when they are done. The return - value of the `BiFunction` will be the result of the following `StateFuture`, which will be finished when - the `BiFunction` is done. - -Those methods are similar to the corresponding ones of the `CompletableFuture`. Besides these methods, -keep in mind that `StateFuture` does not provide a `get()` method to block the current thread until the -state access is done. This is because blocking the current thread may cause recursive blocking. The -`StateFuture` also provides conditional version of `thenAccept`, `thenApply`, `thenCompose` and `thenCombine`, -which is for the case that the state access is done and the following logic will be split into two branches -based on the result of the state access. The conditional version of those methods are `thenConditionallyAccept`, -`thenConditionallyApply`, `thenConditionallyCompose` and `thenConditionallyCombine`. - -`StateIterator` is an iterator that can be used to iterate over the elements of a state. It provides -the following methods: -* `boolean isEmpty()` : A synchronous method returns a `true` if the iterator has no elements, and `false` otherwise. -* `StateFuture onNext(Consumer)` : This method takes a `Consumer` that will be called with the next element - when the state access is done. It returns a `StateFuture`, which will be finished when the `Consumer` is done. - Also, a function version of `onNext` is provided, which is `StateFuture> onNext(Function)`. This method - takes a `Function` that will be called with the next element when the state access is done. The return value of the function - will be collected and returned as a collection of the following `StateFuture`, which will be - finished when the `Function` is done. - -We also provide a `StateFutureUtils` class that contains some utility methods to handle `StateFuture`s. -These methods are: -* `StateFuture completedFuture(T)`: This method returns a completed `StateFuture` with the given value. This is -useful when you want to return a constant value in a `thenCompose` method for further processing. -* `StateFuture completedVoidFuture()`: This method returns a completed `StateFuture` with `null` value. -A void value version of `completedFuture`. -* `StateFuture> combineAll(Collection>)` : This method takes a collection of `StateFuture`s -and returns a `StateFuture` that will be completed when all the input `StateFuture`s are completed. The result of the -returned `StateFuture` is a collection of the results of the input `StateFuture`s. This is useful when you want to -combine the results of multiple `StateFuture`s. -* `StateFuture> toIterable(StateFuture>)` : This method takes a `StateFuture` of `StateIterator` -and returns a `StateFuture` of `Iterable`. The result of the returned `StateFuture` is an `Iterable` that contains all -the elements of the `StateIterator`. This is useful when you want to convert a `StateIterator` to an `Iterable`. -There is no good reason to do so, since this may disable the capability of lazy loading. Only useful when the further -calculation depends on the whole data from the iterator. - - -### State Primitives - -The available state primitives are: - -* `ValueState`: This keeps a value that can be updated and -retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value -for each key that the operation sees). The value can be set using `asyncUpdate(T)` and retrieved using -`StateFuture asyncValue()`. - -* `ListState`: This keeps a list of elements. You can append elements and retrieve an `StateIterator` -over all currently stored elements. Elements are added using `asyncAdd(T)` or `asyncAddAll(List)`, -the Iterable can be retrieved using `StateFuture> asyncGet()`. -You can also override the existing list with `asyncUpdate(List)` - -* `ReducingState`: This keeps a single value that represents the aggregation of all values -added to the state. The interface is similar to `ListState` but elements added using -`asyncAdd(T)` are reduced to an aggregate using a specified `ReduceFunction`. - -* `AggregatingState`: This keeps a single value that represents the aggregation of all values -added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type -of elements that are added to the state. The interface is the same as for `ListState` but elements -added using `asyncAdd(IN)` are aggregated using a specified `AggregateFunction`. - -* `MapState`: This keeps a list of mappings. You can put key-value pairs into the state and -retrieve an `StateIterator` over all currently stored mappings. Mappings are added using `asyncPut(UK, UV)` or -`asyncPutAll(Map)`. The value associated with a user key can be retrieved using `asyncGet(UK)`. -The iterable views for mappings, keys and values can be retrieved using `asyncEntries()`, -`asyncKeys()` and `asyncValues()` respectively. You can also use `asyncIsEmpty()` to check whether -this map contains any key-value mappings. - -All types of state also have a method `asyncClear()` that clears the state for the currently -active key, i.e. the key of the input element. - -It is important to keep in mind that these state objects are only used for interfacing -with state. The state is not necessarily stored inside but might reside on disk or somewhere else. -The second thing to keep in mind is that the value you get from the state -depends on the key of the input element. So the value you get in one invocation of your -user function can differ from the value in another invocation if the keys involved are different. - -To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state -(as we will see later, you can create several states, and they have to have unique names so -that you can reference them), the type of the values that the state holds, and possibly -a user-specified function, such as a `ReduceFunction`. Depending on what type of state you -want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`, -an `AggregatingStateDescriptor`, a `ReducingStateDescriptor`, or a `MapStateDescriptor`. -To differentiate between the previous state APIs, you should use the `StateDescriptor`s under the -`org.apache.flink.api.common.state.v2` package (note the **v2**). - -State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*. -Please see [here]({{< ref "docs/dev/datastream/user_defined_functions" >}}#rich-functions) for -information about that, but we will also see an example shortly. The `RuntimeContext` that -is available in a `RichFunction` has these methods for accessing state: +## 使用 Keyed State V2 + +和之前的 state API 不同,新的 state API 允许用户异步访问状态。每种类型的状态都提供了两种版本的 API:同步和异步。 +同步 API 是一个阻塞等待状态访问完成的 API。异步 API 是非阻塞的,它会返回一个 `StateFuture`,这个 `StateFuture` 会在状态访问完成时完成。 +在 `StateFuture` 完成时,用户的回调会被执行。异步 API 比同步 API 更加高效,我们更推荐使用异步 API。 +需要注意,**不建议**在同一个用户函数中混合使用同步和异步状态访问。 + +Key state 接口提供了对不同类型状态的访问,它们都是以 key 为作用域。这意味着这个类型的状态只能在 `KeyedStream` 上使用,`keyedStream` +可以通过 `stream.keyBy(…)` 在 Java 中创建。最重要的是,`keyedStream` 需要通过调用 `enableAsyncState()` 来开启异步状态访问。 +新的 API 集合只有在 `KeyedStream` 上开启异步状态访问时才可用。 + +接下来,我们将看到不同类型的状态,并看到它们如何在程序中使用。由于同步 API 与之前的 API 是等价的,这里只关注异步 API。 + +### 返回值 + +首先,我们应该熟悉异步状态访问方法的返回值。 + +`StateFuture` 完成时会返回状态访问的结果。返回类型是 T。它提供了多个方法来处理结果,包括: +* `StateFuture thenAccept(Consumer)`: 接受一个在状态访问完成时会被调用的 `Consumer` 作为参数。 + 它返回一个 `StateFuture`,在 `Consumer` 执行结束时完成。 +* `StateFuture thenApply(Function)`: 接受一个在状态访问完成时会被调用的 `Function` 作为参数。 + 返回值 `StateFuture` 会使用 `Function` 的返回值会作为内部结果,在 `Function` 执行结束时完成。 +* `StateFuture thenCompose(Function>)`: 接受一个在状态访问完成时会被调用的 + `Function` 作为参数。 函数的返回值是 `StateFuture`, 被传给 `thenCompose` 的调用者作为最终的返回值。返回值 + `StateFuture` 会在 `Function` 内部的 `StateFuture` 完成时完成。 +* `StateFuture thenCombine(StateFuture, BiFunction)`: 接受另一个 `StateFuture` + 和一个 `BiFunction` 作为参数,`BiFunction` 在两个 `StateFuture` 都完成时被调用。 返回值 `StateFuture`使用 + `BiFunction` 的返回值作为内部结果, 返回值的 `StateFuture` 会在两个 `StateFuture` 都完成时完成。 + +以上这些方法类似于 `CompletableFuture` 的相关方法。除此之外,注意,`StateFuture` 不提供 `get()` 方法来阻塞当前线程, +直到状态访问完成。 这是因为阻塞当前线程可能会导致递归阻塞。 `StateFuture` 还提供了条件版本的 `thenAccept`, +`thenApply`, `thenCompose` 和 `thenCombine`,用于后续逻辑依赖状态访问返回值来选择不同分支的情景。这些方法的条件版本是: +`thenConditionallyAccept`, `thenConditionallyApply`, `thenConditionallyCompose` 和 `thenConditionallyCombine`。 + +`StateIterator` 是一个迭代器,可用来迭代状态中的元素。它提供了以下方法: +* `boolean isEmpty()` : 当迭代器没有元素时返回 `true`,否则返回 `false`,是一个同步方法。 +* `StateFuture onNext(Consumer)` : 该方法接受一个 `Consumer`作为参数,`Consumer` 会在状态访问完成时与下一个 + 元素一起被调用。它返回一个 `StateFuture`,在 `Consumer` 执行完时完成。同样,`onNext` 也提供了 `Function` + 作为参数的版本:`StateFuture> onNext(Function)`。该方法接受一个 `Function` 作为参数, + 会在状态访问完成时与下一个元素一起被调用。`Function` 的返回值会被收集到一个集合中作为返回值 `StateFuture` 的内部 + 结果,返回值 `StateFuture` 在 `Function` 执行结束时完成。 + +我们也提供了 `StateFutureUtils` 类,其中包含一些用于处理 `StateFuture` 的实用方法: +* `StateFuture completedFuture(T)`: 该方法返回一个已经完成的 `StateFuture`,该 `StateFuture` 的返回值为给定的值。 +* `StateFuture completedVoidFuture()`: 该方法返回一个已经完成的 `StateFuture`,该 `StateFuture` 的返回值为 `null`, + 是 `completedFuture` 的 void 返回值版本。 +* `StateFuture> combineAll(Collection>)` : 该方法接受一个 `StateFuture` 集合 + 作为参数,返回一个 `StateFuture`,返回值 `StateFuture` 在所有的 `StateFuture` 都完成后完成。返回值 `StateFuture` + 的内部结果是输入的 `StateFuture` 的结果的集合。该方法在想组合多个 `StateFuture` 时的结果时非常有用。 +* `StateFuture> toIterable(StateFuture>)` : 该方法接受一个结果为 `StateIterator` + 的 `StateFuture` 作为参数,返回一个 `Iterable` 作为结果的 `StateFuture`。 返回值 `StateFuture` 的内部结果 + `Iterable` 包含了 `StateIterator` 的所有元素。该函数在想把 `StateIterator` 转换为 `Iterable` 时非常有用。 + 但这可能会禁用延迟加载的功能,如果没有充分理由这样做,建议不这样使用。本方法只有当下一步的计算依赖于迭代器的全部数据时才有用。 + +### 状态原语 + +所有支持的状态类型如下所示: + +* `ValueState`: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key + 都可能对应一个值)。 这个值可以通过 `asyncUpdate(T)` 进行更新,通过 `StateFuture asyncValue()` 进行检索。 + +* `ListState`: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 + `asyncAdd(T)` 或者 `asyncAddAll(List)` 进行添加元素,通过 `StateFuture> asyncGet()` + 获得整个列表。还可以通过 `asyncUpdate(List)` 覆盖当前的列表。 + +* `ReducingState`: 保存一个单值,表示添加到状态的所有值的聚合。接口与 `ListState` 类似,但使用 `asyncAdd(T)` + 增加元素,会使用提供的 `ReduceFunction` 进行聚合。 + +* `AggregatingState`: 保留一个单值,表示添加到状态的所有值的聚合。和 `ReducingState` 相反的是, + 聚合类型可能与添加到状态的元素的类型不同。 接口与 `ListState` 类似,但使用 `asyncAdd(IN)` 添加的元素会用指定的 + `AggregateFunction` 进行聚合。 + +* `MapState`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `asyncPut(UK,UV)` + 或者 `asyncPutAll(Map)` 添加映射。 使用 `asyncGet(UK)` 检索特定 key。 使用 `asyncEntries()`,`asyncKeys()` 和 `asyncValues()` + 分别检索映射、键和值的可迭代视图。你还可以通过 `asyncIsEmpty()` 来判断是否包含任何键值对。 + +所有类型的状态还有一个`asyncClear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。 + +请注意,这些状态对象仅用于与状态交互。状态本身不一定存储在内存中,还可能在磁盘或其他位置。 +另外需要注意的是从状态中获取的值取决于输入元素所代表的 key。 因此,在不同 key 上调用同一个接口,可能得到不同的值。 + +你必须创建一个 `StateDescriptor`,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态, +并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如`ReduceFunction`。 +根据不同的状态类型,可以创建`ValueStateDescriptor`,`ListStateDescriptor`,`AggregatingStateDescriptor`, +`ReducingStateDescriptor` 或 `MapStateDescriptor`。为了和之前的 state APIs 区分,应该使用 +`org.apache.flink.api.common.state.v2` (注意是 **v2**)包下的 `StateDescriptor`。 + +状态通过 `RuntimeContext` 进行访问,因此只能在 *rich functions* 中使用。请参阅 +[这里]({{< ref "docs/dev/datastream/user_defined_functions" >}}#rich-functions)获取相关信息, +但是我们很快也会看到一个例子。`RichFunction` 中 `RuntimeContext` 提供如下方法: * `ValueState getState(ValueStateDescriptor)` * `ReducingState getReducingState(ReducingStateDescriptor)` @@ -281,43 +232,29 @@ a `ValueState`. Once the count reaches 2 it will emit the average and clear the we start over from `0`. Note that this would keep a different state value for each different input key if we had tuples with different values in the first field. -### Execution Order - -The state access methods are executed asynchronously. This means that the state access methods -will not block the current thread. With synchronous APIs, the state access methods will be executed in -the order they are called. However, with asynchronous APIs, the state access methods will be executed -out of order, especially for those invokes for different incoming elements. For the above example, -if the `flatMap` function is invoked for two different incoming elements A and B, the state access -methods for A and B will be executed. Firstly, `asyncGet` is executed for A, then `asyncGet` is -allowed to execute for B. The finish order of the two `asyncGet` is not guaranteed. Thus the order -of continuation of the two `StateFuture`s is not guaranteed. Thus invokes of `asyncClear` or -`asyncUpdate`for A and B are not determined. - -Although the state access methods are executed out of order, this not mean that all the user code -are run in parallel. The user code in the `processElement`, `flatMap` or `thenXXxx` methods -following the state access methods will be executed in a single thread (the task thread). So there -is no concurrency issue for the user code. - -Typically, you don't need to worry about the execution order of the state access methods, but there -is still some rules the Flink will ensure: -* The execution order of user code entry `flatMap` for same-key elements are invoked strictly in -order of element arrival. -* The consumers or functions passed to the `thenXXxx` methods are executed in the order they are -chained. If they are not chained, or there are multiple chains, the order is not guaranteed. - -### Best practice of asynchronous APIs - -The asynchronous APIs are designed to be more efficient and more powerful than the synchronous APIs. -There are some best practices that you should follow when using the asynchronous APIs: -* **Avoid mixing synchronous and asynchronous state access** -* Use chaining of `thenXXxx` methods to handle the result of the state access and then gives another - state access or result. Divide the logic into multiple steps split by `thenXXxx` methods. -* Avoid accessing mutable members of the user function (`RichFlatMapFunction`). Since the state access - methods are executed out of order, the mutable members may be accessed in unpredictable order. - Instead, use the result of the state access to pass the data between different steps. The - `StateFutureUtils.completedFuture` or `thenApply` method can be used to pass the data. Or use - a captured container (`AtomicReference`) which is initialized for each invoke of the `flatMap` - to share between lambdas. +### 执行顺序 + +状态访问是异步的,这意味着状态访问方法不会阻塞当前线程。在同步 API 下,状态访问方法会按照它们被调用的顺序执行,但是异步 API 下, +尤其是对于不同输入元素的状态访问方法,状态访问方法可能会乱序执行。在上面的例子中,如果 `flatMap` 函数接收两个不同的 +输入元素 A 和 B,元素 A 和 B 的状态访问方法会被执行。首先,元素 A 的 `asyncGet` 方法被执行,然后元素 B 的 `asyncGet` 方法接着被执行, +但两个 `asyncGet` 的完成顺序不能保证。因此我们无法保证两个 `StateFuture` 后续步骤的执行顺序,也就无法确定元素 A 和 元素 B 的 +`asyncClear` 和 `asyncUpdate` 的调用顺序。 + +尽管状态访问方法可能会乱序执行,这并不意味着所有用户代码都会并行执行。在 `processElement`, `flatMap` 或者 `thenXXxx` +方法中的用户代码都会在单个线程(即主任务线程)中执行。因此不会出现用户代码的并发问题。 + +通常,您无需担心状态访问方法的执行顺序问题,但 Flink 仍确保一些规则: +* 同 key 的元素的执行顺序严格按照元素到达 `flatMap` 的顺序执行的。 +* 传递给 `thenXXxx` 方法的函数会按照调用链的顺序执行。如果同时有多个调用链,则无法保证执行顺序。 + +### 异步 APIs 的最佳实践 + +异步 APIs 的设计比同步 APIs 更加高效和强大。当使用异步 APIs 时,有一些最佳实践应该遵循: +* **不要混合使用同步和异步状态访问** +* 将逻辑分割成多个步骤,通过连续的 `thenXXxx` 来进行链式调用,前一个 `thenXXxx` 的结果传递给后一个 `thenXXxx`。 +* 避免在用户函数(例如 `RichFlatMapFunction`) 中使用可变(mutable)成员。因为状态访问方法是乱序执行的, + 可变成员可能会以无法预测的顺序执行。相反,建议使用 `thenXXxx` 的结果在不同步骤之间传递数据。`StateFutureUtils.completedFuture` + 和 `thenApply` 方法可以被用来传递数据。或者使用一个每次在 `flatMap` 调用时初始化的容器 (`AtomicReference`),以便在不同回调之间共享数据。 ### State Time-To-Live (TTL) @@ -481,37 +418,26 @@ by activating debug level for `FlinkCompactionFilter`: - For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`, e.g. after restart from savepoint. - Periodic compaction could only work when TTL is enabled. -## Operator State - -*Operator State* (or *non-keyed state*) is state that is bound to one -parallel operator instance. The [Kafka Connector]({{< ref "docs/connectors/datastream/kafka" >}}) is a good motivating example for the use of -Operator State in Flink. Each parallel instance of the Kafka consumer maintains -a map of topic partitions and offsets as its Operator State. - -The Operator State interfaces support redistributing state among parallel -operator instances when the parallelism is changed. There are different schemes -for doing this redistribution. - -In a typical stateful Flink Application you don't need operators state. It is -mostly a special type of state that is used in source/sink implementations and -scenarios where you don't have a key by which state can be partitioned. - -## Broadcast State - -*Broadcast State* is a special type of *Operator State*. It was introduced to -support use cases where records of one stream need to be broadcasted to all -downstream tasks, where they are used to maintain the same state among all -subtasks. This state can then be accessed while processing records of a second -stream. As an example where broadcast state can emerge as a natural fit, one -can imagine a low-throughput stream containing a set of rules which we want to -evaluate against all elements coming from another stream. Having the above type -of use cases in mind, broadcast state differs from the rest of operator states -in that: - - 1. it has a map format, - 2. it is only available to specific operators that have as inputs a - *broadcasted* stream and a *non-broadcasted* one, and - 3. such an operator can have *multiple broadcast states* with different names. +## 算子状态 (Operator State) + +*算子状态*(或者*非 keyed 状态*)是绑定到一个并行算子实例的状态。[Kafka Connector]({{< ref "docs/connectors/datastream/kafka" >}}) 是 Flink 中使用算子状态一个很具有启发性的例子。 +Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。 + +当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。 + +在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。 + +**注意:** Python DataStream API 仍无法支持算子状态。 + +## 广播状态 (Broadcast State) + +*广播状态*是一种特殊的*算子状态*。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 +该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流,这个例子自然而然地运用了广播状态。 +考虑到上述这类使用情形,广播状态和其他算子状态的不同之处在于: + + 1. 它具有 map 格式, + 2. 它仅在一些特殊的算子中可用。这些算子的输入为一个*广播*数据流和*非广播*数据流, + 3. 这类算子可以拥有不同命名的*多个广播状态* 。 {{< top >}} @@ -653,14 +579,11 @@ using the provided `FunctionInitializationContext`. {{< top >}} -## Migrate from the Old State API +## 从老的状态 API 迁移 -It is very easy to migrate from the old state API to the new one. Please take the following steps: +从老的状态 API 迁移到新的状态 API 非常方便,请采用以下步骤: -1. Invoke `enableAsyncState()` on the `KeyedStream` to enable the new state API. -2. Replace the `StateDescriptor`s with the new ones under `v2` package. Also, replace the old state -handles with the new ones under `v2` package. -3. Rewrite the old state access methods with the new asynchronous ones. -4. It is recommended to use ForSt State Backend for the new state API, which could perform -async state access. Other state backends only support synchronous execution of state access, -although it can be used with the new state API. +1. 在 `KeyedStream` 上调用 `enableAsyncState()` 来启动新的状态 API. +2. 替换 `StateDescriptor` 为 `v2` 包下的 `StateDescriptor`。 同时, 替换旧的 state handle 为新的 state handle。 +3. 用新的异步 API 重写旧的状态访问方法。 +4. 使用新的状态 API 时,推荐使用 ForSt 状态后端,它支持异步状态访问。其他状态只支持同步的状态访问,尽管它们也可以和新的状态 API 一起使用。 diff --git a/docs/content.zh/docs/ops/state/disaggregated_state.md b/docs/content.zh/docs/ops/state/disaggregated_state.md index 83f58c64f1ae8..7ed7a42ea1e01 100644 --- a/docs/content.zh/docs/ops/state/disaggregated_state.md +++ b/docs/content.zh/docs/ops/state/disaggregated_state.md @@ -1,5 +1,5 @@ --- -title: "Disaggregated State Management" +title: "存算分离(Disaggregated State Management)" weight: 20 type: docs aliases: @@ -25,87 +25,62 @@ specific language governing permissions and limitations under the License. --> -# Disaggregated State Management - -## Overview - -For the first ten years of Flink, the state management is based on memory or local disk of the TaskManager. -This approach works well for most use cases, but it has some limitations: - * **Local Disk Constraints**: The state size is limited by the memory or disk size of the TaskManager. - * **Spiky Resource Usage**: The local state model triggers periodic CPU and network I/O bursts during checkpointing or SST files compaction. - * **Heavy Recovery**: State needs to be downloaded during recovery. The recovery time is -proportional to the state size, which can be slow for large state sizes. - -In Flink 2.0, we introduced the disaggregated state management. This feature allows users to store -the state in external storage systems like S3, HDFS, etc. This is useful when the state size -is extremely large. It could be used to store the state in a more cost-effective way, or to -persist or recovery the state in a more lightweight way. The benefits of disaggregated state management are: - * **Unlimited State Size**: The state size is only limited by the external storage system. - * **Stable Resource Usage**: The state is stored in external storage, thus the checkpoint could be very lightweight. -And the SST files compaction could be done remotely (TODO). - * **Fast Recovery**: No need to download the state during recovery. The recovery time is -independent of the state size. - * **Flexible**: Users can easily choose different external storage systems or I/O performance levels, -or scale the storage based on their requirements without change their hardware. - * **Cost-effective**: External storage are usually cheaper than local disk. Users can flexibly -adjust computing resources and storage resources independently if there is any bottleneck. - -The disaggregated state management contains three parts: - * **ForSt State Backend**: A state backend that stores the state in external storage systems. It -can also leverage the local disk for caching and buffering. The asynchronous I/O model is used to -read and write the state. For more details, see [ForSt State Backend]({{< ref "docs/ops/state/state_backends#the-forststatebackend" >}}). - * **New State APIs**: The new state APIs (State V2) are introduced to perform asynchronous state -reads and writes, which is essential for overcoming the high network latency when accessing -the disaggregated state. For more details, see [New State APIs]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). - * **SQL Support**: Many SQL operators are rewritten to support the disaggregated state management -and asynchronous state access. User can easily enable these by setting the configuration. +# 存算分离(Disaggregated State Management) + +## 概览 + +在 Flink 的前 10 年,状态管理是基于 TaskManager 的内存或本地磁盘。该方法适用于大多数用户场景,但存在一些限制: +* **本地磁盘限制**: 状态大小受限于 TaskManager 的内存或者磁盘大小。 +* **资源使用尖刺**:本地状态模型会在 checkpoint 时触发 SST 文件的 compaction,引起 CPU 和网络 I/O 的尖刺。 +* **恢复很慢**:在恢复期间,状态后端需要下载状态。恢复时间与状态大小成比例,大状态的恢复非常慢。 + +在 Flink 2.0,我们引入了存算分离(disaggregated state management)来解决上述问题。 存算分离允许状态后端将状态直接存储在远端存储系统,如 S3,HDFS 等。 +存算分离以一种更有效的方式存储状态,能够轻量地持久化或恢复状态。它有如下好处: +* **状态大小无限制**:状态大小仅取决于外部存储系统大小。 +* **资源使用稳定**:状态被存储在外部存储上,因此 checkpoint 可以非常轻量。SST 文件的 compaction 也可以被放在远程(TODO)。 +* **恢复快速**:在恢复期间无需下载状态到本地,恢复时间与状态大小无关。 +* **灵活**:用户可以轻松地选择不同的外部存储系统或I/O性能级别,或者根据需求独立地扩展存储资源,而不需要改变他们的硬件。 +* **经济实惠**:外部存储通常比本地磁盘便宜。如果有瓶颈,用户可以灵活、独立地调整计算资源以及存储资源。 + +存算分离包括三个部分: +* **ForSt 状态后端**: 一个以外部存储系统作为主存的状态后端,它可以利用本地磁盘进行缓存。 + 它使用异步 I/O 模型用于读取和写入状态。详细信息请参阅[ForSt State Backend]({{< ref "docs/ops/state/state_backends#the-forststatebackend" >}}). +* **新的状态 API**: 引入了新的状态 API (State V2) 用于异步状态读取和写入,异步状态访问是存算分离克服高网络延迟的必要组件。 + 详细信息请参阅[New State APIs]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). +* **SQL 支持**: 许多 SQL 算子使用异步状态 API 来重新实现,用户可以轻松地通过配置项启用这些算子。 {{< hint info >}} -Disaggregated state and asynchronous state access are encouraged for large state. However, when -the state size is small, the local state management with synchronous state access is a better -choice. +对于大状态作业,我们推荐使用存算分离和异步状态访问。如果作业状态很小,本地状态管理和同步状态访问会是更好的选择。 {{< /hint >}} {{< hint warning >}} -The disaggregated state management is still in experimental state. We are working on improving -the performance and stability of this feature. The APIs and configurations may change in future -release. +存算分离(disaggregated state management)仍然是实验性功能。我们正在改进此功能的性能和稳定性。新的状态 API 和配置可能会在将来的版本中更改。 {{< /hint >}} ## Quick Start ### For SQL Jobs -To enable the disaggregated state management in SQL jobs, you can set the following configurations: +在 SQL 作业中可以通过以下配置启用存算分离: ```yaml state.backend.type: forst table.exec.async-state.enabled: true - -# enable checkpoints, checkpoint directory is required -execution.checkpointing.incremental: true -execution.checkpointing.dir: s3://your-bucket/flink-checkpoints - -# We don't support the mini-batch and two-phase aggregation in asynchronous state access yet. table.exec.mini-batch.enabled: false table.optimizer.agg-phase-strategy: ONE_PHASE ``` -Thus, you could leverage the disaggregated state management and asynchronous state access in -your SQL jobs. We haven't implemented the full support for the asynchronous state access -in SQL yet. If the SQL operators you are using are not supported, the operator will fall back -to the synchronous state implementation automatically. The performance may not be optimal in -this case. The supported stateful operators are: - - Rank (Top1, Append TopN) - - Row Time Deduplicate - - Aggregate (without distinct) - - Join - - Window Join - - Tumble / Hop / Cumulative Window Aggregate - +如此一来,您可以在 SQL 作业中使用存算分离和异步状态访问。目前我们并未实现所有异步状态访问版本的 SQL 算子。 +对于还未支持异步状态访问的算子,算子会自动回退到同步状态实现,这种情况下性能可能不是最佳的。 +目前支持异步状态访问的有状态算子有: +- Rank (Top1, Append TopN) +- Row Time Deduplicate +- Aggregate (without distinct) +- Join +- Window Join +- Tumble / Hop / Cumulative Window Aggregate ### For DataStream Jobs -To enable the disaggregated state management in DataStream jobs, firstly you should use -the `ForStStateBackend`. Configure via code in per-job mode: +在 DataStream 作业启用存算分离,首先需要使用 `ForStStateBackend`。通过以下代码在作业中配置: ```java Configuration config = new Configuration(); config.set(StateBackendOptions.STATE_BACKEND, "forst"); @@ -113,116 +88,63 @@ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3://your-bucket/flink-c config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); env.configure(config); ``` -Or configure via `config.yaml`: +或者通过 `config.yaml` 进行配置: ```yaml state.backend.type: forst - # enable checkpoints, checkpoint directory is required execution.checkpointing.incremental: true execution.checkpointing.dir: s3://your-bucket/flink-checkpoints ``` +然后,您可以使用新的状态 API 来编写您的 datastream 作业。有关新的状态 API,请参阅[状态 V2]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). -Then, you should write your datastream jobs with the new state APIs. For more -details, see [State V2]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). - -## Advanced Tuning Options - -### Tuning ForSt State Backend +## 高级调优选项 -The `ForStStateBackend` has many configurations to tune the performance. -The design of ForSt is very similar to RocksDB, and the configurable options are almost the same, -so you can refer to [large state tuning]({{< ref "docs/ops/state/large_state_tuning#tuning-rocksdb-or-forst" >}}) -to tune the ForSt state backend. +### ForSt 状态后端调优 -Besides that, the following sections introduce some unique configurations for ForSt. +`ForStStateBackend` 有许多配置项用于性能调优。ForSt 的设计和 RocksDB 非常类似,因此您可以参考[大状态调优]({{< ref "docs/ops/state/large_state_tuning#tuning-rocksdb-or-forst" >}}) +来为 ForSt state backend 调优。 +除此之外,以下章节介绍了一些 ForSt 独有的配置项。 -#### ForSt Primary Storage Location +#### ForSt 主要存储位置 -By default, ForSt stores the state in the checkpoint directory. In this case, -ForSt could perform lightweight checkpoints and fast recovery. However, users may -want to store the state in a different location, e.g., a different bucket in S3. -You can set the following configuration to specify the primary storage location: +默认情况下,ForSt 会将状态存储在 checkpoint 目录中。在这种情况下,ForSt 可以执行轻量级的 checkpoint 和快速恢复。 +用户也可能想将状态存在一个不同的位置,例如,一个不同的 S3 桶。您可以通过以下配置来指定主要存储位置: ```yaml state.backend.forst.primary-dir: s3://your-bucket/forst-state ``` +**注意**:如果设置了此配置,则您可能无法利用轻量级检查点和快速恢复,因为 ForSt 会在 checkpoint 和恢复过程中将文件 +从主要存储位置复制到 checkpoint 目录。 -**Note**: If you set this configuration, you may not be able to leverage the lightweight -checkpoint and fast recovery, since the ForSt will perform file copy between the primary -storage location and the checkpoint directory during checkpointing and recovery. +#### ForSt 文件缓存 -#### ForSt Local Storage Location +ForSt 使用本地磁盘进行缓存和缓冲。缓存粒度是整个文件。默认启用缓存功能,除非将主要存储位置设置为本地。缓存有两种容量限制策略: +- 基于固定大小的 (Size-based): 在已使用缓存大小超过限制时淘汰最旧的文件。 +- 基于预留空间的 (Reserved-based): 在磁盘上(缓存目录所在的磁盘)剩余空间不足时淘汰最旧的文件。 -By default, ForSt will **ONLY** disaggregate state when asynchronous APIs (State V2) are used. When -using synchronous state APIs in DataStream and SQL jobs, ForSt will only serve as **local state store**. -Since a job may contain multiple ForSt instances with mixed API usage, synchronous local state access -along with asynchronous remote state access could help achieve better overall throughput. -If you want the operators with synchronous state APIs to store state in remote, the following configuration will help: -```yaml -state.backend.forst.sync.enforce-local: false -``` -And you can specify the local storage location via: -```yaml -state.backend.forst.local-dir: path-to-local-dir -``` - -#### ForSt File Cache - -ForSt uses the local disk for caching and buffering. The granularity of the cache is whole file. -This is enabled by default, except when the primary storage location is set to local. -There are two capacity limit policies for the cache: - - Size-based: The cache will evict the oldest files when the cache size exceeds the limit. - - Reserved-based: The cache will evict the oldest files when the reserved space on disk -(the disk where cache directory is) is not enough. -Corresponding configurations are: +相关配置项如下: ```yaml state.backend.forst.cache.size-based-limit: 1GB state.backend.forst.cache.reserve-size: 10GB ``` -Those can take effect together. If so, the cache will evict the oldest files when the cache -size exceeds either the size-based limit or the reserved size limit. +以上两个配置项可以同时生效。如果同时开启两个选项,在缓存使用量触发以上两个限制之一时,缓存会淘汰最老的文件。 -One can also specify the cache directory via: +您可以通过以下配置项来指定缓存目录: ```yaml state.backend.forst.cache.dir: /tmp/forst-cache ``` -#### ForSt Asynchronous Threads - -ForSt uses asynchronous I/O to read and write the state. There are three types of threads: - - Coordinator thread: The thread that coordinates the asynchronous read and write. - - Read thread: The thread that reads the state asynchronously. - - Write thread: The thread that writes the state asynchronously. +#### ForSt 异步线程 -The number of asynchronous threads is configurable. Typically, you don't need to adjust these -values since the default values are good enough for most cases. -In case for special needs, you can set the following configuration to specify the number of -asynchronous threads: - - `state.backend.forst.executor.read-io-parallelism`: The number of asynchronous threads for read. Default is 3. - - `state.backend.forst.executor.write-io-parallelism`: The number of asynchronous threads for write. Default is 1. - - `state.backend.forst.executor.inline-write`: Whether to inline the write operation in the coordinator thread. -Default is true. Setting this to false will raise the CPU usage. - - `state.backend.forst.executor.inline-coordinator`: Whether to let task thread be the coordinator thread. -Default is true. Setting this to false will raise the CPU usage. +ForSt 使用异步 I/O 来读取和写入状态。有 3 种类型的线程: +- 分发线程: 负责分发状态读取和写入请求。 +- 读线程: 负责异步读取状态。 +- 写线程: 负责异步写入状态。 -{{< hint info >}} -`ForStStateBackend` utilizes [ForSt](https://github.com/ververica/ForSt/) as its underlying database core. -The current version of ForSt is forked from [frocksdb](https://github.com/ververica/frocksdb), -architected as an embedded database core specifically for Flink local state management. - -While transitioning the db toward a disaggregated architecture, we encountered significant -architectural and engineering constraints within the existing framework. -To address these challenges, community members are now working on a next-generation, -cloud-native ForSt DB written in Rust. - -#### Key Advantages of the New ForSt DB: -- Architectural Simplicity: A streamlined codebase designed for high extensibility. -- Stream-Native Design: Optimized specifically for the unique demands of large-scale stream processing. -- Cloud-Native: Built from the ground up to support disaggregation. - -#### Roadmap & Maintenance: -- Release Schedule: The first stable open-source version is projected for later 2026 (August, optimistically) -- Deprecation Notice: As we shift our focus to the new Rust-based implementation, the frocksdb-based version of ForSt is no longer under active development and will be phased out (deprecated) following the new release. - -{{< /hint >}} +异步线程的数量可以配置。通常,您不需要调整这些值,因为默认值已经适合大多数情况。 +如果有特殊需要,您可以设置以下配置来指定异步线程的数量: +- `state.backend.forst.executor.read-io-parallelism`: 读线程的数量,默认值是 3。 +- `state.backend.forst.executor.write-io-parallelism`: 写线程的数量,默认值是 1。 +- `state.backend.forst.executor.inline-write`: 写操作是否在分发线程内执行。默认值为 true。设置为 false 会提高 CPU 使用率。 +- `state.backend.forst.executor.inline-coordinator`: 是否让任务线程(即主线程)作为分发线程。默认值为 true。设置为 false 会提高 CPU 使用率。 {{< top >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state.md b/docs/content/docs/dev/datastream/fault-tolerance/state.md index d9dd75df81bdc..db1b501da937e 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state.md @@ -563,7 +563,7 @@ In a typical stateful Flink Application you don't need operators state. It is mostly a special type of state that is used in source/sink implementations and scenarios where you don't have a key by which state can be partitioned. -**Notes:** Operator state is still not supported in Python DataStream API. +**Notes:** Operator state is still not supported in the Python DataStream API. ## Broadcast State diff --git a/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md b/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md index cc3da7ee2318c..fc39d43f488b0 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/state_v2.md @@ -105,7 +105,7 @@ We only focus on the asynchronous ones here. First of all, we should get familiar with the return value of those asynchronous state access methods. `StateFuture` is a future that will be completed with the result of the state access. -The return bype is T. It provides multiple methods to handle the result, listed as: +The return type is T. It provides multiple methods to handle the result, listed as: * `StateFuture thenAccept(Consumer)`: This method takes a `Consumer` that will be called with the result when the state access is done. It returns a `StateFuture`, which will be finished when the `Consumer` is done. From d1c7222d9cf2d3a146ec26951c3a4f89b829e9e7 Mon Sep 17 00:00:00 2001 From: Arvind Kandpal Date: Tue, 9 Jun 2026 13:51:44 +0530 Subject: [PATCH 2/2] [FLINK-37502][docs] Fix missing sections and verify all reviewer comments are addressed Co-Authored-By: Claude Sonnet 4.6 --- .../docs/ops/state/disaggregated_state.md | 32 +++++++++ .../docs/ops/state/state_backends.md | 69 +++++++++---------- 2 files changed, 64 insertions(+), 37 deletions(-) diff --git a/docs/content.zh/docs/ops/state/disaggregated_state.md b/docs/content.zh/docs/ops/state/disaggregated_state.md index 7ed7a42ea1e01..995f0b6784892 100644 --- a/docs/content.zh/docs/ops/state/disaggregated_state.md +++ b/docs/content.zh/docs/ops/state/disaggregated_state.md @@ -115,6 +115,19 @@ state.backend.forst.primary-dir: s3://your-bucket/forst-state **注意**:如果设置了此配置,则您可能无法利用轻量级检查点和快速恢复,因为 ForSt 会在 checkpoint 和恢复过程中将文件 从主要存储位置复制到 checkpoint 目录。 +#### ForSt 本地存储位置 + +默认情况下,ForSt 仅在使用异步 API (State V2) 时才会进行存算分离。在 DataStream 和 SQL 作业中使用同步状态 API 时, +ForSt 仅作为**本地状态存储**。由于一个作业可能包含混合使用 API 的多个 ForSt 实例,同步本地状态访问与异步远程状态访问相结合, +有助于实现更好的整体吞吐量。如果您希望使用同步状态 API 的算子将状态存储在远端,可以使用以下配置: +```yaml +state.backend.forst.sync.enforce-local: false +``` +您可以通过以下配置项来指定本地存储位置: +```yaml +state.backend.forst.local-dir: path-to-local-dir +``` + #### ForSt 文件缓存 ForSt 使用本地磁盘进行缓存和缓冲。缓存粒度是整个文件。默认启用缓存功能,除非将主要存储位置设置为本地。缓存有两种容量限制策略: @@ -147,4 +160,23 @@ ForSt 使用异步 I/O 来读取和写入状态。有 3 种类型的线程: - `state.backend.forst.executor.inline-write`: 写操作是否在分发线程内执行。默认值为 true。设置为 false 会提高 CPU 使用率。 - `state.backend.forst.executor.inline-coordinator`: 是否让任务线程(即主线程)作为分发线程。默认值为 true。设置为 false 会提高 CPU 使用率。 +{{< hint info >}} +`ForStStateBackend` 以 [ForSt](https://github.com/ververica/ForSt/) 作为其底层数据库核心。 +当前版本的 ForSt 是从 [frocksdb](https://github.com/ververica/frocksdb) fork 而来, +专为 Flink 本地状态管理设计的嵌入式数据库核心。 + +在将数据库向存算分离架构演进的过程中,我们在现有框架内遇到了重大的架构和工程挑战。 +为了解决这些挑战,社区成员正在开发基于 Rust 编写的下一代云原生 ForSt DB。 + +#### 新 ForSt DB 的关键优势: +- 架构简洁:为高扩展性而设计的精简代码库。 +- 流原生设计:专门针对大规模流处理的独特需求进行优化。 +- 云原生:从头设计以支持存算分离。 + +#### 路线图与维护: +- 发布计划:第一个稳定的开源版本预计于 2026 年下半年发布(乐观估计为 8 月)。 +- 弃用声明:随着我们将重心转移到新的基于 Rust 的实现,基于 frocksdb 的 ForSt 版本已不再活跃开发,并将在新版本发布后逐步退出(弃用)。 + +{{< /hint >}} + {{< top >}} diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md index 7b8a50f896add..af5f1ce57fee0 100644 --- a/docs/content.zh/docs/ops/state/state_backends.md +++ b/docs/content.zh/docs/ops/state/state_backends.md @@ -105,49 +105,44 @@ EmbeddedRocksDBStateBackend 是目前唯一支持增量 CheckPoint 的 State Bac -### The ForStStateBackend - -The *ForStStateBackend* is a state backend that is based on [ForSt project](https://github.com/ververica/ForSt), -which is also a LSM-tree structured key-value store and built on top of the RocksDB. -It is designed for disaggregated state management, for more details, see [here]({{< ref "docs/ops/state/disaggregated_state" >}}). -Most importantly, it can hold its sst files on remote file systems that Flink supports, such as HDFS, S3, etc. -This allows Flink to scale the state size beyond the local disk capacity of the TaskManager. -Moreover, by putting the sst files on remote file systems, it can also provide a more lightweight -way to perform checkpoint and recovery. - -The ForStStateBackend is still in the experimental stage and is not fully available for production. -It always performs asynchronous incremental snapshots. - -The ForStStateBackend is encouraged for: - -- Jobs with very large state, long windows, large key/value states. Local disk may not be enough to -store the state. -- All high-availability setups. -- Asynchronous state access is preferred. Since the ForStStateBackend is the only one supporting -asynchronous state access. -- Jobs that require lightweight checkpoint and recovery, such as cloud-native applications. - -Limitations of the ForStStateBackend (for now): - -- Same as EmbeddedRocksDBStateBackend, the maximum supported size per key and per value is 2^31 bytes each. -- Does not support canonical savepoint, full snapshot, changelog and file-merging checkpoints. -Always perform incremental snapshots. - -Compared with EmbeddedRocksDBStateBackend, ForStStateBackend stores data on remote file system, thus -the amount of state that you can keep is unlimited. The local disk of TaskManager is only used to -store cache of file, to provide better performance. Note that when most of the active state is on -remote file system, the performance of state access may be affected by the network latency. Flink -introduces asynchronous state access to mitigate this issue. If you are using the asynchronous state -methods in State API V2, you can benefit from the asynchronous state access. To get familiar with the -State API V2, please refer to the [State API V2 documentation]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}). +### ForStStateBackend + +*ForStStateBackend* 是基于 [ForSt 项目](https://github.com/ververica/ForSt) 的状态后端, +ForSt 同样是 LSM-tree 结构的 key-value 存储,构建于 RocksDB 之上。 +它专为存算分离状态管理而设计,更多详情请参阅[这里]({{< ref "docs/ops/state/disaggregated_state" >}})。 +最重要的是,它可以将 sst 文件存储在 Flink 支持的远端文件系统上,如 HDFS、S3 等。 +这使得 Flink 能够突破 TaskManager 本地磁盘空间的限制来扩展状态大小。 +此外,通过将 sst 文件存储在远端文件系统上,还可以提供更轻量级的 checkpoint 和恢复方式。 + +ForStStateBackend 目前仍处于实验阶段,目前不推荐用于生产环境。 +它始终以异步增量快照方式执行。 + +ForStStateBackend 的适用场景: + +- 状态非常大、窗口非常长、key/value 状态非常大的 Job。本地磁盘空间可能不足以存储这些状态。 +- 所有高可用的场景。 +- 优先选择异步状态访问的场景。因为 ForStStateBackend 是唯一支持异步状态访问的状态后端。 +- 需要轻量级 checkpoint 和恢复的 Job,例如云原生应用程序。 + +ForStStateBackend 目前的局限: + +- 与 EmbeddedRocksDBStateBackend 相同,每个 key 和 value 的最大支持大小均为 2^31 字节。 +- 不支持标准 savepoint、全量快照、changelog 和文件合并 checkpoint,始终执行增量快照。 + +与 EmbeddedRocksDBStateBackend 相比,ForStStateBackend 将数据存储在远端文件系统, +因此可以保留的状态大小是无限制的。TaskManager 的本地磁盘空间仅用于存储文件缓存,以提供更好的性能。 +请注意,当大部分活跃状态存储在远端文件系统时,状态访问性能可能受到网络延迟的影响。Flink 引入了 +异步状态访问来缓解这一问题。如果您使用 State API V2 中的异步状态方法,则可以受益于异步状态访问。 +要了解 State API V2,请参阅 [State API V2 文档]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}})。 ## 选择合适的 State Backend 在选择 `HashMapStateBackend` 和 `RocksDB` 的时候,其实就是在性能与可扩展性之间权衡。`HashMapStateBackend` 是非常快的,因为每个状态的读取和算子对于 objects 的更新都是在 Java 的 heap 上;但是状态的大小受限于集群中可用的内存。 -另一方面,`RocksDB` 可以根据可用的 disk 空间扩展,并且只有它支持增量 snapshot。 -然而,每个状态的读取和更新都需要(反)序列化,而且在 disk 上进行读操作的性能可能要比基于内存的 state backend 慢一个数量级。 +另一方面,`RocksDB` 可以根据可用的本地磁盘空间扩展,并且只有它支持增量 snapshot。 +然而,每个状态的读取和更新都需要(反)序列化,而且在磁盘上进行读操作的性能可能要比基于内存的状态后端慢一个数量级。 +如果您需要处理非常大的状态,甚至超过可用的本地磁盘空间,或者希望在云原生环境中快速伸缩,则应考虑使用 `ForStStateBackend`。 {{< hint info >}} 在 Flink 1.13 版本中我们统一了 savepoints 的二进制格式。这意味着你可以生成 savepoint 并且之后使用另一种 state backend 读取它。