mt.sql.psql

Useful modules for accessing PostgreSQL

Functions

mt.sql.psql.pg_get_locked_transactions(engine, schema: str | None = None)

Obtains a dataframe representing transactions which have been locked by the server.

Parameters:
  • engine (sqlalchemy.engine.Engine) – connection engine

  • schema (str or None) – If None, then all schemas are considered and not just the public schema. Else, scope down to a single schema.

Returns:

A table containing the current backend transactions

Return type:

pd.DataFrame

mt.sql.psql.pg_cancel_backend(engine, pid)

Cancels a backend transaction given its pid.

Parameters:
  • engine (sqlalchemy.engine.Engine) – connection engine

  • pid (int) – the backend pid to be cancelled

mt.sql.psql.pg_cancel_all_backends(engine, schema: str | None = None, logger: IndentedLoggerAdapter | None = None)

Cancels all backend transactions.

Parameters:
  • engine (sqlalchemy.engine.Engine) – connection engine

  • schema (str or None) – If None, then all schemas are considered and not just the public schema. Else, scope down to a single schema.

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.compliance_check(df: DataFrame)

Checks if a dataframe is compliant to PSQL.

It must have no index, or indices which do not match with any column.

Parameters:

df (pandas.DataFrame) – the input dataframe

Raises:

ValueError – when an error is encountered.

mt.sql.psql.as_column_name(s)

Converts a string into a PSQL-compliant column name.

Parameters:

s (str) – a string

Returns:

s2 – a lower-case alpha-numeric and underscore-only string

Return type:

str

Raises:

ValueError if the string cannot be converted.

mt.sql.psql.to_sql(df, name, engine, schema: str | None = None, if_exists='fail', nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None, **kwargs)

Writes records stored in a DataFrame to a PostgreSQL database.

With a number of trials to overcome OperationalError.

Parameters:
  • df (pandas.DataFrame) – dataframe to be sent to the server

  • name (str) – name of the table to be written to

  • engine (sqlalchemy.engine.Engine) – connection engine to the server

  • schema (string, optional) – Specify the schema. If None, use default schema.

  • if_exists (str) – what to do when the table exists. Beside all options available from pandas.to_sql(), a new option called ‘gently_replace’ is introduced, in which it will avoid dropping the table by trying to delete all entries and then inserting new entries. But it will only do so if the remote table contains exactly all the columns that the local dataframe has, and vice-versa.

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Raises:

sqlalchemy.exc.ProgrammingError if the local and remote frames do not have the same structure

Notes

The original pandas.DataFrame.to_sql() function does not turn any index into a primary key in PSQL. This function attempts to fix that problem. It takes as input a PSQL-compliant dataframe (see compliance_check()). It ignores any input index or index_label keyword. Instead, it considers 2 cases. If the dataframe’s has an index or indices, then the tuple of all indices is turned into the primary key. If not, there is no primary key and no index is uploaded.

See also

pandas.DataFrame.to_sql

mt.sql.psql.rename_schema(old_schema, new_schema, engine, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Renames a schema.

Parameters:
  • old_schema (str) – old schema name

  • new_schema (str) – new schema name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.list_matviews(engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all materialized views of a given schema.

Parameters:
  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out – list of all materialized view names

Return type:

list

mt.sql.psql.list_foreign_tables(engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all foreign tables of a given schema.

Parameters:
  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out – list of all materialized view names

Return type:

list

mt.sql.psql.list_frames(engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all dataframes (tables/views/materialized views/foreign tables) of a given schema.

Parameters:
  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out – list of all dataframes of types {‘table’, ‘view’, ‘matview’}

Return type:

pd.DataFrame(columns=[‘name’, ‘type’])

mt.sql.psql.list_all_frames(engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all dataframes (tables/views/materialized views/foreign tables) across all schemas.

Parameters:
  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out – list of all dataframes of types {‘table’, ‘view’, ‘matview’}

Return type:

pd.DataFrame(columns=[‘name’, ‘schema’, ‘type’])

mt.sql.psql.get_frame_length(frame_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Gets the number of rows of a dataframes (tables/views/materialized views).

Parameters:
  • frame_name (str) – name of the dataframe

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out – number of rows

Return type:

int

Notes

The dataframe must exist.

mt.sql.psql.get_frame_dependencies(frame_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Gets the list of all frames that depend on the given frame.

mt.sql.psql.get_view_sql_code(view_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Gets the SQL string of a view.

Parameters:
  • view_name (str) – view name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

retval – SQL query string defining the view

Return type:

str

mt.sql.psql.rename_table(old_table_name, new_table_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Renames a table of a schema.

Parameters:
  • old_table_name (str) – old table name

  • new_table_name (str) – new table name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Return type:

whatever exec_sql() returns

mt.sql.psql.vacuum_table(table_name, engine, schema: str | None = None, full: bool = False, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Vacuums a table of a schema.

Parameters:
  • table_name (str) –

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • full (bool) – whether or not to do a full vacuuming

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Return type:

whatever exec_sql() returns

mt.sql.psql.drop_table(table_name, engine, schema: str | None = None, restrict=True, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Drops a table if it exists, with restrict or cascade options.

Parameters:
  • table_name (str) – table name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • restrict (bool) – If True, refuses to drop table if there is any object depending on it. Otherwise it is the ‘cascade’ option which allows you to remove those dependent objects together with the table automatically.

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Return type:

whatever exec_sql() returns

mt.sql.psql.rename_view(old_view_name, new_view_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Renames a view of a schema.

Parameters:
  • old_view_name (str) – old view name

  • new_view_name (str) – new view name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.drop_view(view_name, engine, schema: str | None = None, restrict=True, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Drops a view if it exists, with restrict or cascade options.

Parameters:
  • view_name (str) – view name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • restrict (bool) – If True, refuses to drop table if there is any object depending on it. Otherwise it is the ‘cascade’ option which allows you to remove those dependent objects together with the table automatically.

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Return type:

whatever exec_sql() returns

mt.sql.psql.rename_matview(old_matview_name, new_matview_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Renames a materialized view of a schema.

Parameters:
  • old_matview_name (str) – old materialized view name

  • new_matview_name (str) – new materialized view name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.refresh_matview(matview_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Refreshes a materialized view of a schema.

Parameters:
  • matview_name (str) – materialized view name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.drop_matview(matview_name, engine, schema: str | None = None, restrict=True, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Drops a mateiralized view if it exists, with restrict or cascade options.

Parameters:
  • matview_name (str) – materialized view name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • restrict (bool) – If True, refuses to drop table if there is any object depending on it. Otherwise it is the ‘cascade’ option which allows you to remove those dependent objects together with the table automatically.

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Return type:

whatever exec_sql() returns

mt.sql.psql.frame_exists(frame_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Checks if a frame exists.

Parameters:
  • frame_name (str) – name of table or view

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

retval – whether a table or a view exists with the given name

Return type:

bool

mt.sql.psql.drop_frame(frame_name, engine, schema: str | None = None, restrict=True, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Drops a frame (table/view/mateiralized view) if it exists, with restrict or cascade options.

Parameters:
  • frame_name (str) – frame name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • restrict (bool) – If True, refuses to drop table if there is any object depending on it. Otherwise it is the ‘cascade’ option which allows you to remove those dependent objects together with the table automatically.

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Return type:

whatever exec_sql() returns, or False if the frame does not exist

mt.sql.psql.list_columns_ext(table_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all columns of a given table of a given schema.

Parameters:
  • table_name (str) – a valid table name returned from list_tables()

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out – a table of details of the columns

Return type:

pandas.DataFrame

mt.sql.psql.list_columns(table_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all columns of a given table of a given schema.

Parameters:
  • table_name (str) – a valid table name returned from list_tables()

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

out

Return type:

list of all column names

mt.sql.psql.list_primary_columns_ext(frame_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all primary columns of a given frame of a given schema.

Parameters:
  • frame_name (str) – a valid table/view/matview name returned from list_frames()

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

dataframe containing primary column names and data types

Return type:

pandas.DataFrame

mt.sql.psql.list_primary_columns(frame_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Lists all primary columns of a given frame of a given schema.

Parameters:
  • frame_name (str) – a valid table/view/matview name returned from list_frames()

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

list of primary column names

Return type:

list

mt.sql.psql.rename_column(table_name, old_column_name, new_column_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Renames a column of a table.

Parameters:
  • table_name (str) – table name

  • old_column_name (str) – old column name

  • new_column_name (str) – new column name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – schema name

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.drop_column(table_name, column_name, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Drops a column of a table.

Parameters:
  • table_name (str) – table name

  • column_name (str) – column name

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – schema name

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.make_primary(table_name: str, l_columns: list, engine, schema: str | None = None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Removes all duplicate records from an unindexed table based on a list of keys and then make the keys primary.

Parameters:
  • table_name (str) – a valid table name returned from list_tables()

  • l_columns (list,) – list of columns to be made as primary keys

  • engine (sqlalchemy.engine.Engine) – an sqlalchemy connection engine created by function create_engine()

  • schema (str or None) – a valid schema name returned from list_schemas()

  • nb_trials (int) – number of query trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

mt.sql.psql.comparesync_table(engine, df_filepath, table_name, id_name, hash_name='hash', columns=['*'], schema: str | None = None, max_records_per_query=None, cond=None, reading_mode=True, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Compares a local CSV table with a remote PostgreSQL to find out which rows are the same or different.

Parameters:
  • engine (sqlalchemy connectible) – connection to the PostgreSQL database

  • df_filepath (path) – path to the local ‘.csv’, ‘.csv.zip’ or ‘.parquet’ file

  • table_name (str) – table name

  • id_name (str) – index column name. Assumption is only one column for indexing for now.

  • hash_name (str) – Name of the hash field that only changes when the row changes. If reading_mode is True and the field does not exist remotely, it will be generated by the remote server via md5. If reading_mode is False and the field does not exist locally, it will be generate locally using hashlib.

  • columns (list) – list of column names the function will read from, ignoring the remaining columns

  • schema (str) – schema name, None means using the default one

  • max_records_per_query (int or None) – maximum number of records to be updated in each SQL query. If None, this will be dynamic to make sure each query runs about 5 minute.

  • cond (str) – additional condition in selecting rows from the PostgreSQL table

  • reading_mode (bool) – whether comparing is for reading or for writing

  • nb_trials (int) – number of read_sql() trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

  • local_df (pandas.DataFrame(index=id_name, columns=[…, hash_name]) or None) – local dataframe loaded to memory, if it exists

  • remote_md5_df (pandas.DataFrame(index=id_name, columns=[hash_name])) – remote dataframe containing only the hash values

  • same_keys (list) – list of keys identifying rows which appear in both tables and are the same

  • diff_keys (list) – list of keys identifying rows which appear in both tables but are different

  • local_only_keys (list) – list of keys containing rows which appear in the local table only

  • remote_only_keys (list) – list of keys identifying rows which appear in the remote table only

Notes

The hash field of each table will be used to store and compare the hash values. If it does not exist, it will be generated automatically.

The id_name field must uniquely identify each record in both tables. Duplicated keys in either table will be treated as diff_keys, so that hopefully next sync will fix them.

mt.sql.psql.readsync_table(engine, df_filepath, table_name, id_name, hash_name='hash', columns=['*'], schema: str | None = None, cond=None, bg_write_csv=False, max_records_per_query=None, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None, raise_exception_upon_mismatch=True)

Reads and updates a local CSV table from a PostgreSQL table by updating only rows which have been changed.

Parameters:
  • engine (sqlalchemy connectible) – connection to the PostgreSQL database

  • df_filepath (path) – path to the local ‘.csv’, ‘.csv.zip’ or ‘.parquet’ file

  • table_name (str) – table name

  • id_name (str) – index column name. Assumption is only one column for indexing for now.

  • hash_name (str) – hash column name. See compare_table() for additional assumptions.

  • columns (list) – list of column names the function will read from, ignoring the remaining columns

  • schema (str) – schema name, None means using the default one

  • cond (str) – additional condition in selecting rows from the PostgreSQL table

  • bg_write_csv (bool) – whether to write the updated CSV file in a background thread

  • max_records_per_query (int or None) – maximum number of records to be updated in each SQL query. If None, this will be dynamic to make sure each query runs about 5 minute.

  • nb_trials (int) – number of read_sql() trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

  • raise_exception_upon_mismatch (bool) – whether to raise a RuntimeError upon mismatching the number of hashes and the number of records

Returns:

  • df (pandas.DataFrame) – the data frame representing the read and updated table

  • bg (BgInvoke or None, optional) – If bg_write_csv is True, this represents the background thread for writing the updated CSV file. If no background thread is needed, None is returned.

mt.sql.psql.writesync_table(engine, df_filepath, table_name, id_name, hash_name='hash', schema: str | None = None, max_records_per_query=None, conn_ro=None, engine_ro=None, drop_cascade: bool = False, nb_trials: int = 3, logger: IndentedLoggerAdapter | None = None)

Writes and updates a remote PostgreSQL table from a local CSV table by updating only rows which have been changed.

Parameters:
  • engine (sqlalchemy connectible) – connection to the PostgreSQL database

  • df_filepath (path) – path to the local ‘.csv’, ‘.csv.zip’ or ‘.parquet’ file

  • table_name (str) – table name

  • id_name (str) – index column name. Assumption is only one column for indexing for now.

  • hash_name (str) – hash column name. See compare_table() for additional assumptions.

  • schema (str) – schema name, None means using the default one

  • bg_write_csv (bool) – whether to write the updated CSV file in a background thread

  • max_records_per_query (int or None) – maximum number of records to be updated in each SQL query. If None, this will be dynamic to make sure each query runs about 5 minute.

  • conn_ro (sqlalchemy connectible or None) – read-only connection to the PostgreSQL database. If not specified, it is set to engine. This is an old-style keyword argument. It will be replaced by engine_ro.

  • engine_ro (sqlalchemy.engine.Engine) – read-only connection engine to the server. If not specified, it is set to engine. This new keyword argument will replace conn_ro.

  • drop_cascade (bool) – whether or not to drop using the CASCADE option when dropping a table

  • nb_trials (int) – number of read_sql() trials

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging

Returns:

df – the data frame representing the local table

Return type:

pandas.DataFrame

Notes

The id_name column is written as the primary key of the remote table.

The function tries to use conn_ro instead of engine whenever possible to save cost.