Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ needless_range_loop = "allow"
or_fun_call = "deny"
panic = "deny"
# panic_in_result_fn = "deny" -- we cannot disable this for tests to use assertions
clone_on_ref_ptr = "deny"
redundant_clone = "deny"
same_name_method = "deny"
tests_outside_test_module = "deny"
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/compress-bench/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Compressor for ParquetCompressor {
// Read the input parquet file
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand All @@ -69,7 +69,7 @@ impl Compressor for ParquetCompressor {
// First compress to get the bytes we'll decompress
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand Down
15 changes: 10 additions & 5 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ pub fn make_object_store(
.with_bucket_name(bucket_name)
.build()?,
);
session
.register_object_store(&Url::parse(&format!("s3://{bucket_name}/"))?, s3.clone());
session.register_object_store(
&Url::parse(&format!("s3://{bucket_name}/"))?,
Arc::<object_store::aws::AmazonS3>::clone(&s3),
);
Ok(s3)
}
"gs" => {
Expand All @@ -92,13 +94,16 @@ pub fn make_object_store(
.with_bucket_name(bucket_name)
.build()?,
);
session
.register_object_store(&Url::parse(&format!("gs://{bucket_name}/"))?, gcs.clone());
session.register_object_store(
&Url::parse(&format!("gs://{bucket_name}/"))?,
Arc::<object_store::gcp::GoogleCloudStorage>::clone(&gcs),
);
Ok(gcs)
}
_ => {
let fs = Arc::new(LocalFileSystem::default());
session.register_object_store(&Url::parse("file:/")?, fs.clone());
session
.register_object_store(&Url::parse("file:/")?, Arc::<LocalFileSystem>::clone(&fs));
Ok(fs)
}
}
Expand Down
12 changes: 7 additions & 5 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async fn main() -> anyhow::Result<()> {
.iter()
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
{
plans_mut.push((query_idx, *format, plan.clone()));
plans_mut.push((query_idx, *format, Arc::clone(&plan)));
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
let pattern = benchmark.pattern(table.name, format);
let table_url = ListingTableUrl::try_new(benchmark_base.clone(), pattern)?;

let mut listing_options = ListingOptions::new(file_format.clone())
let mut listing_options = ListingOptions::new(Arc::clone(&file_format))
.with_session_config_options(session.state().config());
if benchmark.dataset_name() == "polarsignals" && format == Format::Parquet {
// Work around a DataFusion bug (fixed in 53.0.0) where the
Expand Down Expand Up @@ -304,8 +304,10 @@ async fn register_v2_tables<B: Benchmark + ?Sized>(
.runtime_env()
.object_store(table_url.object_store())?;

let fs: vortex::io::filesystem::FileSystemRef =
Arc::new(ObjectStoreFileSystem::new(store.clone(), SESSION.handle()));
let fs: vortex::io::filesystem::FileSystemRef = Arc::new(ObjectStoreFileSystem::new(
Arc::clone(&store),
SESSION.handle(),
));
let base_prefix = benchmark_base.path().trim_start_matches('/').to_string();
let fs = fs.with_prefix(base_prefix);

Expand Down Expand Up @@ -416,7 +418,7 @@ pub async fn execute_query(
.create_physical_plan()
.with_labelset(get_labelset_from_global())
.await?;
let result = collect(plan.clone(), task_ctx)
let result = collect(Arc::clone(&plan), task_ctx)
.with_labelset(get_labelset_from_global())
.await?;

Expand Down
5 changes: 3 additions & 2 deletions benchmarks/lance-bench/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::fs;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

Expand Down Expand Up @@ -92,7 +93,7 @@ impl Compressor for LanceCompressor {
// Read the input parquet file
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand Down Expand Up @@ -131,7 +132,7 @@ impl Compressor for LanceCompressor {
// First compress to get the Lance dataset
let file = File::open(parquet_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema().clone();
let schema = Arc::clone(builder.schema());
let reader = builder.build()?;
let batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;

Expand Down
8 changes: 4 additions & 4 deletions benchmarks/lance-bench/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl Iterator for ParquetFilesIterator {

impl RecordBatchReader for ParquetFilesIterator {
fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}
}

Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn convert_parquet_to_lance<'p>(
// Get schema from the first Parquet file
let first_file = File::open(&parquet_files[0])?;
let first_builder = ParquetRecordBatchReaderBuilder::try_new(first_file)?;
let schema = first_builder.schema().clone();
let schema = Arc::clone(first_builder.schema());

// Create a streaming iterator that reads from all Parquet files
let batch_iter = ParquetFilesIterator::new(parquet_files, schema)?;
Expand Down Expand Up @@ -237,7 +237,7 @@ pub fn convert_utf8view_batch(batch: RecordBatch) -> anyhow::Result<RecordBatch>
// Cast Utf8View to Utf8.
cast(column, &DataType::Utf8)?
} else {
column.clone()
Arc::clone(column)
};
new_columns.push(new_column);
}
Expand Down Expand Up @@ -277,6 +277,6 @@ impl Iterator for ConvertingParquetFilesIterator {

impl RecordBatchReader for ConvertingParquetFilesIterator {
fn schema(&self) -> SchemaRef {
self.converted_schema.clone()
Arc::clone(&self.converted_schema)
}
}
6 changes: 4 additions & 2 deletions encodings/fastlanes/src/bitpacking/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl FilterKernel for BitPacked {

let patches = array
.patches()
.map(|patches| patches.filter(&Mask::Values(values.clone()), ctx))
.map(|patches| patches.filter(&Mask::Values(Arc::clone(values)), ctx))
.transpose()?
.flatten();

Expand Down Expand Up @@ -112,7 +112,9 @@ fn filter_primitive_without_patches<U: UnsignedPType + BitPacking>(
selection: &Arc<MaskValues>,
) -> VortexResult<(Buffer<U>, Validity)> {
let values = filter_with_indices(array.data(), selection.indices());
let validity = array.validity()?.filter(&Mask::Values(selection.clone()))?;
let validity = array
.validity()?
.filter(&Mask::Values(Arc::clone(selection)))?;

Ok((values.freeze(), validity))
}
Expand Down
2 changes: 1 addition & 1 deletion encodings/sparse/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub(super) fn execute_sparse(
execute_varbin(array, dtype.clone(), fill_value, ctx)?
}
DType::List(values_dtype, nullability) => {
execute_sparse_lists(array, values_dtype.clone(), *nullability, ctx)?
execute_sparse_lists(array, Arc::clone(values_dtype), *nullability, ctx)?
}
DType::FixedSizeList(.., nullability) => {
execute_sparse_fixed_size_list(array, *nullability, ctx)?
Expand Down
6 changes: 4 additions & 2 deletions fuzz/src/array/fill_null.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
Expand Down Expand Up @@ -221,7 +223,7 @@ fn fill_varbinview_array(
if result_nullability == Nullability::Nullable {
VarBinViewArray::new_handle(
result.to_varbinview().views_handle().clone(),
result.to_varbinview().data_buffers().clone(),
Arc::clone(result.to_varbinview().data_buffers()),
result.dtype().as_nullable(),
result_nullability.into(),
)
Expand Down Expand Up @@ -255,7 +257,7 @@ fn fill_varbinview_array(
if result_nullability == Nullability::Nullable {
VarBinViewArray::new_handle(
result.to_varbinview().views_handle().clone(),
result.to_varbinview().data_buffers().clone(),
Arc::clone(result.to_varbinview().data_buffers()),
result.dtype().as_nullable(),
result_nullability.into(),
)
Expand Down
4 changes: 3 additions & 1 deletion fuzz/src/array/mask.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
Expand Down Expand Up @@ -86,7 +88,7 @@ pub fn mask_canonical_array(canonical: Canonical, mask: &Mask) -> VortexResult<A
let new_validity = mask_validity(&array.validity()?, mask);
VarBinViewArray::new_handle(
array.views_handle().clone(),
array.data_buffers().clone(),
Arc::clone(array.data_buffers()),
array.dtype().with_nullability(new_validity.nullability()),
new_validity,
)
Expand Down
6 changes: 3 additions & 3 deletions vortex-array/src/arrays/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ fn random_fixed_size_list(
let array_length = chunk_len.unwrap_or(u.int_in_range(0..=20)?);

let mut builder =
FixedSizeListBuilder::with_capacity(elem_dtype.clone(), list_size, null, array_length);
FixedSizeListBuilder::with_capacity(Arc::clone(elem_dtype), list_size, null, array_length);

for _ in 0..array_length {
if null == Nullability::Nullable && u.arbitrary::<bool>()? {
Expand Down Expand Up @@ -229,7 +229,7 @@ fn random_list_with_offset_type<O: IntegerPType>(
) -> Result<ArrayRef> {
let array_length = chunk_len.unwrap_or(u.int_in_range(0..=20)?);

let mut builder = ListViewBuilder::<O, O>::with_capacity(elem_dtype.clone(), null, 20, 10);
let mut builder = ListViewBuilder::<O, O>::with_capacity(Arc::clone(elem_dtype), null, 20, 10);

for _ in 0..array_length {
if null == Nullability::Nullable && u.arbitrary::<bool>()? {
Expand All @@ -255,7 +255,7 @@ fn random_list_scalar(
let elems = (0..list_size)
.map(|_| random_scalar(u, elem_dtype))
.collect::<Result<Vec<_>>>()?;
Ok(Scalar::list(elem_dtype.clone(), elems, null))
Ok(Scalar::list(Arc::clone(elem_dtype), elems, null))
}

fn random_string(
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/arrays/filter/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mod varbinview;

/// Reconstruct a [`Mask`] from an [`Arc<MaskValues>`].
fn values_to_mask(values: &Arc<MaskValues>) -> Mask {
Mask::Values(values.clone())
Mask::Values(Arc::clone(values))
}

/// A helper function that lazily filters a [`Validity`] with selection mask values.
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/arrays/list/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ mod test {
assert_eq!(
result.scalar_at(0).unwrap(),
Scalar::list(
element_dtype.clone(),
Arc::clone(&element_dtype),
vec![0i32.into(), 5.into()],
Nullability::Nullable
)
Expand All @@ -249,7 +249,7 @@ mod test {
assert_eq!(
result.scalar_at(2).unwrap(),
Scalar::list(
element_dtype.clone(),
Arc::clone(&element_dtype),
vec![3i32.into()],
Nullability::Nullable
)
Expand Down Expand Up @@ -317,7 +317,7 @@ mod test {
assert_eq!(
result.scalar_at(0).unwrap(),
Scalar::list(
element_dtype.clone(),
Arc::clone(&element_dtype),
vec![3i32.into()],
Nullability::NonNullable
)
Expand All @@ -327,7 +327,7 @@ mod test {
assert_eq!(
result.scalar_at(1).unwrap(),
Scalar::list(
element_dtype.clone(),
Arc::clone(&element_dtype),
vec![0i32.into(), 5.into()],
Nullability::NonNullable
)
Expand Down
8 changes: 4 additions & 4 deletions vortex-array/src/arrays/list/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ impl ListArray {
{
let iter = iter.into_iter();
let mut builder = ListBuilder::<O>::with_capacity(
dtype.clone(),
Arc::clone(&dtype),
crate::dtype::Nullability::NonNullable,
2 * iter.size_hint().0,
iter.size_hint().0,
);

for v in iter {
let elem = Scalar::list(
dtype.clone(),
Arc::clone(&dtype),
v.into_iter().map(|x| x.into()).collect_vec(),
dtype.nullability(),
);
Expand All @@ -55,7 +55,7 @@ impl ListArray {
{
let iter = iter.into_iter();
let mut builder = ListBuilder::<O>::with_capacity(
dtype.clone(),
Arc::clone(&dtype),
crate::dtype::Nullability::Nullable,
2 * iter.size_hint().0,
iter.size_hint().0,
Expand All @@ -64,7 +64,7 @@ impl ListArray {
for v in iter {
if let Some(v) = v {
let elem = Scalar::list(
dtype.clone(),
Arc::clone(&dtype),
v.into_iter().map(|x| x.into()).collect_vec(),
dtype.nullability(),
);
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/arrays/masked/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! Execution logic for MaskedArray - applies a validity mask to canonical arrays.

use std::ops::BitAnd;
use std::sync::Arc;

use vortex_error::VortexResult;
use vortex_mask::Mask;
Expand Down Expand Up @@ -136,7 +137,7 @@ fn mask_validity_varbinview(
Ok(unsafe {
VarBinViewArray::new_handle_unchecked(
array.views_handle().clone(),
array.data_buffers().clone(),
Arc::clone(array.data_buffers()),
dtype,
new_validity,
)
Expand Down
4 changes: 3 additions & 1 deletion vortex-array/src/arrays/varbinview/compute/cast.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use vortex_error::VortexResult;

use crate::ArrayRef;
Expand Down Expand Up @@ -28,7 +30,7 @@ impl CastReduce for VarBinView {
Ok(Some(
VarBinViewArray::new_handle_unchecked(
array.views_handle().clone(),
array.data_buffers().clone(),
Arc::clone(array.data_buffers()),
new_dtype,
new_validity,
)
Expand Down
4 changes: 3 additions & 1 deletion vortex-array/src/arrays/varbinview/compute/mask.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use vortex_error::VortexResult;

use crate::ArrayRef;
Expand All @@ -18,7 +20,7 @@ impl MaskReduce for VarBinView {
Ok(Some(
VarBinViewArray::new_handle_unchecked(
array.views_handle().clone(),
array.data_buffers().clone(),
Arc::clone(array.data_buffers()),
array.dtype().as_nullable(),
array.validity()?.and(Validity::Array(mask.clone()))?,
)
Expand Down
Loading
Loading