Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
#include "storage/task/engine_storage_migration_task.h"
#include "storage/txn/txn_manager.h"
#include "storage/utils.h"
#include "udf/python/python_server.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/jni-util.h"
Expand Down Expand Up @@ -2511,6 +2512,7 @@ void clean_udf_cache_callback(const TAgentTaskRequest& req) {

if (clean_req.__isset.function_id && clean_req.function_id > 0) {
UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear_udaf_state_cache() is a void wrapper that uses THROW_IF_ERROR around a broadcast to Python processes. This callback does not catch exceptions, so a normal race where a Python process dies after is_alive() but before FlightClient::Connect()/DoAction() can make the clean-cache agent task throw out of the worker instead of just logging the failed cleanup. The neighboring module-cache cleanup already returns Status and logs failures; please make the UDAF cleanup follow that pattern instead of throwing from this task callback.


LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
Expand Down
35 changes: 31 additions & 4 deletions be/src/exprs/function/function_python_udf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,37 @@ Status PythonFunctionCall::open(FunctionContext* context,
func_meta.id = _fn.id;
func_meta.name = _fn.name.function_name;
func_meta.symbol = _fn.scalar_fn.symbol;
LOG(INFO) << fmt::format(
"[pyudf-test] be open raw tfunction name={}, symbol={}, has_hdfs_location={}, "
"hdfs_location={}, has_function_code={}, function_code_empty={}, "
"has_runtime_version={}, "
"runtime_version={}, checksum={}",
_fn.name.function_name, _fn.scalar_fn.symbol,
_fn.__isset.hdfs_location ? "true" : "false", _fn.hdfs_location,
_fn.__isset.function_code ? "true" : "false",
_fn.function_code.empty() ? "true" : "false",
_fn.__isset.runtime_version ? "true" : "false", _fn.runtime_version, _fn.checksum);
if (!_fn.function_code.empty()) {
func_meta.type = PythonUDFLoadType::INLINE;
func_meta.location = "inline";
func_meta.inline_code = _fn.function_code;
LOG(INFO) << fmt::format("[pyudf-test] be open inline mode code_length={}",
_fn.function_code.size());
} else if (!_fn.hdfs_location.empty()) {
func_meta.type = PythonUDFLoadType::MODULE;
func_meta.location = _fn.hdfs_location;
func_meta.checksum = _fn.checksum;
LOG(INFO) << fmt::format("[pyudf-test] be open module mode url={}, checksum={}",
_fn.hdfs_location, _fn.checksum);
} else {
func_meta.type = PythonUDFLoadType::UNKNOWN;
func_meta.location = "unknown";
LOG(INFO) << "[pyudf-test] be open unknown mode because both function_code and "
"hdfs_location are empty";
}
LOG(INFO) << fmt::format(
"[pyudf-test] be open classified load_type={}, location={}, checksum={}",
static_cast<int>(func_meta.type), func_meta.location, func_meta.checksum);

func_meta.input_types = _argument_types;
func_meta.return_type = _return_type;
Expand All @@ -81,12 +100,15 @@ Status PythonFunctionCall::open(FunctionContext* context,
func_meta.runtime_version = version.full_version;
RETURN_IF_ERROR(func_meta.check());
func_meta.always_nullable = _return_type->is_nullable();
LOG(INFO) << fmt::format("runtime_version: {}, func_meta: {}", version.to_string(),
LOG(INFO) << fmt::format("[pyudf-test] runtime_version: {}, func_meta: {}", version.to_string(),
func_meta.to_string());

if (func_meta.type == PythonUDFLoadType::MODULE) {
RETURN_IF_ERROR(UserFunctionCache::instance()->get_pypath(
func_meta.id, func_meta.location, func_meta.checksum, &func_meta.location));
LOG(INFO) << fmt::format(
"[pyudf-test] be open resolved module path id={}, resolved_location={}",
func_meta.id, func_meta.location);
}

PythonUDFClientPtr client = nullptr;
Expand All @@ -112,7 +134,7 @@ Status PythonFunctionCall::execute_impl(FunctionContext* context, Block& block,
return Status::InternalError("Python UDF client is null");
}

int64_t input_rows = block.rows();
int64_t input_rows = num_rows;
uint32_t input_columns = block.columns();
DCHECK(input_columns > 0 && result < input_columns &&
_argument_types.size() == arguments.size());
Expand Down Expand Up @@ -141,8 +163,13 @@ Status PythonFunctionCall::execute_impl(FunctionContext* context, Block& block,
std::shared_ptr<arrow::RecordBatch> input_batch;
std::shared_ptr<arrow::RecordBatch> output_batch;
cctz::time_zone _timezone_obj; // default UTC
RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema, arrow::default_memory_pool(),
&input_batch, _timezone_obj));
if (arguments.empty()) {
input_batch = arrow::RecordBatch::Make(schema, input_rows,
std::vector<std::shared_ptr<arrow::Array>> {});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds zero-argument Python UDF execution, but BE still has no Python deterministic metadata. VectorizedFnCall::is_constant() only special-cases zero-arg JAVA_UDF, so a zero-arg Python UDF declared deterministic="false" can still be folded/evaluated once for a fragment and reused for all rows. A function like return random.random() or return time.time() would return one repeated value in SELECT py_rand() FROM t, despite the new FE/Nereids deterministic flag. Please pass/enforce the deterministic flag in the BE function constantness path, or at least disable constant treatment for zero-arg Python UDFs as Java UDFs do.

} else {
RETURN_IF_ERROR(convert_to_arrow_batch(input_block, schema, arrow::default_memory_pool(),
&input_batch, _timezone_obj));
}
RETURN_IF_ERROR(client->evaluate(*input_batch, &output_batch));
int64_t output_rows = output_batch->num_rows();

Expand Down
15 changes: 13 additions & 2 deletions be/src/exprs/table_function/python_udtf_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,28 @@ Status PythonUDTFFunction::process_init(Block* block, RuntimeState* state) {
for (uint32_t i = 0; i < child_column_idxs.size(); ++i) {
input_block.insert(block->get_by_position(child_column_idxs[i]));
}
int64_t input_rows = block->rows();
std::shared_ptr<arrow::Schema> input_schema;
std::shared_ptr<arrow::RecordBatch> input_batch;
RETURN_IF_ERROR(get_arrow_schema_from_block(input_block, &input_schema,
TimezoneUtils::default_time_zone));
RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema, arrow::default_memory_pool(),
&input_batch, _timezone_obj));
if (child_column_idxs.empty()) {
input_batch = arrow::RecordBatch::Make(input_schema, input_rows,
std::vector<std::shared_ptr<arrow::Array>> {});
} else {
RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema,
arrow::default_memory_pool(), &input_batch,
_timezone_obj));
}

// Step 3: Call Python UDTF to evaluate all rows at once (similar to Java UDTF's JNI call)
// Python returns a ListArray where each element contains outputs for one input row
std::shared_ptr<arrow::ListArray> list_array;
RETURN_IF_ERROR(_udtf_client->evaluate(*input_batch, &list_array));
if (list_array->length() != input_rows) [[unlikely]] {
return Status::InternalError("Python UDTF output rows {} not equal to input rows {}",
list_array->length(), input_rows);
}

// Step 4: Convert Python server output (ListArray) to Doris array column
RETURN_IF_ERROR(_convert_list_array_to_array_column(list_array));
Expand Down
Loading
Loading