Skip to content
Merged
13 changes: 0 additions & 13 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::{Arc, Weak};

use crate::object_storage::{AwsOptions, GcpOptions, get_object_store};
Expand Down Expand Up @@ -50,10 +49,6 @@ impl DynamicObjectStoreCatalog {
}

impl CatalogProviderList for DynamicObjectStoreCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
Expand Down Expand Up @@ -91,10 +86,6 @@ impl DynamicObjectStoreCatalogProvider {
}

impl CatalogProvider for DynamicObjectStoreCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
}
Expand Down Expand Up @@ -134,10 +125,6 @@ impl DynamicObjectStoreSchemaProvider {

#[async_trait]
impl SchemaProvider for DynamicObjectStoreSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}
Expand Down
16 changes: 0 additions & 16 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,6 @@ struct ParquetMetadataTable {

#[async_trait]
impl TableProvider for ParquetMetadataTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down Expand Up @@ -479,10 +475,6 @@ struct MetadataCacheTable {

#[async_trait]
impl TableProvider for MetadataCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down Expand Up @@ -598,10 +590,6 @@ struct StatisticsCacheTable {

#[async_trait]
impl TableProvider for StatisticsCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down Expand Up @@ -734,10 +722,6 @@ struct ListFilesCacheTable {

#[async_trait]
impl TableProvider for ListFilesCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! See `main.rs` for how to run it.

use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{self, Debug, Formatter};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -162,10 +161,6 @@ impl Default for CustomDataSource {

#[async_trait]
impl TableProvider for CustomDataSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
SchemaRef::new(Schema::new(vec![
Field::new("id", DataType::UInt8, false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! See `main.rs` for how to run it.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -201,10 +200,6 @@ impl DefaultValueTableProvider {

#[async_trait]
impl TableProvider for DefaultValueTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
14 changes: 1 addition & 13 deletions datafusion-examples/examples/data_io/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::{
prelude::SessionContext,
};
use std::sync::RwLock;
use std::{any::Any, collections::HashMap, path::Path, sync::Arc};
use std::{collections::HashMap, path::Path, sync::Arc};
use std::{fs::File, io::Write};
use tempfile::TempDir;

Expand Down Expand Up @@ -178,10 +178,6 @@ impl DirSchema {

#[async_trait]
impl SchemaProvider for DirSchema {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
let tables = self.tables.read().unwrap();
tables.keys().cloned().collect::<Vec<_>>()
Expand Down Expand Up @@ -231,10 +227,6 @@ impl DirCatalog {
}

impl CatalogProvider for DirCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn register_schema(
&self,
name: &str,
Expand Down Expand Up @@ -277,10 +269,6 @@ impl CustomCatalogProviderList {
}

impl CatalogProviderList for CustomCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! See `main.rs` for how to run it.

use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::ops::Range;
Expand Down Expand Up @@ -451,10 +450,6 @@ impl IndexedFile {
/// so that we can query it as a table.
#[async_trait]
impl TableProvider for IndexTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.indexed_file.schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,6 @@ fn get_key_value<'a>(file_meta_data: &'a FileMetaData, key: &'_ str) -> Option<&
/// Implement TableProvider for DistinctIndexTable, using the distinct index to prune files
#[async_trait]
impl TableProvider for DistinctIndexTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Display;
use std::fs;
Expand Down Expand Up @@ -208,10 +207,6 @@ impl IndexTableProvider {

#[async_trait]
impl TableProvider for IndexTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.index.schema().clone()
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion-examples/examples/data_io/remote_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::prelude::{DataFrame, SessionContext};
use futures::TryStreamExt;
use std::any::Any;
use std::sync::Arc;

/// Interfacing with a remote catalog (e.g. over a network)
Expand Down Expand Up @@ -224,10 +223,6 @@ impl RemoteTable {
/// Implement the DataFusion Catalog API for [`RemoteTable`]
#[async_trait]
impl TableProvider for RemoteTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion-examples/examples/udf/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ struct LocalCsvTable {

#[async_trait]
impl TableProvider for LocalCsvTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
5 changes: 0 additions & 5 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::empty::EmptyExec;
use futures::{Stream, StreamExt, TryStreamExt, future, stream};
use object_store::ObjectStore;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -449,10 +448,6 @@ fn can_be_evaluated_for_partition_pruning(

#[async_trait]
impl TableProvider for ListingTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
Expand Down
25 changes: 3 additions & 22 deletions datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ impl SchemaProvider for ResolvedSchemaProvider {
self.owner_name.as_deref()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn table_names(&self) -> Vec<String> {
self.cached_tables.keys().cloned().collect()
}
Expand Down Expand Up @@ -115,10 +111,6 @@ struct ResolvedCatalogProvider {
cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
}
impl CatalogProvider for ResolvedCatalogProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.cached_schemas.keys().cloned().collect()
}
Expand Down Expand Up @@ -160,10 +152,6 @@ struct ResolvedCatalogProviderList {
cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>,
}
impl CatalogProviderList for ResolvedCatalogProviderList {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn register_catalog(
&self,
_name: String,
Expand Down Expand Up @@ -424,12 +412,9 @@ pub trait AsyncCatalogProviderList: Send + Sync {

#[cfg(test)]
mod tests {
use std::{
any::Any,
sync::{
Arc,
atomic::{AtomicU32, Ordering},
},
use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};

use arrow::datatypes::SchemaRef;
Expand All @@ -447,10 +432,6 @@ mod tests {
struct MockTableProvider {}
#[async_trait]
impl TableProvider for MockTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef {
unimplemented!()
Expand Down
52 changes: 42 additions & 10 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ use datafusion_common::not_impl_err;
/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123
///
/// [`TableProvider`]: crate::TableProvider
pub trait CatalogProvider: Debug + Sync + Send {
/// Returns the catalog provider as [`Any`]
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

pub trait CatalogProvider: Any + Debug + Sync + Send {
/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;

Expand Down Expand Up @@ -152,15 +148,31 @@ pub trait CatalogProvider: Debug + Sync + Send {
}
}

impl dyn CatalogProvider {
/// Returns `true` if the catalog provider is of type `T`.
///
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
/// called on `Arc<dyn CatalogProvider>` via auto-deref.
pub fn is<T: CatalogProvider>(&self) -> bool {
(self as &dyn Any).is::<T>()
}

/// Attempts to downcast this catalog provider to a concrete type `T`,
/// returning `None` if the provider is not of that type.
///
/// Works correctly when called on `Arc<dyn CatalogProvider>` via auto-deref,
/// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
/// downcast the `Arc` itself.
pub fn downcast_ref<T: CatalogProvider>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
}
}

/// Represent a list of named [`CatalogProvider`]s.
///
/// Please see the documentation on [`CatalogProvider`] for details of
/// implementing a custom catalog.
pub trait CatalogProviderList: Debug + Sync + Send {
/// Returns the catalog list as [`Any`]
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

pub trait CatalogProviderList: Any + Debug + Sync + Send {
/// Adds a new catalog to this catalog list
/// If a catalog of the same name existed before, it is replaced in the list and returned.
fn register_catalog(
Expand All @@ -175,3 +187,23 @@ pub trait CatalogProviderList: Debug + Sync + Send {
/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}

impl dyn CatalogProviderList {
/// Returns `true` if the catalog provider list is of type `T`.
///
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
/// called on `Arc<dyn CatalogProviderList>` via auto-deref.
pub fn is<T: CatalogProviderList>(&self) -> bool {
(self as &dyn Any).is::<T>()
}

/// Attempts to downcast this catalog provider list to a concrete type `T`,
/// returning `None` if the provider list is not of that type.
///
/// Works correctly when called on `Arc<dyn CatalogProviderList>` via
/// auto-deref, unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would
/// attempt to downcast the `Arc` itself.
pub fn downcast_ref<T: CatalogProviderList>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
}
}
Loading
Loading