Skip to main content
Version: 10 - TBD

ConnectorTypeSQLPG

class connector_types.connector_type_sqlpg.ConnectorTypeSQLPG

Interact with a PostgreSQL database. This connector type supports the execute, fetch, fetchrow, and fetchval commands. Each command expects an SQL query and returns the status, list, record or field value respectively.

Consult the PostgreSQL SQL language documentation at https://www.postgresql.org/docs/12/tutorial-sql.html for more information.

Input Schema

  • host

    Type: string

  • port

    Type: anyOf

  • user

    Type: string

    Default: postgres

  • password

    Type: anyOf

  • server_ca

    Type: anyOf

  • check_hostname

    If set, the hostname of the PostgreSQL server is checked against the server_ca certificate

    Type: boolean

    Default: True

  • client_cert

    Type: anyOf

  • client_key

    Type: anyOf

  • database

    The database to use.

    Type: string

    Default: pg_catalog

  • execute

    Execute a query.

    Type: anyOf

  • executemany

    Execute a query with many rows.

    Type: anyOf

  • transaction

    Execute several queries in a transaction.

    Type: anyOf

  • fetch

    Execute a query and fetch all rows.

    Type: anyOf

  • fetchrow

    Execute a query and fetch the first row.

    Type: anyOf

  • fetchval

    Execute a query and fetch the first column of the first row.

    Type: anyOf

  • params

    Bind parameters for the query.

    Type: array

Output Schema

  • result

    Data

Constants

ssl_context_inputs = ['check_hostname', 'client_cert', 'client_key', 'server_ca']

Example

import flow_api

def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
postgres_server_version = this.connect(
connector_type='SQLPG',
host='my-postgres-server',
fetchval='SELECT version()',
).get('output_value')['result']
this.log(postgres_server_version=postgres_server_version)

postgres_server_events = this.connect(
connector_type='SQLPG',
host='my-postgres-server',
database='testlocal',
execute='CREATE TABLE IF NOT EXISTS mytab (a int, b int);',
).get('output_value')['result']
this.log(postgres_server_events=postgres_server_events)

postgres_server_events = this.connect(
connector_type='SQLPG',
host='my-postgres-server',
database='testlocal',
transaction=[{'execute': 'INSERT INTO mytab (a, b) VALUES ($1, $2);', 'params': [1,2]},
{'execute': 'INSERT INTO mytab (a, b) VALUES ($1, $2);', 'params': [3,4]},
{'execute': 'INSERT INTO mytab (a, b) VALUES ($1, $2);', 'params': [5,6]},
],
).get('output_value')['result']
this.log(postgres_server_events=postgres_server_events)

postgres_server_events = this.connect(
connector_type='SQLPG',
host='my-postgres-server',
database='testlocal',
executemany=['INSERT INTO mytab (a, b) VALUES ($1, $2);',
[(1,2), (3,4), (6,7)]],
).get('output_value')['result']
this.log(postgres_server_events=postgres_server_events)

return this.success('all done')