Skip to main content
Version: 7 - Gugelhupf

ConnectorTypeK8S

class connector_types.connector_type_k8s.ConnectorTypeK8S

Interact with a kubernetes cluster.

This connector type enables you to interact with a remote kubernetes cluster. See the Official Python client library for kubernetes for a description of available APIs and methods.

Inputs

NameTypeDefaultDescription
apistrNoneThe API to use. E.g. "CoreV1Api". Either document or (api and method) must be specified
argslistNonePositional arguments to pass to the method call
certificatestrThe CA certificate of the kubernetes cluster
documentdictNoneA Kubernetes API object to apply. Either document or (api and method) must be specified
grantstrNonethe name of an oauth grant to use for authentication. Either token, key, or grant must be specified
hoststrThe hostname or IP of the kubernetes cluster to use
keydictNoneThe content of the key file of the service account to use. Either token, key, or grant must be specified
kwargsdictNoneKeyword arguments to pass to the method call
methodstrNoneThe method to call. E.g. "create_namespaced_deployment". Either document or (api and method) must be specified
parametersdictNoneDeprecated: Additional keyword parameters to pass to the method call, please use kwargs instead
replaceboolFalseIf set to true, the resource will be replaced. Otherwise it will be patched, which means artifacts that have not changed will remain
tokenstrNoneThe bearer token to use for authentication. Either token, key, or grant must be specified

Outputs

NameTypeDefaultDescription
resultdict

Constants

input_list = ['api', 'args', 'certificate', 'document', 'grant', 'host', 'key', 'kwargs', 'method', 'parameters', 'replace', 'token'] output_list = ['result'] ssl_context_inputs = ['check_hostname', 'client_cert', 'client_key', 'server_ca'] version = 1

Methods

execute

log

one_of_inputs

run

Example

import yaml
import flow_api

def handler(system: flow_api.System, this: flow_api.Execution):
nginx_deployment_str = system.file('nginx-deployment.yaml').get_text_content()
nginx_deployment = yaml.safe_load(nginx_deployment_str)
this.connect(
connector_type='K8S',
token='*secret*',
host='my-kubernetes-cluster-master',
certificate='-----BEGIN CERTIFICATE-----\nMII...',
api='ExtensionsV1beta1Api',
method='create_namespaced_deployment',
kwargs={
'body': nginx_deployment,
},
)

return this.success('all done')