Category

Databases

Pandas to PostgreSQL using Psycopg2: Bulk Insert Using execute_values()

As you can see at the end of my benchmark post, the 3 acceptable ways (performance wise) to do a bulk insert in Psycopg2 are 

  • execute_values()
  • execute_mogrify() 
  • copy_from()

This post provides an end-to-end working code for the execute_values() option.

Step 1: Specify the Connection Parameters

# Here you want to change your database, username & password according to your own values
param_dic = {
    "host"      : "localhost",
    "database"  : "globaldata",
    "user"      : "myuser",
    "password"  : "Passw0rd"
}

Step 2: Helper Functions

def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1)
    print("Connection successful")
    return conn

def execute_values(conn, df, table):
    """
    Using psycopg2.extras.execute_values() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()


Step 3: Main Code

# Read the csv file
    # Read your dataframe
    df = read_dataframe(csv_file)

    # Connect to the database
    conn = connect(param_dic)

    # Run the execute_many strategy
    execute_values(conn, df, 'MonthlyTemp')

    # Close the connection
    conn.close()

For a fully functioning tutorial on how to replicate this, please refer to my Jupyter notebook  and Python script on GitHub.

 

Pandas to PostgreSQL using Psycopg2: Bulk Insert Performance Benchmark

 

If you have ever tried to insert a relatively large dataframe into a PostgreSQL table, you know that single inserts are to be avoided at all costs because of how long they take to execute.  There are multiple ways to do bulk inserts with Psycopg2 (see this Stack Overflow page and this blog post for instance). It becomes confusing to identify which one is the most efficient. In this post, I compared the following 5 bulk insert methods, and  ran the benchmarks for you:

  • execute_many()
  • execute_batch()
  • execute_values() – view post
  • mogrify() then execute()
  • copy_from()

For a fully functioning tutorial on how to replicate this, please refer to my Jupyter notebook on GitHub.

Step 1: Specify the connection parameters

# Here you want to change your database, username & password according to your own values
param_dic = {
    "host"      : "localhost",
    "database"  : "globaldata",
    "user"      : "myuser",
    "password"  : "Passw0rd"
}

Step 2: Load the pandas dataframe, and connect to the database

The data for this tutorial is freely available on https://datahub.io/core/global-temp, but you will also find it in the data/ directory of my GitHub repository. What is nice about this dataframe is that it contains string, date and float columns, so it should be a good test dataframe for bench-marking bulk inserts.

import pandas as pd

csv_file = "../data/global-temp-monthly.csv"
df = pd.read_csv(csv_file)
df = df.rename(columns={
    "Source": "source", 
    "Date": "datetime",
    "Mean": "mean_temp"
})
def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    print("Connection successful")
    return conn

conn = connect(param_dic)

Step 3: The five different ways to do a Bulk Insert using Psycopg2

You are welcome.

import os
import psycopg2.extras as extras

def execute_many(conn, df, table):
    """
    Using cursor.executemany() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s)" % (table, cols)
    cursor = conn.cursor()
    try:
        cursor.executemany(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_many() done")
    cursor.close()


def execute_batch(conn, df, table, page_size=100):
    """
    Using psycopg2.extras.execute_batch() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s)" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_batch(cursor, query, tuples, page_size)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_batch() done")
    cursor.close()


def execute_values(conn, df, table):
    """
    Using psycopg2.extras.execute_values() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    query  = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_values() done")
    cursor.close()


def execute_mogrify(conn, df, table):
    """
    Using cursor.mogrify() to build the bulk insert query
    then cursor.execute() to execute the query
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    # SQL quert to execute
    cursor = conn.cursor()
    values = [cursor.mogrify("(%s,%s,%s)", tup).decode('utf8') for tup in tuples]
    query  = "INSERT INTO %s(%s) VALUES " % (table, cols) + ",".join(values)
    
    try:
        cursor.execute(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("execute_mogrify() done")
    cursor.close()


def copy_from(conn, df, table):
    """
    Here we are going save the dataframe on disk as 
    a csv file, load the csv file  
    and use copy_from() to copy it to the table
    """
    # Save the dataframe to disk
    tmp_df = "./tmp_dataframe.csv"
    df.to_csv(tmp_df, index_label='id', header=False)
    f = open(tmp_df, 'r')
    cursor = conn.cursor()
    try:
        cursor.copy_from(f, table, sep=",")
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        os.remove(tmp_df)
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print("copy_from() done")
    cursor.close()
    os.remove(tmp_df)

Step 4: Performance benchmark

For the details on how the benchmarking was done, please refer to the ‘Benchmarking’ section of this notebook

So execute_values() and execute_mogrify() are doing fine…but the best solution seems to be to

  • do whatever transformation you need to do with pandas
  • make sure to rename the dataframe columns to fit your sql table (IN THE RIGHT ORDER)
  • save the transformed database on disk
  • use copy_from() to load your csv file in the database

I don’t know about you, but it is kind of sad to see that a good old copy is doing a better job than execute_values() and execute_mogrify()… But sometimes low tech is best.  Just like when you try to reach your colleague through Google Hangouts, Zoom, Skype… and then end up picking your phone and just calling them because the voice is better

🙂

 

 

From Pandas Dataframe To SQL Table using Psycopg2

For a full functioning example, please refer to my Jupyter notebook on GitHub.

 

Step 1: Specify the connection parameters

# Here you want to change your database, username & password according to your own values
param_dic = {
    "host"      : "localhost",
    "database"  : "worldbankdata",
    "user"      : "myuser",
    "password"  : "Passw0rd"
}

 

Step 2: Connect to the database and insert your dataframe one row at the time

import psycopg2
import pandas as pd

def connect(params_dic):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params_dic)

    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1) 
    return conn


def single_insert(conn, insert_req):
    """ Execute a single INSERT request """
    cursor = conn.cursor()
    try:
        cursor.execute(insert_req)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    cursor.close()


# Connecting to the database
conn = connect(param_dic)

# Inserting each row
for i in dataframe.index:

    query = """
    INSERT into emissions(column1, column2, column3) values('%s',%s,%s);
    """ % (dataframe['column1'], dataframe['column2'], dataframe['column3'])
    single_insert(conn, query)

# Close the connection
conn.close()

 

The full working code is available here.

 

How to query geo-referenced data in PostGIS

PostGIS is an open-source extension to PostgreSQL that adds support for geo-referenced queries. PostGIS adds a geometry column to your table that indicates the shape and geolocation of your row.

here the geometry column is called ‘geom’

Here are three of the most common queries that I use.

  1. Display the geometry as text

    select ST_AsEWKT(geometry_column) from table;
  2. Select rows from table inside given bounding box

    # Here, (xmin, ymin, xmax, ymax) are the edges of your bounding box
    SELECT * FROM table_name WHERE geometry_column && ST_MakeEnvelope(xmin, ymin, xmax, ymax);
    
    # Example
    SELECT * FROM rain_gauges WHERE geom && ST_MakeEnvelope(1503535, -174488.9, 1603535.4, -154488.8);
  3. Change the projection of a table

    # This will change the srid of the table table_name table to 2295 from whatever it was before
    SELECT UpdateGeometrySRID('table_name','geometry_column,2295);

    For more info on projections & SRIDs : https://spatialreference.org/ref/sr-org/what-is-srid/