[outdated] Tracking Snowflake Data Migrations
Using DBND to track an example data migration process from PostgreSQL to Snowflake.
When migrating data between databases, it's critical to know that data is being consistently and accurately moved. In this guide, we will demonstrate how to use DBND to ensure data integrity when migrating data between databases.
Our example covers a migration flow from PostgreSQL to Snowflake. We will be using DBND's tracking functions to log data integrity checks when the data is moved to Snowflake.
For full functionality on how to visualize, monitor, and alert on the process, reach out to our team to learn more about Databand's monitoring application here.
General migration workflow in this example.
Steps of our flow:
- Extract data from a PostgreSQL database.
- Log the table and interesting metrics of the data we extracted.
- Stage the files (both internal and external staging methods will be covered).
- Copy the staged data into a Snowflake table.
- Extract data from Snowflake to ensure that the data is properly stored and production ready. We will also log the Snowflake resources consumed and the final table.
Installing Prerequisites
DBND Tracking
First we need to install dbnd
and dbnd-snowflake
with the following command:
pip install dbnd dbnd-snowflake
SQLAlchemy
SQLAlchemy
We will use SQLAlchemy
to interact with our PostgreSQL database. You can install SQLAlchemy
with the following command:
pip install SQLAlchemy
Alternatively, psycopg2
has substitutes for the functions we will be using in this tutorial.
Snowflake Python Connector
Next, we need to install Snowflake's Python connector, so that we can move data into Snowflake with Python scripts. You can install the connector with:
pip install snowflake-python-connector
For a full list of prerequisites or for an installation process better suited for your environment, visit Snowflake's official documentation.
Example Data
Before beginning to query our data, it is worthwhile to take a look at the example data used throughout this tutorial to make it easier to adapt this example for your own custom migration or replication workflows.
Postgres table used in this example
The table columns are:
index
- The indices of PostgreSQL table.invoice_ID
- Unique ID generated for each transaction.transaction_amt
- Amount exchanged in the transaction.transaction_time
- POSIX timestamp of transaction.transaction_fulfilled
- Whether the transaction has been fulfilled.
Querying data from PostgreSQL
First, create a sqlalchemy
engine to interact with the PostgreSQL. We will be using a simple on premise database for this example.
from sqlalchemy import create_engine
import pandas as pd
def establish_postgres_connection():
'''uses sqlalchemy to establish connection to postgresql db'''
pg_db = create_engine('postgresql+psycopg2://dbnd_postgres:[email protected]/dbnd_postgres')
return pg_db
Next, we will create a function to extract the data from the database. This function will either extract all the data or a subset of the data if only a partial migration to Snowflake is required. For example, we may only want the transaction history from most recent 10 days.
from sqlalchemy import create_engine
import pandas as pd
from dbnd import log_dataframe, log_metric
import datetime
def establish_postgres_connection():
'''uses sqlalchemy to establish connection to postgresql db'''
...
def find_transactions(pg_connection, start_day=None):
'''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.'''
local_file = "/tmp/transactions.csv"
if start_day:
start = (datetime.datetime.now()- datetime.timedelta(days=start_day)).timestamp()
query = f"COPY (SELECT * FROM transactions_data WHERE transaction_time >= {start}) TO '{local_file}' WITH DELIMITER ',' CSV HEADER;"
else:
query = f"COPY transactions_data TO '{local_file}' WITH DELIMITER ',' CSV HEADER;"
transaction_data = pd.read_csv('/tmp/transactions.csv')
transaction_data.rename(columns={'index':'pg_index'}, inplace=True)
transaction_data.to_csv('./tmp/internal_stage/transactions.csv', index=False)
log_dataframe("PostgreSQL data", transaction_data, with_histograms=True)
return transaction_data
COPY (SELECT * FROM transactions_data WHERE transaction_time >= {start}) TO '/tmp/transactions.csv' WITH DELIMITER ',' CSV HEADER;
COPY transactions_data TO '/tmp/transactions.csv' WITH DELIMITER ',' CSV HEADER;
We will use the COPY TO
command to query and save the data into a file. We will use this file for staging in the next section. We can use pandas
to modify the index
column name to pg_index
to avoid any confusion after migration or replication is completed. If this index
column is no longer required, we can simply drop it here.
Next, we will use DBND's log_dataframe
function to track the dataframe. This dataframe can be used for a data integrity check at the end of the workflow.
Staging
Here, we will establish a connection to Snowflake with Snowflake's Python connector. For simplicity, we will be opening the connection in the default OCSP Fail Open Mode. The connection will allow us to access the cursor used for executing Snowflake queries.
from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
import datetime
def establish_postgres_connection():
'''uses sqlalchemy to establish connection to postgresql db'''
...
def find_transactions(pg_connection, start_day=None):
'''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.'''
...
def establish_snowflake_connection():
'''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
snowflake_connection = snowflake.connector.connect(
user=credentials.USER,
password=credentials.PW,
account=f'{credentials.ACCOUNT}'
)
connection_string = f'snowflake://{credentials.USER}:{credentials.PW}@{credentials.ACCOUNT}/?account={credentials.ACCOUNT}'
return snowflake_connection, connection_string
We covered the general format of the connection_string
and account
in the Tracking Snowflake Guide.
Snowflake Account Roles
Before continuing in this section, please ensure that your Snowflake account role has privileges required to
CREATE
tables, internal stages and/or external stages. Visit Snowflake's official documentation for more details on privileges.
Next, we will create two tables in Snowflake to store our incoming PostgreSQL table. You can do this in Snowflake's web UI or with the Python connector. Snowflake supports a variety of data types. In this tutorial we will be only using simple data types. You can read the list of supported data types in Snowflake's official documentation.
CREATE TABLE "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER);
CREATE TABLE "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" ("pg_index" INTEGER, "invoice_ID" STRING, "transaction_amt" DOUBLE, "transaction_time" DOUBLE, "transaction_fulfilled" INTEGER);
Internal Stage
An internal stage is a specified location on the local machine where data file(s) are stored so that they can be loaded into a table. An internal stage has the advantage of being easier to set up but it does not support automated continuous data ingestion. For a one-shot bulk data migration, it is often easier to create an internal stage.
After creating an internal stage, we will load the contents onto an intermediate table - this will allow us to UPDATE
our final table when new data is received. If a one-shot bulk data migration is desired, you can directly use COPY INTO
to load your data into the final table directly.
We will also be keeping track of the Query ID
of our queries - this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries.
from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
from dbnd import log_snowflake_resouce_usage, log_snowflake_table
import datetime
def establish_postgres_connection():
'''uses sqlalchemy to establish connection to postgresql db'''
...
def find_transactions(pg_connection, start_day=None):
'''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.'''
...
def establish_snowflake_connection():
'''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
...
def migrate_from_internal_stage(sf_cursor):
'''creates and populates an internal stage, then copies files into an intermediate table'''
query_ids = []
# first create the internal stage:
stage_creation_query = """CREATE OR REPLACE TEMPORARY STAGE pg_internal_stage
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);"""
sf_cursor.execute(stage_creation_query)
query_ids.append(sf_cursor.sfqid)
sf_cursor.execute("PUT file://./tmp/internal_stage/transactions.csv @pg_internal_stage;")
query_ids.append(sf_cursor.sfqid)
sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @pg_internal_stage;')
query_ids.append(sf_cursor.sfqid)
return query_ids
CREATE OR REPLACE TEMPORARY STAGE pg_internal_stage
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);
PUT file://./tmp/internal_stage/transactions.csv @pg_internal_stage;
COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @pg_internal_stage;
External Stage
An external stage is a specified cloud storage location where data file(s) are stored so that they can be loaded into a table. Currently, Snowflake supports the following cloud storage services:
- Amazon S3 Buckets
- Google Cloud Storage Buckets
- Microsoft Azure Containers
In this example, we are using an AWS S3 Bucket as our external stage. For the required parameters of your cloud storage service, visit Snowflake's official documentation
External stages require only a few extra steps but are more versatile. External stages support continuous data ingestion that be automated with SnowPipe.
Automated Continuous Data Ingestion
Visit Snowflake's official documentation to learn more about automated continued data ingestion from an external stage.
After creating the external stage, we can use the COPY INTO
command to load the data into the intermediate table. As with the internal stage, if a one-shot bulk data migration is desired, you can directly use COPY INTO
to load your data into the final table directly. We will also be keeping track of the Query ID
of our queries - this will later be used to discover the Snowflake resource usage and any performance bottlenecks of our queries.
from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
from dbnd import log_snowflake_resouce_usage, log_snowflake_table
import datetime
def establish_postgres_connection():
'''uses sqlalchemy to establish connection to postgresql db'''
...
def find_transactions(pg_connection, start_day=None):
'''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.'''
...
def establish_snowflake_connection():
'''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
...
def migrate_from_internal_stage(sf_cursor):
'''creates and populates an internal stage, then copies files into an intermediate table'''
...
def migrate_from_external_s3_stage(sf_cursor):
'''creates and populates an external stage, then copies files into an intermediate table'''
query_ids = []
# first create the storage integration on snowflake:
sf_cursor.execute("USE ROLE ACCOUNTADMIN;")
snowflake_storage_integration = """CREATE OR REPLACE STORAGE INTEGRATION s3_integration
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::role/snowflake-role'
storage_allowed_locations = ('s3://<bucket>/<path>');
"""
sf_cursor.execute(snowflake_storage_integration)
query_ids.append(sf_cursor.sfqid)
# next create the external stage:
ex_stage_creation_query = """CREATE OR REPLACE STAGE aws_external_stage
URL = 's3://<bucket>/<path>'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);"""
sf_cursor.execute(ex_stage_creation_query)
query_ids.append(sf_cursor.sfqid)
# copy from external stage to intermediate table
sf_cursor.execute('COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @aws_external_stage')
query_ids.append(sf_cursor.sfqid)
return query_ids
CREATE OR REPLACE STORAGE INTEGRATION s3_integration
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::573172120002:role/snowflake-role'
storage_allowed_locations = ('s3://<bucket>/<path>');
CREATE OR REPLACE STAGE aws_external_stage
URL = 's3://<bucket>/<path>'
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = (TYPE = CSV SKIP_HEADER = 1);
COPY INTO "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" from @aws_external_stage
Merging and Checking for Data Integrity
Next, we will merge the intermediate table with the final target table in Snowflake. By using the MERGE
command, we can merge the two tables without having duplicate rows in the final transactions_data
table.
After we merge the tables, we check if the final table's contents are the same as the original data extracted from the PostgreSQL database. We can accomplish this with Panda's equals
method. The equals
method checks if the values in two dataframes are the same - including the data type. It will return 1
if the dataframes are equal and 0
otherwise. We'll log that result with DBND so that it's saved in our file system and we have a record that the process completed successfully.
Data Validation Performance
If your dataset is extremely large, checking the structural shape and/or statistics of a dataframe is recommended over the
equals
method which checks through the entire dataframe.
We also use log_snowflake_resource_usage
to understand cost of the process in terms of credits and time. log_snowflake_resource_usage
logs granular data about our queries, which will make it easier to quickly discover any performance bottlenecks that come up in the process.
Finally, we log the final table that exists on Snowflake. Using the with_histogram=True
parameter in the log_dataframe
function allows us to visually ensure that the data from PostgreSQL and Snowflake matches up.
from sqlalchemy import create_engine
import pandas as pd
import snowflake.connector
from dbnd import log_dataframe, log_metric
from dbnd import log_snowflake_resouce_usage, log_snowflake_table
import datetime
def establish_postgres_connection():
'''uses sqlalchemy to establish connection to postgresql db'''
...
def find_transactions(pg_connection, start_day=None):
'''finds transactions between start_day to current time. If start_day is None, returns all transactions as a Pandas DataFrame.'''
...
def establish_snowflake_connection():
'''uses Snowflake Python Connector to establish connection to snowflake, returns connection and connection string'''
...
def migrate_from_internal_stage(sf_cursor):
'''creates and populates an internal stage, then copies files into an intermediate table'''
...
def migrate_from_external_s3_stage(sf_cursor):
'''creates and populates an external stage, then copies files into an intermediate table'''
...
def merge_intermediate_table(sf_cursor):
'''Uses the locally staged file to update table'''
query_ids = []
snowflake_merge='''MERGE into "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" final_table using "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" i_t
on final_table."invoice_ID" = i_t."invoice_ID"
WHEN matched
THEN UPDATE SET final_table."pg_index" = i_t."pg_index",
final_table."invoice_ID" = i_t."invoice_ID",
final_table."transaction_amt" = i_t."transaction_amt",
final_table."transaction_time" = i_t."transaction_time",
final_table."transaction_fulfilled" = i_t."transaction_fulfilled"
WHEN not matched
THEN insert ("pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled")
values (i_t."pg_index", i_t."invoice_ID", i_t."transaction_amt", i_t."transaction_time", i_t."transaction_fulfilled");'''
sf_cursor.execute(snowflake_merge)
query_ids.append(sf_cursor.sfqid)
# clear the intermediate table
sf_cursor.execute('DELETE FROM "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE"')
query_ids.append(sf_cursor.sfqid)
return query_ids
if __name__ == '__main__':
pg_connection = establish_postgres_connection()
transaction_data = find_transactions_since(pg_connection)
database = "TRANSACTIONS"
sf_connection, sf_conn_string = establish_snowflake_connection()
sf_cursor = sf_connection.cursor()
sf_cursor.execute(f"USE DATABASE {database}")
session_id = sf_cursor.connection.session_id
query_ids = []
query_ids += migrate_from_internal_stage(sf_cursor)
query_ids += merge_intermediate_table(sf_cursor)
query_ids += migrate_from_external_s3_stage(sf_cursor)
query_ids += merge_intermediate_table(sf_cursor)
sf_cursor.execute('SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"')
results = sf_cursor.fetchall()
query_ids.append(sf_cursor.sfqid)
snowflake_final_data = pd.DataFrame(results, columns =["pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled"])
# log_dataframe("final data",snowflake_final_data)
log_metric("Data Identical",snowflake_final_data.equals(transaction_data))
log_dataframe("Snowflake Data", snowflake_final_data, with_histograms=True)
log_snowflake_resource_usage(
database=database,
connection_string=sf_conn_string,
query_ids=query_ids,
session_id=int(session_id),
raise_on_error=True,
delay=1
)
log_snowflake_table(
table_name="TRANSACTION_DATA",
connection_string=sf_conn_string,
database=database,
schema="PUBLIC",
with_preview=True,
raise_on_error=False
)
sf_connection.close()
MERGE into "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA" final_table using "TRANSACTIONS"."PUBLIC"."INTERMEDIATE_TABLE" i_t
on final_table."invoice_ID" = i_t."invoice_ID"
WHEN matched
THEN UPDATE SET final_table."pg_index" = i_t."pg_index",
final_table."invoice_ID" = i_t."invoice_ID",
final_table."transaction_amt" = i_t."transaction_amt",
final_table."transaction_time" = i_t."transaction_time",
final_table."transaction_fulfilled" = i_t."transaction_fulfilled"
WHEN not matched
THEN insert ("pg_index", "invoice_ID", "transaction_amt", "transaction_time", "transaction_fulfilled")
values (i_t."pg_index", i_t."invoice_ID", i_t."transaction_amt", i_t."transaction_time", i_t."transaction_fulfilled");
Running the Script
First, enable tracking by exporting the DBND__TRACKING
environment variable. Then, we can run the Python file normally.
export DBND__TRACKING=True
python data_migration.py
The output will include validation that the data matches from source to location, the metrics that were logged, histograms (if enabled), and profiling statistics about the data extracted. We will also see the resource usage in Snowflake, to check they are in acceptable ranges.
====================
= Running Databand!
TRACKERS : ['console']
...
INFO data_migration_pipe.py__9bbea85326 - Histogram logged: PostgreSQL data.transaction_amt
INFO data_migration_pipe.py__9bbea85326 - ###############################################################################
INFO data_migration_pipe.py__9bbea85326 - count bin
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 186.94
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 846.0049999999999
INFO data_migration_pipe.py__9bbea85326 - █████ 1 1505.07
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 2164.1349999999998
INFO data_migration_pipe.py__9bbea85326 - ███████████████████████████████████████████████████████ 10 2823.2
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 3482.265
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 4141.329999999999
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 4800.3949999999995
INFO data_migration_pipe.py__9bbea85326 - ███████████ 2 5459.459999999999
INFO data_migration_pipe.py__9bbea85326 - █████ 1 6118.524999999999
INFO data_migration_pipe.py__9bbea85326 - ███████████████████████████ 5 6777.589999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████ 6 7436.654999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████████████████████ 9 8095.719999999998
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 8754.785
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 9413.85
INFO data_migration_pipe.py__9bbea85326 - ███████████ 2 10072.914999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████ 6 10731.98
INFO data_migration_pipe.py__9bbea85326 - ███████████████████████████ 5 11391.045
INFO data_migration_pipe.py__9bbea85326 - ████████████████ 3 12050.109999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████ 6 12709.175
...
INFO data_migration_pipe.py__3ca9e38f1e - Metric logged: Data Identical=1
...
INFO data_migration_pipe.py__9bbea85326 - Histogram logged: Snowflake Data.transaction_amt
INFO data_migration_pipe.py__9bbea85326 - ###############################################################################
INFO data_migration_pipe.py__9bbea85326 - count bin
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 186.94
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 846.0049999999999
INFO data_migration_pipe.py__9bbea85326 - █████ 1 1505.07
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 2164.1349999999998
INFO data_migration_pipe.py__9bbea85326 - ███████████████████████████████████████████████████████ 10 2823.2
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 3482.265
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 4141.329999999999
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████████████████████ 7 4800.3949999999995
INFO data_migration_pipe.py__9bbea85326 - ███████████ 2 5459.459999999999
INFO data_migration_pipe.py__9bbea85326 - █████ 1 6118.524999999999
INFO data_migration_pipe.py__9bbea85326 - ███████████████████████████ 5 6777.589999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████ 6 7436.654999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████████████████████ 9 8095.719999999998
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 8754.785
INFO data_migration_pipe.py__9bbea85326 - ██████████████████████ 4 9413.85
INFO data_migration_pipe.py__9bbea85326 - ███████████ 2 10072.914999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████ 6 10731.98
INFO data_migration_pipe.py__9bbea85326 - ███████████████████████████ 5 11391.045
INFO data_migration_pipe.py__9bbea85326 - ████████████████ 3 12050.109999999999
INFO data_migration_pipe.py__9bbea85326 - █████████████████████████████████ 6 12709.175
...
snowflake_query.10.bytes_scanned=4096
snowflake_query.10.compilation_time_milliseconds=96
snowflake_query.10.credits_used_cloud_services=0.000015000
snowflake_query.10.error_message=None
snowflake_query.10.execution_status=SUCCESS
snowflake_query.10.execution_time_milliseconds=32
snowflake_query.10.query_id=01994de3-0541-a253-0033-79830001b69e
snowflake_query.10.query_tag=
snowflake_query.10.query_text=SELECT * FROM "TRANSACTIONS"."PUBLIC"."TRANSACTION_DATA"
snowflake_query.10.rows_produced=100
snowflake_query.10.session_id=14488827360022646
snowflake_query.10.total_elapsed_time_milliseconds=128
...
====================
= Your run has been successfully executed!
TRACKERS : ['console']
Updated 5 months ago