project: connector_plain_list_project_id: [] created_at: '2022-10-18T11:56:56.199895+00:00' created_by_identity: identity_type: GIT_CONFIG name: common-content deleted_at: null deleted_by_identity: null description: Wrappers encapsulate executable resources. Executing a wrapped resource will instead run the wrapper, which can coordinate the execution of the wrapped resource. extension_id: null file_plain_list_project_id: [] flow_plain_list_project_id: [] git_config_plain_list_project_id: [] icon:  is_archived: false is_readonly: false modified_at: '2022-12-16T08:50:04.075601+00:00' modified_by_identity: identity_type: USER name: stefan@cloudomation.com name: Wrapper bundle oauth_plain_list_project_id: [] organization_id_organization: name: cloudomation plugin_plain_list_project_id: [] project_id_project: name: Wrapper bundle record_tag_plain_list_record_id: [] role_plain_list_project_id: [] schedule_plain_list_project_id: [] scheduler_plain_list_project_id: [] schema_plain_list_project_id: - name: wrapper-validate-input-schema value: additionalProperties: false properties: child: type: object mode: enum: - complain - learn point_of_validation: enum: - input - output schema_name: type: string required: - schema_name - mode - point_of_validation type: object setting_plain_list_project_id: [] sync_config_plain_list_project_id: [] tag_plain_list_project_id: [] vault_config_plain_list_project_id: [] version: '2022-12-16' webhook_plain_list_project_id: [] workspace_id_workspace: name: release-6 wrapper_plain_list_project_id: - name: check resource_wrapper_plain_list_resource_id: [] script: "import textwrap\nimport flow_api\n\ndef handler(system: flow_api.System,\ \ this: flow_api.Execution):\n inputs = this.get('input_value')\n child\ \ = this.child(\n **inputs['child'],\n )\n child_name, child_outputs\ \ = child.get('name', 'output_value')\n this.save(name=child_name)\n try:\n\ \ if 'checker_flow' in inputs:\n this.flow(\n \ \ inputs['checker_flow'],\n name=f'check outputs of {child_name}',\n\ \ input_value={\n 'data': child_outputs,\n\ \ **inputs.get('inputs', {}),\n },\n \ \ )\n elif 'checker_script' in inputs:\n this.script(\n\ \ textwrap.dedent(inputs['checker_script']),\n \ \ name=f'check outputs of {child_name}',\n input_value={\n \ \ 'data': child_outputs,\n **inputs.get('inputs',\ \ {}),\n },\n )\n elif 'checker' in inputs:\n\ \ this.script(\n textwrap.dedent(inputs['checker']),\n\ \ name=f'check outputs of {child_name}',\n input_value={\n\ \ 'data': child_outputs,\n **inputs.get('inputs',\ \ {}),\n },\n )\n else:\n return\ \ this.error('expecting `checker_flow` or `checker_script` input')\n except\ \ flow_api.DependencyFailedError:\n return this.error(f'output check\ \ of {child_name} failed')\n this.save(output_value=child_outputs)\n return\ \ this.success(f'output check of {child_name} succeeded')\n" - name: timeout resource_wrapper_plain_list_resource_id: [] script: "import flow_api\n\ndef handler(system: flow_api.System, this: flow_api.Execution):\n\ \ try:\n workspace_wrapper_settings = system.setting('wrapper-timeout-settings').get('value')\n\ \ except flow_api.ResourceNotFoundError:\n workspace_wrapper_settings\ \ = {}\n\n inputs = {\n 'seconds': 60,\n **workspace_wrapper_settings,\n\ \ **this.get('input_value'),\n }\n seconds = inputs['seconds']\n\ \n child = this.child(\n **inputs['child'],\n run=False,\n\ \ )\n\n try:\n child.run(timeout_sec=seconds)\n except flow_api.DependencyTimeoutError:\n\ \ child.cancel_process(f'timeout of {seconds} seconds exceeded.')\n \ \ this.error(f'child timeout of {seconds} seconds exceeded.')\n\n this.save(output_value=child.get('output_value'))\n\ \ return this.success('all done')\n" - name: interactive resource_wrapper_plain_list_resource_id: [] script: "import textwrap\nimport datetime\nimport dateutil.parser as dateutil_parser\n\ import yaml\nimport chevron\nimport markdown\nimport flow_api\n\n\ndef handler(system:\ \ flow_api.System, this: flow_api.Execution):\n try:\n workspace_wrapper_settings\ \ = system.setting('wrapper-interactive-settings').get('value')\n except\ \ flow_api.ResourceNotFoundError:\n workspace_wrapper_settings = {}\n\ \ inputs = {\n 'interactive_limit_minutes': 30,\n 'archive_retries':\ \ True,\n 'notify_users': None,\n 'context': None,\n 'child_message_is_markdown':\ \ True,\n 'subject': '{{child_name}} failed{{#is_retry}} {{fail_count}}\ \ times{{/is_retry}}',\n 'body': '''Execution [{{child_name}}]({{self_url}}/execution/{{child_id}})\ \ ended with **{{child_status}}**\n\n{{#parent_name}}\nThe parent process **{{parent_name}}**\ \ is blocked and waiting for confirmation on how to continue.\n{{/parent_name}}\n\ \n{{#is_email}}\nPlease choose how to proceed by following this link: [{{subject}}]({{self_url}}/execution/{{this_id}})\n\ {{/is_email}}\n\n{{#context}}\n### Context\n\n{{context}}\n{{/context}}\n\n\ {{#child_message}}\n### Status message\n\n{{child_message}}\n{{/child_message}}\n\ ''',\n **workspace_wrapper_settings,\n **this.get('input_value'),\n\ \ }\n self_url = system.get_self_url()\n this_id = this.get('id')\n\ \ notify_users = inputs['notify_users']\n parent = this.parent()\n \ \ parent_name = parent.get('name') if parent else ''\n if inputs['context']:\n\ \ context = textwrap.indent(\n yaml.safe_dump(inputs['context']).strip(),\n\ \ ' ',\n )\n else:\n context = ''\n\n fail_count\ \ = 0\n while True:\n child = this.child(\n **inputs['child'],\n\ \ run=False,\n )\n this.save(name=child.get('name'))\n\ \ try:\n child.run()\n except flow_api.DependencyFailedError:\n\ \ # ask users what to do\n fail_count += 1\n \ \ child_id, child_name, child_status, child_message = child.get(\n \ \ 'id',\n 'name',\n 'status',\n \ \ 'message',\n )\n if not inputs['child_message_is_markdown']:\n\ \ # if the child message is not markdown, indent it so it is\ \ displayed as a preformatted code block\n child_message = textwrap.indent(child_message.strip(),\ \ ' ')\n template_data = {\n 'fail_count': fail_count,\n\ \ 'is_retry': fail_count > 1,\n 'parent_name':\ \ parent_name,\n 'self_url': self_url,\n 'child_id':\ \ child_id,\n 'child_name': child_name,\n 'child_status':\ \ child_status,\n 'child_message': child_message,\n \ \ 'context': context,\n 'this_id': this_id,\n \ \ 'is_email': False,\n }\n subject = chevron.render(inputs['subject'],\ \ template_data)\n template_data['subject'] = subject\n \ \ info_str = chevron.render(inputs['body'], template_data)\n template_data['is_email']\ \ = True\n info_str_email_html = markdown.markdown(chevron.render(inputs['body'],\ \ template_data))\n message = None\n while True:\n \ \ # check which users were active within `interactive_limit_minutes`\n\ \ active_user_names = []\n # get latest activity\ \ of any user\n latest_activity = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)\n\ \ for user in system.users():\n last_activity\ \ = user.get('last_activity')\n if last_activity is None:\n\ \ continue\n last_activity = dateutil_parser.parse(last_activity)\n\ \ if last_activity > datetime.datetime.now(datetime.timezone.utc)\ \ - datetime.timedelta(minutes=inputs['interactive_limit_minutes']):\n \ \ active_user_names.append(user.get('name'))\n \ \ if last_activity > latest_activity:\n latest_activity\ \ = last_activity\n timeout = latest_activity + datetime.timedelta(minutes=inputs['interactive_limit_minutes'])\n\ \ delta = timeout - datetime.datetime.now(datetime.timezone.utc)\n\ \ if delta <= datetime.timedelta(seconds=0):\n \ \ # no user was active within the last `interactive_limit_minutes`\n \ \ # default action is to fail -> re-raise the DependencyFailedError\n\ \ raise\n # a user was active\n \ \ # if we did not yet create a message, create one now\n \ \ if message is None:\n message = system.message(\n \ \ subject=subject,\n body={\n \ \ 'type': 'object',\n 'properties':\ \ {\n 'info': {\n \ \ 'element': 'markdown',\n 'type':\ \ 'string',\n 'docs': info_str,\n \ \ 'order': 1,\n },\n\ \ 'retry': {\n \ \ 'element': 'submit',\n 'type': 'boolean',\n\ \ 'label': 'Retry',\n \ \ 'order': 2,\n },\n \ \ 'fail': {\n 'element':\ \ 'submit',\n 'type': 'boolean',\n \ \ 'label': 'Fail',\n \ \ 'order': 3,\n },\n \ \ 'ignore': {\n 'element':\ \ 'submit',\n 'type': 'boolean',\n \ \ 'label': 'Ignore',\n \ \ 'order': 4,\n },\n \ \ }\n }\n )\n \ \ # send email notification with a link to the message\n \ \ if notify_users in (False, None):\n #\ \ no notification emails\n notify_user_names = []\n \ \ elif notify_users is True:\n # notfy\ \ all \"active\" users\n notify_user_names = active_user_names\n\ \ elif isinstance(notify_users, str):\n \ \ # notify one user\n notify_user_names = [notify_users]\n\ \ else:\n notify_user_names = notify_users\n\ \ for user_name in notify_user_names:\n \ \ try:\n system.user(user_name, by='name').send_mail(\n\ \ subject=subject,\n \ \ html=info_str_email_html,\n )\n \ \ except Exception as ex:\n this.log(warning=f'failed\ \ to send email to {user_name}: {repr(ex)}')\n # let's wait for\ \ an answer to the message\n try:\n message.wait(\n\ \ wait_timeout=delta.total_seconds()\n \ \ )\n except flow_api.MessageResponseTimeoutError:\n \ \ # no response within the timeout\n # re-check\ \ activity levels\n continue\n # we have a\ \ response\n response = message.get('response')\n \ \ if 'retry' in response:\n # we break out of the loop\ \ to wait for a response\n break\n if 'fail'\ \ in response:\n # re-raise the DependencyFailedError\n \ \ raise\n if 'ignore' in response:\n \ \ return this.success('error ignored')\n # retrying\n\ \ if inputs['archive_retries']:\n child.archive_process()\n\ \ else:\n # child was successful\n break\n this.save(output_value=child.get('output_value'))\n\ \ return this.success('success')\n" - name: rollback resource_wrapper_plain_list_resource_id: [] script: "import flow_api\n\ndef handler(system: flow_api.System, this: flow_api.Execution):\n\ \ try:\n workspace_wrapper_settings = system.setting('wrapper-rollback-settings').get('value')\n\ \ except flow_api.ResourceNotFoundError:\n workspace_wrapper_settings\ \ = {}\n inputs = {\n 'rollback_flow_name': None,\n **workspace_wrapper_settings,\n\ \ **this.get('input_value'),\n }\n try:\n child = this.child(**inputs['child'])\n\ \ this.save(output_value=child.get('output_value'))\n except flow_api.DependencyFailedError\ \ as ex:\n child_dict = ex.execution.get_dict('id', 'status', 'message')\n\ \ rollback_flow = this.flow(\n inputs['rollback_flow_name'],\ \ \n id_=child_dict['id'], \n status=child_dict['status'],\ \ \n message=child_dict['message']\n )\n \ \ rollback_dict = rollback_flow.get_dict('output_value', 'message')\n \ \ this.save(\n output_value=rollback_dict['output_value'],\ \ \n message=f'{child_dict[\"message\"]} {rollback_dict[\"message\"\ ]}'\n )\n raise\n\n\n return this.success('all done')\n" - name: validate resource_wrapper_plain_list_resource_id: [] script: "import genson\nimport flow_api\n\ndef handler(system: flow_api.System,\ \ this: flow_api.Execution):\n def update_schema():\n lock = system.setting(f'{inputs[\"\ schema_name\"]}-lock')\n lock.acquire()\n builder = genson.SchemaBuilder()\n\ \ builder.add_schema(old_schema_value)\n builder.add_object(value)\n\ \ combined_schema_value = builder.to_schema()\n builder = None\n\ \n schema.save(value=combined_schema_value)\n lock.release()\n\ \n try:\n workspace_wrapper_settings = system.setting('wrapper-validate-settings').get('value')\n\ \ except flow_api.ResourceNotFoundError:\n workspace_wrapper_settings\ \ = {}\n\n inputs = {\n 'schema_name': None,\n 'mode': 'complain',\n\ \ 'point_of_validation': 'input', \n **workspace_wrapper_settings,\n\ \ **this.get('input_value'),\n }\n system.schema('wrapper-validate-input-schema').validate(inputs)\n\ \n if inputs['mode'] == 'complain':\n schema = system.schema(inputs['schema_name'])\n\ \ if inputs['point_of_validation'] == 'input':\n schema.validate(inputs['child']['input_value'])\n\ \ \n child = this.child(**inputs['child'])\n\n if inputs['point_of_validation']\ \ == 'output':\n schema.validate(child.get('output_value'))\n\n \ \ if inputs['mode'] == 'learn':\n schema = system.schema(inputs['schema_name'])\n\ \ old_schema_value = schema.get('value') if schema.exists() else {}\n\ \n if inputs['point_of_validation'] == 'input':\n value =\ \ inputs['child']['input_value']\n update_schema()\n\n child\ \ = this.child(**inputs['child'])\n\n if inputs['point_of_validation']\ \ == 'output':\n value = child.get('output_value')\n update_schema()\n\ \n this.save(output_value=child.get('output_value'))\n\n return this.success('all\ \ done')\n" - name: mock resource_wrapper_plain_list_resource_id: [] script: "import flow_api\n\ndef handler(system: flow_api.System, this: flow_api.Execution):\n\ \ try:\n workspace_wrapper_settings = system.setting('wrapper-mock-settings').get('value')\n\ \ except flow_api.ResourceNotFoundError:\n workspace_wrapper_settings\ \ = {}\n inputs = {\n 'mode': 'ENDED_SUCCESS',\n 'ignore_in_productive_mode':\ \ True,\n **workspace_wrapper_settings,\n **this.get('input_value'),\n\ \ }\n child = {\n **inputs['child'],\n **inputs['child'].get('init',\ \ {}),\n }\n child_name = child.get('name')\n child_type = child['type']\n\ \ if child_name is not None:\n this.save(name=child_name)\n\n if\ \ this.get('is_productive') is True and inputs['ignore_in_productive_mode']\ \ is True:\n this.save(message='ignoring mock in productive mode')\n\ \ child = this.child(\n **inputs['child'],\n )\n \ \ this.save(output_value=child.get('output_value'))\n return this.success('ignored\ \ mock in productive mode')\n\n if inputs['mode'] == 'STATIC':\n if\ \ 'output_value' not in inputs:\n return this.error('missing input\ \ `output_value` for mode \"STATIC\"')\n this.save(output_value=inputs['output_value'])\n\ \ return this.success('returned static output_value')\n if inputs['mode']\ \ == 'FIXED':\n if 'execution_id' not in inputs:\n return\ \ this.error('missing input `execution_id` for mode \"FIXED\"')\n execution\ \ = system.execution(inputs['execution_id'], by='id')\n output_value,\ \ id_ = execution.get('output_value', 'id')\n this.save(output_value=output_value)\n\ \ return this.success(f'returned output_value from {id_}')\n if inputs['mode']\ \ not in {'ENDED_SUCCESS', 'ENDED_ERROR', 'LAST'}:\n return this.error(f\"\ invalid mode {inputs['mode']}\")\n\n conditions = [{\n 'field': 'type',\n\ \ 'op': 'eq',\n 'value': child_type,\n }]\n if child_name\ \ is not None:\n conditions.append({\n 'field': 'name',\n\ \ 'op': 'eq',\n 'value': child_name,\n })\n \ \ if child_type == 'FLOW':\n if 'flow_name' in child:\n flow_id\ \ = system.flow(child['flow_name'], by='name').get('id')\n elif 'flow_id'\ \ in child:\n flow_id = child['flow_id']\n else:\n \ \ return this.error('cannot identify flow')\n conditions.append({\n\ \ 'field': 'flow_id',\n 'op': 'eq',\n 'value':\ \ flow_id,\n })\n elif child_type == 'CONNECTION':\n if 'connector_name'\ \ in child:\n connector_id = system.connector(child['connector_name'],\ \ by='name').get('id')\n elif 'connector_id' in child:\n connector_id\ \ = child['connector_id']\n else:\n return this.error('cannot\ \ identify connector')\n conditions.append({\n 'field': 'connector_id',\n\ \ 'op': 'eq',\n 'value': connector_id,\n })\n \ \ elif child_type == 'WRAPPER':\n if 'wrapper_name' in child:\n \ \ wrapper_id = system.wrapper(child['wrapper_name'], by='name').get('id')\n\ \ elif 'wrapper_id' in child:\n wrapper_id = child['wrapper_id']\n\ \ else:\n return this.error('cannot identify wrapper')\n \ \ conditions.append({\n 'field': 'wrapper_id',\n \ \ 'op': 'eq',\n 'value': wrapper_id,\n })\n else:\n \ \ return this.error(f'dont know how to handle child_type {child_type}')\n\ \ if inputs['mode'] in {'ENDED_SUCCESS', 'ENDED_ERROR'}:\n conditions.append({\n\ \ 'field': 'status',\n 'op': 'eq',\n 'value':\ \ inputs['mode'],\n })\n\n try:\n execution = next(system.executions(\n\ \ filter_={'and': conditions},\n order='-created_at',\n\ \ limit=1,\n include_archived=True,\n ))\n except\ \ StopIteration:\n return this.error('did not find any matching executions')\n\ \ output_value, id_ = execution.get('output_value', 'id')\n this.save(output_value=output_value)\n\ \ return this.success(f'returned output_value from {id_}')\n" - name: cache resource_wrapper_plain_list_resource_id: [] script: "import datetime\nimport flow_api\n\ndef handler(system: flow_api.System,\ \ this: flow_api.Execution):\n try:\n workspace_wrapper_settings =\ \ system.setting('wrapper-cache-settings').get('value')\n except flow_api.ResourceNotFoundError:\n\ \ workspace_wrapper_settings = {}\n inputs = {\n 'max_age_minutes':\ \ 24*60,\n **workspace_wrapper_settings,\n **this.get('input_value'),\n\ \ }\n child = {\n **inputs['child'],\n **inputs['child'].get('init',\ \ {}),\n }\n child_name = child.get('name')\n child_type = child['type']\n\ \ if child_name is not None:\n this.save(name=child_name)\n\n conditions\ \ = [\n {\n 'field': 'type',\n 'op': 'eq',\n \ \ 'value': child_type,\n },\n {\n 'field':\ \ 'status',\n 'op': 'eq',\n 'value': 'ENDED_SUCCESS',\n\ \ },\n {\n 'field': 'created_at',\n 'op':\ \ 'gte',\n 'value': datetime.datetime.now() - datetime.timedelta(minutes=inputs['max_age_minutes']),\n\ \ },\n ]\n if child_name is not None:\n conditions.append({\n\ \ 'field': 'name',\n 'op': 'eq',\n 'value':\ \ child_name,\n })\n if child_type == 'FLOW':\n if 'flow_name'\ \ in child:\n flow_id = system.flow(child['flow_name'], by='name').get('id')\n\ \ elif 'flow_id' in child:\n flow_id = child['flow_id']\n\ \ else:\n return this.error('cannot identify flow')\n \ \ conditions.append({\n 'field': 'flow_id',\n 'op':\ \ 'eq',\n 'value': flow_id,\n })\n elif child_type == 'CONNECTION':\n\ \ if 'connector_name' in child:\n connector_id = system.connector(child['connector_name'],\ \ by='name').get('id')\n elif 'connector_id' in child:\n connector_id\ \ = child['connector_id']\n else:\n return this.error('The\ \ cache wrapper can not be used with dynamic connections.')\n conditions.append({\n\ \ 'field': 'connector_id',\n 'op': 'eq',\n \ \ 'value': connector_id,\n })\n elif child_type == 'WRAPPER':\n \ \ if 'wrapper_name' in child:\n wrapper_id = system.wrapper(child['wrapper_name'],\ \ by='name').get('id')\n elif 'wrapper_id' in child:\n wrapper_id\ \ = child['wrapper_id']\n else:\n return this.error('cannot\ \ identify wrapper')\n conditions.append({\n 'field': 'wrapper_id',\n\ \ 'op': 'eq',\n 'value': wrapper_id,\n })\n \ \ else:\n return this.error(f'dont know how to handle child_type {child_type}')\n\ \n try:\n execution = next(system.executions(\n filter_={'and':\ \ conditions},\n order='-created_at',\n limit=1,\n \ \ include_archived=True,\n ))\n except StopIteration:\n \ \ child = this.child(\n **inputs['child'],\n )\n \ \ this.save(output_value=child.get('output_value'))\n return this.success('started\ \ child')\n output_value, id_ = execution.get('output_value', 'id')\n \ \ this.save(output_value=output_value)\n return this.success(f'returned output_value\ \ from {id_}')\n" - name: retry resource_wrapper_plain_list_resource_id: [] script: "import datetime\n\nimport flow_api\n\ndef handler(system: flow_api.System,\ \ this: flow_api.Execution):\n try:\n workspace_wrapper_settings =\ \ system.setting('wrapper-retry-settings').get('value')\n except flow_api.ResourceNotFoundError:\n\ \ workspace_wrapper_settings = {}\n inputs = {\n 'max_tries':\ \ 0,\n 'delay_sec': 10,\n 'timeout_sec': 0,\n 'archive_retries':\ \ True,\n **workspace_wrapper_settings,\n **this.get('input_value'),\n\ \ }\n\n try_count = 0\n timeout = (\n None\n if inputs['timeout_sec']\ \ <= 0\n else (\n datetime.datetime.now(datetime.timezone.utc)\n\ \ + datetime.timedelta(seconds=inputs['timeout_sec'])\n )\n\ \ )\n end_condition = ''\n if inputs['max_tries'] > 0:\n end_condition\ \ += f\"max. {inputs['max_tries']} tries \"\n if inputs['timeout_sec'] >\ \ 0:\n end_condition += f\"{inputs['timeout_sec']} sec timeout\"\n\n\ \ while True:\n try_count += 1\n this.save(message=f'try #{try_count}/{end_condition}')\n\ \ child = this.child(\n **inputs['child'],\n run=False,\n\ \ )\n this.save(name=child.get('name'))\n try:\n \ \ child.run()\n except flow_api.DependencyFailedError as ex:\n \ \ if timeout and datetime.datetime.now(datetime.timezone.utc) > timeout:\n\ \ raise flow_api.TimeoutExceededError() from ex\n \ \ if inputs['max_tries'] > 0 and try_count >= inputs['max_tries']:\n \ \ raise\n if inputs['delay_sec']:\n this.sleep(\n\ \ inputs['delay_sec'],\n message=f\"try\ \ #{try_count}/{end_condition} - failed. delaying retry\"\n )\n\ \ if inputs['archive_retries']:\n child.archive_process()\n\ \ else:\n this.save(output_value=child.get('output_value'))\n\ \ return this.success(f'success after {try_count} {\"try\" if try_count\ \ == 1 else \"tries\"}')\n" - name: notify resource_wrapper_plain_list_resource_id: [] script: "import textwrap\nimport yaml\nimport chevron\nimport markdown\nimport\ \ flow_api\n\ndef handler(system: flow_api.System, this: flow_api.Execution):\n\ \ try:\n workspace_wrapper_settings = system.setting('wrapper-notify-settings').get('value')\n\ \ except flow_api.ResourceNotFoundError:\n workspace_wrapper_settings\ \ = {}\n\n inputs = {\n 'notify_on_success': False,\n 'notify_on_error':\ \ True,\n #'use_markdown': True, # Deprecated, replaced by `child_message_is_markdown`\n\ \ 'child_message_is_markdown': True,\n 'to': None,\n 'context':\ \ None,\n 'subject': '{{child_name}} {{child_status}}',\n 'body':\ \ '''Execution [{{child_name}}]({{self_url}}/execution/{{child_id}}) ended with\ \ **{{child_status}}**\n\n{{#context}}\n### Context\n\n{{context}}\n{{/context}}\n\ \n{{#child_message}}\n### Status message\n\n{{child_message}}\n{{/child_message}}\n\ ''',\n **workspace_wrapper_settings,\n **this.get('input_value'),\n\ \ }\n # Backwards-compatible accept deprecated parameter\n if 'use_markdown'\ \ in inputs and 'child_message_is_markdown' not in inputs:\n inputs['child_message_is_markdown']\ \ = inputs.pop('use_markdown')\n self_url = system.get_self_url()\n this_id\ \ = this.get('id')\n if inputs['context']:\n context = textwrap.indent(\n\ \ yaml.safe_dump(inputs['context']).strip(),\n ' ',\n\ \ )\n else:\n context = ''\n\n child = this.child(\n \ \ **inputs['child'],\n run=False,\n )\n this.save(name=child.get('name'))\n\ \ if isinstance(inputs['to'], str):\n inputs['to'] = [inputs['to']]\n\ \ try:\n child.run()\n except flow_api.DependencyFailedError as\ \ ex:\n exception = ex\n else:\n exception = None\n\n child_id,\ \ child_name, child_status, child_message = child.get(\n 'id',\n \ \ 'name',\n 'status',\n 'message',\n )\n if not inputs['child_message_is_markdown']:\n\ \ # if the child message is not markdown, indent it so it is displayed\ \ as a preformatted code block\n child_message = textwrap.indent(child_message.strip(),\ \ ' ')\n template_data = {\n 'self_url': self_url,\n 'child_id':\ \ child_id,\n 'child_name': child_name,\n 'child_status': child_status,\n\ \ 'child_message': child_message,\n 'context': context,\n \ \ 'this_id': this_id,\n }\n subject = chevron.render(inputs['subject'],\ \ template_data)\n template_data['subject'] = subject\n info_str_email_html\ \ = markdown.markdown(chevron.render(inputs['body'], template_data))\n\n \ \ if exception is not None:\n if inputs['notify_on_error'] and inputs['to']:\n\ \ for user_name in inputs['to']:\n try:\n \ \ system.user(user_name, by='name').send_mail(\n \ \ subject=subject,\n html=info_str_email_html,\n\ \ )\n except Exception as ex:\n \ \ this.log(warning=f'failed to send email to {user_name}: {repr(ex)}')\n\ \ # Stefan: false-positive. we are in `if exception is not None`\n \ \ # pylint: disable=raising-bad-type\n raise exception\n if inputs['notify_on_success']\ \ and inputs['to']:\n for user_name in inputs['to']:\n try:\n\ \ system.user(user_name, by='name').send_mail(\n \ \ subject=subject,\n html=info_str_email_html,\n \ \ )\n except Exception as ex:\n this.log(warning=f'failed\ \ to send email to {user_name}: {repr(ex)}')\n\n this.save(output_value=child.get('output_value'))\n\ \ return this.success('success')\n"