diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index 63b055388fdbe..185dfb6b08006 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::sync::{Arc, Weak}; use crate::object_storage::{AwsOptions, GcpOptions, get_object_store}; @@ -50,10 +49,6 @@ impl DynamicObjectStoreCatalog { } impl CatalogProviderList for DynamicObjectStoreCatalog { - fn as_any(&self) -> &dyn Any { - self - } - fn register_catalog( &self, name: String, @@ -91,10 +86,6 @@ impl DynamicObjectStoreCatalogProvider { } impl CatalogProvider for DynamicObjectStoreCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { self.inner.schema_names() } @@ -134,10 +125,6 @@ impl DynamicObjectStoreSchemaProvider { #[async_trait] impl SchemaProvider for DynamicObjectStoreSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { self.inner.table_names() } diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index ae890f7c95172..26f007cdd3193 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -229,10 +229,6 @@ struct ParquetMetadataTable { #[async_trait] impl TableProvider for ParquetMetadataTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -479,10 +475,6 @@ struct MetadataCacheTable { #[async_trait] impl TableProvider for MetadataCacheTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -598,10 +590,6 @@ struct StatisticsCacheTable { #[async_trait] impl TableProvider for StatisticsCacheTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -734,10 +722,6 @@ struct ListFilesCacheTable { #[async_trait] impl TableProvider for ListFilesCacheTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 0b441b9d7c8d2..701a886d2a140 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -17,7 +17,6 @@ //! See `main.rs` for how to run it. -use std::any::Any; use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Debug, Formatter}; use std::sync::{Arc, Mutex}; @@ -162,10 +161,6 @@ impl Default for CustomDataSource { #[async_trait] impl TableProvider for CustomDataSource { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { SchemaRef::new(Schema::new(vec![ Field::new("id", DataType::UInt8, false), diff --git a/datafusion-examples/examples/custom_data_source/default_column_values.rs b/datafusion-examples/examples/custom_data_source/default_column_values.rs index 40c8836c1f822..633b98244367e 100644 --- a/datafusion-examples/examples/custom_data_source/default_column_values.rs +++ b/datafusion-examples/examples/custom_data_source/default_column_values.rs @@ -17,7 +17,6 @@ //! See `main.rs` for how to run it. -use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -201,10 +200,6 @@ impl DefaultValueTableProvider { #[async_trait] impl TableProvider for DefaultValueTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion-examples/examples/data_io/catalog.rs b/datafusion-examples/examples/data_io/catalog.rs index 9781a93374ea6..7e5cc5a4cfc05 100644 --- a/datafusion-examples/examples/data_io/catalog.rs +++ b/datafusion-examples/examples/data_io/catalog.rs @@ -32,7 +32,7 @@ use datafusion::{ prelude::SessionContext, }; use std::sync::RwLock; -use std::{any::Any, collections::HashMap, path::Path, sync::Arc}; +use std::{collections::HashMap, path::Path, sync::Arc}; use std::{fs::File, io::Write}; use tempfile::TempDir; @@ -178,10 +178,6 @@ impl DirSchema { #[async_trait] impl SchemaProvider for DirSchema { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { let tables = self.tables.read().unwrap(); tables.keys().cloned().collect::>() @@ -231,10 +227,6 @@ impl DirCatalog { } impl CatalogProvider for DirCatalog { - fn as_any(&self) -> &dyn Any { - self - } - fn register_schema( &self, name: &str, @@ -277,10 +269,6 @@ impl CustomCatalogProviderList { } impl CatalogProviderList for CustomCatalogProviderList { - fn as_any(&self) -> &dyn Any { - self - } - fn register_catalog( &self, name: String, diff --git a/datafusion-examples/examples/data_io/parquet_advanced_index.rs b/datafusion-examples/examples/data_io/parquet_advanced_index.rs index f02b01354b784..9e69c7f15a841 100644 --- a/datafusion-examples/examples/data_io/parquet_advanced_index.rs +++ b/datafusion-examples/examples/data_io/parquet_advanced_index.rs @@ -17,7 +17,6 @@ //! See `main.rs` for how to run it. -use std::any::Any; use std::collections::{HashMap, HashSet}; use std::fs::File; use std::ops::Range; @@ -451,10 +450,6 @@ impl IndexedFile { /// so that we can query it as a table. #[async_trait] impl TableProvider for IndexTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.indexed_file.schema) } diff --git a/datafusion-examples/examples/data_io/parquet_embedded_index.rs b/datafusion-examples/examples/data_io/parquet_embedded_index.rs index bcaca2ed5c85b..40b5b468ff5bf 100644 --- a/datafusion-examples/examples/data_io/parquet_embedded_index.rs +++ b/datafusion-examples/examples/data_io/parquet_embedded_index.rs @@ -393,9 +393,6 @@ fn get_key_value<'a>(file_meta_data: &'a FileMetaData, key: &'_ str) -> Option<& /// Implement TableProvider for DistinctIndexTable, using the distinct index to prune files #[async_trait] impl TableProvider for DistinctIndexTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion-examples/examples/data_io/parquet_index.rs b/datafusion-examples/examples/data_io/parquet_index.rs index 515dad7a51e17..9be84d8249342 100644 --- a/datafusion-examples/examples/data_io/parquet_index.rs +++ b/datafusion-examples/examples/data_io/parquet_index.rs @@ -45,7 +45,6 @@ use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; -use std::any::Any; use std::collections::HashSet; use std::fmt::Display; use std::fs; @@ -208,10 +207,6 @@ impl IndexTableProvider { #[async_trait] impl TableProvider for IndexTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { self.index.schema().clone() } diff --git a/datafusion-examples/examples/data_io/remote_catalog.rs b/datafusion-examples/examples/data_io/remote_catalog.rs index 10ec26b1d5c05..16814752b3ec2 100644 --- a/datafusion-examples/examples/data_io/remote_catalog.rs +++ b/datafusion-examples/examples/data_io/remote_catalog.rs @@ -45,7 +45,6 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::prelude::{DataFrame, SessionContext}; use futures::TryStreamExt; -use std::any::Any; use std::sync::Arc; /// Interfacing with a remote catalog (e.g. over a network) @@ -224,10 +223,6 @@ impl RemoteTable { /// Implement the DataFusion Catalog API for [`RemoteTable`] #[async_trait] impl TableProvider for RemoteTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion-examples/examples/udf/simple_udtf.rs b/datafusion-examples/examples/udf/simple_udtf.rs index b00a06d301939..af123ab7e5d4a 100644 --- a/datafusion-examples/examples/udf/simple_udtf.rs +++ b/datafusion-examples/examples/udf/simple_udtf.rs @@ -85,10 +85,6 @@ struct LocalCsvTable { #[async_trait] impl TableProvider for LocalCsvTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index cc526bd80d44b..06ba8c8113fac 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -47,7 +47,6 @@ use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; use futures::{Stream, StreamExt, TryStreamExt, future, stream}; use object_store::ObjectStore; -use std::any::Any; use std::collections::HashMap; use std::sync::Arc; @@ -449,10 +448,6 @@ fn can_be_evaluated_for_partition_pruning( #[async_trait] impl TableProvider for ListingTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.table_schema) } diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index 1b8039d828fdb..87b7b7c3431a1 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -37,10 +37,6 @@ impl SchemaProvider for ResolvedSchemaProvider { self.owner_name.as_deref() } - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn table_names(&self) -> Vec { self.cached_tables.keys().cloned().collect() } @@ -115,10 +111,6 @@ struct ResolvedCatalogProvider { cached_schemas: HashMap>, } impl CatalogProvider for ResolvedCatalogProvider { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema_names(&self) -> Vec { self.cached_schemas.keys().cloned().collect() } @@ -160,10 +152,6 @@ struct ResolvedCatalogProviderList { cached_catalogs: HashMap>, } impl CatalogProviderList for ResolvedCatalogProviderList { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn register_catalog( &self, _name: String, @@ -424,12 +412,9 @@ pub trait AsyncCatalogProviderList: Send + Sync { #[cfg(test)] mod tests { - use std::{ - any::Any, - sync::{ - Arc, - atomic::{AtomicU32, Ordering}, - }, + use std::sync::{ + Arc, + atomic::{AtomicU32, Ordering}, }; use arrow::datatypes::SchemaRef; @@ -447,10 +432,6 @@ mod tests { struct MockTableProvider {} #[async_trait] impl TableProvider for MockTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef { unimplemented!() diff --git a/datafusion/catalog/src/catalog.rs b/datafusion/catalog/src/catalog.rs index bb9e89eba2fef..34cdf74440cb3 100644 --- a/datafusion/catalog/src/catalog.rs +++ b/datafusion/catalog/src/catalog.rs @@ -105,11 +105,7 @@ use datafusion_common::not_impl_err; /// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123 /// /// [`TableProvider`]: crate::TableProvider -pub trait CatalogProvider: Debug + Sync + Send { - /// Returns the catalog provider as [`Any`] - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - +pub trait CatalogProvider: Any + Debug + Sync + Send { /// Retrieves the list of available schema names in this catalog. fn schema_names(&self) -> Vec; @@ -152,15 +148,31 @@ pub trait CatalogProvider: Debug + Sync + Send { } } +impl dyn CatalogProvider { + /// Returns `true` if the catalog provider is of type `T`. + /// + /// Prefer this over `downcast_ref::().is_some()`. Works correctly when + /// called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast this catalog provider to a concrete type `T`, + /// returning `None` if the provider is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + /// Represent a list of named [`CatalogProvider`]s. /// /// Please see the documentation on [`CatalogProvider`] for details of /// implementing a custom catalog. -pub trait CatalogProviderList: Debug + Sync + Send { - /// Returns the catalog list as [`Any`] - /// so that it can be downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - +pub trait CatalogProviderList: Any + Debug + Sync + Send { /// Adds a new catalog to this catalog list /// If a catalog of the same name existed before, it is replaced in the list and returned. fn register_catalog( @@ -175,3 +187,23 @@ pub trait CatalogProviderList: Debug + Sync + Send { /// Retrieves a specific catalog by name, provided it exists. fn catalog(&self, name: &str) -> Option>; } + +impl dyn CatalogProviderList { + /// Returns `true` if the catalog provider list is of type `T`. + /// + /// Prefer this over `downcast_ref::().is_some()`. Works correctly when + /// called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast this catalog provider list to a concrete type `T`, + /// returning `None` if the provider list is not of that type. + /// + /// Works correctly when called on `Arc` via + /// auto-deref, unlike `(&arc as &dyn Any).downcast_ref::()` which would + /// attempt to downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} diff --git a/datafusion/catalog/src/cte_worktable.rs b/datafusion/catalog/src/cte_worktable.rs index 9565dcc60141e..dd313ebb4cbff 100644 --- a/datafusion/catalog/src/cte_worktable.rs +++ b/datafusion/catalog/src/cte_worktable.rs @@ -17,7 +17,6 @@ //! CteWorkTable implementation used for recursive queries -use std::any::Any; use std::borrow::Cow; use std::sync::Arc; @@ -65,10 +64,6 @@ impl CteWorkTable { #[async_trait] impl TableProvider for CteWorkTable { - fn as_any(&self) -> &dyn Any { - self - } - fn get_logical_plan(&'_ self) -> Option> { None } diff --git a/datafusion/catalog/src/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs index fb6531ba0b2ee..a06fcdbe0bcc4 100644 --- a/datafusion/catalog/src/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -117,10 +117,6 @@ fn preserves_table_type() { #[async_trait] impl TableProvider for TestTempTable { - fn as_any(&self) -> &dyn Any { - self - } - fn table_type(&self) -> TableType { TableType::Temporary } diff --git a/datafusion/catalog/src/dynamic_file/catalog.rs b/datafusion/catalog/src/dynamic_file/catalog.rs index ccccb9762eb4c..f93bd35cd7f0a 100644 --- a/datafusion/catalog/src/dynamic_file/catalog.rs +++ b/datafusion/catalog/src/dynamic_file/catalog.rs @@ -19,7 +19,6 @@ use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; use async_trait::async_trait; -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -42,10 +41,6 @@ impl DynamicFileCatalog { } impl CatalogProviderList for DynamicFileCatalog { - fn as_any(&self) -> &dyn Any { - self - } - fn register_catalog( &self, name: String, @@ -87,10 +82,6 @@ impl DynamicFileCatalogProvider { } impl CatalogProvider for DynamicFileCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { self.inner.schema_names() } @@ -137,10 +128,6 @@ impl DynamicFileSchemaProvider { #[async_trait] impl SchemaProvider for DynamicFileSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { self.inner.table_names() } diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index ea93dc21a3f5b..34c677c3dd43e 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -44,7 +44,7 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::streaming::PartitionStream; use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::Debug; -use std::{any::Any, sync::Arc}; +use std::sync::Arc; pub const INFORMATION_SCHEMA: &str = "information_schema"; pub(crate) const TABLES: &str = "tables"; @@ -532,10 +532,6 @@ fn remove_native_type_prefix(native_type: &NativeType) -> String { #[async_trait] impl SchemaProvider for InformationSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { INFORMATION_SCHEMA_TABLES .iter() @@ -1454,10 +1450,6 @@ mod tests { ) } - fn as_any(&self) -> &dyn Any { - unimplemented!("not required for these tests") - } - fn table_names(&self) -> Vec { vec!["atable".to_string()] } @@ -1468,10 +1460,6 @@ mod tests { } impl CatalogProviderList for Fixture { - fn as_any(&self) -> &dyn Any { - unimplemented!("not required for these tests") - } - fn register_catalog( &self, _: String, @@ -1490,10 +1478,6 @@ mod tests { } impl CatalogProvider for Fixture { - fn as_any(&self) -> &dyn Any { - unimplemented!("not required for these tests") - } - fn schema_names(&self) -> Vec { vec!["aschema".to_string()] } diff --git a/datafusion/catalog/src/listing_schema.rs b/datafusion/catalog/src/listing_schema.rs index 77fbea8577089..d38fe659aaa97 100644 --- a/datafusion/catalog/src/listing_schema.rs +++ b/datafusion/catalog/src/listing_schema.rs @@ -17,7 +17,6 @@ //! [`ListingSchemaProvider`]: [`SchemaProvider`] that scans ObjectStores for tables automatically -use std::any::Any; use std::collections::HashSet; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -146,10 +145,6 @@ impl ListingSchemaProvider { #[async_trait] impl SchemaProvider for ListingSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { self.tables .lock() diff --git a/datafusion/catalog/src/memory/catalog.rs b/datafusion/catalog/src/memory/catalog.rs index b71888c54e9d6..ebe6b9dfa0ebc 100644 --- a/datafusion/catalog/src/memory/catalog.rs +++ b/datafusion/catalog/src/memory/catalog.rs @@ -21,7 +21,6 @@ use crate::{CatalogProvider, CatalogProviderList, SchemaProvider}; use dashmap::DashMap; use datafusion_common::exec_err; -use std::any::Any; use std::sync::Arc; /// Simple in-memory list of catalogs @@ -47,10 +46,6 @@ impl Default for MemoryCatalogProviderList { } impl CatalogProviderList for MemoryCatalogProviderList { - fn as_any(&self) -> &dyn Any { - self - } - fn register_catalog( &self, name: String, @@ -90,10 +85,6 @@ impl Default for MemoryCatalogProvider { } impl CatalogProvider for MemoryCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { self.schemas.iter().map(|s| s.key().clone()).collect() } diff --git a/datafusion/catalog/src/memory/schema.rs b/datafusion/catalog/src/memory/schema.rs index 97a579b021617..46b0beb440613 100644 --- a/datafusion/catalog/src/memory/schema.rs +++ b/datafusion/catalog/src/memory/schema.rs @@ -21,7 +21,6 @@ use crate::{SchemaProvider, TableProvider}; use async_trait::async_trait; use dashmap::DashMap; use datafusion_common::{DataFusionError, exec_err}; -use std::any::Any; use std::sync::Arc; /// Simple in-memory implementation of a schema. @@ -47,10 +46,6 @@ impl Default for MemorySchemaProvider { #[async_trait] impl SchemaProvider for MemorySchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { self.tables .iter() diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index a0a34a8f6d6ad..8102c15079658 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -17,7 +17,6 @@ //! [`MemTable`] for querying `Vec` by DataFusion. -use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -214,10 +213,6 @@ impl MemTable { #[async_trait] impl TableProvider for MemTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/catalog/src/schema.rs b/datafusion/catalog/src/schema.rs index c6299582813b4..d99027593ccce 100644 --- a/datafusion/catalog/src/schema.rs +++ b/datafusion/catalog/src/schema.rs @@ -34,17 +34,13 @@ use datafusion_expr::TableType; /// /// [`CatalogProvider`]: super::CatalogProvider #[async_trait] -pub trait SchemaProvider: Debug + Sync + Send { +pub trait SchemaProvider: Any + Debug + Sync + Send { /// Returns the owner of the Schema, default is None. This value is reported /// as part of `information_tables.schemata fn owner_name(&self) -> Option<&str> { None } - /// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a - /// specific implementation. - fn as_any(&self) -> &dyn Any; - /// Retrieves the list of available table names in this schema. fn table_names(&self) -> Vec; @@ -89,3 +85,23 @@ pub trait SchemaProvider: Debug + Sync + Send { /// Returns true if table exist in the schema provider, false otherwise. fn table_exist(&self, name: &str) -> bool; } + +impl dyn SchemaProvider { + /// Returns `true` if the schema provider is of type `T`. + /// + /// Prefer this over `downcast_ref::().is_some()`. Works correctly when + /// called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast this schema provider to a concrete type `T`, + /// returning `None` if the provider is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs index bdd72a1b1d70b..c58ff939eeab6 100644 --- a/datafusion/catalog/src/stream.rs +++ b/datafusion/catalog/src/stream.rs @@ -303,10 +303,6 @@ impl StreamTable { #[async_trait] impl TableProvider for StreamTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(self.0.source.schema()) } diff --git a/datafusion/catalog/src/streaming.rs b/datafusion/catalog/src/streaming.rs index db9596b420b7b..e609877c2b778 100644 --- a/datafusion/catalog/src/streaming.rs +++ b/datafusion/catalog/src/streaming.rs @@ -17,7 +17,6 @@ //! A simplified [`TableProvider`] for streaming partitioned datasets -use std::any::Any; use std::sync::Arc; use arrow::datatypes::SchemaRef; @@ -81,10 +80,6 @@ impl StreamingTable { #[async_trait] impl TableProvider for StreamingTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 4d939b883f28e..5d1391bed1172 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -48,11 +48,7 @@ use datafusion_physical_plan::ExecutionPlan; /// [`RecordBatch`]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html /// [`CatalogProvider`]: super::CatalogProvider #[async_trait] -pub trait TableProvider: Debug + Sync + Send { - /// Returns the table provider as [`Any`] so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - +pub trait TableProvider: Any + Debug + Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; @@ -268,7 +264,6 @@ pub trait TableProvider: Debug + Sync + Send { /// /// #[async_trait] /// impl TableProvider for TestDataSource { - /// # fn as_any(&self) -> &dyn Any { todo!() } /// # fn schema(&self) -> SchemaRef { todo!() } /// # fn table_type(&self) -> TableType { todo!() } /// # async fn scan(&self, s: &dyn Session, p: Option<&Vec>, f: &[Expr], l: Option) -> Result> { @@ -385,6 +380,26 @@ pub trait TableProvider: Debug + Sync + Send { } } +impl dyn TableProvider { + /// Returns `true` if the table provider is of type `T`. + /// + /// Prefer this over `downcast_ref::().is_some()`. Works correctly when + /// called on `Arc` via auto-deref. + pub fn is(&self) -> bool { + (self as &dyn Any).is::() + } + + /// Attempts to downcast this table provider to a concrete type `T`, + /// returning `None` if the provider is not of that type. + /// + /// Works correctly when called on `Arc` via auto-deref, + /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to + /// downcast the `Arc` itself. + pub fn downcast_ref(&self) -> Option<&T> { + (self as &dyn Any).downcast_ref() + } +} + /// Arguments for scanning a table with [`TableProvider::scan_with_args`]. #[derive(Debug, Clone, Default)] pub struct ScanArgs<'a> { diff --git a/datafusion/catalog/src/view.rs b/datafusion/catalog/src/view.rs index 54c54431a5913..45084e65f23f2 100644 --- a/datafusion/catalog/src/view.rs +++ b/datafusion/catalog/src/view.rs @@ -17,7 +17,7 @@ //! View data source which uses a LogicalPlan as it's input. -use std::{any::Any, borrow::Cow, sync::Arc}; +use std::{borrow::Cow, sync::Arc}; use crate::Session; use crate::TableProvider; @@ -83,10 +83,6 @@ impl ViewTable { #[async_trait] impl TableProvider for ViewTable { - fn as_any(&self) -> &dyn Any { - self - } - fn get_logical_plan(&'_ self) -> Option> { Some(Cow::Borrowed(&self.logical_plan)) } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 17dd22fef3b31..0f38988c69405 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -41,7 +41,6 @@ use crate::physical_plan::{ execute_stream, execute_stream_partitioned, }; use crate::prelude::SessionContext; -use std::any::Any; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -2648,10 +2647,6 @@ struct DataFrameTableProvider { #[async_trait] impl TableProvider for DataFrameTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn get_logical_plan(&self) -> Option> { Some(Cow::Borrowed(&self.plan)) } diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5aeca92b1626d..ef7f29d205f8e 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -17,7 +17,6 @@ //! [`EmptyTable`] useful for testing. -use std::any::Any; use std::sync::Arc; use arrow::datatypes::*; @@ -57,10 +56,6 @@ impl EmptyTable { #[async_trait] impl TableProvider for EmptyTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 012d7f0e00267..d1ee301d91327 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -264,10 +264,7 @@ mod tests { .with_options(HashMap::from([("format.has_header".into(), "true".into())])) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); let listing_options = listing_table.options(); assert_eq!(".tbl", listing_options.file_extension); } @@ -297,10 +294,7 @@ mod tests { .with_options(options) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); let format = listing_table.options().format.clone(); let csv_format = format.as_any().downcast_ref::().unwrap(); @@ -334,10 +328,7 @@ mod tests { .with_options(options) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); // Verify compression is used let format = listing_table.options().format.clone(); @@ -378,10 +369,7 @@ mod tests { .with_options(options) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); let listing_options = listing_table.options(); assert_eq!("", listing_options.file_extension); @@ -413,10 +401,7 @@ mod tests { ) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); let listing_options = listing_table.options(); assert_eq!("", listing_options.file_extension); @@ -444,10 +429,7 @@ mod tests { ) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); let listing_options = listing_table.options(); let dtype = @@ -476,10 +458,7 @@ mod tests { ) .build(); let table_provider = factory.create(&state, &cmd).await.unwrap(); - let listing_table = table_provider - .as_any() - .downcast_ref::() - .unwrap(); + let listing_table = table_provider.downcast_ref::().unwrap(); let listing_options = listing_table.options(); assert!(listing_options.table_partition_cols.is_empty()); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 1731120558dfb..75ddfd5247585 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -320,7 +320,7 @@ impl SessionContext { let schema = cat .schema(schema_name.as_str()) .ok_or_else(|| internal_datafusion_err!("Schema not found!"))?; - let lister = schema.as_any().downcast_ref::(); + let lister = schema.downcast_ref::(); if let Some(lister) = lister { lister.refresh(&self.state()).await?; } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index bf84fcc53e957..49a347764212b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -4469,10 +4469,6 @@ digraph { #[async_trait] impl TableProvider for MockSchemaTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.logical_schema) } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 466ee38a426fd..aad659eacbe55 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -23,7 +23,6 @@ pub mod parquet; pub mod csv; use futures::Stream; -use std::any::Any; use std::collections::HashMap; use std::fmt::Formatter; use std::fs::File; @@ -208,10 +207,6 @@ impl TestTableProvider {} #[async_trait] impl TableProvider for TestTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/core/tests/catalog/memory.rs b/datafusion/core/tests/catalog/memory.rs index 5258f3bf97574..b49183e92e387 100644 --- a/datafusion/core/tests/catalog/memory.rs +++ b/datafusion/core/tests/catalog/memory.rs @@ -26,7 +26,6 @@ use datafusion_catalog::memory::*; use datafusion_catalog::{SchemaProvider, TableProvider}; use datafusion_common::test_util::batches_to_string; use insta::assert_snapshot; -use std::any::Any; use std::sync::Arc; #[test] @@ -83,10 +82,6 @@ fn default_register_schema_not_supported() { #[derive(Debug)] struct TestProvider {} impl CatalogProvider for TestProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { unimplemented!() } diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index 8c4bae5e98b36..24a3df7e0a8fa 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -17,7 +17,6 @@ //! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and assignment extraction. -use std::any::Any; use std::sync::{Arc, Mutex}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -91,10 +90,6 @@ impl std::fmt::Debug for CaptureDeleteProvider { #[async_trait] impl TableProvider for CaptureDeleteProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } @@ -191,10 +186,6 @@ impl std::fmt::Debug for CaptureUpdateProvider { #[async_trait] impl TableProvider for CaptureUpdateProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } @@ -269,10 +260,6 @@ impl std::fmt::Debug for CaptureTruncateProvider { #[async_trait] impl TableProvider for CaptureTruncateProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index c01e65beddd3d..cef75b444f6fe 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -226,10 +225,6 @@ impl ExecutionPlan for CustomExecutionPlan { #[async_trait] impl TableProvider for CustomTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { TEST_CUSTOM_SCHEMA_REF!() } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 82774c8b44dde..e52c559ec79ef 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -175,10 +175,6 @@ struct CustomProvider { #[async_trait] impl TableProvider for CustomProvider { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.zero_batch.schema() } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index e38238f861739..11af5984abb84 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -17,7 +17,7 @@ //! This module contains end to end tests of statistics propagation -use std::{any::Any, sync::Arc}; +use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::execution::context::TaskContext; @@ -77,10 +77,6 @@ impl StatisticsValidation { #[async_trait] impl TableProvider for StatisticsValidation { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index da13389901ee0..90459960c5561 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -17,7 +17,6 @@ //! This module contains tests for limiting memory at runtime in DataFusion -use std::any::Any; use std::num::NonZeroUsize; use std::sync::{Arc, LazyLock}; @@ -1145,10 +1144,6 @@ impl SortedTableProvider { #[async_trait] impl TableProvider for SortedTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 07b92923b6f00..c8024af30db78 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -106,11 +106,7 @@ mod test { .await .unwrap(); let table = ctx.table_provider(table_name.as_str()).await.unwrap(); - let listing_table = table - .as_any() - .downcast_ref::() - .unwrap() - .clone(); + let listing_table = table.downcast_ref::().unwrap().clone(); listing_table .scan(&ctx.state(), None, &[], None) .await diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 2bab79df424b6..326c767d97610 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, str::FromStr, sync::Arc}; +use std::{str::FromStr, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; @@ -88,10 +88,6 @@ impl TestInsertTableProvider { #[async_trait] impl TableProvider for TestInsertTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index 2a0e85068e467..c8ded3a6fce3f 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -118,10 +118,6 @@ struct SimpleCsvTable { #[async_trait] impl TableProvider for SimpleCsvTable { - fn as_any(&self) -> &dyn std::any::Any { - self - } - fn schema(&self) -> SchemaRef { self.schema.clone() } diff --git a/datafusion/ffi/src/catalog_provider.rs b/datafusion/ffi/src/catalog_provider.rs index ff588a89a71b3..4fd2ec408277e 100644 --- a/datafusion/ffi/src/catalog_provider.rs +++ b/datafusion/ffi/src/catalog_provider.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::ffi::c_void; use std::sync::Arc; @@ -85,12 +84,12 @@ unsafe impl Send for FFI_CatalogProvider {} unsafe impl Sync for FFI_CatalogProvider {} struct ProviderPrivateData { - provider: Arc, + provider: Arc, runtime: Option, } impl FFI_CatalogProvider { - unsafe fn inner(&self) -> &Arc { + unsafe fn inner(&self) -> &Arc { unsafe { let private_data = self.private_data as *const ProviderPrivateData; &(*private_data).provider @@ -140,7 +139,7 @@ unsafe extern "C" fn register_schema_fn_wrapper( unsafe { let runtime = provider.runtime(); let inner_provider = provider.inner(); - let schema: Arc = schema.into(); + let schema: Arc = schema.into(); let returned_schema = rresult_return!(inner_provider.register_schema(name.as_str(), schema)) @@ -229,7 +228,7 @@ impl Drop for FFI_CatalogProvider { impl FFI_CatalogProvider { /// Creates a new [`FFI_CatalogProvider`]. pub fn new( - provider: Arc, + provider: Arc, runtime: Option, task_ctx_provider: impl Into, logical_codec: Option>, @@ -246,12 +245,11 @@ impl FFI_CatalogProvider { } pub fn new_with_ffi_codec( - provider: Arc, + provider: Arc, runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { - if let Some(provider) = provider.as_any().downcast_ref::() - { + if let Some(provider) = provider.downcast_ref::() { return provider.0.clone(); } @@ -282,14 +280,13 @@ pub struct ForeignCatalogProvider(pub(crate) FFI_CatalogProvider); unsafe impl Send for ForeignCatalogProvider {} unsafe impl Sync for ForeignCatalogProvider {} -impl From<&FFI_CatalogProvider> for Arc { +impl From<&FFI_CatalogProvider> for Arc { fn from(provider: &FFI_CatalogProvider) -> Self { if (provider.library_marker_id)() == crate::get_library_marker_id() { return Arc::clone(unsafe { provider.inner() }); } - Arc::new(ForeignCatalogProvider(provider.clone())) - as Arc + Arc::new(ForeignCatalogProvider(provider.clone())) as Arc } } @@ -300,10 +297,6 @@ impl Clone for FFI_CatalogProvider { } impl CatalogProvider for ForeignCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { unsafe { (self.0.schema_names)(&self.0) @@ -330,7 +323,7 @@ impl CatalogProvider for ForeignCatalogProvider { schema: Arc, ) -> Result>> { unsafe { - let schema = match schema.as_any().downcast_ref::() { + let schema = match schema.downcast_ref::() { Some(s) => &s.0, None => &FFI_SchemaProvider::new_with_ffi_codec( schema, @@ -387,7 +380,7 @@ mod tests { FFI_CatalogProvider::new(catalog, None, task_ctx_provider, None); ffi_catalog.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog: Arc = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); let prior_schema_names = foreign_catalog.schema_names(); assert_eq!(prior_schema_names.len(), 1); @@ -432,20 +425,18 @@ mod tests { FFI_CatalogProvider::new(catalog, None, task_ctx_provider, None); // Verify local libraries can be downcast to their original - let foreign_catalog: Arc = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); assert!( foreign_catalog - .as_any() .downcast_ref::() .is_some() ); // Verify different library markers generate foreign providers ffi_catalog.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog: Arc = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); assert!( foreign_catalog - .as_any() .downcast_ref::() .is_some() ); diff --git a/datafusion/ffi/src/catalog_provider_list.rs b/datafusion/ffi/src/catalog_provider_list.rs index 65574a7ac33de..3e88df72670bd 100644 --- a/datafusion/ffi/src/catalog_provider_list.rs +++ b/datafusion/ffi/src/catalog_provider_list.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::ffi::c_void; use std::sync::Arc; @@ -75,12 +74,12 @@ unsafe impl Send for FFI_CatalogProviderList {} unsafe impl Sync for FFI_CatalogProviderList {} struct ProviderPrivateData { - provider: Arc, + provider: Arc, runtime: Option, } impl FFI_CatalogProviderList { - unsafe fn inner(&self) -> &Arc { + unsafe fn inner(&self) -> &Arc { unsafe { let private_data = self.private_data as *const ProviderPrivateData; &(*private_data).provider @@ -112,7 +111,7 @@ unsafe extern "C" fn register_catalog_fn_wrapper( unsafe { let runtime = provider.runtime(); let inner_provider = provider.inner(); - let catalog: Arc = catalog.into(); + let catalog: Arc = catalog.into(); inner_provider .register_catalog(name.into(), catalog) @@ -192,7 +191,7 @@ impl Drop for FFI_CatalogProviderList { impl FFI_CatalogProviderList { /// Creates a new [`FFI_CatalogProviderList`]. pub fn new( - provider: Arc, + provider: Arc, runtime: Option, task_ctx_provider: impl Into, logical_codec: Option>, @@ -208,14 +207,11 @@ impl FFI_CatalogProviderList { Self::new_with_ffi_codec(provider, runtime, logical_codec) } pub fn new_with_ffi_codec( - provider: Arc, + provider: Arc, runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { - if let Some(provider) = provider - .as_any() - .downcast_ref::() - { + if let Some(provider) = provider.downcast_ref::() { return provider.0.clone(); } @@ -245,14 +241,14 @@ pub struct ForeignCatalogProviderList(FFI_CatalogProviderList); unsafe impl Send for ForeignCatalogProviderList {} unsafe impl Sync for ForeignCatalogProviderList {} -impl From<&FFI_CatalogProviderList> for Arc { +impl From<&FFI_CatalogProviderList> for Arc { fn from(provider: &FFI_CatalogProviderList) -> Self { if (provider.library_marker_id)() == crate::get_library_marker_id() { return Arc::clone(unsafe { provider.inner() }); } Arc::new(ForeignCatalogProviderList(provider.clone())) - as Arc + as Arc } } @@ -263,18 +259,13 @@ impl Clone for FFI_CatalogProviderList { } impl CatalogProviderList for ForeignCatalogProviderList { - fn as_any(&self) -> &dyn Any { - self - } - fn register_catalog( &self, name: String, catalog: Arc, ) -> Option> { unsafe { - let catalog = match catalog.as_any().downcast_ref::() - { + let catalog = match catalog.downcast_ref::() { Some(s) => &s.0, None => &FFI_CatalogProvider::new_with_ffi_codec( catalog, @@ -332,7 +323,7 @@ mod tests { FFI_CatalogProviderList::new(catalog_list, None, task_ctx_provider, None); ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog_list: Arc = + let foreign_catalog_list: Arc = (&ffi_catalog_list).into(); let prior_catalog_names = foreign_catalog_list.catalog_names(); @@ -373,22 +364,20 @@ mod tests { FFI_CatalogProviderList::new(catalog_list, None, task_ctx_provider, None); // Verify local libraries can be downcast to their original - let foreign_catalog_list: Arc = + let foreign_catalog_list: Arc = (&ffi_catalog_list).into(); assert!( foreign_catalog_list - .as_any() .downcast_ref::() .is_some() ); // Verify different library markers generate foreign providers ffi_catalog_list.library_marker_id = crate::mock_foreign_marker_id; - let foreign_catalog_list: Arc = + let foreign_catalog_list: Arc = (&ffi_catalog_list).into(); assert!( foreign_catalog_list - .as_any() .downcast_ref::() .is_some() ); diff --git a/datafusion/ffi/src/proto/logical_extension_codec.rs b/datafusion/ffi/src/proto/logical_extension_codec.rs index a5be8588d23e3..89a76d442b22b 100644 --- a/datafusion/ffi/src/proto/logical_extension_codec.rs +++ b/datafusion/ffi/src/proto/logical_extension_codec.rs @@ -562,7 +562,7 @@ mod tests { ) -> Result<()> { buf.push(Self::MAGIC_NUMBER); - if !node.as_any().is::() { + if !node.is::() { return exec_err!("TestExtensionCodec only expects MemTable"); }; @@ -637,7 +637,7 @@ mod tests { ctx.task_ctx().as_ref(), )?; - assert!(returned_table.as_any().is::()); + assert!(returned_table.is::()); Ok(()) } diff --git a/datafusion/ffi/src/schema_provider.rs b/datafusion/ffi/src/schema_provider.rs index 5d1348e2328f7..b44e11e77c7ff 100644 --- a/datafusion/ffi/src/schema_provider.rs +++ b/datafusion/ffi/src/schema_provider.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::ffi::c_void; use std::sync::Arc; @@ -91,12 +90,12 @@ unsafe impl Send for FFI_SchemaProvider {} unsafe impl Sync for FFI_SchemaProvider {} struct ProviderPrivateData { - provider: Arc, + provider: Arc, runtime: Option, } impl FFI_SchemaProvider { - unsafe fn inner(&self) -> &Arc { + unsafe fn inner(&self) -> &Arc { unsafe { let private_data = self.private_data as *const ProviderPrivateData; &(*private_data).provider @@ -238,7 +237,7 @@ impl Drop for FFI_SchemaProvider { impl FFI_SchemaProvider { /// Creates a new [`FFI_SchemaProvider`]. pub fn new( - provider: Arc, + provider: Arc, runtime: Option, task_ctx_provider: impl Into, logical_codec: Option>, @@ -255,12 +254,11 @@ impl FFI_SchemaProvider { } pub fn new_with_ffi_codec( - provider: Arc, + provider: Arc, runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { - if let Some(provider) = provider.as_any().downcast_ref::() - { + if let Some(provider) = provider.downcast_ref::() { return provider.0.clone(); } @@ -294,14 +292,13 @@ pub struct ForeignSchemaProvider(pub FFI_SchemaProvider); unsafe impl Send for ForeignSchemaProvider {} unsafe impl Sync for ForeignSchemaProvider {} -impl From<&FFI_SchemaProvider> for Arc { +impl From<&FFI_SchemaProvider> for Arc { fn from(provider: &FFI_SchemaProvider) -> Self { if (provider.library_marker_id)() == crate::get_library_marker_id() { return Arc::clone(unsafe { provider.inner() }); } - Arc::new(ForeignSchemaProvider(provider.clone())) - as Arc + Arc::new(ForeignSchemaProvider(provider.clone())) as Arc } } @@ -313,10 +310,6 @@ impl Clone for FFI_SchemaProvider { #[async_trait] impl SchemaProvider for ForeignSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn owner_name(&self) -> Option<&str> { let name: Option<&RString> = self.0.owner_name.as_ref().into(); name.map(|s| s.as_str()) @@ -351,7 +344,7 @@ impl SchemaProvider for ForeignSchemaProvider { table: Arc, ) -> Result>> { unsafe { - let ffi_table = match table.as_any().downcast_ref::() { + let ffi_table = match table.downcast_ref::() { Some(t) => t.0.clone(), None => FFI_TableProvider::new_with_ffi_codec( table, @@ -414,7 +407,7 @@ mod tests { FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None); ffi_schema_provider.library_marker_id = crate::mock_foreign_marker_id; - let foreign_schema_provider: Arc = + let foreign_schema_provider: Arc = (&ffi_schema_provider).into(); let prior_table_names = foreign_schema_provider.table_names(); @@ -467,20 +460,18 @@ mod tests { FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None); // Verify local libraries can be downcast to their original - let foreign_schema: Arc = (&ffi_schema).into(); + let foreign_schema: Arc = (&ffi_schema).into(); assert!( foreign_schema - .as_any() .downcast_ref::() .is_some() ); // Verify different library markers generate foreign providers ffi_schema.library_marker_id = crate::mock_foreign_marker_id; - let foreign_schema: Arc = (&ffi_schema).into(); + let foreign_schema: Arc = (&ffi_schema).into(); assert!( foreign_schema - .as_any() .downcast_ref::() .is_some() ); diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 4a89bb025a56d..f5ba4455f108c 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::ffi::c_void; use std::sync::Arc; @@ -159,12 +158,12 @@ unsafe impl Send for FFI_TableProvider {} unsafe impl Sync for FFI_TableProvider {} struct ProviderPrivateData { - provider: Arc, + provider: Arc, runtime: Option, } impl FFI_TableProvider { - fn inner(&self) -> &Arc { + fn inner(&self) -> &Arc { let private_data = self.private_data as *const ProviderPrivateData; unsafe { &(*private_data).provider } } @@ -186,7 +185,7 @@ unsafe extern "C" fn table_type_fn_wrapper( } fn supports_filters_pushdown_internal( - provider: &Arc, + provider: &Arc, filters_serialized: &[u8], task_ctx: &Arc, codec: &dyn LogicalExtensionCodec, @@ -363,7 +362,7 @@ impl Drop for FFI_TableProvider { impl FFI_TableProvider { /// Creates a new [`FFI_TableProvider`]. pub fn new( - provider: Arc, + provider: Arc, can_support_pushdown_filters: bool, runtime: Option, task_ctx_provider: impl Into, @@ -386,12 +385,12 @@ impl FFI_TableProvider { } pub fn new_with_ffi_codec( - provider: Arc, + provider: Arc, can_support_pushdown_filters: bool, runtime: Option, logical_codec: FFI_LogicalExtensionCodec, ) -> Self { - if let Some(provider) = provider.as_any().downcast_ref::() { + if let Some(provider) = provider.downcast_ref::() { return provider.0.clone(); } let private_data = Box::new(ProviderPrivateData { provider, runtime }); @@ -443,10 +442,6 @@ impl Clone for FFI_TableProvider { #[async_trait] impl TableProvider for ForeignTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { let wrapped_schema = unsafe { (self.0.schema)(&self.0) }; wrapped_schema.into() @@ -701,7 +696,6 @@ mod tests { let foreign_table: Arc = (&ffi_table).into(); assert!( foreign_table - .as_any() .downcast_ref::() .is_some() ); @@ -711,7 +705,6 @@ mod tests { let foreign_table: Arc = (&ffi_table).into(); assert!( foreign_table - .as_any() .downcast_ref::() .is_some() ); diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 0097872d4970d..011d3f0a0a343 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -25,7 +25,6 @@ //! access the runtime, then you will get a panic when trying to do operations //! such as spawning a tokio task. -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -127,10 +126,6 @@ pub fn start_async_provider() -> (AsyncTableProvider, Handle) { #[async_trait] impl TableProvider for AsyncTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> Arc { super::create_test_schema() } diff --git a/datafusion/ffi/src/tests/catalog.rs b/datafusion/ffi/src/tests/catalog.rs index 76d60ee379a7c..0c02de5d049ae 100644 --- a/datafusion/ffi/src/tests/catalog.rs +++ b/datafusion/ffi/src/tests/catalog.rs @@ -25,7 +25,6 @@ //! access the runtime, then you will get a panic when trying to do operations //! such as spawning a tokio task. -use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -89,10 +88,6 @@ impl Default for FixedSchemaProvider { #[async_trait] impl SchemaProvider for FixedSchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { self.inner.table_names() } @@ -142,10 +137,6 @@ impl Default for FixedCatalogProvider { } impl CatalogProvider for FixedCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { self.inner.schema_names() } @@ -205,10 +196,6 @@ impl Default for FixedCatalogProviderList { } impl CatalogProviderList for FixedCatalogProviderList { - fn as_any(&self) -> &dyn Any { - self - } - fn catalog_names(&self) -> Vec { self.inner.catalog_names() } diff --git a/datafusion/ffi/tests/ffi_catalog.rs b/datafusion/ffi/tests/ffi_catalog.rs index 28bb5f406f53f..c66caf7a61720 100644 --- a/datafusion/ffi/tests/ffi_catalog.rs +++ b/datafusion/ffi/tests/ffi_catalog.rs @@ -39,7 +39,7 @@ mod tests { "External catalog provider failed to implement create_catalog" .to_string(), ))?(codec); - let foreign_catalog: Arc = (&ffi_catalog).into(); + let foreign_catalog: Arc = (&ffi_catalog).into(); let _ = ctx.register_catalog("fruit", foreign_catalog); @@ -66,7 +66,7 @@ mod tests { "External catalog provider failed to implement create_catalog_list" .to_string(), ))?(codec); - let foreign_catalog_list: Arc = + let foreign_catalog_list: Arc = (&ffi_catalog_list).into(); ctx.register_catalog_list(foreign_catalog_list); diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 8c2403c3a2fcc..175a6b3bff06c 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -443,10 +443,6 @@ fn validate_interval_step(step: IntervalMonthDayNano) -> Result<()> { #[async_trait] impl TableProvider for GenerateSeriesTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 1e0264690d4fb..ded656c9ad983 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1114,7 +1114,6 @@ impl AsLogicalPlan for LogicalPlanNode { }) => { let provider = source_as_provider(source)?; let schema = provider.schema(); - let source = provider.as_any(); let projection = match projection { None => None, @@ -1132,7 +1131,7 @@ impl AsLogicalPlan for LogicalPlanNode { let filters: Vec = serialize_exprs(filters, extension_codec)?; - if let Some(listing_table) = source.downcast_ref::() { + if let Some(listing_table) = provider.downcast_ref::() { let any = listing_table.options().format.as_any(); let file_format_type = { let mut maybe_some_type = None; @@ -1246,7 +1245,7 @@ impl AsLogicalPlan for LogicalPlanNode { }, )), }) - } else if let Some(view_table) = source.downcast_ref::() { + } else if let Some(view_table) = provider.downcast_ref::() { let schema: protobuf::Schema = schema.as_ref().try_into()?; Ok(LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new( @@ -1267,7 +1266,8 @@ impl AsLogicalPlan for LogicalPlanNode { }, ))), }) - } else if let Some(cte_work_table) = source.downcast_ref::() + } else if let Some(cte_work_table) = + provider.downcast_ref::() { let name = cte_work_table.name().to_string(); let schema = cte_work_table.schema(); diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index d29d6fc7cd08d..07bd02c61b7e9 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -215,8 +215,6 @@ impl LogicalExtensionCodec for TestTableProviderCodec { buf: &mut Vec, ) -> Result<()> { let table = node - .as_ref() - .as_any() .downcast_ref::() .expect("Can't encode non-test tables"); let msg = TestTableProto { diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 23bbf1c951446..428a36e7e8153 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::any::Any; use std::collections::HashMap; use std::fs::File; use std::io::Write; @@ -228,10 +227,6 @@ struct StrictOrdersSchema { #[async_trait] impl SchemaProvider for StrictOrdersSchema { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { vec!["orders".to_string()] } @@ -359,10 +354,6 @@ pub async fn register_temp_table(ctx: &SessionContext) { #[async_trait] impl TableProvider for TestTable { - fn as_any(&self) -> &dyn Any { - self - } - fn schema(&self) -> SchemaRef { unimplemented!() } diff --git a/docs/source/library-user-guide/catalogs.md b/docs/source/library-user-guide/catalogs.md index daa329523afee..fc1d0abb7823a 100644 --- a/docs/source/library-user-guide/catalogs.md +++ b/docs/source/library-user-guide/catalogs.md @@ -74,10 +74,6 @@ use datafusion::common::{Result, exec_err}; #[async_trait] impl SchemaProvider for MemorySchemaProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn table_names(&self) -> Vec { self.tables .iter() @@ -131,10 +127,6 @@ Without getting into a `CatalogProvider` implementation, we can create a `Memory # # #[async_trait] # impl SchemaProvider for MemorySchemaProvider { -# fn as_any(&self) -> &dyn Any { -# self -# } -# # fn table_names(&self) -> Vec { # self.tables # .iter() @@ -219,7 +211,6 @@ impl SchemaProvider for Schema { # todo!(); } -# fn as_any(&self) -> &(dyn std::any::Any + 'static) { todo!() } # fn table_names(&self) -> Vec { todo!() } # fn table_exist(&self, _: &str) -> bool { todo!() } } @@ -242,10 +233,6 @@ pub struct MemoryCatalogProvider { } impl CatalogProvider for MemoryCatalogProvider { - fn as_any(&self) -> &dyn Any { - self - } - fn schema_names(&self) -> Vec { self.schemas.iter().map(|s| s.key().clone()).collect() } @@ -298,10 +285,6 @@ pub struct MemoryCatalogProviderList { } impl CatalogProviderList for MemoryCatalogProviderList { - fn as_any(&self) -> &dyn Any { - self - } - fn register_catalog( &self, name: String, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index e9ec80acf3b4f..81b2d131e65c3 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -155,12 +155,10 @@ stream path, which gives you complete control and applies to any data source. ## Layer 1: TableProvider A [TableProvider] represents a queryable data source. For a minimal read-only -table, you need four methods: +table, you need three methods: ```rust,ignore impl TableProvider for MyTable { - fn as_any(&self) -> &dyn Any { self } - fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } @@ -520,7 +518,6 @@ To opt in, implement `supports_filters_pushdown`: # # #[async_trait::async_trait] # impl TableProvider for MyFilterTable { -# fn as_any(&self) -> &dyn Any { self } # fn schema(&self) -> SchemaRef { todo!() } # fn table_type(&self) -> TableType { TableType::Base } # async fn scan(&self, _: &dyn Session, _: Option<&Vec>, _: &[Expr], _: Option) -> Result> { todo!() } @@ -673,7 +670,6 @@ struct DatePartitionedTable { #[async_trait::async_trait] impl TableProvider for DatePartitionedTable { - fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } fn table_type(&self) -> TableType { TableType::Base } @@ -832,7 +828,6 @@ impl CountingTable { #[async_trait::async_trait] impl TableProvider for CountingTable { - fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { Arc::clone(&self.schema) } fn table_type(&self) -> TableType { TableType::Base } diff --git a/docs/source/library-user-guide/upgrading/54.0.0.md b/docs/source/library-user-guide/upgrading/54.0.0.md index c5d03ebf8878c..c34c08ec194ac 100644 --- a/docs/source/library-user-guide/upgrading/54.0.0.md +++ b/docs/source/library-user-guide/upgrading/54.0.0.md @@ -189,13 +189,15 @@ let mut stats = Arc::unwrap_or_clone(plan.partition_statistics(None)?); stats.column_statistics[0].min_value = ...; ``` -### Remove `as_any` from `ScalarUDFImpl`, `AggregateUDFImpl`, `WindowUDFImpl`, and `ExecutionPlan` +### Remove `as_any` from `ScalarUDFImpl`, `AggregateUDFImpl`, `WindowUDFImpl`, `ExecutionPlan`, `TableProvider`, `SchemaProvider`, `CatalogProvider`, and `CatalogProviderList` Now that we have a more recent minimum version of Rust, we can take advantage of trait upcasting. This reduces the amount of boilerplate code that users need to implement. In your implementations of `ScalarUDFImpl`, -`AggregateUDFImpl`, `WindowUDFImpl`, and `ExecutionPlan`, you can simply remove -the `as_any` function. The below diffs are examples from the associated PRs. +`AggregateUDFImpl`, `WindowUDFImpl`, `ExecutionPlan`, `TableProvider`, +`SchemaProvider`, `CatalogProvider`, and `CatalogProviderList`, you can simply +remove the `as_any` function. The below diffs are examples from the associated +PRs. **Scalar UDFs:** @@ -261,6 +263,70 @@ the `as_any` function. The below diffs are examples from the associated PRs. } ``` +**Table Providers:** + +```diff + impl TableProvider for MyTable { +- fn as_any(&self) -> &dyn Any { +- self +- } +- + fn schema(&self) -> SchemaRef { + ... + } + + ... + } +``` + +**Schema Providers:** + +```diff + impl SchemaProvider for MySchema { +- fn as_any(&self) -> &dyn Any { +- self +- } +- + fn table_names(&self) -> Vec { + ... + } + + ... + } +``` + +**Catalog Providers:** + +```diff + impl CatalogProvider for MyCatalog { +- fn as_any(&self) -> &dyn Any { +- self +- } +- + fn schema_names(&self) -> Vec { + ... + } + + ... + } +``` + +**Catalog Provider Lists:** + +```diff + impl CatalogProviderList for MyCatalogList { +- fn as_any(&self) -> &dyn Any { +- self +- } +- + fn register_catalog( + ... + } + + ... + } +``` + If you have code that is downcasting, you can use the new `downcast_ref` and `is` methods defined directly on each trait object: @@ -269,6 +335,7 @@ and `is` methods defined directly on each trait object: ```rust,ignore let exec = plan.as_any().downcast_ref::().unwrap(); let udf = scalar_udf.as_any().downcast_ref::().unwrap(); +let table = table_provider.as_any().downcast_ref::().unwrap(); ``` **After:** @@ -276,10 +343,13 @@ let udf = scalar_udf.as_any().downcast_ref::().unwrap(); ```rust,ignore let exec = plan.downcast_ref::().unwrap(); let udf = scalar_udf.downcast_ref::().unwrap(); +let table = table_provider.downcast_ref::().unwrap(); ``` -These methods are available on `dyn ExecutionPlan`, `dyn ScalarUDFImpl`, -`dyn AggregateUDFImpl`, and `dyn WindowUDFImpl`. They work correctly +These methods are available on `dyn ExecutionPlan`, `dyn TableProvider`, +`dyn SchemaProvider`, `dyn CatalogProvider`, `dyn CatalogProviderList`, +`dyn ScalarUDFImpl`, `dyn AggregateUDFImpl`, and `dyn WindowUDFImpl`. +They work correctly whether the value is a bare reference or behind an `Arc` (Rust auto-derefs through the `Arc`).