Skip to content

Commit 8295cec

Browse files
committed
DPL: allow --completion-policy <N>[s, m] to wait before quitting
In case of a completed processing, wait the specified number of seconds / minutes before quitting.
1 parent 1e1c6e5 commit 8295cec

4 files changed

Lines changed: 40 additions & 3 deletions

File tree

Framework/Core/include/Framework/ProcessingPolicies.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ struct ProcessingPolicies {
3232
enum TerminationPolicy termination;
3333
enum TerminationPolicy error;
3434
enum EarlyForwardPolicy earlyForward;
35+
/// When termination policy is QUIT, optionally wait this many seconds before
36+
/// actually quitting (0 means quit immediately). Set via --completion-policy
37+
/// with a duration string, e.g. --completion-policy 10s.
38+
int terminationTimeout = 0;
3539
};
3640

3741
/// The mode in which the driver is running. Should be MASTER when running locally,

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,9 @@ void DataProcessingDevice::InitTask()
901901

902902
deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
903903
deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
904+
if (deviceContext.exitTransitionTimeout == 0 && deviceContext.processingPolicies.terminationTimeout > 0) {
905+
deviceContext.exitTransitionTimeout = deviceContext.processingPolicies.terminationTimeout;
906+
}
904907
deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
905908

906909
for (auto& channel : GetChannels()) {

Framework/Core/src/DataProcessingHelpers.cxx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ TransitionHandlingState DataProcessingHelpers::updateStateTransition(ServiceRegi
219219
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
220220
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
221221
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
222+
} else if (policies.termination == TerminationPolicy::QUIT && policies.terminationTimeout > 0) {
223+
// --completion-policy was given as a duration: wait even though already idle
224+
uv_update_time(state.loop);
225+
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and already idle, waiting %d seconds before quitting as per --completion-policy.", deviceContext.exitTransitionTimeout);
226+
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
227+
return TransitionHandlingState::Requested;
222228
} else if (policies.termination == TerminationPolicy::QUIT) {
223229
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
224230
} else {

Framework/Core/src/runDataProcessing.cxx

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2921,8 +2921,8 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
29212921
("resources", bpo::value<std::string>()->default_value(""), "resources allocated for the workflow") // //
29222922
("start-port,p", bpo::value<unsigned short>()->default_value(22000), "start port to allocate") // //
29232923
("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") // //
2924-
("completion-policy,c", bpo::value<TerminationPolicy>(&processingPolicies.termination)->default_value(TerminationPolicy::QUIT), // //
2925-
"what to do when processing is finished: quit, wait") // //
2924+
("completion-policy,c", bpo::value<std::string>()->default_value("quit"), //
2925+
"what to do when processing is finished: quit, wait, or a duration (e.g. 10s, 2m) to quit after waiting that long") // //
29262926
("error-policy", bpo::value<TerminationPolicy>(&processingPolicies.error)->default_value(TerminationPolicy::QUIT), // //
29272927
"what to do when a device has an error: quit, wait") // //
29282928
("min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // //
@@ -3186,7 +3186,31 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
31863186
driverInfo.argc = argc;
31873187
driverInfo.argv = argv;
31883188
driverInfo.noSHMCleanup = varmap["no-cleanup"].as<bool>();
3189-
driverInfo.processingPolicies.termination = varmap["completion-policy"].as<TerminationPolicy>();
3189+
{
3190+
auto completionPolicyStr = varmap["completion-policy"].as<std::string>();
3191+
if (completionPolicyStr == "quit") {
3192+
driverInfo.processingPolicies.termination = TerminationPolicy::QUIT;
3193+
} else if (completionPolicyStr == "wait") {
3194+
driverInfo.processingPolicies.termination = TerminationPolicy::WAIT;
3195+
} else {
3196+
// Try to parse as a duration, e.g. "10s" or "2m"
3197+
int value = 0;
3198+
char unit = 's';
3199+
int matched = sscanf(completionPolicyStr.c_str(), "%d%c", &value, &unit);
3200+
if (matched >= 1 && value > 0) {
3201+
int seconds = value;
3202+
if (matched == 2 && unit == 'm') {
3203+
seconds = value * 60;
3204+
} else if (matched == 2 && unit != 's') {
3205+
throw std::runtime_error(fmt::format("Invalid --completion-policy value '{}': use 'quit', 'wait', or a duration like '10s' or '2m'", completionPolicyStr));
3206+
}
3207+
driverInfo.processingPolicies.termination = TerminationPolicy::QUIT;
3208+
driverInfo.processingPolicies.terminationTimeout = seconds;
3209+
} else {
3210+
throw std::runtime_error(fmt::format("Invalid --completion-policy value '{}': use 'quit', 'wait', or a duration like '10s' or '2m'", completionPolicyStr));
3211+
}
3212+
}
3213+
}
31903214
driverInfo.processingPolicies.earlyForward = varmap["early-forward-policy"].as<EarlyForwardPolicy>();
31913215
driverInfo.mode = varmap["driver-mode"].as<DriverMode>();
31923216

0 commit comments

Comments
 (0)