Tracking Snowflake Data Migrations

Using DBND to track an example data migration process from PostgreSQL to Snowflake.

When migrating data between databases, it's critical to know that data is being consistently and accurately moved. In this guide, we will demonstrate how to use DBND to ensure data integrity when migrating data between databases.

Our example covers a migration flow from PostgreSQL to Snowflake. We will be using DBND's tracking functions to log data integrity checks when the data is moved to Snowflake.

For full functionality on how to visualize, monitor, and alert on the process, reach out to our team to learn more about Databand's monitoring application here.

General migration workflow in this example.General migration workflow in this example.

General migration workflow in this example.

Steps of our flow:

  1. Extract data from a PostgreSQL database.
  2. Log the table and interesting metrics of the data we extracted.
  3. Stage the files (both internal and external staging methods will be covered).
  4. Copy the staged data into a Snowflake table.
  5. Extract data from Snowflake to ensure that the data is properly stored and production ready. We will also log the Snowflake resources consumed and the final table.

Installing Prerequisites

DBND Tracking

First we need to install dbnd and dbnd-snowflake with the following command:

pip install dbnd dbnd-snowflake

SQLAlchemy

We will use SQLAlchemy to interact with our PostgreSQL database. You can install SQLAlchemy with the following command:

pip install SQLAlchemy

Alternatively, psycopg2 has substitutes for the functions we will be using in this tutorial.

Snowflake Python Connector

Next, we need to install Snowflake's Python connector, so that we can move data into Snowflake with Python scripts. You can install the connector with:

pip install snowflake-python-connector

For a full list of prerequisites or for an installation process better suited for your environment, visit Snowflake's official documentation.

Example Data

Before beginning to query our data, it is worthwhile to take a look at the example data used throughout this tutorial to make it easier to adapt this example for your own custom migration or replication workflows.

Postgres table used in this examplePostgres table used in this example

Postgres table used in this example

The table columns are:

  • index - The indices of PostgreSQL table.
  • invoice_ID - Unique ID generated for each transaction.
  • transaction_amt - Amount exchanged in the transaction.
  • transaction_time - POSIX timestamp of transaction.
  • transaction_fulfilled - Whether the transaction has been fulfilled.

Querying data from PostgreSQL

First, create a sqlalchemy engine to interact with the PostgreSQL. We will be using a simple on premise database for this example.

from sqlalchemy import create_engine
import pandas as pd

def establish_postgres_connection():
    '''uses sqlalchemy to establish connection to postgresql db'''    
    pg_db = create_engine('postgresql+psycopg2://dbnd_postgres:[email protected]/dbnd_postgres')
    return pg_db

Next, we will create a function to extract the data from the database. This function will either extract all the data or a subset of the data if only a partial migration to Snowflake is required. For example, we may only want the transaction history from most recent 10 days.

from sqlalchemy import create_engine
import pandas as pd
from dbnd import log_dataframe, log_metric
import datetime

def establish_postgres_connection():
    '''uses sqlalchemy to establish connection to postgresql db'''    
    ...
  
def find_transactions(pg_connection, start_day=None):
    '''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.''' 
    local_file = "/tmp/transactions.csv"
    if start_day:
        start = (datetime.datetime.now()- datetime.timedelta(days=start_day)).timestamp()
        query = f"COPY (SELECT * FROM transactions_data WHERE transaction_time >= {start}) TO '{local_file}' WITH DELIMITER ',' CSV HEADER;"
    else:
        query = f"COPY transactions_data TO '{local_file}' WITH DELIMITER ',' CSV HEADER;"


    transaction_data = pd.read_csv('/tmp/transactions.csv')
    transaction_data.rename(columns={'index':'pg_index'}, inplace=True)
    transaction_data.to_csv('./tmp/internal_stage/transactions.csv', index=False)
    
    log_dataframe("PostgreSQL data", transaction_data, with_histograms=True)
    
    return transaction_data
COPY (SELECT * FROM transactions_data WHERE transaction_time >= {start}) TO '/tmp/transactions.csv' WITH DELIMITER ',' CSV HEADER;
COPY transactions_data TO '/tmp/transactions.csv' WITH DELIMITER ',' CSV HEADER;

We will use the COPY TO command to query and save the data into a file. We will use this file for staging in the next section. We can use pandas to modify the index column name to pg_index to avoid any confusion after migration or replication is completed. If this index column is no longer required, we can simply drop it here.

Next, we will use DBND's log_dataframe function to track the dataframe. This dataframe can be used for a data integrity check at the end of the workflow.

Staging

Here, we will establish a connection to Snowflake with Snowflake's Python connector. For simplicity, we will be opening the connection in the default OCSP Fail Open Mode. The connection will allow us to access the cursor used for executing Snowflake queries.

from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
import datetime

def establish_postgres_connection():
    '''uses sqlalchemy to establish connection to postgresql db'''     
    ...
  
def find_transactions(pg_connection, start_day=None):
    '''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.''' 
    ...
  
def establish_snowflake_connection():
    '''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
    snowflake_connection = snowflake.connector.connect(
        user=credentials.USER,
        password=credentials.PW,
        account=f'{credentials.ACCOUNT}'
    )
    connection_string = f'snowflake://{credentials.USER}:{credentials.PW}@{credentials.ACCOUNT}/?account={credentials.ACCOUNT}'
    return snowflake_connection, connection_string

We covered the general format of the connection_string and account in the Tracking Snowflake Guide.

🚧

Snowflake Account Roles

Before continuing in this section, please ensure that your Snowflake account role has privileges required to CREATE tables, internal stages and/or external stages. Visit Snowflake's official documentation for more details on privileges.

Next, we will create two tables in Snowflake to store our incoming PostgreSQL table. You can do this in Snowflake's web UI or with the Python connector. Snowflake supports a variety of data types. In this tutorial we will be only using simple data types. You can read the list of supported data types in Snowflake's official documentation.

CREATE TABLE "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER);

CREATE TABLE "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE"  ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER);

Internal Stage

An internal stage is a specified location on the local machine where data file(s) are stored so that they can be loaded into a table. An internal stage has the advantage of being easier to set up but it does not support automated continuous data ingestion. For a one-shot bulk data migration, it is often easier to create an internal stage.

After creating an internal stage, we will load the contents onto an intermediate table - this will allow us to UPDATE our final table when new data is received. If a one-shot bulk data migration is desired, you can directly use COPY INTO to load your data into the final table directly.

We will also be keeping track of the Query ID of our queries - this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries.

from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
from dbnd import log_snowflake_resouce_usage, log_snowflake_table
import datetime

def establish_postgres_connection():
    '''uses sqlalchemy to establish connection to postgresql db'''    
    ...
  
def find_transactions(pg_connection, start_day=None):
    '''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.''' 
    ...
  
def establish_snowflake_connection():
    '''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
    ...
 
def migrate_from_internal_stage(sf_cursor):
    '''creates and populates an internal stage, then copies files into an intermediate table''' 
    query_ids = []
    # first create the internal stage: 
    stage_creation_query = """CREATE OR REPLACE TEMPORARY STAGE pg_internal_stage 
                                FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);"""
    sf_cursor.execute(stage_creation_query)
    query_ids.append(sf_cursor.sfqid)
    
    sf_cursor.execute("PUT file://./tmp/internal_stage/transactions.csv @pg_internal_stage;")
    query_ids.append(sf_cursor.sfqid)

    sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @pg_internal_stage;')
    query_ids.append(sf_cursor.sfqid)

    return query_ids
CREATE OR REPLACE TEMPORARY STAGE pg_internal_stage 
                                FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);
PUT file://./tmp/internal_stage/transactions.csv @pg_internal_stage;
COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @pg_internal_stage;

External Stage

An external stage is a specified cloud storage location where data file(s) are stored so that they can be loaded into a table. Currently, Snowflake supports the following cloud storage services:

  • Amazon S3 Buckets
  • Google Cloud Storage Buckets
  • Microsoft Azure Containers

In this example, we are using an AWS S3 Bucket as our external stage. For the required parameters of your cloud storage service, visit Snowflake's official documentation

External stages require only a few extra steps but are more versatile. External stages support continuous data ingestion that be automated with SnowPipe.

πŸ“˜

Automated Continuous Data Ingestion

Visit Snowflake's official documentation to learn more about automated continued data ingestion from an external stage.

After creating the external stage, we can use the COPY INTO command to load the data into the intermediate table. As with the internal stage, if a one-shot bulk data migration is desired, you can directly use COPY INTO to load your data into the final table directly. We will also be keeping track of the Query ID of our queries - this will later be used to discover the Snowflake resource usage and any performance bottlenecks of our queries.

from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
from dbnd import log_snowflake_resouce_usage, log_snowflake_table
import datetime

def establish_postgres_connection():
    '''uses sqlalchemy to establish connection to postgresql db'''    
    ...
  
def find_transactions(pg_connection, start_day=None):
    '''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.''' 
    ...
  
def establish_snowflake_connection():
    '''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
    ...
 
def migrate_from_internal_stage(sf_cursor):
    '''creates and populates an internal stage, then copies files into an intermediate table''' 
    ...
    
def migrate_from_external_s3_stage(sf_cursor):
    '''creates and populates an external stage, then copies files into an intermediate table'''
    query_ids = []
    # first create the storage integration on snowflake:
    sf_cursor.execute("USE ROLE ACCOUNTADMIN;")
    snowflake_storage_integration = """CREATE OR REPLACE STORAGE INTEGRATION s3_integration
                                        type = external_stage
                                        storage_provider = s3
                                        enabled = true
                                        storage_aws_role_arn = 'arn:aws:iam::role/snowflake-role'
                                        storage_allowed_locations = ('s3://<bucket>/<path>');
                                    """ 
    sf_cursor.execute(snowflake_storage_integration)
    query_ids.append(sf_cursor.sfqid)
    
    # next create the external stage:
    ex_stage_creation_query = """CREATE OR REPLACE STAGE aws_external_stage 
                                URL = 's3://<bucket>/<path>'
                                STORAGE_INTEGRATION = s3_int
                                FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);"""
    sf_cursor.execute(ex_stage_creation_query)
    query_ids.append(sf_cursor.sfqid)
        
    # copy from external stage to intermediate table
    sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @aws_external_stage')
    query_ids.append(sf_cursor.sfqid)

    return query_ids
CREATE OR REPLACE STORAGE INTEGRATION s3_integration
                                        type = external_stage
                                        storage_provider = s3
                                        enabled = true
                                        storage_aws_role_arn = 'arn:aws:iam::573172120002:role/snowflake-role'
                                        storage_allowed_locations = ('s3://<bucket>/<path>');
CREATE OR REPLACE STAGE aws_external_stage 
                                URL = 's3://<bucket>/<path>'
                                STORAGE_INTEGRATION = s3_int
                                FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);
COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @aws_external_stage

Merging and Checking for Data Integrity

Next, we will merge the intermediate table with the final target table in Snowflake. By using the MERGE command, we can merge the two tables without having duplicate rows in the final transactions_data table.

After we merge the tables, we check if the final table's contents are the same as the original data extracted from the PostgreSQL database. We can accomplish this with Panda's equals method. The equals method checks if the values in two dataframes are the same - including the data type. It will return 1 if the dataframes are equal and 0 otherwise. We'll log that result with DBND so that it's saved in our file system and we have a record that the process completed successfully.

πŸ“˜

Data Validation Performance

If your dataset is extremely large, checking the structural shape and/or statistics of a dataframe is recommended over the equals method which checks through the entire dataframe.

We also use log_snowflake_resource_usage to understand cost of the process in terms of credits and time. log_snowflake_resource_usage logs granular data about our queries, which will make it easier to quickly discover any performance bottlenecks that come up in the process.

Finally, we log the final table that exists on Snowflake. Using the with_histogram=True parameter in the log_dataframe function allows us to visually ensure that the data from PostgreSQL and Snowflake matches up.

from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
from dbnd import log_snowflake_resouce_usage, log_snowflake_table
import datetime

def establish_postgres_connection():
    '''uses sqlalchemy to establish connection to postgresql db'''    
    ...
  
def find_transactions(pg_connection, start_day=None):
    '''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.''' 
    ...
  
def establish_snowflake_connection():
    '''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
    ...
 
def migrate_from_internal_stage(sf_cursor):
    '''creates and populates an internal stage, then copies files into an intermediate table''' 
    ...
    
def migrate_from_external_s3_stage(sf_cursor):
    '''creates and populates an external stage, then copies files into an intermediate table'''
    ...
    
def merge_intermediate_table(sf_cursor):
    '''Uses the locally staged file to update table'''
    query_ids = []
    snowflake_merge='''MERGE into "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" final_table using "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" i_t 
    on final_table."invoice_ID" = i_t."invoice_ID"
        WHEN matched 
            THEN UPDATE SET final_table."pg_index" = i_t."pg_index", 
                          final_table."invoice_ID" = i_t."invoice_ID",  
                          final_table."transaction_amt" = i_t."transaction_amt",
                          final_table."transaction_time" = i_t."transaction_time",
                          final_table."transaction_fulfilled" = i_t."transaction_fulfilled"
        WHEN not matched 
            THEN insert ("pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled") 
            values (i_t."pg_index", i_t."invoice_ID", i_t."transaction_amt", i_t."transaction_time", i_t."transaction_fulfilled");'''
    sf_cursor.execute(snowflake_merge)
    query_ids.append(sf_cursor.sfqid)

    # clear the intermediate table
    sf_cursor.execute('DELETE FROM "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE"')
    query_ids.append(sf_cursor.sfqid)
    return query_ids
  
if __name__ == '__main__':
        pg_connection = establish_postgres_connection()
    transaction_data = find_transactions_since(pg_connection)

    database = "TRANSACTIONS"
    sf_connection, sf_conn_string = establish_snowflake_connection()
    sf_cursor = sf_connection.cursor()
    sf_cursor.execute(f"USE DATABASE {database}")
    session_id = sf_cursor.connection.session_id
    query_ids = []

    query_ids += migrate_from_internal_stage(sf_cursor)
    query_ids += merge_intermediate_table(sf_cursor)

    query_ids += migrate_from_external_s3_stage(sf_cursor)
    query_ids += merge_intermediate_table(sf_cursor)

    sf_cursor.execute('SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"')
    results = sf_cursor.fetchall()
    query_ids.append(sf_cursor.sfqid)

    snowflake_final_data = pd.DataFrame(results, columns =["pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled"])
    # log_dataframe("final data",snowflake_final_data)

    log_metric("Data Identical",snowflake_final_data.equals(transaction_data))
      log_dataframe("Snowflake Data", snowflake_final_data, with_histograms=True)
    
    log_snowflake_resource_usage(
        database=database,
        connection_string=sf_conn_string,
        query_ids=query_ids,
        session_id=int(session_id),
        raise_on_error=True,
        delay=1
    )

    log_snowflake_table(
        table_name="TRANSACTION_DATA",
        connection_string=sf_conn_string,
        database=database,
        schema="PUBLIC",
        with_preview=True,
        raise_on_error=False
    )

    sf_connection.close()
MERGE into "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" final_table using "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" i_t 
    on final_table."invoice_ID" = i_t."invoice_ID"
        WHEN matched 
            THEN UPDATE SET final_table."pg_index" = i_t."pg_index", 
                          final_table."invoice_ID" = i_t."invoice_ID",  
                          final_table."transaction_amt" = i_t."transaction_amt",
                          final_table."transaction_time" = i_t."transaction_time",
                          final_table."transaction_fulfilled" = i_t."transaction_fulfilled"
        WHEN not matched 
            THEN insert ("pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled") 
            values (i_t."pg_index", i_t."invoice_ID", i_t."transaction_amt", i_t."transaction_time", i_t."transaction_fulfilled");

Running the Script

First, enable tracking by exporting the DBND__TRACKING environment variable. Then, we can run the Python file normally.

export DBND__TRACKING=True
python data_migration.py

The output will include validation that the data matches from source to location, the metrics that were logged, histograms (if enabled), and profiling statistics about the data extracted. We will also see the resource usage in Snowflake, to check they are in acceptable ranges.

==================== 
= Running Databand!
 TRACKERS   : ['console']
...

INFO  data_migration_pipe.py__9bbea85326 - Histogram logged: PostgreSQL data.transaction_amt
INFO  data_migration_pipe.py__9bbea85326 - ###############################################################################
INFO  data_migration_pipe.py__9bbea85326 -                                                        count bin
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  186.94            
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  846.0049999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                                     1  1505.07           
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  2164.1349999999998
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ  10  2823.2            
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  3482.265          
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  4141.329999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  4800.3949999999995
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                               2  5459.459999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                                     1  6118.524999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                               5  6777.589999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                         6  7436.654999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ         9  8095.719999999998 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  8754.785          
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  9413.85           
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                               2  10072.914999999999
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                         6  10731.98          
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                               5  11391.045         
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                          3  12050.109999999999
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                         6  12709.175     
...

INFO  data_migration_pipe.py__3ca9e38f1e - Metric logged: Data Identical=1
...

INFO  data_migration_pipe.py__9bbea85326 - Histogram logged: Snowflake Data.transaction_amt
INFO  data_migration_pipe.py__9bbea85326 - ###############################################################################
INFO  data_migration_pipe.py__9bbea85326 -                                                        count bin
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  186.94            
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  846.0049999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                                     1  1505.07           
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  2164.1349999999998
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ  10  2823.2            
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  3482.265          
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  4141.329999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                    7  4800.3949999999995
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                               2  5459.459999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                                     1  6118.524999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                               5  6777.589999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                         6  7436.654999999999 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ         9  8095.719999999998 
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  8754.785          
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                    4  9413.85           
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                               2  10072.914999999999
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                         6  10731.98          
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                               5  11391.045         
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                                          3  12050.109999999999
INFO  data_migration_pipe.py__9bbea85326 - β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ                         6  12709.175 
...

snowflake_query.10.bytes_scanned=4096
snowflake_query.10.compilation_time_milliseconds=96
snowflake_query.10.credits_used_cloud_services=0.000015000
snowflake_query.10.error_message=None
snowflake_query.10.execution_status=SUCCESS
snowflake_query.10.execution_time_milliseconds=32
snowflake_query.10.query_id=01994de3-0541-a253-0033-79830001b69e
snowflake_query.10.query_tag=
snowflake_query.10.query_text=SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"
snowflake_query.10.rows_produced=100
snowflake_query.10.session_id=14488827360022646
snowflake_query.10.total_elapsed_time_milliseconds=128
...

==================== 
= Your run has been successfully executed!
 TRACKERS   : ['console']

What’s Next
Did this page help you?