-
Notifications
You must be signed in to change notification settings - Fork 3.8k
[Fix](pyudf) Fix some pyudf issues #62249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a27c7ac
a9458f8
ab7280a
11ad4a9
4ea33a8
fe66e93
7834b51
e21893f
bce6c6c
a59cb21
4354e5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,18 +54,37 @@ Status PythonFunctionCall::open(FunctionContext* context, | |
| func_meta.id = _fn.id; | ||
| func_meta.name = _fn.name.function_name; | ||
| func_meta.symbol = _fn.scalar_fn.symbol; | ||
| LOG(INFO) << fmt::format( | ||
| "[pyudf-test] be open raw tfunction name={}, symbol={}, has_hdfs_location={}, " | ||
| "hdfs_location={}, has_function_code={}, function_code_empty={}, " | ||
| "has_runtime_version={}, " | ||
| "runtime_version={}, checksum={}", | ||
| _fn.name.function_name, _fn.scalar_fn.symbol, | ||
| _fn.__isset.hdfs_location ? "true" : "false", _fn.hdfs_location, | ||
| _fn.__isset.function_code ? "true" : "false", | ||
| _fn.function_code.empty() ? "true" : "false", | ||
| _fn.__isset.runtime_version ? "true" : "false", _fn.runtime_version, _fn.checksum); | ||
| if (!_fn.function_code.empty()) { | ||
| func_meta.type = PythonUDFLoadType::INLINE; | ||
| func_meta.location = "inline"; | ||
| func_meta.inline_code = _fn.function_code; | ||
| LOG(INFO) << fmt::format("[pyudf-test] be open inline mode code_length={}", | ||
| _fn.function_code.size()); | ||
| } else if (!_fn.hdfs_location.empty()) { | ||
| func_meta.type = PythonUDFLoadType::MODULE; | ||
| func_meta.location = _fn.hdfs_location; | ||
| func_meta.checksum = _fn.checksum; | ||
| LOG(INFO) << fmt::format("[pyudf-test] be open module mode url={}, checksum={}", | ||
| _fn.hdfs_location, _fn.checksum); | ||
| } else { | ||
| func_meta.type = PythonUDFLoadType::UNKNOWN; | ||
| func_meta.location = "unknown"; | ||
| LOG(INFO) << "[pyudf-test] be open unknown mode because both function_code and " | ||
| "hdfs_location are empty"; | ||
| } | ||
| LOG(INFO) << fmt::format( | ||
| "[pyudf-test] be open classified load_type={}, location={}, checksum={}", | ||
| static_cast<int>(func_meta.type), func_meta.location, func_meta.checksum); | ||
|
|
||
| func_meta.input_types = _argument_types; | ||
| func_meta.return_type = _return_type; | ||
|
|
@@ -81,12 +100,15 @@ Status PythonFunctionCall::open(FunctionContext* context, | |
| func_meta.runtime_version = version.full_version; | ||
| RETURN_IF_ERROR(func_meta.check()); | ||
| func_meta.always_nullable = _return_type->is_nullable(); | ||
| LOG(INFO) << fmt::format("runtime_version: {}, func_meta: {}", version.to_string(), | ||
| LOG(INFO) << fmt::format("[pyudf-test] runtime_version: {}, func_meta: {}", version.to_string(), | ||
| func_meta.to_string()); | ||
|
|
||
| if (func_meta.type == PythonUDFLoadType::MODULE) { | ||
| RETURN_IF_ERROR(UserFunctionCache::instance()->get_pypath( | ||
| func_meta.id, func_meta.location, func_meta.checksum, &func_meta.location)); | ||
| LOG(INFO) << fmt::format( | ||
| "[pyudf-test] be open resolved module path id={}, resolved_location={}", | ||
| func_meta.id, func_meta.location); | ||
| } | ||
|
|
||
| PythonUDFClientPtr client = nullptr; | ||
|
|
@@ -112,7 +134,7 @@ Status PythonFunctionCall::execute_impl(FunctionContext* context, Block& block, | |
| return Status::InternalError("Python UDF client is null"); | ||
| } | ||
|
|
||
| int64_t input_rows = block.rows(); | ||
| int64_t input_rows = num_rows; | ||
| uint32_t input_columns = block.columns(); | ||
| DCHECK(input_columns > 0 && result < input_columns && | ||
| _argument_types.size() == arguments.size()); | ||
|
|
@@ -141,8 +163,13 @@ Status PythonFunctionCall::execute_impl(FunctionContext* context, Block& block, | |
| std::shared_ptr<arrow::RecordBatch> input_batch; | ||
| std::shared_ptr<arrow::RecordBatch> output_batch; | ||
| cctz::time_zone _timezone_obj; // default UTC | ||
| RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema, arrow::default_memory_pool(), | ||
| &input_batch, _timezone_obj)); | ||
| if (arguments.empty()) { | ||
| input_batch = arrow::RecordBatch::Make(schema, input_rows, | ||
| std::vector<std::shared_ptr<arrow::Array>> {}); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This adds zero-argument Python UDF execution, but BE still has no Python deterministic metadata. |
||
| } else { | ||
| RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema, arrow::default_memory_pool(), | ||
| &input_batch, _timezone_obj)); | ||
| } | ||
| RETURN_IF_ERROR(client->evaluate(*input_batch, &output_batch)); | ||
| int64_t output_rows = output_batch->num_rows(); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear_udaf_state_cache()is avoidwrapper that usesTHROW_IF_ERRORaround a broadcast to Python processes. This callback does not catch exceptions, so a normal race where a Python process dies afteris_alive()but beforeFlightClient::Connect()/DoAction()can make the clean-cache agent task throw out of the worker instead of just logging the failed cleanup. The neighboring module-cache cleanup already returnsStatusand logs failures; please make the UDAF cleanup follow that pattern instead of throwing from this task callback.