Source code for safebridge.pipeline

from .gis_ops import *
from .data  import BridgeDamage
from duckdb import DuckDBPyConnection
from .logger import SafeBridgeLogger

class DBPipeline:
    """ DBPipeline is a class designed to process and manage bridge damage data using a DuckDB database connection. 
    It provides methods to build geometries, process tables, and establish relationships between various data components 
    such as decks, axes, supports, and scatter points. The class also includes functionality for creating sectors, 
    calculating normalized distances, and initializing result tables for further analysis.
    
    Attributes
    ----------
    damage : BridgeDamage
    connection : DuckDBPyConnection
    log : SafeBridgeLogger
        Logger instance for logging pipeline operations.
    
    Methods
    -------
    build_point_geometry():
        Build geometries for the ascending and descending data by generating point geometries for latitude and longitude fields.
    build_process_tables(computational_projection: str):
        Generate process tables for the deck, axis, support, ascending, and descending data by reprojecting geometries.
    process_axis():
        Process the axis data by reordering vertices and calculating length and azimuth.
    process_deck(buffer_distance: float):
        Process the deck data by calculating span count, establishing relations, creating buffers, and relating deck with axis.
    relate_deck_axis():
        Establish the relation between deck and axis geometries, adding attributes such as deck_edge, deck_length, and orientation.
    create_sectors():
        Create sectors from the deck geometries, calculating centroids and normalized distances.
    relate_deck_pspoints():
        Establish the relation between deck and point scatter data for ascending and descending orbits.
    relate_axis_pspoints():
        Relate axis and point scatter data by calculating normalized distances and projections on the axis line.
    deck_edge_control(buffer_distance: float):
        Check if there are projected points within a specified buffer distance at both edges of the deck geometry.
    init_result_table():
        Initialize result tables for processed data, including tables for North-South and East-West oriented bridges.
    get_ns_bridge_uid() -> list[int]:
        Retrieve the UID of bridges with North-South orientation that meet specific criteria.
    get_ew_bridge_uid() -> list[int]:
        Retrieve the UID of bridges with East-West orientation that meet specific criteria.
    get_attributes(table_name: str) -> list[str]:
        Retrieve the column names of a specified table in the DuckDB database.
    """
    def __init__(self, bridgedamage:BridgeDamage, connection: DuckDBPyConnection):
        """ Initialize the DBPipeline with the BridgeDamage data and database connection.

        Arguments
        ---------
        bridgedamage : BridgeDamage
            The BridgeDamage data object containing deck, axis, support, ascending, and descending data.
        dbconnection : DuckDBPyConnection
            The database connection object.
        """

        self.damage = bridgedamage
        self.connection = connection
        self.log = SafeBridgeLogger()

    def build_point_geometry(self):
        """ Build geometries for the ascending and descending data.

        This method checks if the source files for ascending and descending data are in CSV format, and if so, it generates geometries for the latitude and longitude fields.

        Raises
        -------
        ValueError: If the source file does not contain latitude and longitude fields.
        """
        for orbit in ['ascending', 'descending']:
            obj = getattr(self.damage, orbit)
            if obj.source_file.endswith('.csv'):
                col_names = self.get_attributes(obj.table_name)

                if obj.lat_field not in col_names or obj.lon_field not in col_names:
                    raise ValueError(f"{orbit} table must contain {obj.lat_field} and {obj.lon_field} fields.")
                if "geom" not in self.get_attributes(obj.table_name):
                    self.connection.execute(f"""
                        ALTER TABLE {obj.table_name} ADD COLUMN geom GEOMETRY;
                        UPDATE {obj.table_name} SET geom = ST_Point({obj.lon_field}, {obj.lat_field});    
                    """)
            self.log.get_logger().info(f"Point geometries for {orbit} data have been built successfully.")
            
    def build_process_tables(self, computational_projection: str):
        """ Generate process tables for the deck, axis, support, ascending, and descending data.

        This method creates new tables with the prefix `proc_` for each data type, reprojecting the geometries to the specified computational projection.

        Arguments
        ---------
        computational_projection : str
            The coordinate reference system for computations.
        
        Raises
        -------
        ValueError: If the computational projection is not specified.
        """
        
        for i in self.damage.__dataclass_fields__.keys():
            obj = getattr(self.damage, i)
            self.connection.execute(f"""
                CREATE OR REPLACE TABLE proc_{obj.table_name} AS
                SELECT uid, ST_Transform(geom, '{obj.source_projection}', '{computational_projection}', always_xy := true) AS geom FROM {obj.table_name};
            """)
            self.log.get_logger().info(f"Process table for `{i}` data has been created successfully.")

    def process_axis(self):
        """ Process the axis data by reordering vertices and calculating length and azimuth.

        This method ensures that the axis geometries start from the leftmost point on north oriented map, and adds length and azimuth columns to the axis table.
        
        Raises
        ------
        ValueError: If the axis table does not exist or is empty.
        """
        self.connection.execute(f"""\
            -- Reorder the axis geometries based on the centroid
            -- This ensures that the axis starts from the leftmost point in north aligned map view.
                        
            UPDATE proc_{self.damage.axis.table_name} SET geom = CASE
                WHEN ST_Y(ST_StartPoint(geom)) > ST_Y(ST_Centroid(geom)) 
                    THEN ST_Reverse(geom) 
                WHEN ST_Y(ST_StartPoint(geom)) = ST_Y(ST_Centroid(geom)) AND ST_X(ST_StartPoint(geom)) > ST_X(ST_Centroid(geom)) 
                    THEN ST_Reverse(geom) 
                ELSE 
                    geom
            END;
        
            -- Add length and azimuth columns to the axis table
                        
            ALTER TABLE proc_{self.damage.axis.table_name} ADD COLUMN length FLOAT;
            ALTER TABLE proc_{self.damage.axis.table_name} ADD COLUMN azimuth FLOAT;
            UPDATE proc_{self.damage.axis.table_name}
            SET length = ST_Distance(ST_StartPoint(geom), ST_EndPoint(geom)),
                azimuth = degrees(2*pi() + pi()/2 - atan2(ST_Y(ST_EndPoint(geom)) - ST_Y(ST_StartPoint(geom)), ST_X(ST_EndPoint(geom)) - ST_X(ST_StartPoint(geom))) % (2*pi())) % 360 ;
        """)
        self.log.get_logger().info("Axis data has been processed successfully, including reordering vertices and calculating length and azimuth.")
    
    def process_deck(self, buffer_distance:float):
        """ Process the deck data by generating mulitple attiributes.

        This method calculates the span count for each deck geometry based on overlaps with support,
        establishes the relation between support and deck, creates buffer, and relates deck with axis.
        
        Arguments
        ---------
        buffer_distance : float
            The distance to buffer geometries in meters.
        """
        
        self.connection.execute(f"""
            -- Calculate the span count for each deck geometry based on overlaps with support geometries
                        
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN span_count INTEGER;
            UPDATE proc_{self.damage.deck.table_name} 
            SET span_count = COALESCE(second.overlap + 1, 1)
            FROM (
                SELECT first.uid, COUNT(second.uid) AS overlap
                FROM proc_{self.damage.deck.table_name} AS first
                LEFT JOIN proc_{self.damage.support.table_name} AS second
                ON ST_Overlaps(first.geom, second.geom)
                GROUP BY first.uid
            ) AS second
            WHERE proc_{self.damage.deck.table_name}.uid = second.uid;
        
            
            -- Establish the relation between support and deck geometries

            ALTER TABLE proc_{self.damage.support.table_name} ADD COLUMN rdeck INTEGER;
            UPDATE proc_{self.damage.support.table_name}
            SET rdeck = second.rdeck
            FROM (
                SELECT first.uid as sup_uid, second.uid AS rdeck
                FROM proc_{self.damage.support.table_name} AS first
                JOIN proc_{self.damage.deck.table_name} AS second
                ON ST_Intersects(first.geom, second.geom)
            ) AS second
            WHERE proc_{self.damage.support.table_name}.uid = second.sup_uid;
            
            -- if not related deck exists remove the support geometries
            DELETE FROM proc_{self.damage.support.table_name} WHERE rdeck IS NULL;
    
            
            -- Create buffer geometries for the deck geometries
            
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN buffer GEOMETRY;
            UPDATE proc_{self.damage.deck.table_name} 
            SET buffer = ST_Buffer(geom, {buffer_distance});
        """)
        self.log.get_logger().info("Deck data has been processed successfully, including span count calculation, support relation establishment, and buffer geometry creation.")

    def relate_deck_axis(self):
        """ Data pipelines to relate deck and axis geometries.
        
        This method establishes the relation between deck and axis geometries, adding deck_edge, deck_length, buffer_edge, and orientation columns to the deck table, and rdeck column to the axis table. It calculates the deck_edge, deck_length, and buffer_edge based on the intersection of deck and axis geometries, and determines the orientation based on the azimuth of the axis geometry.
        """

        self.connection.execute(f"""
            -- Establish the relation between deck and axis geometries
            -- Add raxis column to the deck table and calculate deck_edge, deck_length, and buffer_edge
            -- Update the deck table with the related axis geometries
            
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN deck_edge GEOMETRY;
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN deck_length FLOAT;
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN buffer_edge GEOMETRY;
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN orientation CHAR(2);
            UPDATE proc_{self.damage.deck.table_name} 
            SET deck_edge = second.deck_edge,
                deck_length = second.deck_length,
                buffer_edge = second.buffer_edge,
                orientation = second.orient
            FROM (
                SELECT 
                    first.uid, 
                    ST_Intersection(first.geom, second.geom) AS deck_edge,
                    ST_Length(ST_Intersection(first.geom, second.geom)) AS deck_length,
                    ST_Intersection(first.buffer, second.geom) AS buffer_edge,
                    CASE 
                        WHEN
                            (second.azimuth >= 0 AND second.azimuth <= 45) OR
                            (second.azimuth >= 315 AND second.azimuth <= 360) OR
                            (second.azimuth >= 135 AND second.azimuth <= 225)
                        THEN 'NS'
                        ELSE 'EW'
                    END AS orient
                FROM proc_{self.damage.deck.table_name} AS first
                JOIN proc_{self.damage.axis.table_name} AS second
                ON ST_Intersects(first.geom, second.geom)
            ) AS second
            WHERE proc_{self.damage.deck.table_name}.uid = second.uid;
            
            -- if not related axis exists remove the deck geometries
            DELETE FROM proc_{self.damage.deck.table_name} WHERE orientation IS NULL;
        
            
            -- Add rdeck column to the axis table and update it with the related deck geometries
                            
            ALTER TABLE proc_{self.damage.axis.table_name} ADD COLUMN rdeck INTEGER;
            UPDATE proc_{self.damage.axis.table_name}
            SET rdeck = second.related
            FROM (
                SELECT first.uid as uid, second.uid AS related
                FROM proc_{self.damage.axis.table_name} AS first
                JOIN proc_{self.damage.deck.table_name} AS second
                ON ST_Intersects(first.geom, second.geom)
            ) AS second
            WHERE proc_{self.damage.axis.table_name}.uid = second.uid;
        """)
        self.log.get_logger().info("Deck and axis geometries have been related successfully, including deck_edge, deck_length, buffer_edge, and orientation calculations.")

    def create_sectors(self):
        """ Create sectors from the deck geometries, calculating centroids and normalized distances.

        This method extracts relevant data from the deck table, creates a sequence for sector IDs, and generates a sectors table with `geometry`, `sector_tag`, and `rdeck` columns. It also calculates centroids for each sector based on the deck edges and adds a normalized distance column. The sectors are created by splitting the deck buffer geometries with extended lines from the deck edges.
        """
        data = self.connection.sql(f"""
            --- Extract relevant data from the deck table for sector creation

            SELECT
                uid,
                ST_AsWKB(geom),
                ST_AsWKB(buffer),
                ST_AsWKB(buffer_edge),
                ST_AsWKB(ST_Centroid(ST_MakeLine(ST_StartPoint(deck_edge), ST_StartPoint(buffer_edge)))) AS start,
                ST_AsWKB(ST_Centroid(ST_MakeLine(ST_EndPoint(deck_edge), ST_EndPoint(buffer_edge)))) AS finish,
            FROM proc_{self.damage.deck.table_name}
        """).fetchall()

        self.connection.execute(f"""
            -- Create a sequence for sector IDs and a table for sectors with geometry, sector_tag, and rdeck columns
            DROP SEQUENCE IF EXISTS sector_id CASCADE;
            -- Create a new sequence and table for sectors
            -- The sectors table will store the geometries of the sectors, their tags (N, C, S), and the related deck ID (rdeck)
            CREATE OR REPLACE SEQUENCE sector_id;
            CREATE OR REPLACE TABLE sectors (uid INTEGER DEFAULT nextval('sector_id'), geom GEOMETRY, sector_tag CHAR(1), rdeck INTEGER);
        """)
        sector_tags = ["N","C","S"]
        for i in data:
            edges = extract_intersecting_edges(i[1], i[3])
            split_lines = [extend_line(edge, 1e3) for edge in edges]  # Extend lines by 1 km
            points = sort_by_centroid([wkbloads(point) for point in [i[4], i[5]]])

            split_lines = move_lines_to_points(split_lines, points)
            res = multisplit(wkbloads(i[2]), split_lines)
            for t in range(3):
                self.connection.execute(f"INSERT INTO sectors (geom, sector_tag, rdeck) VALUES (?, ?, ?)", (res[t].wkt, sector_tags[t], i[0]))

        self.connection.execute(f"""
            -- Add center column to the sectors table and update it with the calculated centroids based on sector_tag
                            
            ALTER TABLE sectors ADD COLUMN center GEOMETRY;
            UPDATE sectors SET center = CASE
            WHEN sector_tag = 'N' THEN subquery.n_center
            WHEN sector_tag = 'C' THEN subquery.c_center
            WHEN sector_tag = 'S' THEN subquery.s_center
            END
            FROM (
                SELECT
                    uid,
                    ST_Centroid(ST_MakeLine(ST_EndPoint(buffer_edge), ST_Centroid(ST_MakeLine(ST_EndPoint(deck_edge), ST_EndPoint(buffer_edge))))) as n_center,
                    ST_Centroid(ST_MakeLine(ST_StartPoint(deck_edge), ST_EndPoint(deck_edge))) as c_center,
                    ST_Centroid(ST_MakeLine(ST_StartPoint(buffer_edge), ST_Centroid(ST_MakeLine(ST_StartPoint(deck_edge), ST_StartPoint(buffer_edge))))) as s_center
                FROM 
                    proc_{self.damage.deck.table_name}
            ) AS subquery
            WHERE sectors.rdeck = subquery.uid;

            
            -- Add ndist column to the sectors table and calculate the normalized distance from the start point of the axis line
                            
            ALTER TABLE sectors ADD COLUMN ndist FLOAT;
            UPDATE sectors SET ndist = 
            ST_Distance(ST_StartPoint(first.geom), second.center)/first.length
            FROM proc_{self.damage.axis.table_name} AS first
            JOIN sectors as second
            ON first.rdeck = second.rdeck
            WHERE second.uid = sectors.uid;        
        """)
        self.log.get_logger().info("Sectors have been created successfully from the deck geometries.")

    def relate_deck_pspoints(self):
        """ Establish the relation between deck and point data.

        This method adds `rdeck` and `rsector` columns to the `ascending` and `descending` tables, updating them with the related deck and sector geometries. It also cleans up any non-related points from the ascending and descending tables. Additionally, it adds `edge_check` column to the deck table and updates it based on the existence of projected points within the buffer distance from the deck edges.
        """
        self.connection.execute(f"""
            -- Add rdeck and rsector columns to the ascending and descending tables
            -- Update these columns with the related deck and sector geometries
                            
            ALTER TABLE proc_{self.damage.ascending.table_name} ADD COLUMN rdeck INTEGER;
            ALTER TABLE proc_{self.damage.ascending.table_name} ADD COLUMN rsector INTEGER;
            UPDATE proc_{self.damage.ascending.table_name}
            SET rdeck = second.rdeck,
                rsector = second.rsector
            FROM (
                SELECT second.rdeck as rdeck, second.uid as rsector, first.uid as p_uid
                FROM proc_{self.damage.ascending.table_name} AS first
                JOIN sectors AS second
                ON ST_Within(first.geom, second.geom)
            ) AS second
            WHERE proc_{self.damage.ascending.table_name}.uid = second.p_uid;
            
            -- clean deck non-related points
            DELETE FROM proc_{self.damage.ascending.table_name} WHERE rdeck IS NULL;

            -- Add rdeck and rsector columns to the descending table and update them with the related deck and sector geometries

            ALTER TABLE proc_{self.damage.descending.table_name} ADD COLUMN rdeck INTEGER;
            ALTER TABLE proc_{self.damage.descending.table_name} ADD COLUMN rsector INTEGER;
            UPDATE proc_{self.damage.descending.table_name}
            SET rdeck = second.rdeck,
                rsector = second.rsector
            FROM (
                SELECT second.rdeck as rdeck, second.uid as rsector, first.uid as p_uid
                FROM proc_{self.damage.descending.table_name} AS first
                JOIN sectors AS second
                ON ST_Within(first.geom, second.geom)
            ) AS second
            WHERE proc_{self.damage.descending.table_name}.uid = second.p_uid;
            -- clean deck non-related points
            DELETE FROM proc_{self.damage.descending.table_name} WHERE rdeck IS NULL;
        """)
        self.log.get_logger().info("Deck and ps scatter point data has been related successfully")

    def relate_axis_pspoints(self):
        """ Relating axis and point data.

        This method adds `ndist_axis` and `proj_axis` columns to the ascending and descending tables, calculating the normalized distance along the axis line and the projected point on the axis line. It uses the axis geometries to determine the distance and projection for each point in the ascending and descending tables.
        """

        self.connection.execute(f"""
            -- Add ndist_axis and proj_axis columns to the ascending and descending tables
            -- Calculate the normalized distance along the axis line and the projected point on the axis line
                            
            ALTER TABLE proc_{self.damage.ascending.table_name} ADD COLUMN ndist_axis FLOAT;
            ALTER TABLE proc_{self.damage.ascending.table_name} ADD COLUMN proj_axis GEOMETRY;
            UPDATE proc_{self.damage.ascending.table_name}
            SET ndist_axis = ST_Distance(ST_StartPoint(subquery.lgeom), ST_EndPoint(ST_ShortestLine(subquery.geom, subquery.lgeom)))/subquery.linelen,
                proj_axis = ST_EndPoint(ST_ShortestLine(subquery.geom, subquery.lgeom))
            FROM (
                SELECT second.*, first.geom as lgeom, first.length as linelen
                FROM proc_{self.damage.axis.table_name} AS first
                JOIN proc_{self.damage.ascending.table_name} AS second
                ON first.rdeck = second.rdeck
                ) AS subquery
            WHERE proc_{self.damage.ascending.table_name}.uid = subquery.uid;
            
            --- Add ndist_axis and proj_axis columns to the descending table and calculate them similarly

            ALTER TABLE proc_{self.damage.descending.table_name} ADD COLUMN ndist_axis FLOAT;
            ALTER TABLE proc_{self.damage.descending.table_name} ADD COLUMN proj_axis GEOMETRY;
            UPDATE proc_{self.damage.descending.table_name}
            SET ndist_axis = ST_Distance(ST_StartPoint(subquery.lgeom), ST_EndPoint(ST_ShortestLine(subquery.geom, subquery.lgeom)))/subquery.linelen,
                proj_axis = ST_EndPoint(ST_ShortestLine(subquery.geom, subquery.lgeom))
            FROM (
                SELECT second.*, first.geom as lgeom, first.length as linelen
                FROM proc_{self.damage.axis.table_name} AS first
                JOIN proc_{self.damage.descending.table_name} AS second
                ON first.rdeck = second.rdeck
                ) AS subquery
            WHERE proc_{self.damage.descending.table_name}.uid = subquery.uid;
        """)
        self.log.get_logger().info("Axis and ps scatter point data has been related successfully.")

    def deck_edge_control(self, buffer_distance:float):

        """ Checks if there is at least one projected point for both orbital orientations within the radius of buffer_distance / 2 at both edges of the deck geometry.
    
        Arguments
        ---------
        buffer_distance : float
            The distance to buffer geometries in meters.
        """      

        self.connection.execute(f"""                
            -- Add edge_check column to the deck table and update it based on the existence of projected points within the buffer distance from the deck edges
            -- This will help to identify if the deck is covered by both ascending and descending points
            -- The edge_check will be TRUE if there is at least one projected point within the buffer distance from both edges of the deck geometry
                            
            ALTER TABLE proc_{self.damage.deck.table_name} ADD COLUMN edge_check BOOLEAN;
            UPDATE proc_{self.damage.deck.table_name}
            SET edge_check = 
                EXISTS (
                    SELECT 1
                    FROM proc_{self.damage.ascending.table_name} AS asc_table
                    WHERE asc_table.rdeck = proc_{self.damage.deck.table_name}.uid
                    AND ST_DWithin( ST_StartPoint(proc_{self.damage.deck.table_name}.deck_edge), asc_table.proj_axis, {buffer_distance / 2})
                ) AND EXISTS (
                    SELECT 1
                    FROM proc_{self.damage.descending.table_name} AS desc_table
                    WHERE desc_table.rdeck = proc_{self.damage.deck.table_name}.uid
                    AND ST_DWithin( ST_EndPoint(proc_{self.damage.deck.table_name}.deck_edge), desc_table.proj_axis,  {buffer_distance / 2})
                );
        """)
        self.log.get_logger().info("Deck edge control has been performed successfully, checking for projected points within the buffer distance at both edges of the deck geometry.")

    def init_result_table(self):
        """ Initialize the result tables for the processed data and creates a new tables called `result_ew`,`result_ns`,`graph_ew`,`graph_ns`.
        """
        self.connection.execute(f"""
            -- Create a table to store the results for North-South oriented bridges
            CREATE OR REPLACE TABLE result_ns
            (
            rdeck INTEGER,
            asc_tilt DOUBLE,
            asc_defl DOUBLE,
            dsc_tilt DOUBLE,
            dsc_defl DOUBLE,
            );
            -- Create a table to store the results for East-West oriented bridges
            CREATE OR REPLACE TABLE result_ew 
            (
            rdeck INTEGER,
            tilt DOUBLE,
            defl DOUBLE,
            );
            -- Create a table to store the graph generation data during the processing for ns orietation
            CREATE OR REPLACE TABLE graph_ns
            (
            rdeck INTEGER,
            asc_quadratic_x DOUBLE[],
            asc_quadratic_y DOUBLE[],
            dsc_quadratic_x DOUBLE[],
            dsc_quadratic_y DOUBLE[],
            asc_analytical_y DOUBLE[],
            dsc_analytical_y DOUBLE[],
            );
            -- Create a table to store the graph generation data during the processing for ew orientation
            CREATE OR REPLACE TABLE graph_ew
            (
            rdeck INTEGER,
            longitudinal DOUBLE[],
            vertical DOUBLE[],
            );

        """)    
        
        self.log.get_logger().info("Result tables for processed data have been initialized successfully, including `result_ns`, `result_ew`, `graph_ns`, and `graph_ew`.")
    
    def get_ns_bridge_uid(self):
        """ Get the UID of the bridge with North-South orientation.
        
        This method retrieves the UID of the bridge from the deck table that has an orientation of 'NS'.
        
        Returns
        -------
        list[int]: The UID of the bridge with North-South orientation.
        """

        query = f"""
            SELECT rdeck
            FROM proc_{self.damage.ascending.table_name}
            WHERE rdeck IN (
                SELECT rdeck
                FROM proc_{self.damage.descending.table_name}
                GROUP BY rdeck
            )
            GROUP BY rdeck ORDER BY rdeck
        """
        return self.connection.execute(f"""
            SELECT uid 
            FROM proc_{self.damage.deck.table_name}
            WHERE orientation = 'NS' AND edge_check = TRUE
            AND uid IN ({query});
        """).fetchnumpy()['uid'].tolist()

    def get_ew_bridge_uid(self):
        """ Get the UID of the bridge with East-West orientation.
        
        This method retrieves the UID of the bridge from the deck table that has an orientation of 'EW'.
        
        Returns
        -------
        list[int]: The UID of the bridge with East-West orientation.
        """
        desc_related_sectors = f"""
            SELECT rdeck
            FROM (
                SELECT pa.rdeck, GROUP_CONCAT(DISTINCT s.sector_tag) AS sector_tags
                FROM proc_{self.damage.descending.table_name} AS pa 
                JOIN sectors AS s 
                ON pa.rsector = s.uid
                GROUP BY pa.rdeck
                HAVING INSTR(sector_tags, 'S') > 0 AND INSTR(sector_tags, 'N') > 0
            )
            """
        asc_related_sectors = f"""
            SELECT rdeck
            FROM (
                SELECT pa.rdeck, GROUP_CONCAT(DISTINCT s.sector_tag) AS sector_tags
                FROM proc_{self.damage.ascending.table_name} AS pa 
                JOIN sectors AS s 
                ON pa.rsector = s.uid
                GROUP BY pa.rdeck
                HAVING INSTR(sector_tags, 'S') > 0 AND INSTR(sector_tags, 'N') > 0
            )
            """
        final_uids = f"SELECT rdeck FROM ({asc_related_sectors}) WHERE rdeck IN ({desc_related_sectors})"
        
        return self.connection.sql(f"SELECT uid FROM proc_{self.damage.deck.table_name} WHERE orientation = 'EW' AND uid IN ({final_uids})").fetchnumpy()['uid'].tolist()
    
    def get_attributes(self, table_name: str):
        """ Get the column names of the specified table.

        This method retrieves the column names of the specified table in the DuckDB database.
        
        Arguments
        ---------
        table_name : str
            The name of the table to retrieve column names from.
        
        Returns
        -------
        list[str]: A list of column names in the specified table.
        """
        return self.connection.execute(f"select column_name from (describe {table_name})").fetchnumpy()['column_name'].tolist()
    
class DBQueries:
    """ A class that provides methods to generate SQL queries for retrieving various geometries 
    and related data for decks from a database.
    
    Methods
    -------
    deck_geometry(deckuid: int, deck_table: str) -> str
        Get the geometry of a deck by its UID.
    buffer_geometry(deckuid: int, table_name: str) -> str
        Get the buffer geometry of a deck by its UID.
    sector_geometry(deckuid: int) -> str
        Get the sector geometry of a deck by its UID.
    support_geometry(deckuid: int, table_name: str) -> str
        Get the support geometry of a deck by its UID.
    axis_geometry(deckuid: int, table_name: str) -> str
        Get the axis geometry of a deck by its UID.
    deck_edge(deckuid: int, table_name: str) -> str
        Get the deck edge geometry of a deck by its UID.
    scatter_geometry(deckuid: int, table_name: str) -> str
        Get the origin scatter points of a deck by its UID.
    projected_scatters(deckuid: int, table_name: str) -> str
        Get the projected scatter points of a deck by its UID.
    buffer_edge(deckuid: int, axis_name: str, deck_name: str) -> str
        Get the buffer edge geometry of a deck by its UID.
    deck_edge_graph(deckuid: int, axis_name: str, deck_name: str) -> str
        Get the deck edge graph of a deck by its UID for graph generation.
    scatter_graph(deckuid: int, table_name: str, name_fields: list) -> str
        Get the scatter data of a deck by its UID for graph generation.
    support_graph(deckuid: str, axis_name: str, support_name: str) -> str
        Returns the query to retrieve the support graph data for a given deck UID.
    """ 
    
    def __init__(self):
        pass
    
    def deck_geometry(self, deckuid: int, deck_table: str) -> str:
        """ Get the geometry of a deck by its UID. 
        
        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        deck_table : str
            The name of the deck table.
        
        Returns
        -------
        str: SQL query to retrieve the geometry of the specified deck.
        """
        return f"SELECT ST_AsWKB(geom) FROM {deck_table} WHERE uid = {deckuid}"
    
    def buffer_geometry(self, deckuid: int, table_name: str) -> str:
        """ Get the buffer geometry of a deck by its UID.

        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the table.
        
        Returns
        -------
        str: SQL query to retrieve the buffer geometry of the specified deck.
        """
        return f"SELECT ST_AsWKB(buffer) FROM {table_name} WHERE uid = {deckuid}"
    
    def sector_geometry(self, deckuid: int) -> str:
        """ Get the sector geometry of a deck by its UID.
        
        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        
        Returns
        -------
        str: SQL query to retrieve the sector geometry of the specified deck.
        
        """
        return f"SELECT ST_AsWKB(geom) FROM sectors WHERE rdeck = {deckuid}"

    def support_geometry(self, deckuid: int, table_name: str) -> str:
        """ Get the support geometry of a deck by its UID.
        
        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the table.
        
        Returns
        -------
        str: SQL query to retrieve the support geometry of the specified deck.
        """
        return f"SELECT ST_AsWKB(geom) FROM {table_name} WHERE rdeck = {deckuid}"
    
    def axis_geometry(self, deckuid: int, table_name: str) -> str:
        """ Get the axis geometry of a deck by its UID.

        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the deck table.
        
        Returns
        -------
        str: SQL query to retrieve the support geometry of the specified deck.
        """
        return f"SELECT ST_AsWKB(geom) FROM {table_name} WHERE rdeck = {deckuid}"
    
    def deck_edge(self, deckuid: int, table_name:str) -> str:
        """ Get the deck edge geometry of a deck by its UID.
        
        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the table.
        
        Returns
        -------
        str: SQL query to retrieve the support geometry of the specified deck.
        """
        return f"SELECT ST_AsWKB(ST_StartPoint(deck_edge)) as st, ST_AsWKB(ST_EndPoint(deck_edge)) as ed, FROM proc_{table_name} WHERE uid = {deckuid}"
        
    def scatter_geometry(self, deckuid:int, table_name: str) -> str:
        """ Get the origin scatter points of a deck by its UID.
        
        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the table.
        
        Returns
        -------
        str: SQL query to retrieve the support geometry of the specified deck.    
        """

        return f"SELECT ST_X(geom) as x, ST_Y(geom) as y FROM {table_name} WHERE rdeck = {deckuid}"
    
    def projected_scatters(self, deckuid: int, table_name: str) -> str:
        """ Get the projected scatter points of a deck by its UID.
        
        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the table.
        
        Returns
        -------
        str: SQL query to retrieve the support geometry of the specified deck.
        """
        return f"SELECT ST_X(proj_axis) as x, ST_Y(proj_axis) as y FROM {table_name} WHERE rdeck = {deckuid}"
    
    def buffer_edge(self, deckuid:int, axis_name:str, deck_name:str) -> str:
        """ Get the buffer edge geometry of a deck by its UID.

        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        axis_name : str
            The name of the axis table.
        deck_name : str
            The name of the deck table.
        
        Returns
        -------
        str: SQL query to retrieve the support geometry of the specified deck.
        """
        return f"""
                SELECT
                    ST_Distance(ST_StartPoint(deck.buffer_edge), ST_StartPoint(axis.geom)) / axis.length as p1,
                    ST_Distance(ST_EndPoint(deck.buffer_edge), ST_StartPoint(axis.geom)) / axis.length as p2,
                FROM (SELECT * FROM {axis_name} WHERE rdeck = {deckuid}) as axis
                JOIN {deck_name} as deck
                ON axis.rdeck = deck.uid
                """
    
    def deck_edge_graph(self, deckuid:int, axis_name:str, deck_name:str) -> str:
        """ Get the deck edge graph of a deck by its UID for graph generation.

        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        axis_name : str
            The name of the axis table.
        deck_name : str
            The name of the deck table.

        Returns
        -------
        str: SQL query to retrieve the deck edge graph of the specified deck.
        """
        return f"""
                SELECT
                    ST_Distance(ST_StartPoint(deck.deck_edge), ST_StartPoint(axis.geom)) / axis.length as p1,
                    ST_Distance(ST_EndPoint(deck.deck_edge), ST_StartPoint(axis.geom)) / axis.length as p2,
                FROM (SELECT * FROM {axis_name} WHERE rdeck = {deckuid}) as axis
                JOIN {deck_name} as deck
                ON axis.rdeck = deck.uid
                """
    
    def scatter_graph(self, deckuid:int, table_name:str, name_fields: list):
        """ Get the scatter data of a deck by its UID for graph generation.

        Arguments
        ---------
        deckuid : int
            The UID of the deck.
        table_name : str
            The name of the table containing scatter data.
        name_fields : list
            The list of field names to be used in the query.

        Returns
        -------
        str: SQL query to retrieve the scatter data of the specified deck.
        """
        return f""" 
                SELECT 
                    proc_scatter.ndist_axis as x,
                    scatter.{name_fields[-1]} - scatter.{name_fields[0]}  as y,
                FROM (SELECT * FROM proc_{table_name} WHERE rdeck = {deckuid}) as proc_scatter
                JOIN {table_name} as scatter
                ON proc_scatter.uid = scatter.uid
                """
    
    def support_graph(self, deckuid:str, axis_name:str, support_name:str) -> str:
        """ Returns the query to retrieve the support graph data for a given deck UID.

        Arguments
        ---------
        deckuid : str
            The UID of the deck.
        axis_name : str
            The name of the axis table.
        support_name : str
            The name of the support table.
        
        Returns
        -------
        str: SQL query to retrieve the support graph data of the specified deck.
        """

        return f"""
              SELECT *
              FROM (
                SELECT 
                    ST_Distance(ST_StartPoint(axis.geom), ST_Centroid(ST_Intersection(support.geom, axis.geom))) / axis.length as p1,
                FROM (SELECT * FROM proc_{support_name} WHERE rdeck = {deckuid}) AS support
                JOIN proc_{axis_name} AS axis
                ON support.rdeck = axis.rdeck
              )
              WHERE p1 IS NOT NULL
              """