diff --git a/src/wp-includes/collaboration/class-wp-sync-post-meta-storage.php b/src/wp-includes/collaboration/class-wp-sync-post-meta-storage.php index c605fa48699b7..c29b81d95560d 100644 --- a/src/wp-includes/collaboration/class-wp-sync-post-meta-storage.php +++ b/src/wp-includes/collaboration/class-wp-sync-post-meta-storage.php @@ -79,65 +79,32 @@ public function add_update( string $room, $update ): bool { return false; } - // Create an envelope and stamp each update to enable cursor-based filtering. - $envelope = array( - 'timestamp' => $this->get_time_marker(), - 'value' => $update, + $meta_id = $this->with_suspended_posts_last_changed_update( + fn() => add_post_meta( $post_id, self::SYNC_UPDATE_META_KEY, $update, false ) ); - return (bool) add_post_meta( $post_id, wp_slash( self::SYNC_UPDATE_META_KEY ), wp_slash( $envelope ), false ); - } - - /** - * Retrieves all sync updates for a given room. - * - * @since 7.0.0 - * - * @param string $room Room identifier. - * @return array Sync updates. - */ - private function get_all_updates( string $room ): array { - $this->room_cursors[ $room ] = $this->get_time_marker() - 100; // Small buffer to ensure consistency. - - $post_id = $this->get_storage_post_id( $room ); - if ( null === $post_id ) { - return array(); + if ( $meta_id ) { + wp_cache_delete( "sync_room_state_{$this->get_room_hash( $room )}", 'sync' ); } - $updates = get_post_meta( $post_id, self::SYNC_UPDATE_META_KEY, false ); - - if ( ! is_array( $updates ) ) { - $updates = array(); - } - - // Filter out any updates that don't have the expected structure. - $updates = array_filter( - $updates, - static function ( $update ): bool { - return is_array( $update ) && isset( $update['timestamp'], $update['value'] ) && is_int( $update['timestamp'] ); - } - ); - - $this->room_update_counts[ $room ] = count( $updates ); - - return $updates; + return (bool) $meta_id; } /** * Gets awareness state for a given room. * + * Awareness data is stored in a transient with a short TTL. Transients may + * be evicted at any time (cache flush, deploy, expiration), in which case + * this method returns an empty array and clients briefly appear offline + * until the next poll repopulates state (typically under one second). + * * @since 7.0.0 * * @param string $room Room identifier. * @return array Awareness state. */ public function get_awareness_state( string $room ): array { - $post_id = $this->get_storage_post_id( $room ); - if ( null === $post_id ) { - return array(); - } - - $awareness = get_post_meta( $post_id, self::AWARENESS_META_KEY, true ); + $awareness = get_transient( "sync_awareness_{$this->get_room_hash( $room )}" ); if ( ! is_array( $awareness ) ) { return array(); @@ -149,6 +116,11 @@ public function get_awareness_state( string $room ): array { /** * Sets awareness state for a given room. * + * Uses a transient rather than post meta to avoid database write churn, + * since awareness data (cursor positions, selections) is ephemeral and + * repopulated on every client poll. A cache eviction only causes a brief + * flicker rather than data loss. + * * @since 7.0.0 * * @param string $room Room identifier. @@ -156,22 +128,14 @@ public function get_awareness_state( string $room ): array { * @return bool True on success, false on failure. */ public function set_awareness_state( string $room, array $awareness ): bool { - $post_id = $this->get_storage_post_id( $room ); - if ( null === $post_id ) { - return false; - } - - // update_post_meta returns false if the value is the same as the existing value. - update_post_meta( $post_id, wp_slash( self::AWARENESS_META_KEY ), wp_slash( $awareness ) ); - return true; + return set_transient( "sync_awareness_{$this->get_room_hash( $room )}", $awareness, MINUTE_IN_SECONDS ); } /** * Gets the current cursor for a given room. * * The cursor is set during get_updates_after_cursor() and represents the - * point in time just before the updates were retrieved, with a small buffer - * to ensure consistency. + * highest meta_id seen for the room's sync updates. * * @since 7.0.0 * @@ -194,7 +158,7 @@ public function get_cursor( string $room ): int { * @return int|null Post ID. */ private function get_storage_post_id( string $room ): ?int { - $room_hash = md5( $room ); + $room_hash = $this->get_room_hash( $room ); if ( isset( self::$storage_post_ids[ $room_hash ] ) ) { return self::$storage_post_ids[ $room_hash ]; @@ -235,17 +199,6 @@ private function get_storage_post_id( string $room ): ?int { return null; } - /** - * Gets the current time in milliseconds as a comparable time marker. - * - * @since 7.0.0 - * - * @return int Current time in milliseconds. - */ - private function get_time_marker(): int { - return (int) floor( microtime( true ) * 1000 ); - } - /** * Gets the number of updates stored for a given room. * @@ -259,32 +212,83 @@ public function get_update_count( string $room ): int { } /** - * Retrieves sync updates from a room for a given client and cursor. Updates - * from the specified client should be excluded. + * Retrieves sync updates from a room after the given cursor. * * @since 7.0.0 * * @param string $room Room identifier. - * @param int $cursor Return updates after this cursor. + * @param int $cursor Return updates after this cursor (meta_id). * @return array Sync updates. */ public function get_updates_after_cursor( string $room, int $cursor ): array { - $all_updates = $this->get_all_updates( $room ); - $updates = array(); + global $wpdb; - foreach ( $all_updates as $update ) { - if ( $update['timestamp'] > $cursor ) { - $updates[] = $update; - } + $room_hash = $this->get_room_hash( $room ); + $state_cache_key = "sync_room_state_{$room_hash}"; + $cached = wp_cache_get( $state_cache_key, 'sync' ); + + if ( is_array( $cached ) && $cached['cursor'] <= $cursor ) { + $this->room_cursors[ $room ] = $cached['cursor']; + $this->room_update_counts[ $room ] = $cached['count']; + return array(); + } + + $post_id = $this->get_storage_post_id( $room ); + if ( null === $post_id ) { + $this->room_cursors[ $room ] = 0; + $this->room_update_counts[ $room ] = 0; + return array(); } - // Sort by timestamp to ensure order. - usort( - $updates, - fn ( $a, $b ) => $a['timestamp'] <=> $b['timestamp'] + // Capture the current room state first so the returned cursor is race-safe. + $stats = $wpdb->get_row( + $wpdb->prepare( + "SELECT COUNT(*) AS total_updates, COALESCE( MAX(meta_id), 0 ) AS max_meta_id FROM {$wpdb->postmeta} WHERE post_id = %d AND meta_key = %s", + $post_id, + self::SYNC_UPDATE_META_KEY + ) + ); + + $total_updates = $stats ? (int) $stats->total_updates : 0; + $max_meta_id = $stats ? (int) $stats->max_meta_id : 0; + + $this->room_update_counts[ $room ] = $total_updates; + $this->room_cursors[ $room ] = $max_meta_id; + + wp_cache_set( + $state_cache_key, + array( + 'cursor' => $max_meta_id, + 'count' => $total_updates, + ), + 'sync' ); - return wp_list_pluck( $updates, 'value' ); + if ( $max_meta_id <= $cursor ) { + return array(); + } + + $rows = $wpdb->get_results( + $wpdb->prepare( + "SELECT meta_value FROM {$wpdb->postmeta} WHERE post_id = %d AND meta_key = %s AND meta_id > %d AND meta_id <= %d ORDER BY meta_id ASC", + $post_id, + self::SYNC_UPDATE_META_KEY, + $cursor, + $max_meta_id + ) + ); + + if ( ! $rows ) { + return array(); + } + + $updates = array(); + foreach ( $rows as $row ) { + $update = maybe_unserialize( $row->meta_value ); + $updates[] = $update; + } + + return $updates; } /** @@ -293,30 +297,80 @@ public function get_updates_after_cursor( string $room, int $cursor ): array { * @since 7.0.0 * * @param string $room Room identifier. - * @param int $cursor Remove updates with markers < this cursor. + * @param int $cursor Remove updates with meta_id < this cursor. * @return bool True on success, false on failure. */ public function remove_updates_before_cursor( string $room, int $cursor ): bool { + global $wpdb; + $post_id = $this->get_storage_post_id( $room ); if ( null === $post_id ) { return false; } - $all_updates = $this->get_all_updates( $room ); + $deleted_rows = $wpdb->query( + $wpdb->prepare( + "DELETE FROM {$wpdb->postmeta} WHERE post_id = %d AND meta_key = %s AND meta_id < %d", + $post_id, + self::SYNC_UPDATE_META_KEY, + $cursor + ) + ); - // Remove all updates for the room and re-store only those that are newer than the cursor. - if ( ! delete_post_meta( $post_id, wp_slash( self::SYNC_UPDATE_META_KEY ) ) ) { + if ( false === $deleted_rows ) { return false; } - // Re-store envelopes directly to avoid double-wrapping by add_update(). - $add_result = true; - foreach ( $all_updates as $envelope ) { - if ( $add_result && $envelope['timestamp'] >= $cursor ) { - $add_result = (bool) add_post_meta( $post_id, self::SYNC_UPDATE_META_KEY, $envelope, false ); + if ( $deleted_rows > 0 ) { + wp_cache_delete( $post_id, 'post_meta' ); + wp_cache_delete( "sync_room_state_{$this->get_room_hash( $room )}", 'sync' ); + } + + return true; + } + + /** + * Invokes the provided callback while the suspending setting the posts last_changed cache key. + * + * @since 7.0.0 + * @see wp_cache_set_posts_last_changed() + * + * @template T + * @param Closure(): T $callback Callback. + * @return T Return value from the callback. + */ + private function with_suspended_posts_last_changed_update( Closure $callback ) { + $priorities = array( + 'added_post_meta' => has_action( 'added_post_meta', 'wp_cache_set_posts_last_changed' ), + 'updated_post_meta' => has_action( 'updated_post_meta', 'wp_cache_set_posts_last_changed' ), + 'deleted_post_meta' => has_action( 'deleted_post_meta', 'wp_cache_set_posts_last_changed' ), + ); + foreach ( $priorities as $action => $priority ) { + if ( false !== $priority ) { + remove_action( $action, 'wp_cache_set_posts_last_changed', $priority ); + } + } + $return_value = $callback(); + foreach ( $priorities as $action => $priority ) { + if ( false !== $priority ) { + add_action( $action, 'wp_cache_set_posts_last_changed', $priority ); } } + return $return_value; + } - return $add_result; + /** + * Returns a hash of the room identifier. + * + * Used as the post slug for storage posts, as a segment in cache keys, + * and as part of transient names. + * + * @since 7.0.0 + * + * @param string $room Room identifier. + * @return string MD5 hash of the room. + */ + private function get_room_hash( string $room ): string { + return md5( $room ); } } diff --git a/tests/phpunit/tests/rest-api/rest-sync-server.php b/tests/phpunit/tests/rest-api/rest-sync-server.php index a09b256115f48..c1124ba214155 100644 --- a/tests/phpunit/tests/rest-api/rest-sync-server.php +++ b/tests/phpunit/tests/rest-api/rest-sync-server.php @@ -308,7 +308,7 @@ public function test_sync_end_cursor_is_positive_integer() { $data = $response->get_data(); $this->assertIsInt( $data['rooms'][0]['end_cursor'] ); - $this->assertGreaterThan( 0, $data['rooms'][0]['end_cursor'] ); + $this->assertGreaterThanOrEqual( 0, $data['rooms'][0]['end_cursor'] ); } public function test_sync_empty_updates_returns_zero_total() { @@ -539,6 +539,162 @@ public function test_sync_total_updates_increments() { $this->assertSame( 3, $data['rooms'][0]['total_updates'] ); } + public function test_sync_cursor_does_not_skip_update_inserted_during_fetch_window() { + global $wpdb; + + wp_set_current_user( self::$editor_id ); + + $room = $this->get_post_room(); + $storage = new WP_Sync_Post_Meta_Storage(); + + $seed_update = array( + 'client_id' => 1, + 'type' => 'update', + 'data' => 'c2VlZA==', + ); + + $this->assertTrue( $storage->add_update( $room, $seed_update ) ); + + $initial_updates = $storage->get_updates_after_cursor( $room, 0 ); + $baseline_cursor = $storage->get_cursor( $room ); + + $this->assertCount( 1, $initial_updates ); + $this->assertSame( $seed_update, $initial_updates[0] ); + $this->assertGreaterThan( 0, $baseline_cursor ); + + $storage_posts = get_posts( + array( + 'post_type' => WP_Sync_Post_Meta_Storage::POST_TYPE, + 'posts_per_page' => 1, + 'post_status' => 'publish', + 'name' => md5( $room ), + 'fields' => 'ids', + ) + ); + $storage_post_id = array_first( $storage_posts ); + + $this->assertIsInt( $storage_post_id ); + + $injected_update = array( + 'client_id' => 9999, + 'type' => 'update', + 'data' => base64_encode( 'injected-during-fetch' ), + ); + + // Clear the room state cache so the stats query actually executes + // and the proxy can intercept it to simulate the race condition. + wp_cache_delete( 'sync_room_state_' . md5( $room ), 'sync' ); + + $original_wpdb = $wpdb; + $proxy_wpdb = new class( $original_wpdb, $storage_post_id, $injected_update ) { + private $wpdb; + private $storage_post_id; + private $injected_update; + public $postmeta; + public $did_inject = false; + + public function __construct( $wpdb, int $storage_post_id, array $injected_update ) { + $this->wpdb = $wpdb; + $this->storage_post_id = $storage_post_id; + $this->injected_update = $injected_update; + $this->postmeta = $wpdb->postmeta; + } + + // phpcs:disable WordPress.DB.PreparedSQL.NotPrepared -- Proxy forwards fully prepared core queries. + public function prepare( ...$args ) { + return $this->wpdb->prepare( ...$args ); + } + + public function get_row( $query = null, $output = OBJECT, $y = 0 ) { + $result = $this->wpdb->get_row( $query, $output, $y ); + + $this->maybe_inject_after_sync_query( $query ); + + return $result; + } + + public function get_var( $query = null, $x = 0, $y = 0 ) { + $result = $this->wpdb->get_var( $query, $x, $y ); + + $this->maybe_inject_after_sync_query( $query ); + + return $result; + } + + public function get_results( $query = null, $output = OBJECT ) { + return $this->wpdb->get_results( $query, $output ); + } + // phpcs:enable WordPress.DB.PreparedSQL.NotPrepared + + public function __call( $name, $arguments ) { + return $this->wpdb->$name( ...$arguments ); + } + + public function __get( $name ) { + return $this->wpdb->$name; + } + + public function __set( $name, $value ) { + $this->wpdb->$name = $value; + } + + private function inject_update(): void { + if ( $this->did_inject ) { + return; + } + + $this->did_inject = true; + + add_post_meta( + $this->storage_post_id, + WP_Sync_Post_Meta_Storage::SYNC_UPDATE_META_KEY, + $this->injected_update, + false + ); + } + + private function maybe_inject_after_sync_query( $query ): void { + if ( $this->did_inject || ! is_string( $query ) ) { + return; + } + + $targets_postmeta = false !== strpos( $query, $this->postmeta ); + $targets_post_id = 1 === preg_match( '/\bpost_id\s*=\s*' . (int) $this->storage_post_id . '\b/', $query ); + $targets_meta_key = 1 === preg_match( + "/\bmeta_key\s*=\s*'" . preg_quote( WP_Sync_Post_Meta_Storage::SYNC_UPDATE_META_KEY, '/' ) . "'/", + $query + ); + + if ( $targets_postmeta && $targets_post_id && $targets_meta_key ) { + $this->inject_update(); + } + } + }; + + $wpdb = $proxy_wpdb; + try { + $race_updates = $storage->get_updates_after_cursor( $room, $baseline_cursor ); + $race_cursor = $storage->get_cursor( $room ); + } finally { + $wpdb = $original_wpdb; + } + + $this->assertTrue( $proxy_wpdb->did_inject, 'Expected race-window update injection to occur.' ); + $this->assertEmpty( $race_updates ); + $this->assertSame( $baseline_cursor, $race_cursor ); + + // Clear the room state cache since the injected update bypassed + // add_update() and its cache invalidation. + wp_cache_delete( 'sync_room_state_' . md5( $room ), 'sync' ); + + $follow_up_updates = $storage->get_updates_after_cursor( $room, $race_cursor ); + $follow_up_cursor = $storage->get_cursor( $room ); + + $this->assertCount( 1, $follow_up_updates ); + $this->assertSame( $injected_update, $follow_up_updates[0] ); + $this->assertGreaterThan( $race_cursor, $follow_up_cursor ); + } + /* * Compaction tests. */ @@ -623,6 +779,136 @@ public function test_sync_should_compact_is_false_for_non_compactor() { $this->assertFalse( $data['rooms'][0]['should_compact'] ); } + public function test_sync_compaction_does_not_delete_update_inserted_during_delete() { + global $wpdb; + + wp_set_current_user( self::$editor_id ); + + $room = $this->get_post_room(); + $storage = new WP_Sync_Post_Meta_Storage(); + + // Seed three updates so there's something to compact. + for ( $i = 1; $i <= 3; $i++ ) { + $this->assertTrue( + $storage->add_update( + $room, + array( + 'client_id' => $i, + 'type' => 'update', + 'data' => base64_encode( "seed-$i" ), + ) + ) + ); + } + + // Capture the cursor after all seeds are in place. + $storage->get_updates_after_cursor( $room, 0 ); + $compaction_cursor = $storage->get_cursor( $room ); + $this->assertGreaterThan( 0, $compaction_cursor ); + + $storage_posts = get_posts( + array( + 'post_type' => WP_Sync_Post_Meta_Storage::POST_TYPE, + 'posts_per_page' => 1, + 'post_status' => 'publish', + 'name' => md5( $room ), + 'fields' => 'ids', + ) + ); + $storage_post_id = array_first( $storage_posts ); + $this->assertIsInt( $storage_post_id ); + + $concurrent_update = array( + 'client_id' => 9999, + 'type' => 'update', + 'data' => base64_encode( 'arrived-during-compaction' ), + ); + + $original_wpdb = $wpdb; + $proxy_wpdb = new class( $original_wpdb, $storage_post_id, $concurrent_update ) { + private $wpdb; + private $storage_post_id; + private $concurrent_update; + public $did_inject = false; + + public function __construct( $wpdb, int $storage_post_id, array $concurrent_update ) { + $this->wpdb = $wpdb; + $this->storage_post_id = $storage_post_id; + $this->concurrent_update = $concurrent_update; + } + + // phpcs:disable WordPress.DB.PreparedSQL.NotPrepared -- Proxy forwards fully prepared core queries. + public function prepare( ...$args ) { + return $this->wpdb->prepare( ...$args ); + } + + public function query( $query ) { + $result = $this->wpdb->query( $query ); + + // After the DELETE executes, inject a concurrent update via + // raw SQL through the real $wpdb to avoid metadata cache + // interactions while the proxy is active. + if ( ! $this->did_inject + && is_string( $query ) + && 0 === strpos( $query, "DELETE FROM {$this->wpdb->postmeta}" ) + && false !== strpos( $query, "post_id = {$this->storage_post_id}" ) + ) { + $this->did_inject = true; + $this->wpdb->insert( + $this->wpdb->postmeta, + array( + 'post_id' => $this->storage_post_id, + 'meta_key' => WP_Sync_Post_Meta_Storage::SYNC_UPDATE_META_KEY, + 'meta_value' => maybe_serialize( $this->concurrent_update ), + ), + array( '%d', '%s', '%s' ) + ); + } + + return $result; + } + // phpcs:enable WordPress.DB.PreparedSQL.NotPrepared + + public function __call( $name, $arguments ) { + return $this->wpdb->$name( ...$arguments ); + } + + public function __get( $name ) { + return $this->wpdb->$name; + } + + public function __set( $name, $value ) { + $this->wpdb->$name = $value; + } + }; + + // Run compaction through the proxy so the concurrent update + // is injected immediately after the DELETE executes. + $wpdb = $proxy_wpdb; + try { + $result = $storage->remove_updates_before_cursor( $room, $compaction_cursor ); + } finally { + $wpdb = $original_wpdb; + } + + $this->assertTrue( $result ); + $this->assertTrue( $proxy_wpdb->did_inject, 'Expected concurrent update injection to occur.' ); + + // Clear caches since the injection bypassed add_update(). + wp_cache_delete( $storage_post_id, 'post_meta' ); + wp_cache_delete( 'sync_room_state_' . md5( $room ), 'sync' ); + + // The concurrent update must survive the compaction delete. + $updates = $storage->get_updates_after_cursor( $room, 0 ); + + $update_data = wp_list_pluck( $updates, 'data' ); + $this->assertContains( + $concurrent_update['data'], + $update_data, + 'Concurrent update should survive compaction.' + ); + } + /* * Awareness tests. */