Skip to main content
Version: 7 - Gugelhupf

Wrappers

Wrappers encapsulate executeable resources. Executing a wrapped resource will instead run the wrapper, which can coordinate the execution of the wrapped resource.

Use Cases

Use wrappers to

  • Make sure something is always executed before and/or after.
  • Automatically retry an execution in case of an error. See Retries.
  • Send notifications in case of an error. See Notifications.
  • Provision resources on-the-fly when they are accessed.
  • Cleanup resources after they were accessed.
  • Execute a flow interactively. See Interactive Mode.

Concept

The following resource types can be wrapped:

  • Flow
  • Schedule
  • Scheduler
  • Connection
  • Wrapper
note

In the example below we use a wrapped flow. The same concept also applies for all other resource types which can be wrapped.

When a user runs a flow which is wrapped, an execution of the wrapper script is started instead. The wrapper script is provided with the information of how to start the flow and its inputs. The wrapper script is then responsible for starting the flow which the user originally wanted to run.

The wrapper script can have arbitrary logic before / around / after starting the flow. The wrapper script can modify the inputs for the flow, it can modify the outputs of the flow, it can start the flow multiple times, or the wrapper can also decide not to start the flow at all.

Wrapping resources

Resources can be wrapped statically. This means the wrapper is attached to the definition of the resource and always applies when the resource is used. The caller does not need to know if there is a wrapper. This is useful when certain functionality should always apply when the resource is used.

Another method is to apply wrappers dynamically when using a resource. A caller can specify a wrapper for a particular child execution. The wrapper applies for this particular use of the resource. In other places the same resource can be used without the wrapper.

Static wrappers

To register a static wrapper on a resource, open the resource in the frontend. Scoll down to "Wrappers" and click "Add".

Each statically defined wrapper has an order field. Wrappers are applied in ascending order.

Statically defined wrappers also have "Wrapper parameters" which are passed to the wrapper script next to the child input_value.

Dynamic wrappers

To use a wrapper dynamically use the following syntax in your script:

this.using('<name-of-a-wrapper>').flow('<name-of-a-flow>')
# or
this.using('<name-of-a-wrapper>').connect('<name-of-a-connector>')

To use multiple dynamic wrappers

this.using(
'<name-of-a-wrapper>'
).using(
'<name-of-another-wrapper>'
).flow(
'<name-of-a-flow>'
)

Specify parameters for a wrapper

this.using(
'<name-of-a-wrapper>',
parameter='for-the-wrapper',
).flow(
'<name-of-a-flow>',
another_parameter='for-the-flow',
)

Multiple wrappers

It is possible to specify multiple static wrappers, multiple dynamic wrappers and both at the same time. The wrappers will be applied in the following order:

  1. dynamic wrappers in the order they were specified
  2. static wrappers in ascending order
example

A chain of wrapped executions

# Flow 1
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
this.using(
'Dynamic Wrapper 1'
).using(
'Dynamic Wrapper 2'
).flow('Flow 2')

Static wrappers of Flow 2:

  • Static Wrapper 1, order: 10
  • Static Wrapper 2, order: 20

The following chain of executions will run when Flow 1 is started:

Examples

We provide several example wrappers as part of the Wrapper bundle.

Modify inputs

example

Modify the inputs of a wrapped execution

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
input_value = this.get('input_value')
child = input_value['child']
# child contains all information needed to start
# the child execution, including its input_value
output_value = this.child(
**child,
# we can override any input by specifying it later
parameter='overridden',
).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')
example

Remove an input passed to the wrapped execution

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
input_value = this.get('input_value')
child = input_value['child']
# child contains all information needed to start
# the child execution, including its input_value
# we can remove any input by removing it from the `child` dictionary
child.pop('parameter')
output_value = this.child(**child).get('output_value')
# we need to store the output_value of the child in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')

Modify outputs

example

Modify outputs returned by a wrapped execution

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
input_value = this.get('input_value')
child = input_value['child']
output_value = this.child(**child).get('output_value')
# we can modify and remove outputs as needed
output_value.pop('parameter-to-remove')
output_value['parameter'] = 'new-value'
# we need to store the final output_value in the wrappers output_value
this.save(output_value=output_value)
return this.success('all done')

Starting child more than once

example

Start a child execution in a loop until success

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
input_value = this.get('input_value')
child = input_value['child']
for _ in range(5): # let's try at most 5 times
try:
output_value = this.child(**child).get('output_value')
except flow_api.DependencyFailedError:
# let's wait a second before retrying
this.sleep(1)
continue
else:
break # child succeeded, let's exit the loop
else:
return this.error('child did not succeed after 5 tries')
this.save(output_value=output_value)
return this.success('all done')

Not starting the child

example

Only starting the child outside of working hours, otherwise return the last output_value

import datetime
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
input_value = this.get('input_value')
child = input_value['child']
now = datetime.datetime.today()
if now.hour < 9 or now.hour > 17:
# outside working hours. let's start the child
output_value = this.child(**child).get('output_value')
# store the output_value in a setting
system.setting('last-child-output').save(value=output_value)
else:
# within working hours, we do not start the child
# load the last output from the setting
output_value = system.setting('last-child-output').get('value')
this.save(output_value=output_value)
return this.success('all done')

Learn More

Notifications
Retries
Interactive Mode
Caching
Wrapper bundle