ConnectorTypeSQLPG
class connector_types.connector_type_sqlpg.ConnectorTypeSQLPG
Interact with a PostgreSQL database. This connector type supports the execute,
fetch, fetchrow, and fetchval commands. Each command expects an
SQL query and returns the status, list, record or field value respectively.
Consult the PostgreSQL SQL language documentation at https://www.postgresql.org/docs/12/tutorial-sql.html for more information.
Input Schema
-
schema_versionType:
string -
authenticationType:
anyOfOptions: -
hostThe remote hostname or IP address.
Type:
string -
portType:
anyOfOptions: -
tlsIf to connect using TLS/SSL.
Type:
anyOfOptions: -
modeType:
anyOfOptions: -
databaseThe database to use.
Type:
string
Output Schema
-
resultData
Example
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
postgres_server_version = this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'fetchval',
'query': 'SELECT version()',
},
).get('output_value')['result']
this.log(postgres_server_version=postgres_server_version)
return this.success('all done')
Insert data into a table
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'execute',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'params': ['foo', 42],
},
)
return this.success('all done')
Execute many
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'executemany',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'rows': [
['foo', 42],
['bar', 43],
],
'batch_size': 100,
},
)
return this.success('all done')
Transaction
import flow_api
def handler(system: flow_api.System, this: flow_api.Execution, inputs: dict):
results = this.connect(
connector_type='SQLPG',
authentication={
'authentication_method': 'username_password',
'username': '...',
'password': '...',
},
host='...',
database='...',
mode={
'mode_name': 'transaction',
'queries': [
{
'mode_name': 'execute',
'query': 'INSERT INTO mytab (str_col, int_col) VALUES ($1, $2);',
'params': ['foo', 42],
},
{
'mode_name': 'fetchval',
'query': 'SELECT last_insert_id()',
},
],
},
).get('output_value')['result']
this.log(results=results)
return this.success('all done')