ConnectorTypeK8S
class connector_types.connector_type_k8s.ConnectorTypeK8S
Interact with a kubernetes cluster.
This connector type enables you to interact with a remote kubernetes cluster. See the Official Python client library for kubernetes for a description of available APIs and methods.
Input Schema
-
host
The hostname or IP of the kubernetes cluster to use.
Type:
string
-
certificate
The CA certificate of the kubernetes cluster.
Type:
string
-
document
A Kubernetes API JSON object to apply. Either
document
or (api
andmethod
) must be specified. -
api
The API to use. E.g. "CoreV1Api". Either
document
or (api
andmethod
) must be specified.Type:
anyOf
Options: -
method
The method to call. E.g. "create_namespaced_deployment". Either
document
or (api
andmethod
) must be specified.Type:
anyOf
Options: -
replace
If set to true, the resource will be replaced. Otherwise it will be patched, which means artefacts that have not changed will remain.
Type:
boolean
-
args
Positional arguments to pass to the method call.
Type:
array
Items: -
kwargs
Keyword arguments to pass to the method call
Type:
object
Additional Properties:
True
Pattern Properties:
-
.*
Data
-
-
token
The bearer token to use for authentication. Either
token
orkey
must be specified.Type:
anyOf
Options: -
key
Type:
anyOf
Options: -
parameters
DEPRECATED. Replaced by
kwargs
.
Output Schema
-
result
Data
Constants
ssl_context_inputs = ['check_hostname', 'client_cert', 'client_key', 'server_ca']Example
import yaml
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
nginx_deployment_str = system.file('nginx-deployment.yaml').get_text_content()
nginx_deployment = yaml.safe_load(nginx_deployment_str)
this.connect(
connector_type='K8S',
token='*secret*',
host='my-kubernetes-cluster-master',
certificate='-----BEGIN CERTIFICATE-----\nMII...',
api='ExtensionsV1beta1Api',
method='create_namespaced_deployment',
kwargs={
'body': nginx_deployment,
},
)
return this.success('all done')