Tracking Snowflake

Getting started with tracking system resource usage and tables from Snowflake.

For pipelines that run tasks on data warehouses like Snowflake, the performance of individual queries will impact important SLA metrics like run durations. If your pipelines are delayed, tracking performance metrics can help you trace the root causes. Likewise, tracking metadata from Snowflake tables like schemas and data previews are helpful in monitoring your data quality.

DBND enables you to track system resource metrics as well as table metadata for data quality monitoring. In this quickstart tutorial, we will demonstrate how to apply DBND’s tracking functionality on Snowflake tables and observe resource usage from your CLI in a simple Python workflow.

1. Installing Requirements

Before working through the example, we need to install dbnd and dbnd-snowflake with the following commands:

pip install dbnd dbnd-snowflake

We will also need to install Snowflake’s Python Connector to establish a connection with Snowflake. The Python connector can be leveraged to execute Snowflake queries using external orchestrators or scripts. This connector is often used to run Snowflake queries on a schedule from a scheduler or CRON job. For more information on the connector, visit Snowflake's official documentation.

You can install the connector with the following command:

pip install snowflake-connector-python

2. Establish a Snowflake Connection

Next we can create a function to establish our connection to Snowflake. For simplicity, we will be opening the connection in the default OCSP Fail Open Mode. The connection will allow us to access the cursor used for executing queries.

import snowflake.connector 

def establish_connection():
    '''Establish and return a Snowflake Connection using Snowflake Python Connector'''
    snowflake_connection = snowflake.connector.connect(
        user=your_username,
        password=your_password,
        account=your_full_account_name
    )
    return snowflake_connection

The user parameter is the username used to access Snowflake, and the password parameter is the corresponding password. The account parameter is your full account name, including the region and cloud platform, if applicable. For example, if your account name is xyz12345 and your region and cloud platform are US East (N. Virginia) and AWS respectively, then your full account name would be xyz12345.us-east-2.aws. For more information on Snowflake account names, visit Snowflake's official documentation.

3. Create a Function to Execute Queries

Next, we will write a helper function that executes a single query for us. This function returns the Snowflake results of our query, query ID, and the session ID. The latter two are required to track the query.

import snowflake.connector 

def establish_connection():
    '''Establish and return a Snowflake Connection using Snowflake Python Connector'''
    snowflake_conn_id = "snowflake_connection"
    snowflake_connection = snowflake.connector.connect(
        user=your_username,
        password=your_password,
        account=your_full_account_name
    )
    return snowflake_connection

def run_query(connection, query):
    '''Generate a cursor, and execute a query. Returns the query result, query ID and session ID.'''
    cursor = connection.cursor()
    query_results = cursor.execute(query).fetchall()
    query_id,  session_id = cursor.sfqid, cursor.connection.session_id
    cursor.close()
    return query_results, query_id, session_id

4. Executing Queries, Tracking Resources, and Logging Tables

Now that we have our prerequisite functions set up, we can write our main query function. Here is where we will execute a query, log metadata about our query, and log a Snowflake table.

📘

Evaluating Query Performance

log_snowflake_resource_usage supports multiple Query IDs. You can use this function to trace and diagnose performance issues with your queries.

Full Code

import snowflake.connector 
from dbnd_snowflake import log_snowflake_resource_usage, log_snowflake_table
import dbndcredentials as credentials

def establish_connection():
    '''Establish and return a Snowflake Connection using Snowflake Python Connector'''
    snowflake_conn_id = "snowflake_connection"
    snowflake_connection = snowflake.connector.connect(
        user=your_username,
        password=your_password,
        account=your_full_account_name
    )
    return snowflake_connection

def run_query(connection, query):
    '''Generate a cursor, and execute a query. Returns the query result, query ID and session ID.'''
    cursor = connection.cursor()
    query_results = cursor.execute(query).fetchall()
    query_id,  session_id = cursor.sfqid, cursor.connection.session_id
    cursor.close()
    return query_results, query_id, session_id

def query_snowflake():
    '''Query Snowflake, track resource usage, and log Snowflake table''' 
    connection_string = f'snowflake://your_connection_string'
    snowflake_connection = establish_connection()
    query_ids = []

    database = 'SNOWFLAKE_SAMPLE_DATA'
    schema = 'TPCDS_SF100TCL'
    table_name = 'STORE_SALES' 
    query = f'select * from "{database}"."{schema}"."{table_name}" limit 30'

    results, query_id, session_id = run_query(snowflake_connection, query)
    query_ids.append(query_id)

    log_snowflake_resource_usage(
        database=database,
        connection_string=connection_string,
        query_ids=query_ids,
        session_id=int(session_id),
        delay=1
    )

    log_snowflake_table(
        table_name=f"{table_name}",
        connection_string=connection_string,
        database=database,
        schema=schema,
        with_preview=False
    )

    # As best practice, close the Snowflake connection 
    snowflake_connection.close()

if __name__ == '__main__':
    query_snowflake()

Step 1. Establish is the connection_string. The connection_string used here is in the format of: snowflake://<user>:<password>@<full account name>/?account=<full account name>.
Step 2. Invoke the establish_connection() function and initialize an empty list to hold our query_ids.
Step 3. Build our query and execute it using our run_query function. This is the query we are executing here:

select * from "SNOWFLAKE_SAMPLE_DATA"."TPCDS_SF100TCL"."STORE_SALES" limit 30

We add the query_id into our list after executing the query and now we are ready to use DBND tracking functions to log resource usage and the table that was queried. The log_snowflake_resouce_usage function allows us to log the metadata of our query or a list of queries within a Snowflake session. The log_snowflake_table function is used to log a specific table - in this case we are logging the table we recently queried from.

📘

Snowflake Persisted Query Results

When a query is executed, Snowflake caches the results for 24 hours. To optimize the retrieval, the cached results will be used if the query and the query target did not change during this timeframe. This means metadata such as the amount of credits used and rows produced are minimized. For more information on persisted query results, visit Snowflake's official documentation.

5. Running the Script

Prior to executing the Python script, we need to enable DBND tracking. We can do this with the following command:

export DBND__TRACKING=True

Then we can run the Python script normally.

python snowflake_tracking_example.py

The output should be similar to the following:

==================== 
= Running Databand!
 TRACKERS   : ['console']
 TASKS      : total=1

...

==================== 

INFO  snowflake_tracking_example.py__78ea19c4e1 - Metric logged: log_snowflake_resource_usage__time_seconds=2.183037281036377
INFO  snowflake_tracking_example.py__78ea19c4e1 - Metrics logged:
    snowflake_query.bytes_scanned=800857856
    snowflake_query.compilation_time_milliseconds=986
    snowflake_query.credits_used_cloud_services=0.00015100
    snowflake_query.error_message=None
    snowflake_query.execution_status=SUCCESS
    snowflake_query.execution_time_milliseconds=2125
    snowflake_query.query_id=0198f94d-001f-9bfe-0000-3241000587a2
    snowflake_query.query_tag=
    snowflake_query.query_text=select * from "SNOWFLAKE_SAMPLE_DATA"."TPCDS_SF100TCL"."STORE_SALES" limit 30
    snowflake_query.rows_produced=30
    snowflake_query.session_id=55254754554490
    snowflake_query.total_elapsed_time_milliseconds=3111
INFO  snowflake_tracking_example.py__78ea19c4e1 - Snowflake Connector for Python Version: 2.3.7, Python Version: 3.6.8, Platform: Linux-5.4.0-26-generic-x86_64-with-debian-bullseye-sid
INFO  snowflake_tracking_example.py__78ea19c4e1 - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
INFO  snowflake_tracking_example.py__78ea19c4e1 - query: [SELECT column_name, data_type FROM SNOWFLAKE_SAMPLE_DATA.information_schema.colu...]
INFO  snowflake_tracking_example.py__78ea19c4e1 - query execution done
INFO  snowflake_tracking_example.py__78ea19c4e1 - fetching data done
INFO  snowflake_tracking_example.py__78ea19c4e1 - query: [SHOW TABLES LIKE 'STORE_SALES' in schema SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL]
INFO  snowflake_tracking_example.py__78ea19c4e1 - query execution done
INFO  snowflake_tracking_example.py__78ea19c4e1 - fetching data done
INFO  snowflake_tracking_example.py__78ea19c4e1 - closed
INFO  snowflake_tracking_example.py__78ea19c4e1 - No async queries seem to be running, deleting session
INFO  snowflake_tracking_example.py__78ea19c4e1 - Metrics logged:
    STORE_SALES.shape0=288010550524
    STORE_SALES.shape1=23
    STORE_SALES.schema={'type': 'SnowflakeTable', 'column_types': {'SS_EXT_SALES_PRICE': 'NUMBER', 'SS_EXT_WHOLESALE_COST': 'NUMBER', 'SS_LIST_PRICE': 'NUMBER', 'SS_STORE_SK': 'NUMBER', 'SS_CDEMO_SK': 'NUMBER', 'SS_ITEM_SK': 'NUMBER', 'SS_WHOLESALE_COST': 'NUMBER', 'SS_CUSTOMER_SK': 'NUMBER', 'SS_EXT_LIST_PRICE': 'NUMBER', 'SS_NET_PAID_INC_TAX': 'NUMBER', 'SS_NET_PROFIT': 'NUMBER', 'SS_QUANTITY': 'NUMBER', 'SS_ADDR_SK': 'NUMBER', 'SS_EXT_DISCOUNT_AMT': 'NUMBER', 'SS_SALES_PRICE': 'NUMBER', 'SS_TICKET_NUMBER': 'NUMBER', 'SS_SOLD_TIME_SK': 'NUMBER', 'SS_COUPON_AMT': 'NUMBER', 'SS_HDEMO_SK': 'NUMBER', 'SS_PROMO_SK': 'NUMBER', 'SS_NET_PAID': 'NUMBER', 'SS_EXT_TAX': 'NUMBER', 'SS_SOLD_DATE_SK': 'NUMBER'}, 'size': '12.0 TB'}
INFO  snowflake_tracking_example.py__78ea19c4e1 - Metric logged: log_snowflake_table__time_seconds=1.6342461109161377
INFO  snowflake_tracking_example.py__78ea19c4e1 - closed
INFO  snowflake_tracking_example.py__78ea19c4e1 - No async queries seem to be running, deleting session
INFO  snowflake_tracking_example.py__78ea19c4e1 - Task snowflake_tracking_example.py has been completed!
INFO  snowflake_tracking_example.py__78ea19c4e1 - Task dbnd_driver__03912e0f95 has been completed!
INFO  snowflake_tracking_example.py__78ea19c4e1 - 

==================== 

...
==================== 
= Your run has been successfully executed!
 TRACKERS   : ['console']

Here we can see the tracked metadata of our executed query and the metadata of the table we queried from. For more information on the metadata that is being collected, visit Snowflake's official documentation.

🚧

Adjusting the delay parameter

If you see: WARNING - Metadata not found for session_id '55254754534414', query_id '0198ed29-00a4-f8a9-0000-3241000558ee' or something similar in your output, it likely means that the query metadata has not yet been written into Snowflake's system at the time of logging. You can adjust the delay parameter of log_snowflake_resource_usage to allow Snowflake more time to write the metadata.


What’s Next
Did this page help you?