--- title: "Conditional Pipelines" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Conditional Pipelines} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r echo=FALSE} knitr::opts_chunk$set( collapse = FALSE, comment = "", out.width = "100%", cache = FALSE, asciicast_knitr_output = "html" ) asciicast::init_knitr_engine( echo = TRUE, echo_input = FALSE, same_process = TRUE, startup = quote({ library(maestro) set.seed(1) }) ) options(asciicast_theme = "pkgdown") ``` A conditional pipeline is a pipeline that executes only if a particular condition is met. Using the `@maestroRunIf` tag, you can specify a boolean R expression where `TRUE` executes the pipeline and `FALSE` skips it. Conditional pipelines can use the input of an upstream DAG pipeline, a resource from the orchestrator as in `run_schedule(..., resources = list())`, or any arbitrary R code so long as it returns a single TRUE/FALSE. ## Simple Conditional To make a pipeline conditional, simply use the `@maestroRunIf` tag containing an R expression that evaluates to a single boolean value. This expression can be inline with the tag or span multiple lines. In this simple example, the pipeline is scheduled to run daily, but will only execute if a `TRUE` is randomly sampled. ```{r, eval=FALSE} #' ./pipelines/conditional1.R #' @maestroFrequency 1 day #' @maestroRunIf sample(c(TRUE, FALSE), size = 1) random_execution <- function() { message("Maybe, maybe not") } ``` ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(unlink("pipelines", recursive = TRUE)) dir.create("pipelines") writeLines( " #' @maestroFrequency 1 day #' @maestroRunIf TRUE random_execution <- function() { message('Maybe, maybe not') }", con = "pipelines/conditional1.R" ) ``` ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) status <- run_schedule( schedule, orch_frequency = "1 day", log_to_console = TRUE ) ``` ## DAG Conditionals [DAG pipelines](maestro-4-directed-acyclic-graphs.html) are several pipelines chained together such that the input from an upstream pipeline is passed to a downstream pipeline. The return value of the upstream pipeline is passed to the downstream pipeline via the `.input` parameter. This same .input can be accessed in the `@maestroRunIf` tag. The example below executes the `transform_flights` and `load_flights` pipelines only if the incoming value is a dataframe and its row count is greater than 0. ```{r, eval=FALSE} #' ./pipelines/conditional2.R #' @maestroFrequency 1 hour extract_flights <- function() { # Pretends to fetch data from an API data.frame( flight_id = 1:5, airline_code = c("UA", "AC", "AC", "AA", "DE"), departure_time = as.POSIXct("2025-10-14 12:00:00") + c(100, 450, 750, 1450, 1750) ) } #' @maestroInputs extract_flights #' @maestroRunIf #' is.data.frame(.input) && nrow(.input) > 0 transform_flights <- function(.input) { proc_data <- .input proc_data <- proc_data |> dplyr::filter( flight_id > 5 ) |> dplyr::mutate( departing_from = "YHZ" ) proc_data } #' @maestroInputs transform_flights #' @maestroRunIf #' is.data.frame(.input) && nrow(.input) > 0 load_flights <- function(.input) { write.csv("flights.csv") } ``` ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(unlink("pipelines", recursive = TRUE)) dir.create("pipelines") writeLines( " #' ./pipelines/conditional1.R #' @maestroFrequency 1 hour extract_flights <- function() { # Pretends to fetch data from an API data.frame( flight_id = 1:5, airline_code = c('UA', 'AC', 'AC', 'AA', 'DE'), departure_time = as.POSIXct('2025-10-14 12:00:00') + c(100, 450, 750, 1450, 1750) ) } #' @maestroInputs extract_flights #' @maestroRunIf #' is.data.frame(.input) && nrow(.input) > 0 transform_flights <- function(.input) { proc_data <- .input proc_data <- proc_data |> dplyr::filter( flight_id > 5 ) |> dplyr::mutate( departing_from = 'YHZ' ) proc_data } #' @maestroInputs transform_flights #' @maestroRunIf #' is.data.frame(.input) && nrow(.input) > 0 load_flights <- function(.input) { write.csv('flights.csv') } ", con = "pipelines/conditional2.R" ) ``` ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) status <- run_schedule( schedule, orch_frequency = "1 hour" ) ``` ## Resource Conditionals In maestro, a resource is an argument or variable passed from the orchestrator to all the pipelines that are designed to make use of that resource. It's a way of allowing a pipeline to use a variable created from the orchestrator context - often it's useful for global configuration type stuff. Conditional pipeline logic can also make use of these resources. Let's see a concrete example where we only want to execute a pipeline if we get a `prod = TRUE` signal from the orchestator: ```{r, eval=FALSE} #' ./pipelines/conditional3.R #' @maestroFrequency 1 day #' @maestroRunIf prod process_payments <- function() { # Some code that does important payment processing stuff # but only in production! message("Payments processed") } ``` ```{r echo=FALSE, warning=FALSE, message=FALSE} invisible(unlink("pipelines", recursive = TRUE)) dir.create("pipelines") writeLines( " #' @maestroFrequency 1 day #' @maestroRunIf prod process_payments <- function() { # Some code that does important payment processing stuff # but only in production! message('Payments processed') }", con = "pipelines/conditional3.R" ) ``` ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) status <- run_schedule( schedule, orch_frequency = "1 day", resources = list( prod = TRUE ), log_to_console = TRUE ) ``` ```{r cleanup, echo=FALSE, message=FALSE, warning=FALSE} invisible(unlink("pipelines", recursive = TRUE)) ```