Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Databases and Python

Database

Relational Databases (SQL)

  • SQLite
  • MySQL - MariaDB
  • PostgreSQL
  • ...

NoSQL

Types of NoSQL databases

  • Document oriented - MongoDB
  • Key-Value store - Redis
  • Graph - Neo4j
  • Tuple store - Apache River, TIBCO

SQLite Database Access

SQLite

  • sqlite

  • SQLite the most popular embedded relational database.

  • sqlite3 - Python library to use SQLite.

Connecting to SQLite database

  • connect|sqlite
  • cursor|sqlite

This connects to the database in the given file. If the file does not exist yet this will create the file and prepare it to hold an SQLite database. No tables are created at this point and no data is inserted.

import sqlite3

conn = sqlite3.connect("companies.db")
crs = conn.cursor()

# use the database here

conn.close()

Connecting to in-memory SQLite database

We can also create in-memory database.

This is not persistent, but it can be useful to load some data into memory and then do fast SQL queries on that database.

import sqlite3

conn = sqlite3.connect(":memory:")
crs = conn.cursor()

# use the database here

conn.close()

Create TABLE in SQLite

  • CREATE|sqlite
  • execute|sqlite
  • commit|sqlite

execute and commit

import sqlite3


def create_table(conn):
    crs = conn.cursor()

    sql = '''
        CREATE TABLE companies (
                        id          INTEGER PRIMARY KEY AUTOINCREMENT,
                        name        VARCRCHAR(100) UNIQUE NOT NULL,
                        established INTEGER NOT NULL,
                        employees   INTEGER DEFAULT 0
    )
    '''

    try:
        crs.execute(sql)
    except sqlite3.OperationalError as err:
        print(f'sqlite error: {err.args[0]}')  # table companies already exists
        print(f'remove companies.db to show how it works')

    conn.commit()

def main():
    conn = sqlite3.connect("companies.db")

    create_table(conn)

    conn.close()

    print('done')

if __name__ == "__main__":
    main()

INSERT data into SQLite database

  • INSERT|sqlite
  • ?|sqlite
import sqlite3

sql = 'INSERT INTO companies (name, employees, established) VALUES (?, ?, ?)'

def insert_one(conn, crs):
    company_name = 'Hostlocal'
    employee_count = 1
    year_of_establishment = 2000

    try:
        crs.execute(sql, (company_name, employee_count, year_of_establishment))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0]) # column name is not unique
    conn.commit()


def insert_many(conn, crs):
    companies = [
        ('Google',    150_028, 1998),
        ('Facebook',   68_177, 2003),
        ('Apple',     154_000, 1977),
        ('Microsoft', 181_000, 1975),
    ]

    try:
        crs.executemany(sql, companies)
    except sqlite3.IntegrityError as err:
        print(f'sqlite error: {err.args[0]}')
    conn.commit()


def main():
    conn = sqlite3.connect("companies.db")
    crs = conn.cursor()
    insert_one(conn, crs)
    insert_many(conn, crs)
    conn.close()
    print('done')

main()
  • Use placeholders (?) supply the data in tuples and to avoid Bobby tables

SELECT data from SQLite database

  • SELECT|sqlite
import sqlite3

conn = sqlite3.connect("companies.db")
crs = conn.cursor()

employees = 3

sql = 'SELECT * FROM companies WHERE employees >= ?'
for company in crs.execute(sql, (employees,)):
    print(company)

print('-----------')

year = 2000
sql = 'SELECT id, name FROM companies WHERE employees >= ? AND established < ?'
for id, name in crs.execute(sql, (employees, year)):
    print(f"{id} - {name}")

conn.close()
  • Use the result as an iterator.

SELECT aggregate data from SQLite database

  • SELECT|sqlite
  • COUNT|sqlite
  • SUM|sqlite
  • fetchone|sqlite
import sqlite3

conn = sqlite3.connect("companies.db")
crs = conn.cursor()

employees = 3
year = 2000

sql = 'SELECT COUNT(id) FROM companies WHERE employees >= ? AND established < ?'
crs.execute(sql, (employees, year))
row = crs.fetchone()
print(row)
print(row[0])

name = '%o%'
sql = 'SELECT SUM(employees) FROM companies WHERE name LIKE ? AND established < ?'
crs.execute(sql, (name, year))
row = crs.fetchone()
print(row)
print(row[0])


conn.close()

If expecting only one row, call the fetchone method. If the result set might be empty, then the fetchone might return None. Check for it!

SELECT data from SQLite database into dictionaries

import sqlite3

conn = sqlite3.connect("companies.db")
conn.row_factory = sqlite3.Row

crs = conn.cursor()

employees = 0
year = 2000

sql = 'SELECT * FROM companies WHERE employees >= ? AND established < ?'
for company in crs.execute(sql, (employees, year)):
    # returns sqlite3.Row objects, but they can also act as dictionaries
    print(f"{company['id']} - {company['name']} - {company['employees']} - {company['established']}")

conn.close()

UPDATE data in SQLite database

  • UPDATE|sqlite

  • UPDATE works quite similar, but it might have a WHERE clause.

import sqlite3

conn = sqlite3.connect("companies.db")
crs = conn.cursor()

name = 'Hostlocal'

sql = 'SELECT employees FROM companies WHERE name = ?'
crs.execute(sql, (name,))
row = crs.fetchone()
employees = row[0]


sql = 'UPDATE companies SET employees=? WHERE name = ?'
crs.execute(sql, (employees+1, name))
conn.commit()

print('-----------')

sql = 'SELECT name, employees FROM companies'
for name, employees in crs.execute(sql):
    print(f"{name} - {employees}")


conn.close()

A counter

"""
    Counter using an SQLite backend
    --list            list all the counters
    --start name     creates the counter for 'name'
    name             counts for 'name'
"""

import sys
import os
import argparse
import sqlite3

database_file = "counter.db"

def list_counters(crs):
    print('List counters:')
    for name, count in crs.execute("SELECT name, count FROM counters"):
        print(f"{name}: {count}")

def get_counter(crs, name):
    crs.execute("SELECT count FROM counters WHERE name = ?", (name,))
    line = crs.fetchone()
    if line is None:
        return None
    return line[0]


def increase_counter(conn, crs, name):
    counter = get_counter(crs, name)
    if counter is None:
        print(f"Invalid counter name '{name}' use the --start flag to start a new counter")
        return

    counter += 1
    crs.execute("UPDATE counters SET count=? WHERE name = ?", (counter, name))
    conn.commit()
    print(f"{name} {counter}")

def start_counter(conn, crs, name):
    counter = get_counter(crs, name)
    if counter is not None:
        print(f"Counter {name} already exists")
        return

    try:
        crs.execute("INSERT INTO counters (name, count) VALUES(?, ?)", (name, 0))
        conn.commit()
    except sqlite3.IntegrityError as err:
        print(f"Name '{name}' already exists")

def get_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument('--list', action='store_true')
    parser.add_argument('--start', action='store_true')
    parser.add_argument('name', nargs="?")
    args = parser.parse_args()
    if args.name is None and not args.list:
        parser.print_help()
        exit()
    return args

def main():
    args = get_arguments()

    with sqlite3.connect(database_file) as conn:
        crs = conn.cursor()
        try:
            crs.execute('''CREATE TABLE counters (
              id PRIMARY KEY,
              name VARCRCHAR(100) UNIQUE NOT NULL,
              count INTEGER NOT NULL
              )''')
        except sqlite3.OperationalError as err:
            pass
            #print(f'sqlite error: {err.args[0]}')

        if args.list:
            list_counters(crs)
            return

        if args.start  and args.name:
            start_counter(conn, crs, args.name)
            return

        if args.name:
            increase_counter(conn, crs, args.name)
            return

main()

#print "TODO get the value of 'name' from the database"
# if it was not there then add


#try:
#  c.execute('''INSERT INTO companies (name) VALUES ('Stonehenge')''')
#except sqlite3.IntegrityError as e:
#  print 'sqlite error: ', e.args[0] # column name is not unique

#conn.commit()

#conn.close()

#print "done"

SQLite in-memory AUTOINCREMENT

  • AUTOINCREMENT
  • memory
import sqlite3

def create_table(conn, crs):
    sql = '''
        CREATE TABLE people (
           id        INTEGER PRIMARY KEY AUTOINCREMENT,
           username  VARCHAR(100) UNIQUE NOT NULL
    );
    '''

    try:
        crs.execute(sql)
    except sqlite3.OperationalError as err:
        print(f'sqlite error: {err.args[0]}')  # table companies already exists
    conn.commit()


def insert(conn, crs, username):
    sql = 'INSERT INTO people (username) VALUES (?)'
    try:
        crs.execute(sql, (username,))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0])
    conn.commit()


def list_rows(conn, crs):
    sql = "SELECT * FROM people"
    for id, username in crs.execute(sql):
        print(f"{id} - {username}")


def main():
    conn = sqlite3.connect(":memory:")
    crs = conn.cursor()
    create_table(conn, crs)

    insert(conn, crs, "rachel")
    list_rows(conn, crs)

    insert(conn, crs, "joey")
    list_rows(conn, crs)

    insert(conn, crs, "monica")
    list_rows(conn, crs)

    insert(conn, crs, "monica")
    list_rows(conn, crs)

    conn.close()
    print('done')


main()

SQLite in-memory field with DEFAULT value

  • DEFAULT
  • memory
import sqlite3

def create_table(conn, crs):
    sql = '''
        CREATE TABLE qa (
           question  TEXT,
           answer    TEXT DEFAULT "42"
    );
    '''

    try:
        crs.execute(sql)
    except sqlite3.OperationalError as err:
        print(f'sqlite error: {err.args[0]}')  # table companies already exists
    conn.commit()


def insert_qa(conn, crs, question, answer):
    sql = 'INSERT INTO qa (question, answer) VALUES (?, ?)'
    try:
        crs.execute(sql, (question, answer))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0])
    conn.commit()

def insert_q(conn, crs, question):
    sql = 'INSERT INTO qa (question) VALUES (?)'
    try:
        crs.execute(sql, (question,))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0])
    conn.commit()


def list_rows(conn, crs):
    sql = "SELECT * FROM qa"
    for question, answer in crs.execute(sql):
        print(f"{question} - {answer}")
    print("------")


def main():
    conn = sqlite3.connect(":memory:")
    crs = conn.cursor()
    create_table(conn, crs)

    insert_qa(conn, crs, "Language?", "Python")
    list_rows(conn, crs)

    insert_qa(conn, crs, "Database?", "SQLite")
    list_rows(conn, crs)

    insert_q(conn, crs, "Meaning of life?")
    list_rows(conn, crs)

    conn.close()
    print('done')

main()

SQLite transactions

  • TBD

This example is supposed to show the importance of transactions, but so far I failed to replicate the problem as we have in this example

import sqlite3

def create_table(conn, crs):
    sql = '''
        CREATE TABLE bank (
            name TEXT PRIMARY KEY,
            balance INTEGER NOT NULL
    );
    '''

    try:
        crs.execute(sql)
    except sqlite3.OperationalError as err:
        print(f'sqlite error: {err.args[0]}')  # table companies already exists
    conn.commit()


def insert_account(conn, crs, name, balance):
    sql = 'INSERT INTO bank (name, balance) VALUES (?, ?)'
    try:
        crs.execute(sql, (name, balance))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0])
    conn.commit()

def show(conn, crs):
    sql = "SELECT * FROM bank"
    for name, balance in crs.execute(sql):
        print(f"{name:5}: {balance:>5}")
    for (total,) in crs.execute("SELECT SUM(balance) AS total FROM bank"):
        print(f"Total: {total:>5}")
    print("------")

def update(conn, crs, name, amount):
    sql = 'UPDATE bank SET balance = (SELECT balance FROM bank WHERE name = ?) + ? WHERE name = ?';
    try:
        crs.execute(sql, (name, amount, name))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0])
    conn.commit()

def without_transaction(conn, crs, from_name, to_name, amount):
    update(conn, crs, from_name, -amount);
    return
    update(conn, crs, to_name, amount);

def with_transaction(conn, crs, from_name, to_name, amount):
    sql = 'UPDATE bank SET balance = (SELECT balance FROM bank WHERE name = ?) + ? WHERE name = ?';
    try:
        crs.execute(sql, (from_name, -amount, from_name))
        return
        crs.execute(sql, (to_name, amount, to_name))
    except sqlite3.IntegrityError as err:
        print('sqlite error: ', err.args[0])
    print("here")
    conn.commit()


def main():
    conn = sqlite3.connect(":memory:", autocommit=False, isolation_level=None)
    crs = conn.cursor()
    create_table(conn, crs)
    insert_account(conn, crs, "Jane", 0)
    insert_account(conn, crs, "Mary", 1000)
    insert_account(conn, crs, "Ann", 1000)
    show(conn, crs)

    update(conn, crs, "Mary", -100)
    update(conn, crs, "Jane", +100)
    show(conn, crs)

    without_transaction(conn, crs, "Mary", "Jane", 100)
    show(conn, crs)

    with_transaction(conn, crs, "Mary", "Jane", 100)
    show(conn, crs)

    conn.close()

main()

MySQL

Install MySQL support

  • Anaconda on MS Windows: conda install mysql-connector-python
  • Otherwise: pip install mysql-connector

Create database user (manually)

$ mysql -u root -p 

   SHOW DATABASES;

   CREATE USER 'foobar'@'localhost' IDENTIFIED BY 'no secret';
   GRANT ALL PRIVILEGES ON fb_db . * TO 'foobar'@'localhost';
   GRANT ALL PRIVILEGES ON * . * TO 'foobar'@'%' IDENTIFIED BY 'no secret';
   FLUSH PRIVILEGES;

   exit
   vim /etc/mysql/mysql.conf.d/mysqld.cnf
   comment out
   # bind-address          = 127.0.0.1
 
   service mysql restart

Create database (manually)

$ mysql -u foobar -p 

  CREATE DATABASE fb_db;

  DROP DATABASE fb_db;
  exit

Create table (manually)

$ mysql -u foobar -p 

  USE fb_db;
  CREATE TABLE person (
    id INTEGER PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255),
    birthdate DATE,
    score REAL
  );

  INSERT INTO person (name, birthdate, score)
      VALUES ("Foo Bar", "1998-05-23", 42.1)

Connect to MySQL

  • connect
  • close
import mysql.connector

def main():
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    print("Connected:", conn)

    conn.close()

if __name__ == "__main__":
    main()

$ python3 examples/mysql/connect.py
  • Change some of the parameters and try again

Connect to MySQL and Handle exception

import mysql.connector

def main():
    try:
        conn = mysql.connector.connect(
            host = 'localhost',
            database = 'fb_db',
            user = 'foobar',
            password='no secret')
    except mysql.connector.Error as e:
        print("MySQL exception: ", e)
        return
    #except Exception as e:
    #    print("Other exception", e);
    #    return

    print("Connected:", conn)

    conn.close()

if __name__ == "__main__":
    main()

Select data

  • cursor
  • execute
  • fetchone
import mysql.connector


def main():
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("SELECT * FROM person")

    row = cursor.fetchone()
    print(row)

    # cursor.close()  # mysql.connector.errors.InternalError: Unread result found.
    conn.close()

if __name__ == "__main__":
    main()

Select more data

import mysql.connector


def main():
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("SELECT * FROM person")

    while True:
        row = cursor.fetchone()
        if not row:
            break
        print(row)

    cursor.close()
    conn.close()

if __name__ == "__main__":
    main()

Select all data fetchall

  • fetchall
import mysql.connector


def main():
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("SELECT * FROM person")

    rows = cursor.fetchall()

    print(len(rows))
    for row in rows:
        print(row)

    cursor.close()
    conn.close()

if __name__ == "__main__":
    main()

Select some data fetchmany

  • fetchmany
import mysql.connector


def main():
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("SELECT * FROM person")

    size = 2

    while True:
        rows = cursor.fetchmany(size)
        if not rows:
            break
        print(len(rows))
        for row in rows:
            print(row)

    cursor.close()
    conn.close()

if __name__ == "__main__":
    main()

Select some data WHERE clause

  • WHERE
  • %s

Bobby Tables

import mysql.connector


def main(min_score):
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("SELECT * FROM person WHERE score > %s", (min_score,))

    size = 2

    while True:
        rows = cursor.fetchmany(size)
        if not rows:
            break
        print(len(rows))
        for row in rows:
            print(row)

    cursor.close()
    conn.close()

if __name__ == "__main__":
    main(40)

Select into dictionaries

import mysql.connector


def main():
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor(dictionary=True)
    cursor.execute("SELECT * FROM person")

    for row in cursor:
        print(row)

    cursor.close()
    conn.close()

if __name__ == "__main__":
    main()

Insert data

  • INSERT
  • commit
import mysql.connector


def main(name, birthdate, score):
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute(
        "INSERT INTO person (name, birthdate, score) VALUES (%s, %s, %s)",
        (name, birthdate, score))

    if cursor.lastrowid:
        print('last insert id', cursor.lastrowid)
    else:
        print('last insert id not found')
    conn.commit()

    conn.close()

if __name__ == "__main__":
    main('Monty Python', '1969-10-05', 100)


Update data

  • UPDATE
import mysql.connector


def main(uid, score):
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("UPDATE person SET score=%s WHERE id=%s",
        (score, uid))
    conn.commit()

    conn.close()

if __name__ == "__main__":
    main(12, 32)

Delete data

  • DELETE
import mysql.connector


def main(uid):
    conn = mysql.connector.connect(
        host = 'localhost',
        database = 'fb_db',
        user = 'foobar',
        password='no secret')

    cursor = conn.cursor()
    cursor.execute("DELETE FROM person WHERE id=%s", (uid,))
    conn.commit()

    conn.close()

if __name__ == "__main__":
    main(11)

Exercise MySQL

  1. Create a user with a password manually.
  2. Create a database manually.
  3. Create a table manually for describing fleet of cars: id, license-plate, year-built, brand, owner. (Owner is the name of the owner)
  4. Create a program that accepts values on the command line and insterts the data in the database
  5. Create another program that lists all the cars.
  6. Improve the selector program to accept command line paramter --minage N and --maxage N and show the cars within those age limits (N is a number of years e.g. 3)
  7. Create program to delete a car.
  8. Create program to change the owner of a car.

Exercise: MySQL Connection

Instead of hard-coding the connection details in the script, let's create an INI file that contains the connection information and use that.

[development]
host     = localhost
database = fb_db
user     = foobar
password = no secret

Solution: MySQL Connection

import configparser
import mysql.connector

config_file = 'examples/mysql/connect.ini'

def read_config(section = 'development'):
    print(section)
    cp = configparser.ConfigParser()
    cp.read(config_file)
    if not cp.has_section(section):
        raise Exception("No configuration found for '{}'".format(section))

    return cp[section]

def main():
    try:
        db = read_config()
        print(db['password'])
        print(db)
        conn = mysql.connector.connect(**db)
    except mysql.connector.Error as e:
        print("MySQL exception: ", e)
        return
    except Exception as e:
        print("Other exception", e);
        return

    if conn.is_connected():
        print("is connected")
    print("Connected:", conn)

    conn.close()

if __name__ == "__main__":
    main()


PostgreSQL

PostgreSQL install

$ sudo aptitude install postgresql

$ sudo -i -u postgres
$ createuser --interactive
  Add "ubuntu" as superuser   (we need a username that matches our Linux username)
$ createdb testdb

$ psql
$ sudo -u postgres psql

$ psql testdb
testdb=# CREATE TABLE people (id INTEGER PRIMARY KEY, name VARCHAR(100));

PostgreSQL with Docker compose

FROM python:3
WORKDIR /opt
RUN pip install psycopg2==2.9.3
# COPY . .

version: '3'
services:
  app:
    build:
      context: .
      dockerfile: Dockerfile
    tty: true
    command: bash
    volumes:
      - .:/opt
    environment:
      POSTGRES_USER: username
      POSTGRES_PASSWORD: password
      POSTGRES_DB: default_database

  pgdatabase:
    image: 'postgres:latest'
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: username
      POSTGRES_PASSWORD: password
      POSTGRES_DB: default_database
    volumes:
      - pg-data-volume:/var/lib/postgresql/data/

volumes:
  pg-data-volume:

docker-compose up
docker exec -it postgresql_app_1 bash

Python and Postgresql

$ sudo aptitude install python3-postgresql
$ sudo aptitude install python3-psycopg2

PostgreSQL connect

import psycopg2

try:
    #conn = psycopg2.connect("postgresql:///testdb")
    conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
    print(f"I am unable to connect to the database: {err}")

PostgreSQL create table

import psycopg2

try:
    #conn = psycopg2.connect("postgresql:///testdb")
    conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
    print(f"I am unable to connect to the database: {err}")

cur = conn.cursor()

try:
    cur.execute("CREATE TABLE people (id INTEGER PRIMARY KEY, name VARCHAR(100))")
    conn.commit()
except Exception as err:
    print(err)

INSERT

import psycopg2

try:
    #conn = psycopg2.connect("postgresql:///testdb")
    conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
    print(f"I am unable to connect to the database: {err}")

cur = conn.cursor()

uid = 1
name = 'Foo'

try:
    cur.execute("INSERT INTO people (id, name) VALUES (%s, %s)", (uid, name))
    conn.commit()
except Exception as err:
    print(err)
duplicate key value violates unique constraint "people_pkey"
DETAIL:  Key (id)=(1) already exists.

INSERT (from command line)

import psycopg2
import sys

if len(sys.argv) != 3:
    exit("Usage: {} ID NAME".format(sys.argv[0]))

uid, name = sys.argv[1:]


try:
    #conn = psycopg2.connect("postgresql:///testdb")
    conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
    print(f"I am unable to connect to the database: {err}")

cur = conn.cursor()

try:
    cur.execute("INSERT INTO people (id, name) VALUES (%s, %s)", (uid, name))
    conn.commit()
except Exception as err:
    print(err)


SELECT

import psycopg2

try:
    #conn = psycopg2.connect("postgresql:///testdb")
    conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
    print(f"I am unable to connect to the database: {err}")

cur = conn.cursor()

try:
    cur.execute("SELECT * from people")
    for row in cur.fetchall():
        print(row)
except Exception as err:
    print(err)

DELETE

import psycopg2

try:
    #conn = psycopg2.connect("postgresql:///testdb")
    conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
    print(f"I am unable to connect to the database: {err}")

cur = conn.cursor()

try:
    cur.execute("DELETE FROM people")
    conn.commit()
except Exception as err:
    print(err)

try:
    cur.execute("SELECT * from people")
    for row in cur.fetchall():
        print(row)
except Exception as err:
    print(err)


MongoDB

MongoDB CRUD

  • CRUD stands for Create, Read, Update, Delete

Install MongoDB support

pip install pymongo
pip install motor

MongoDB in Docker compose

FROM python:3
WORKDIR /opt
COPY requirements.txt .
RUN pip install -r requirements.txt
# COPY . .

version: '3'
services:
  app:
    build:
      context: .
      dockerfile: Dockerfile
    tty: true
    command: bash
    volumes:
      - .:/opt

  mongodb:
    image: mongo:4.0.8
    volumes:
      - mongo-data:/data/db
      - mongo-configdb:/data/configdb

volumes:
  mongo-data:
  mongo-configdb:
pytest

pymongo
motor

docker-compose up
docker exec -it mongodb_app_1 bash

Python MongoDB drop database

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
client.drop_database('demo')

Python MongoDB insert

  • MongoClient
  • insert
  • insert_one

In order to interract with a MongoDB Database we first need to import the MongoClient from the pymongo module.

Then connect to the MongoDB server using the MongoClient() call. Using the resulting client object we can access the database. MonogoDB does not require us to create a database schema and just by accessing a database and inserting data in it we create the database. So we can use any database name here. In the example the name of the database is going to be demo.

Once we have an object representing the database we can access a "collection" in the database. This too is created automatically if it did not exist earlier. In our example we use the name "people" for our collection.

We then calle the insert_one method passing it a dictionary that we prepared earlier.

This will insert the dictionary as a JSON data structure in the selected collection.

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
db = client.demo

foo = {
    'name'      : 'Foo',
    'email'     : 'foo@example.com',
    'birthdate' : datetime.datetime.strptime('2002-01-02', '%Y-%m-%d'),
    'student'   : True,
}

bar = {
    'name'      : 'Bar',
    'email'     : 'bar@example.com',
    'birthdate' : datetime.datetime.strptime('1998-08-03', '%Y-%m-%d'),
    'student'   : True,
    'teacher'   : False,
}

zorg = {
    'name'      : 'Zorg',
    'email'     : 'zorg@corp.com',
    'birthdate' : datetime.datetime.strptime('1995-12-12', '%Y-%m-%d'),
    'teacher'   : True,
}


db.people.insert_one(foo)
db.people.insert_one(bar)
db.people.insert_one(zorg)

MongoDB CLI

$ mongo
> help
...
> show dbs
admin  (empty)
demo   0.078GB
local  0.078GB

> use demo      (name of db)
switched to db demo

> show collections
people
system.indexes

> db.people.find()
{ "_id" : ObjectId("58a3e9b2962d747a9c6e676c"), "email" : "foo@example.com", "student" : true,
    "birthdate" : ISODate("2002-01-02T00:00:00Z"), "name" : "Foo" }
{ "_id" : ObjectId("58a3e9b2962d747a9c6e676d"), "email" : "bar@example.com", "name" : "Bar", "student" : true,
    "birthdate" : ISODate("1998-08-03T00:00:00Z"), "teacher" : false }
{ "_id" : ObjectId("58a3e9b2962d747a9c6e676e"), "email" : "zorg@corp.com",
    "birthdate" : ISODate("1995-12-12T00:00:00Z"), "teacher" : true, "name" : "Zorg" }

> db.people.drop()      (drop a collection)
> db.dropDatabase()     (drop a whole database)

Python MongoDB find

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
db = client.demo

for person in db.people.find():
    print(person)

Python MongoDB find refine

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
db = client.demo

for person in db.people.find({ 'name' : 'Foo'}):
    print(person)

Python MongoDB update

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
db = client.demo

db.people.update_one({ 'name' : 'Zorg'}, { '$set' : { 'salary' : 1000 } })
for p in db.people.find({ 'name' : 'Zorg'}):
    print(p)

# db.people.update_many({ 'name' : 'Zorg'}, { '$set' : { 'salary' : 1000 } })

Python MongoDB remove (delete)

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
db = client.demo

print("before")
for p in db.people.find():
    print(p)

db.people.delete_one({ 'name' : 'Zorg'})

print("after")
for p in db.people.find():
    print(p)

Python MongoDB replace

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
collection = client.demo.cache

data1 = {
    'total': 10,
    'current': 5,
}

data2 = {
    'total': 13,
    'current': 3,
}


def show():
    print(collection.count_documents({}))
    for entry in collection.find({}):
        print(entry)
    print()

collection.drop()
print(collection.count_documents({}))

collection.insert_one({ '_id' : 'stats', **data1})
show()

collection.replace_one({ '_id' : 'stats'}, data2)
show()
0
1
{'_id': 'stats', 'total': 10, 'current': 5}

1
{'_id': 'stats', 'total': 13, 'current': 3}

Python MongoDB upsert

from pymongo import MongoClient
import datetime

client = MongoClient('mongodb://mongodb:27017')
collection = client.demo.cache

data1 = {
    'total': 10,
    'current': 5,
}

data2 = {
    'total': 13,
    'current': 3,
}


def show():
    print(collection.count_documents({}))
    for entry in collection.find({}):
        print(entry)
    print()

collection.drop()
print(collection.count_documents({}))

collection.update_one({ '_id' : 'stats'}, { '$set': data1 }, upsert=True)
show()

collection.update_one({ '_id' : 'stats'}, { '$set': data2 }, upsert=True)
show()
0
1
{'_id': 'stats', 'current': 5, 'total': 10}

1
{'_id': 'stats', 'current': 3, 'total': 13}

Python Mongodb: TypeError: upsert must be True or False

from pymongo import MongoClient
import datetime

import pymongo
print(pymongo.__version__)

client = MongoClient('mongodb://mongodb:27017')
collection = client.demo.data

collection.drop()

print("working:")
collection.update_one({ '_id' : 'stats'}, { '$set': { 'total': 1 } }, upsert=True)


print("failing:")
collection.update_one({ '_id' : 'stats'}, { '$set': { 'total': 2 } }, { "upsert": True })
3.10.1
Traceback (most recent call last):
  File "examples/mongodb/upsert_error.py", line 12, in <module>
    collection.update_one({ '_id' : 'stats'}, { '$set': { 'total': 2 } }, { "upsert": True })
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 998, in update_one
    self._update_retryable(
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 854, in _update_retryable
    return self.__database.client._retryable_write(
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1491, in _retryable_write
    return self._retry_with_session(retryable, func, s, None)
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1384, in _retry_with_session
    return func(session, sock_info, retryable)
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 846, in _update
    return self._update(
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 767, in _update
    common.validate_boolean("upsert", upsert)
  File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/common.py", line 159, in validate_boolean
    raise TypeError("%s must be True or False" % (option,))
TypeError: upsert must be True or False

Python MongoDB Async with motor - connect

import asyncio
import motor.motor_asyncio

async def get_server_info():
    conn_str = 'mongodb://mongodb:27017'
    client = motor.motor_asyncio.AsyncIOMotorClient(conn_str, serverSelectionTimeoutMS=5000)

    try:
        print(await client.server_info())
    except Exception:
        print("Unable to connect to the server.")
loop = asyncio.get_event_loop()
loop.run_until_complete(get_server_info())


Python MongoDB Async with motor - insert and find

import asyncio
import motor.motor_asyncio
import datetime

mary = {
    'name'      : 'Mary',
    'email'     : 'mary@example.com',
    'birthdate' : datetime.datetime.strptime('2002-01-02', '%Y-%m-%d'),
    'student'   : True,
}


async def examples():
    conn_str = 'mongodb://mongodb:27017'
    client = motor.motor_asyncio.AsyncIOMotorClient(conn_str, serverSelectionTimeoutMS=5000)

    db = client.demo
    db.people.insert_one(mary)

    results = db.people.find()
    async for person in results:
        print(person)

loop = asyncio.get_event_loop()
loop.run_until_complete(examples())

Redis

Redis CLI

redis-cli

$ redis-cli
> set name foo
> get name
> set name "foo bar"
> get name

> set a 1
> get a
> incr a
> get a

> set b 1
> keys *
> del b

Redis list keys

import redis
r = redis.StrictRedis()

for k in r.keys('*'):
    print(k)

Redis set get

import redis
r = redis.StrictRedis()

r.set("name", "some value")
print(r.get("name"))

Redis incr

import redis
r = redis.StrictRedis()

r.set("counter", 40) 
print(r.get("counter"))
print(r.incr("counter"))
print(r.incr("counter"))
print(r.get("counter"))


Redis incrby

import redis
r = redis.StrictRedis()

r.set("counter", 19) 
print(r.get("counter"))
print(r.incrby("counter", 23))
print(r.get("counter"))


Redis setex

Set with expiration time in seconds.

import redis
import time
r = redis.StrictRedis()


r.setex("login", 2, 'foobar') 
print(r.get("login"))   # 'foobar'
time.sleep(1)
print(r.get("login"))   # 'foobar'
time.sleep(1)
print(r.get("login"))   # None

SQLAlchemy

SQLAlchemy hierarchy

  • ORM
  • Table, Metadata, Reflection, DDL - standardized language
  • Engine - standardize low-level access (placeholders)

SQLAlchemy engine

  • create_engine
engine = create_engine('sqlite:///test.db')               # relative path
engine = create_engine('sqlite:////full/path/to/test.db') # full path
engine = create_engine('sqlite://')                       # in memory database

PostgreSQL

engine = create_engine('postgresql://user:password@hostname/dbname')
engine = create_engine('postgresql+psycopg2://user:password@hostname/dbname')

MySQL

engine = create_engine("mysql://user:password@hostname/dbname", encoding='latin1') # defaults to utf-8

SQLAlchemy autocommit

  • begin
  • commit
  • rollback

Unlike the underlying database engines, SQLAlchemy uses autocommit. That is, usually we don't need to call commit(), but if we would like to have a transaction we need to start it using begin() and end it either with commit() or with rollback().

SQLAlchemy engine CREATE TABLE

  • execute
  • Engine
import os
from sqlalchemy import create_engine

dbname = 'test.db'
if os.path.exists(dbname):
    os.unlink(dbname)

engine = create_engine('sqlite:///' + dbname)  # Engine

engine.execute('''
    CREATE TABLE person (
        id INTEGER PRIMARY KEY,
        name VARCHAR(100) UNIQUE,
        balance INTEGER NOT NULL
    );
''')


SQLAlchemy engine INSERT

  • INSERT
import os
from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)

engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'Joe', balance = 100)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'Jane', balance = 100)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'Melinda', balance = 100)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'George', balance = 100)


SQLAlchemy engine SELECT

  • fetchone
  • close
  • SELECT
from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT * FROM person WHERE id=:id', id=3)

print(result)            # <sqlalchemy.engine.result.ResultProxy object at 0x1013c9da0>

row = result.fetchone()
print(row)               # (3, 'Melinda', 100)  - Its a tuple
print(row['name'])       # Melinda              - And a dictionary
print(row.name)          # Melinda   - and object with methods for the columns

for k in row.keys():     # keys also works on it
    print(k)             # id, name, balance

result.close()

SQLAlchemy engine SELECT all

import os
from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT * FROM person')

for row in result:
    print(row)

result.close()

# (1, 'Joe', 100)
# (2, 'Jane', 100)
# (3, 'Melinda', 100)
# (4, 'George', 100)

SQLAlchemy engine SELECT fetchall

from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT * FROM person WHERE id >= :id', id=3)

rows = result.fetchall()
print(rows)        # [(3, 'Melinda', 100), (4, 'George', 100)]

result.close()

SQLAlchemy engine SELECT aggregate

from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT COUNT(*) FROM person')

r = result.fetchone()[0]
print(r)

result.close()

SQLAlchemy engine SELECT IN

from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)

results = engine.execute("SELECT * FROM person WHERE name IN ('Joe', 'Jane')")
print(results.fetchall()) # [(2, 'Jane', 100), (1, 'Joe', 100)]

# engine.execute("SELECT * FROM person WHERE name IN (:a0, :a1)", a0 = 'Joe', a1 = 'Jane')

SQLAlchemy engine SELECT IN with placeholders

from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)


names = ['Joe', 'Jane']
placeholders = []
data = {}
for i in range(len(names)):
    placeholders.append(':a' + str(i))
    data['a' + str(i)] = names[i]

# print(placeholders)  # [':a0', ':a1']
# print(data)          # {'a0': 'Joe', 'a1': 'Jane'}

sql = "SELECT * FROM person WHERE name IN ({})".format(', '.join(placeholders))
# print(sql)  #  SELECT * FROM person WHERE name IN (:a0, :a1)

#results = engine.execute(sql, a0 = 'Jane', a1 = 'Joe')
results = engine.execute(sql, **data)
print(results.fetchall()) # [(2, 'Jane', 100), (1, 'Joe', 100)]

SQLAlchemy engine connection

from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)

conn = engine.connect()
results = conn.execute('SELECT balance, name FROM person WHERE id < :id', id = 3)
print(results.fetchall())   # [(100, 'Joe'), (100, 'Jane')]
conn.close()

SQLAlchemy engine transaction

from sqlalchemy import create_engine

dbname = 'test.db'

engine = create_engine('sqlite:///' + dbname)

conn = engine.connect()

trans = conn.begin()

src = 'Joe'
dst = 'Jane'
payment = 3

results = conn.execute("SELECT balance, name FROM person WHERE name = :name", name = src)
src_balance = results.fetchone()[0]
results.fetchall()
print(src_balance)


results = conn.execute("SELECT balance, name FROM person WHERE name = :name", name = dst)
dst_balance = results.fetchone()[0]
results.fetchall()
print(dst_balance)

conn.execute('UPDATE person SET balance = :balance WHERE name=:name', balance = src_balance - payment, name = src)
conn.execute('UPDATE person SET balance = :balance WHERE name=:name', balance = dst_balance + payment, name = dst)

trans.commit()

# trans.rollback()

conn.close()

results = engine.execute("SELECT * FROM person")
print(results.fetchall())

SQLAlchemy engine using context managers

with engine.begin() as trans:
    conn.execute(...)
    conn.execute(...)
    raise Exception()  # for rollback

Exercise: Create table

Create the following schema

CREATE TABLE node (
    id      INTEGER PRIMARY KEY,
    name    VARCHAR(100)
);

CREATE TABLE interface (
    id       INTEGER PRIMARY KEY,
    node_id  INTEGER NOT NULL,
    ipv4     VARCHAR(15) UNIQUE,
    ipv6     VARCHAR(80) UNIQUE,
    FOREIGN KEY (node_id) REFERENCES node(id)
);

CREATE TABLE connection (
    a        INTEGER NOT NULL,
    b        INTEGER NOT NULL,
    FOREIGN KEY (a) REFERENCES interface(id),
    FOREIGN KEY (b) REFERENCES interface(id)
);

Insert a few data items. Write a few select statements.

SQLAlchemy Metada

Describe the Schema, the structure of the database (tables, columns, constraints, etc.) in Python.

  • SQL generation from the metadata, generate to a schema.
  • Reflection (Introspection) - Create the metadata from an existing database, from an existing schema.
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String

metadata = MetaData()
user_table = Table('user', metadata,
                   Column('id', Integer, primary_key=True),
                   Column('name', String(100), unique=True),
                   Column('balance', Integer, nullable=False)
                   )
print(user_table.name)
print(user_table.c.name)
print(user_table.c.id)

print(user_table.c)
print(user_table.columns)  # A bit like a Python dictionary, but it is an associative array



from sqlalchemy import create_engine
engine = create_engine('sqlite://')
metadata.create_all(engine)

from sqlalchemy import ForeignKey

address_table = Table('address', metadata,
                Column('id', Integer, primary_key=True),
                Column('stree', String(100)),
                Column('user_id', Integer, ForeignKey('user.id'))
                )
address_table.create(engine)

from sqlalchemy import Unicode, UnicodeText, ForeignKeyConstraint, DateTime

story_table = Table('story', metadata,
                    Column('id', Integer, primary_key=True),
                    Column('version', Integer, primary_key=True),
                    Column('headline', Unicode(100), nullable=False),
                    Column('body', UnicodeText)
                    )
published_table = Table('published', metadata,
                    Column('id', Integer, primary_key=True),
                    Column('timestamp', DateTime, nullable=False),
                    Column('story_id', Integer, nullable=False),
                    Column('version', Integer, nullable=False),
                    ForeignKeyConstraint(
                        ['story_id', 'version_id'],
                        ['story.story_id', 'story.version_id']
                    )
                )


conn.execute(user_table.insert(), [
    {'username': 'Jack', 'fullname': 'Jack Burger'},
    {'username': 'Jane', 'fullname': 'Jane Doe'}
])

from sqlalchemy import select
select_stmt = select([user_table.c.username, user_table.c.fullname]).where(user_table.c.username == 'ed')
result = conn.execute(select_stmt)
for row in result:
    print(row)

select_stmt = select([user_table])
conn.execute(select_stmt).fetchall()

select_stmt = select([user_table]).where(
    or_(
        user_table.c.username == 'ed',
        user_table.c.usernane == 'wendy'
    )
)

joined_obj = user_table.join(address_table, user_table.c.id = address_table.c.user_id)

SQLAlchemy types

  • Integer() - INT
  • String() - ASCII strings - VARCHAR
  • Unicode() - Unicode string - VARCHAR or NVARCHAR depending on database
  • Boolean() - BOOLEAN, INT, TINYINT depending on db support for boolean type
  • DateTime() - DATETIME or TIMESTAMP returns Python datetime() objects.
  • Float() - floating point values
  • Numeric() - precision numbers using Python Decimal()

SQLAlchemy ORM - Object Relational Mapping

  • Domain model
  • Mapping between Domain Object - Table Row

SQLAlchemy ORM create

import os
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine

Base = declarative_base()


class Person(Base):
    __tablename__ = 'person'
    id   = Column(Integer, primary_key=True)
    name = Column(String(250), nullable=False, unique=True)

class Genre(Base):
    __tablename__ = 'genre'
    id = Column(Integer, primary_key=True)
    name = Column(String(250), nullable=False, unique=True)

class Movie(Base):
    __tablename__ = 'movie'
    id = Column(Integer, primary_key=True)
    title = Column(String(250), nullable=False, unique=True)
    genre_id = Column(Integer, ForeignKey('genre.id'))
    genre = relationship(Genre)

class Cast(Base):
    __tablename__ = 'cast'
    id = Column(Integer, primary_key=True)
    character = Column(String(250))
    person_id = Column(Integer, ForeignKey('person.id'))
    movie_id = Column(Integer, ForeignKey('movie.id'))



if __name__ == '__main__':
    dbname = 'imdb.db'
    if os.path.exists(dbname):
        os.unlink(dbname)
    engine = create_engine('sqlite:///' + dbname)
    Base.metadata.create_all(engine)

SQLAlchemy ORM schema

    echo .schema | sqlite3 imdb.db
CREATE TABLE person (
	id INTEGER NOT NULL, 
	name VARCHAR(250) NOT NULL, 
	PRIMARY KEY (id)
);
CREATE TABLE genre (
	id INTEGER NOT NULL, 
	title VARCHAR(250), 
	PRIMARY KEY (id)
);
CREATE TABLE movie (
	id INTEGER NOT NULL, 
	title VARCHAR(250), 
	genre_id INTEGER, 
	PRIMARY KEY (id), 
	FOREIGN KEY(genre_id) REFERENCES genre (id)
);
CREATE TABLE "cast" (
	id INTEGER NOT NULL, 
	character VARCHAR(250), 
	person_id INTEGER, 
	movie_id INTEGER, 
	PRIMARY KEY (id), 
	FOREIGN KEY(person_id) REFERENCES person (id), 
	FOREIGN KEY(movie_id) REFERENCES movie (id)
);

SQLAlchemy ORM reflection

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base

Base = automap_base()

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.prepare(engine, reflect=True)
Genre = Base.classes.genre

print(Genre.metadata.sorted_tables)

for c in Base.classes:
     print(c)

#session = Session(engine)
#session.add(Address(email_address="foo@bar.com", user=User(name="foo")))
#session.commit()

SQLAlchemy ORM INSERT after automap

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base

Base = automap_base()

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.prepare(engine, reflect=True)
Genre  = Base.classes.genre
Movie  = Base.classes.movie
Person = Base.classes.person
Cast   = Base.classes.cast



session = Session(engine)
for name in ('Action', 'Animation', 'Comedy', 'Documentary', 'Family', 'Horror'):
    session.add(Genre(name = name))

session.add(Movie(title = "Sing", genre_id=2))
session.add(Movie(title = "Moana", genre_id=2))
session.add(Movie(title = "Trolls", genre_id=2))
session.add(Movie(title = "Power Rangers", genre_id=1))

session.commit()




SQLAlchemy ORM INSERT

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.metadata.bind = engine

session = Session(engine)
genre = {}
for name in ('Action', 'Animation', 'Comedy', 'Documentary', 'Family', 'Horror'):
    genre[name] = Genre(name = name)
    session.add(genre[name])

print(genre['Animation'].name) # Animation
print(genre['Animation'].id)   # None
session.commit()

print(genre['Animation'].name) # Animation
print(genre['Animation'].id)   # 2
session.add(Movie(title = "Sing", genre = genre['Animation']))
session.commit()


SQLAlchemy ORM SELECT

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.metadata.bind = engine

session = Session(engine)

for g in session.query(Genre).all():
    print(g.name, g.id)

print("---")
animation = session.query(Genre).filter(Genre.name == 'Animation').one()
print(animation.name, animation.id)

SQLAlchemy ORM SELECT cross tables

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.metadata.bind = engine

session = Session(engine)

movies = session.query(Movie).all()
for m in movies:
    print(m.title, "-", m.genre.name)

SQLAlchemy ORM SELECT and INSERT

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.metadata.bind = engine

session = Session(engine)

animation = session.query(Genre).filter(Genre.name == 'Animation').one()
session.add(Movie(title = "Moana", genre = animation))
session.add(Movie(title = "Trolls", genre = animation))

action = session.query(Genre).filter(Genre.name == 'Action').one()
session.add(Movie(title = "Power Rangers", genre = action))

comedy = session.query(Genre).filter(Genre.name == 'Comedy').one()
session.add(Movie(title = "Gostbuster", genre = comedy))


session.commit()

SQLAlchemy ORM UPDATE

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.metadata.bind = engine

session = Session(engine)

movie = session.query(Movie).filter(Movie.title == 'Gostbuster').one()
print(movie.title)
movie.title = 'Ghostbusters'
session.commit()

print(movie.title)

SQLAlchemy ORM logging

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast

import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

logger = logging.getLogger('demo')
logger.setLevel(logging.INFO)

dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)

Base.metadata.bind = engine

session = Session(engine)


logger.info("Selecting all")
movies = session.query(Movie).all()
for m in movies:
    logger.info("------------")
    #print(m.title, "-", m.genre_id)
    print(m.title, "-", m.genre.name)

Solution: Create table

Create the followig schema

from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
from sqlalchemy import ForeignKey

metadata = MetaData()

node_table = Table('node', metadata,
                   Column('id', Integer, primary_key=True),
                   Column('name', String(100), unique=True)
                   )

interface_table = Table('interface', metadata,
                   Column('id', Integer, primary_key=True),
                   Column('node_id', Integer, ForeignKey('node.id'), nullable=False),
                   Column('ipv4', String(14), unique=True),
                   Column('ipv6', String(80), unique=True),
                   )

connection_table = Table('connection', metadata,
                    Column('a', Integer, ForeignKey('interface.id'), nullable=False),
                    Column('b', Integer, ForeignKey('interface.id'), nullable=False)
                         )

engine = create_engine('sqlite://', echo=True)
metadata.create_all(engine)

Exercise: Inspector

Use the inspector to list all the tables and all the columns in every table.

from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
from sqlalchemy import ForeignKey

metadata = MetaData()

node_table = Table('node', metadata,
                   Column('id', Integer, primary_key=True),
                   Column('name', String(100), unique=True)
                   )

interface_table = Table('interface', metadata,
                   Column('id', Integer, primary_key=True),
                   Column('node_id', Integer, ForeignKey('node.id'), nullable=False),
                   Column('ipv4', String(14), unique=True),
                   Column('ipv6', String(80), unique=True),
                   )

connection_table = Table('connection', metadata,
                    Column('a', Integer, ForeignKey('interface.id'), nullable=False),
                    Column('b', Integer, ForeignKey('interface.id'), nullable=False)
                         )

engine = create_engine('sqlite://', echo=True)
metadata.create_all(engine)


m2 = MetaData()
m2_node_table = Table('node', m2, autoload=True, autoload_with=engine)
m2_interface_table = Table('interface', m2, autoload=True, autoload_with=engine)
print(m2_node_table.columns)
print(m2_interface_table.columns)
print(m2_node_table.__repr__())

from sqlalchemy import inspect

inspector = inspect(engine)
inspector.get_columns('address')
inspector.get_foreign_keys('address')

SQLAlchemy CREATE and DROP

  • metadata.create_all(engine, checkfirst=True|False) emits CREATE statement for all tables.
  • table.create(engine, checkfirst=False|True) emits CREATE statement for a single table.
  • metadata.drop_all(engine, checkfirst=True|False) emits DROPT statement for all the tables.
  • table.drop(engine, checkfirst=False|True) emits DROPT statement for a single table.

metada can create (or drop) the tables in the correct order to maintain the dependencies.

SQLAlchemy Notes

  • Multi-column primary key (composite primary key).
  • Composite foreign key.

SQLAlchemy Meta SQLite CREATE

from sqlalchemy import create_engine
import os
from sqlite_meta_schema import get_meta

dbname = 'test.db'
if os.path.exists(dbname):
    os.unlink(dbname)
engine = create_engine('sqlite:///test.db')

metadata = get_meta()
metadata.create_all(engine)
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
from sqlalchemy import ForeignKey


def get_meta():
    metadata = MetaData()

    node_table = Table('node', metadata,
                       Column('id', Integer, primary_key=True),
                       Column('name', String(100), unique=True)
                       )

    interface_table = Table('interface', metadata,
                            Column('id', Integer, primary_key=True),
                            Column('node_id', Integer, ForeignKey('node.id'), nullable=False),
                            Column('ipv4', String(14), unique=True),
                            Column('ipv6', String(80), unique=True),
                            )

    connection_table = Table('connection', metadata,
                             Column('a', Integer, ForeignKey('interface.id'), nullable=False),
                             Column('b', Integer, ForeignKey('interface.id'), nullable=False)
                             )
    return metadata

SQLAlchemy Meta Reflection

from sqlalchemy import create_engine
import os
#from sqlalchemy import inspect
from sqlalchemy.engine import reflection

dbname = 'test.db'
if not os.path.exists(dbname):
    exit("Database file '{}' could not be found".format(dbname))

engine = create_engine('sqlite:///test.db')
# inspector = inspect(engine)
# print(inspector)
# print(inspector.get_columns('address'))
# print(inspector.get_foreign_keys('address'))

insp = reflection.Inspector.from_engine(engine)
print(insp.get_table_names())

SQLAlchemy Meta INSERT


SQLAlchemy Meta SELECT