Source code for sqldbclient.dialects.postgresql.sql_view_materializer.sql_view_materializer_utils

import logging

from sqldbclient.sql_executor import SqlExecutor
from sqldbclient.dialects.postgresql.sql_view_factory.view import View, ViewType

logger = logging.getLogger(__name__)


[docs]class SqlViewMaterializerUtils: """Class that performs standard Postgres database actions, such as setting owner, granting privileges, dropping and creating objects and indices, and refreshing materialized views. """ def __init__(self, view: View, sql_executor: SqlExecutor): self.view = view self.sql_executor = sql_executor
[docs] def set_owner(self) -> None: """Sets owner""" if self.view.view_type == ViewType.REGULAR_VIEW: self.sql_executor.execute(f""" ALTER VIEW {self.view.full_name} OWNER TO {self.view.owner}; """) elif self.view.view_type == ViewType.MATERIALIZED_VIEW: self.sql_executor.execute(f""" ALTER MATERIALIZED VIEW {self.view.full_name} OWNER TO {self.view.owner}; """) else: raise Exception('Unexpected error') logger.info(f'View {self.view.full_name} owner set to {self.view.owner}')
[docs] def set_privileges(self) -> None: """Grants privileges""" for grantee, privileges in self.view.privileges.items(): for privilege in privileges: self.sql_executor.execute(f""" GRANT {privilege} ON {self.view.full_name} TO {grantee}; """) logger.info(f'View {self.view.full_name} privileges set')
[docs] def set_descriptions(self) -> None: """Sets description""" if self.view.table_description is not None: if self.view.view_type == ViewType.REGULAR_VIEW: self.sql_executor.execute(f""" COMMENT ON VIEW {self.view.full_name} IS '{self.view.table_description}'; """) elif self.view.view_type == ViewType.MATERIALIZED_VIEW: self.sql_executor.execute(f""" COMMENT ON MATERIALIZED VIEW {self.view.full_name} IS '{self.view.table_description}'; """) else: raise Exception('Unexpected error') for col, col_description in self.view.col_descriptions.items(): if col_description is not None: self.sql_executor.execute(f""" COMMENT ON COLUMN {self.view.full_name}.{col} IS '{col_description}'; """)
[docs] def restore(self) -> None: """Fully restores object in database""" self.create() self.set_owner() self.set_privileges() self.set_descriptions()
[docs] def drop(self) -> None: """Drops database object""" if self.view.view_type == ViewType.REGULAR_VIEW: self.sql_executor.execute(f'DROP VIEW {self.view.full_name}') elif self.view.view_type == ViewType.MATERIALIZED_VIEW: self.sql_executor.execute(f'DROP MATERIALIZED VIEW {self.view.full_name}') else: raise Exception('Unexpected error') logger.info(f'View {self.view.full_name} dropped')
[docs] def create(self) -> None: """"Creates database object""" if self.view.view_type == ViewType.REGULAR_VIEW: query = '\n'.join([f'CREATE VIEW {self.view.full_name} AS', self.view.definition]) self.sql_executor.execute(query) elif self.view.view_type == ViewType.MATERIALIZED_VIEW: query = '\n'.join([f'CREATE MATERIALIZED VIEW {self.view.full_name} AS', self.view.definition.replace(';', ''), 'WITH NO DATA']) self.sql_executor.execute(query) else: raise Exception('Unexpected error') logger.info(f'Created {self.view.full_name}')
[docs] def copy_privileges_to(self, obj: View): """"Sets privileges, that is granted to one object, to another""" for grantee, privileges in self.view.privileges.items(): for privilege in privileges: self.sql_executor.execute(f""" GRANT {privilege} ON {obj.full_name} TO {grantee}; """)
[docs] def refresh(self) -> None: """Refreshes materialized view""" logger.info(f'Refreshing {self.view.full_name}...') if self.view.view_type == ViewType.REGULAR_VIEW: logger.info(f'Skipping regular view {self.view.full_name}') elif self.view.view_type == ViewType.MATERIALIZED_VIEW: self.sql_executor.execute(f'REFRESH MATERIALIZED VIEW {self.view.full_name}') else: raise Exception('Unexpected error') logger.info(f'Refreshed {self.view.full_name}')
[docs] def drop_indexes(self) -> None: """Drops indexes""" logger.info(f'Dropping indexes for {self.view.full_name}...') for index in self.view.indexes: self.sql_executor.execute(f''' DROP INDEX "{index['schema']}"."{index['name']}" ''') logger.info(f'Dropped indexes for {self.view.full_name}')
[docs] def create_indexes(self) -> None: """Creates indexes""" logger.info(f'Creating indexes for {self.view.full_name}...') for index in self.view.indexes: self.sql_executor.execute(index['definition']) logger.info(f'Created indexes for {self.view.full_name}')