Databases and Python
Database
Relational Databases (SQL)
- SQLite
- MySQL - MariaDB
- PostgreSQL
- ...
NoSQL
Types of NoSQL databases
- Document oriented - MongoDB
- Key-Value store - Redis
- Graph - Neo4j
- Tuple store - Apache River, TIBCO
SQLite Database Access
SQLite
-
sqlite
-
SQLite the most popular embedded relational database.
-
sqlite3 - Python library to use SQLite.
Connecting to SQLite database
- connect|sqlite
- cursor|sqlite
This connects to the database in the given file. If the file does not exist yet this will create the file and prepare it to hold an SQLite database. No tables are created at this point and no data is inserted.
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
# use the database here
conn.close()
Connecting to in-memory SQLite database
We can also create in-memory database.
This is not persistent, but it can be useful to load some data into memory and then do fast SQL queries on that database.
import sqlite3
conn = sqlite3.connect(":memory:")
crs = conn.cursor()
# use the database here
conn.close()
Create TABLE in SQLite
- CREATE|sqlite
- execute|sqlite
- commit|sqlite
execute and commit
import sqlite3
def create_table(conn):
crs = conn.cursor()
sql = '''
CREATE TABLE companies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCRCHAR(100) UNIQUE NOT NULL,
established INTEGER NOT NULL,
employees INTEGER DEFAULT 0
)
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
print(f'remove companies.db to show how it works')
conn.commit()
def main():
conn = sqlite3.connect("companies.db")
create_table(conn)
conn.close()
print('done')
if __name__ == "__main__":
main()
INSERT data into SQLite database
- INSERT|sqlite
- ?|sqlite
import sqlite3
sql = 'INSERT INTO companies (name, employees, established) VALUES (?, ?, ?)'
def insert_one(conn, crs):
company_name = 'Hostlocal'
employee_count = 1
year_of_establishment = 2000
try:
crs.execute(sql, (company_name, employee_count, year_of_establishment))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0]) # column name is not unique
conn.commit()
def insert_many(conn, crs):
companies = [
('Google', 150_028, 1998),
('Facebook', 68_177, 2003),
('Apple', 154_000, 1977),
('Microsoft', 181_000, 1975),
]
try:
crs.executemany(sql, companies)
except sqlite3.IntegrityError as err:
print(f'sqlite error: {err.args[0]}')
conn.commit()
def main():
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
insert_one(conn, crs)
insert_many(conn, crs)
conn.close()
print('done')
main()
- Use placeholders (?) supply the data in tuples and to avoid Bobby tables
SELECT data from SQLite database
- SELECT|sqlite
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
employees = 3
sql = 'SELECT * FROM companies WHERE employees >= ?'
for company in crs.execute(sql, (employees,)):
print(company)
print('-----------')
year = 2000
sql = 'SELECT id, name FROM companies WHERE employees >= ? AND established < ?'
for id, name in crs.execute(sql, (employees, year)):
print(f"{id} - {name}")
conn.close()
- Use the result as an iterator.
SELECT aggregate data from SQLite database
- SELECT|sqlite
- COUNT|sqlite
- SUM|sqlite
- fetchone|sqlite
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
employees = 3
year = 2000
sql = 'SELECT COUNT(id) FROM companies WHERE employees >= ? AND established < ?'
crs.execute(sql, (employees, year))
row = crs.fetchone()
print(row)
print(row[0])
name = '%o%'
sql = 'SELECT SUM(employees) FROM companies WHERE name LIKE ? AND established < ?'
crs.execute(sql, (name, year))
row = crs.fetchone()
print(row)
print(row[0])
conn.close()
If expecting only one row, call the fetchone method. If the result set might be empty, then the fetchone might return None. Check for it!
SELECT data from SQLite database into dictionaries
import sqlite3
conn = sqlite3.connect("companies.db")
conn.row_factory = sqlite3.Row
crs = conn.cursor()
employees = 0
year = 2000
sql = 'SELECT * FROM companies WHERE employees >= ? AND established < ?'
for company in crs.execute(sql, (employees, year)):
# returns sqlite3.Row objects, but they can also act as dictionaries
print(f"{company['id']} - {company['name']} - {company['employees']} - {company['established']}")
conn.close()
UPDATE data in SQLite database
-
UPDATE|sqlite
-
UPDATE works quite similar, but it might have a WHERE clause.
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
name = 'Hostlocal'
sql = 'SELECT employees FROM companies WHERE name = ?'
crs.execute(sql, (name,))
row = crs.fetchone()
employees = row[0]
sql = 'UPDATE companies SET employees=? WHERE name = ?'
crs.execute(sql, (employees+1, name))
conn.commit()
print('-----------')
sql = 'SELECT name, employees FROM companies'
for name, employees in crs.execute(sql):
print(f"{name} - {employees}")
conn.close()
A counter
"""
Counter using an SQLite backend
--list list all the counters
--start name creates the counter for 'name'
name counts for 'name'
"""
import sys
import os
import argparse
import sqlite3
database_file = "counter.db"
def list_counters(crs):
print('List counters:')
for name, count in crs.execute("SELECT name, count FROM counters"):
print(f"{name}: {count}")
def get_counter(crs, name):
crs.execute("SELECT count FROM counters WHERE name = ?", (name,))
line = crs.fetchone()
if line is None:
return None
return line[0]
def increase_counter(conn, crs, name):
counter = get_counter(crs, name)
if counter is None:
print(f"Invalid counter name '{name}' use the --start flag to start a new counter")
return
counter += 1
crs.execute("UPDATE counters SET count=? WHERE name = ?", (counter, name))
conn.commit()
print(f"{name} {counter}")
def start_counter(conn, crs, name):
counter = get_counter(crs, name)
if counter is not None:
print(f"Counter {name} already exists")
return
try:
crs.execute("INSERT INTO counters (name, count) VALUES(?, ?)", (name, 0))
conn.commit()
except sqlite3.IntegrityError as err:
print(f"Name '{name}' already exists")
def get_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--start', action='store_true')
parser.add_argument('name', nargs="?")
args = parser.parse_args()
if args.name is None and not args.list:
parser.print_help()
exit()
return args
def main():
args = get_arguments()
with sqlite3.connect(database_file) as conn:
crs = conn.cursor()
try:
crs.execute('''CREATE TABLE counters (
id PRIMARY KEY,
name VARCRCHAR(100) UNIQUE NOT NULL,
count INTEGER NOT NULL
)''')
except sqlite3.OperationalError as err:
pass
#print(f'sqlite error: {err.args[0]}')
if args.list:
list_counters(crs)
return
if args.start and args.name:
start_counter(conn, crs, args.name)
return
if args.name:
increase_counter(conn, crs, args.name)
return
main()
#print "TODO get the value of 'name' from the database"
# if it was not there then add
#try:
# c.execute('''INSERT INTO companies (name) VALUES ('Stonehenge')''')
#except sqlite3.IntegrityError as e:
# print 'sqlite error: ', e.args[0] # column name is not unique
#conn.commit()
#conn.close()
#print "done"
SQLite in-memory AUTOINCREMENT
- AUTOINCREMENT
- memory
import sqlite3
def create_table(conn, crs):
sql = '''
CREATE TABLE people (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(100) UNIQUE NOT NULL
);
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
conn.commit()
def insert(conn, crs, username):
sql = 'INSERT INTO people (username) VALUES (?)'
try:
crs.execute(sql, (username,))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def list_rows(conn, crs):
sql = "SELECT * FROM people"
for id, username in crs.execute(sql):
print(f"{id} - {username}")
def main():
conn = sqlite3.connect(":memory:")
crs = conn.cursor()
create_table(conn, crs)
insert(conn, crs, "rachel")
list_rows(conn, crs)
insert(conn, crs, "joey")
list_rows(conn, crs)
insert(conn, crs, "monica")
list_rows(conn, crs)
insert(conn, crs, "monica")
list_rows(conn, crs)
conn.close()
print('done')
main()
SQLite in-memory field with DEFAULT value
- DEFAULT
- memory
import sqlite3
def create_table(conn, crs):
sql = '''
CREATE TABLE qa (
question TEXT,
answer TEXT DEFAULT "42"
);
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
conn.commit()
def insert_qa(conn, crs, question, answer):
sql = 'INSERT INTO qa (question, answer) VALUES (?, ?)'
try:
crs.execute(sql, (question, answer))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def insert_q(conn, crs, question):
sql = 'INSERT INTO qa (question) VALUES (?)'
try:
crs.execute(sql, (question,))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def list_rows(conn, crs):
sql = "SELECT * FROM qa"
for question, answer in crs.execute(sql):
print(f"{question} - {answer}")
print("------")
def main():
conn = sqlite3.connect(":memory:")
crs = conn.cursor()
create_table(conn, crs)
insert_qa(conn, crs, "Language?", "Python")
list_rows(conn, crs)
insert_qa(conn, crs, "Database?", "SQLite")
list_rows(conn, crs)
insert_q(conn, crs, "Meaning of life?")
list_rows(conn, crs)
conn.close()
print('done')
main()
SQLite transactions
- TBD
This example is supposed to show the importance of transactions, but so far I failed to replicate the problem as we have in this example
import sqlite3
def create_table(conn, crs):
sql = '''
CREATE TABLE bank (
name TEXT PRIMARY KEY,
balance INTEGER NOT NULL
);
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
conn.commit()
def insert_account(conn, crs, name, balance):
sql = 'INSERT INTO bank (name, balance) VALUES (?, ?)'
try:
crs.execute(sql, (name, balance))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def show(conn, crs):
sql = "SELECT * FROM bank"
for name, balance in crs.execute(sql):
print(f"{name:5}: {balance:>5}")
for (total,) in crs.execute("SELECT SUM(balance) AS total FROM bank"):
print(f"Total: {total:>5}")
print("------")
def update(conn, crs, name, amount):
sql = 'UPDATE bank SET balance = (SELECT balance FROM bank WHERE name = ?) + ? WHERE name = ?';
try:
crs.execute(sql, (name, amount, name))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def without_transaction(conn, crs, from_name, to_name, amount):
update(conn, crs, from_name, -amount);
return
update(conn, crs, to_name, amount);
def with_transaction(conn, crs, from_name, to_name, amount):
sql = 'UPDATE bank SET balance = (SELECT balance FROM bank WHERE name = ?) + ? WHERE name = ?';
try:
crs.execute(sql, (from_name, -amount, from_name))
return
crs.execute(sql, (to_name, amount, to_name))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
print("here")
conn.commit()
def main():
conn = sqlite3.connect(":memory:", autocommit=False, isolation_level=None)
crs = conn.cursor()
create_table(conn, crs)
insert_account(conn, crs, "Jane", 0)
insert_account(conn, crs, "Mary", 1000)
insert_account(conn, crs, "Ann", 1000)
show(conn, crs)
update(conn, crs, "Mary", -100)
update(conn, crs, "Jane", +100)
show(conn, crs)
without_transaction(conn, crs, "Mary", "Jane", 100)
show(conn, crs)
with_transaction(conn, crs, "Mary", "Jane", 100)
show(conn, crs)
conn.close()
main()
MySQL
Install MySQL support
- Anaconda on MS Windows: conda install mysql-connector-python
- Otherwise: pip install mysql-connector
Create database user (manually)
$ mysql -u root -p
SHOW DATABASES;
CREATE USER 'foobar'@'localhost' IDENTIFIED BY 'no secret';
GRANT ALL PRIVILEGES ON fb_db . * TO 'foobar'@'localhost';
GRANT ALL PRIVILEGES ON * . * TO 'foobar'@'%' IDENTIFIED BY 'no secret';
FLUSH PRIVILEGES;
exit
vim /etc/mysql/mysql.conf.d/mysqld.cnf
comment out
# bind-address = 127.0.0.1
service mysql restart
Create database (manually)
$ mysql -u foobar -p
CREATE DATABASE fb_db;
DROP DATABASE fb_db;
exit
Create table (manually)
$ mysql -u foobar -p
USE fb_db;
CREATE TABLE person (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255),
birthdate DATE,
score REAL
);
INSERT INTO person (name, birthdate, score)
VALUES ("Foo Bar", "1998-05-23", 42.1)
Connect to MySQL
- connect
- close
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
print("Connected:", conn)
conn.close()
if __name__ == "__main__":
main()
$ python3 examples/mysql/connect.py
- Change some of the parameters and try again
Connect to MySQL and Handle exception
import mysql.connector
def main():
try:
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
except mysql.connector.Error as e:
print("MySQL exception: ", e)
return
#except Exception as e:
# print("Other exception", e);
# return
print("Connected:", conn)
conn.close()
if __name__ == "__main__":
main()
Select data
- cursor
- execute
- fetchone
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
row = cursor.fetchone()
print(row)
# cursor.close() # mysql.connector.errors.InternalError: Unread result found.
conn.close()
if __name__ == "__main__":
main()
Select more data
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
while True:
row = cursor.fetchone()
if not row:
break
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Select all data fetchall
- fetchall
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
rows = cursor.fetchall()
print(len(rows))
for row in rows:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Select some data fetchmany
- fetchmany
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
size = 2
while True:
rows = cursor.fetchmany(size)
if not rows:
break
print(len(rows))
for row in rows:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Select some data WHERE clause
- WHERE
- %s
import mysql.connector
def main(min_score):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person WHERE score > %s", (min_score,))
size = 2
while True:
rows = cursor.fetchmany(size)
if not rows:
break
print(len(rows))
for row in rows:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main(40)
Select into dictionaries
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM person")
for row in cursor:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Insert data
- INSERT
- commit
import mysql.connector
def main(name, birthdate, score):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute(
"INSERT INTO person (name, birthdate, score) VALUES (%s, %s, %s)",
(name, birthdate, score))
if cursor.lastrowid:
print('last insert id', cursor.lastrowid)
else:
print('last insert id not found')
conn.commit()
conn.close()
if __name__ == "__main__":
main('Monty Python', '1969-10-05', 100)
Update data
- UPDATE
import mysql.connector
def main(uid, score):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("UPDATE person SET score=%s WHERE id=%s",
(score, uid))
conn.commit()
conn.close()
if __name__ == "__main__":
main(12, 32)
Delete data
- DELETE
import mysql.connector
def main(uid):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("DELETE FROM person WHERE id=%s", (uid,))
conn.commit()
conn.close()
if __name__ == "__main__":
main(11)
Exercise MySQL
- Create a user with a password manually.
- Create a database manually.
- Create a table manually for describing fleet of cars: id, license-plate, year-built, brand, owner. (Owner is the name of the owner)
- Create a program that accepts values on the command line and insterts the data in the database
- Create another program that lists all the cars.
- Improve the selector program to accept command line paramter --minage N and --maxage N and show the cars within those age limits (N is a number of years e.g. 3)
- Create program to delete a car.
- Create program to change the owner of a car.
Exercise: MySQL Connection
Instead of hard-coding the connection details in the script, let's create an INI file that contains the connection information and use that.
[development]
host = localhost
database = fb_db
user = foobar
password = no secret
Solution: MySQL Connection
import configparser
import mysql.connector
config_file = 'examples/mysql/connect.ini'
def read_config(section = 'development'):
print(section)
cp = configparser.ConfigParser()
cp.read(config_file)
if not cp.has_section(section):
raise Exception("No configuration found for '{}'".format(section))
return cp[section]
def main():
try:
db = read_config()
print(db['password'])
print(db)
conn = mysql.connector.connect(**db)
except mysql.connector.Error as e:
print("MySQL exception: ", e)
return
except Exception as e:
print("Other exception", e);
return
if conn.is_connected():
print("is connected")
print("Connected:", conn)
conn.close()
if __name__ == "__main__":
main()
PostgreSQL
PostgreSQL install
$ sudo aptitude install postgresql
$ sudo -i -u postgres
$ createuser --interactive
Add "ubuntu" as superuser (we need a username that matches our Linux username)
$ createdb testdb
$ psql
$ sudo -u postgres psql
$ psql testdb
testdb=# CREATE TABLE people (id INTEGER PRIMARY KEY, name VARCHAR(100));
PostgreSQL with Docker compose
FROM python:3
WORKDIR /opt
RUN pip install psycopg2==2.9.3
# COPY . .
version: '3'
services:
app:
build:
context: .
dockerfile: Dockerfile
tty: true
command: bash
volumes:
- .:/opt
environment:
POSTGRES_USER: username
POSTGRES_PASSWORD: password
POSTGRES_DB: default_database
pgdatabase:
image: 'postgres:latest'
ports:
- 5432:5432
environment:
POSTGRES_USER: username
POSTGRES_PASSWORD: password
POSTGRES_DB: default_database
volumes:
- pg-data-volume:/var/lib/postgresql/data/
volumes:
pg-data-volume:
docker-compose up
docker exec -it postgresql_app_1 bash
Python and Postgresql
$ sudo aptitude install python3-postgresql
$ sudo aptitude install python3-psycopg2
PostgreSQL connect
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
PostgreSQL create table
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("CREATE TABLE people (id INTEGER PRIMARY KEY, name VARCHAR(100))")
conn.commit()
except Exception as err:
print(err)
INSERT
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
uid = 1
name = 'Foo'
try:
cur.execute("INSERT INTO people (id, name) VALUES (%s, %s)", (uid, name))
conn.commit()
except Exception as err:
print(err)
duplicate key value violates unique constraint "people_pkey"
DETAIL: Key (id)=(1) already exists.
INSERT (from command line)
import psycopg2
import sys
if len(sys.argv) != 3:
exit("Usage: {} ID NAME".format(sys.argv[0]))
uid, name = sys.argv[1:]
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("INSERT INTO people (id, name) VALUES (%s, %s)", (uid, name))
conn.commit()
except Exception as err:
print(err)
SELECT
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("SELECT * from people")
for row in cur.fetchall():
print(row)
except Exception as err:
print(err)
DELETE
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("DELETE FROM people")
conn.commit()
except Exception as err:
print(err)
try:
cur.execute("SELECT * from people")
for row in cur.fetchall():
print(row)
except Exception as err:
print(err)
MongoDB
MongoDB CRUD
- CRUD stands for Create, Read, Update, Delete
Install MongoDB support
pip install pymongo
pip install motor
MongoDB in Docker compose
FROM python:3
WORKDIR /opt
COPY requirements.txt .
RUN pip install -r requirements.txt
# COPY . .
version: '3'
services:
app:
build:
context: .
dockerfile: Dockerfile
tty: true
command: bash
volumes:
- .:/opt
mongodb:
image: mongo:4.0.8
volumes:
- mongo-data:/data/db
- mongo-configdb:/data/configdb
volumes:
mongo-data:
mongo-configdb:
pytest
pymongo
motor
docker-compose up
docker exec -it mongodb_app_1 bash
Python MongoDB drop database
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
client.drop_database('demo')
Python MongoDB insert
- MongoClient
- insert
- insert_one
In order to interract with a MongoDB Database we first need to import the MongoClient from the pymongo module.
Then connect to the MongoDB server using the MongoClient()
call. Using the resulting client object we can
access the database. MonogoDB does not require us to create a database schema and just by accessing a database and
inserting data in it we create the database. So we can use any database name here. In the example the name of the database
is going to be demo
.
Once we have an object representing the database we can access a "collection" in the database. This too is created automatically if it did not exist earlier. In our example we use the name "people" for our collection.
We then calle the insert_one
method passing it a dictionary that we prepared earlier.
This will insert the dictionary as a JSON data structure in the selected collection.
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
db = client.demo
foo = {
'name' : 'Foo',
'email' : 'foo@example.com',
'birthdate' : datetime.datetime.strptime('2002-01-02', '%Y-%m-%d'),
'student' : True,
}
bar = {
'name' : 'Bar',
'email' : 'bar@example.com',
'birthdate' : datetime.datetime.strptime('1998-08-03', '%Y-%m-%d'),
'student' : True,
'teacher' : False,
}
zorg = {
'name' : 'Zorg',
'email' : 'zorg@corp.com',
'birthdate' : datetime.datetime.strptime('1995-12-12', '%Y-%m-%d'),
'teacher' : True,
}
db.people.insert_one(foo)
db.people.insert_one(bar)
db.people.insert_one(zorg)
MongoDB CLI
$ mongo
> help
...
> show dbs
admin (empty)
demo 0.078GB
local 0.078GB
> use demo (name of db)
switched to db demo
> show collections
people
system.indexes
> db.people.find()
{ "_id" : ObjectId("58a3e9b2962d747a9c6e676c"), "email" : "foo@example.com", "student" : true,
"birthdate" : ISODate("2002-01-02T00:00:00Z"), "name" : "Foo" }
{ "_id" : ObjectId("58a3e9b2962d747a9c6e676d"), "email" : "bar@example.com", "name" : "Bar", "student" : true,
"birthdate" : ISODate("1998-08-03T00:00:00Z"), "teacher" : false }
{ "_id" : ObjectId("58a3e9b2962d747a9c6e676e"), "email" : "zorg@corp.com",
"birthdate" : ISODate("1995-12-12T00:00:00Z"), "teacher" : true, "name" : "Zorg" }
> db.people.drop() (drop a collection)
> db.dropDatabase() (drop a whole database)
Python MongoDB find
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
db = client.demo
for person in db.people.find():
print(person)
Python MongoDB find refine
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
db = client.demo
for person in db.people.find({ 'name' : 'Foo'}):
print(person)
Python MongoDB update
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
db = client.demo
db.people.update_one({ 'name' : 'Zorg'}, { '$set' : { 'salary' : 1000 } })
for p in db.people.find({ 'name' : 'Zorg'}):
print(p)
# db.people.update_many({ 'name' : 'Zorg'}, { '$set' : { 'salary' : 1000 } })
Python MongoDB remove (delete)
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
db = client.demo
print("before")
for p in db.people.find():
print(p)
db.people.delete_one({ 'name' : 'Zorg'})
print("after")
for p in db.people.find():
print(p)
Python MongoDB replace
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
collection = client.demo.cache
data1 = {
'total': 10,
'current': 5,
}
data2 = {
'total': 13,
'current': 3,
}
def show():
print(collection.count_documents({}))
for entry in collection.find({}):
print(entry)
print()
collection.drop()
print(collection.count_documents({}))
collection.insert_one({ '_id' : 'stats', **data1})
show()
collection.replace_one({ '_id' : 'stats'}, data2)
show()
0
1
{'_id': 'stats', 'total': 10, 'current': 5}
1
{'_id': 'stats', 'total': 13, 'current': 3}
Python MongoDB upsert
from pymongo import MongoClient
import datetime
client = MongoClient('mongodb://mongodb:27017')
collection = client.demo.cache
data1 = {
'total': 10,
'current': 5,
}
data2 = {
'total': 13,
'current': 3,
}
def show():
print(collection.count_documents({}))
for entry in collection.find({}):
print(entry)
print()
collection.drop()
print(collection.count_documents({}))
collection.update_one({ '_id' : 'stats'}, { '$set': data1 }, upsert=True)
show()
collection.update_one({ '_id' : 'stats'}, { '$set': data2 }, upsert=True)
show()
0
1
{'_id': 'stats', 'current': 5, 'total': 10}
1
{'_id': 'stats', 'current': 3, 'total': 13}
Python Mongodb: TypeError: upsert must be True or False
from pymongo import MongoClient
import datetime
import pymongo
print(pymongo.__version__)
client = MongoClient('mongodb://mongodb:27017')
collection = client.demo.data
collection.drop()
print("working:")
collection.update_one({ '_id' : 'stats'}, { '$set': { 'total': 1 } }, upsert=True)
print("failing:")
collection.update_one({ '_id' : 'stats'}, { '$set': { 'total': 2 } }, { "upsert": True })
3.10.1
Traceback (most recent call last):
File "examples/mongodb/upsert_error.py", line 12, in <module>
collection.update_one({ '_id' : 'stats'}, { '$set': { 'total': 2 } }, { "upsert": True })
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 998, in update_one
self._update_retryable(
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 854, in _update_retryable
return self.__database.client._retryable_write(
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1491, in _retryable_write
return self._retry_with_session(retryable, func, s, None)
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1384, in _retry_with_session
return func(session, sock_info, retryable)
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 846, in _update
return self._update(
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/collection.py", line 767, in _update
common.validate_boolean("upsert", upsert)
File "/home/gabor/venv3/lib/python3.8/site-packages/pymongo/common.py", line 159, in validate_boolean
raise TypeError("%s must be True or False" % (option,))
TypeError: upsert must be True or False
Python MongoDB Async with motor - connect
import asyncio
import motor.motor_asyncio
async def get_server_info():
conn_str = 'mongodb://mongodb:27017'
client = motor.motor_asyncio.AsyncIOMotorClient(conn_str, serverSelectionTimeoutMS=5000)
try:
print(await client.server_info())
except Exception:
print("Unable to connect to the server.")
loop = asyncio.get_event_loop()
loop.run_until_complete(get_server_info())
Python MongoDB Async with motor - insert and find
import asyncio
import motor.motor_asyncio
import datetime
mary = {
'name' : 'Mary',
'email' : 'mary@example.com',
'birthdate' : datetime.datetime.strptime('2002-01-02', '%Y-%m-%d'),
'student' : True,
}
async def examples():
conn_str = 'mongodb://mongodb:27017'
client = motor.motor_asyncio.AsyncIOMotorClient(conn_str, serverSelectionTimeoutMS=5000)
db = client.demo
db.people.insert_one(mary)
results = db.people.find()
async for person in results:
print(person)
loop = asyncio.get_event_loop()
loop.run_until_complete(examples())
Redis
Redis CLI
$ redis-cli
> set name foo
> get name
> set name "foo bar"
> get name
> set a 1
> get a
> incr a
> get a
> set b 1
> keys *
> del b
Redis list keys
import redis
r = redis.StrictRedis()
for k in r.keys('*'):
print(k)
Redis set get
import redis
r = redis.StrictRedis()
r.set("name", "some value")
print(r.get("name"))
Redis incr
import redis
r = redis.StrictRedis()
r.set("counter", 40)
print(r.get("counter"))
print(r.incr("counter"))
print(r.incr("counter"))
print(r.get("counter"))
Redis incrby
import redis
r = redis.StrictRedis()
r.set("counter", 19)
print(r.get("counter"))
print(r.incrby("counter", 23))
print(r.get("counter"))
Redis setex
Set with expiration time in seconds.
import redis
import time
r = redis.StrictRedis()
r.setex("login", 2, 'foobar')
print(r.get("login")) # 'foobar'
time.sleep(1)
print(r.get("login")) # 'foobar'
time.sleep(1)
print(r.get("login")) # None
SQLAlchemy
SQLAlchemy hierarchy
- ORM
- Table, Metadata, Reflection, DDL - standardized language
- Engine - standardize low-level access (placeholders)
SQLAlchemy engine
- create_engine
engine = create_engine('sqlite:///test.db') # relative path
engine = create_engine('sqlite:////full/path/to/test.db') # full path
engine = create_engine('sqlite://') # in memory database
PostgreSQL
engine = create_engine('postgresql://user:password@hostname/dbname')
engine = create_engine('postgresql+psycopg2://user:password@hostname/dbname')
MySQL
engine = create_engine("mysql://user:password@hostname/dbname", encoding='latin1') # defaults to utf-8
SQLAlchemy autocommit
- begin
- commit
- rollback
Unlike the underlying database engines, SQLAlchemy uses autocommit.
That is, usually we don't need to call commit()
, but if we would like to have a transaction we need to
start it using begin()
and end it either with commit()
or with rollback()
.
SQLAlchemy engine CREATE TABLE
- execute
- Engine
import os
from sqlalchemy import create_engine
dbname = 'test.db'
if os.path.exists(dbname):
os.unlink(dbname)
engine = create_engine('sqlite:///' + dbname) # Engine
engine.execute('''
CREATE TABLE person (
id INTEGER PRIMARY KEY,
name VARCHAR(100) UNIQUE,
balance INTEGER NOT NULL
);
''')
SQLAlchemy engine INSERT
- INSERT
import os
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'Joe', balance = 100)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'Jane', balance = 100)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'Melinda', balance = 100)
engine.execute('INSERT INTO person (name, balance) VALUES (:name, :balance)', name = 'George', balance = 100)
SQLAlchemy engine SELECT
- fetchone
- close
- SELECT
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT * FROM person WHERE id=:id', id=3)
print(result) # <sqlalchemy.engine.result.ResultProxy object at 0x1013c9da0>
row = result.fetchone()
print(row) # (3, 'Melinda', 100) - Its a tuple
print(row['name']) # Melinda - And a dictionary
print(row.name) # Melinda - and object with methods for the columns
for k in row.keys(): # keys also works on it
print(k) # id, name, balance
result.close()
SQLAlchemy engine SELECT all
import os
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT * FROM person')
for row in result:
print(row)
result.close()
# (1, 'Joe', 100)
# (2, 'Jane', 100)
# (3, 'Melinda', 100)
# (4, 'George', 100)
SQLAlchemy engine SELECT fetchall
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT * FROM person WHERE id >= :id', id=3)
rows = result.fetchall()
print(rows) # [(3, 'Melinda', 100), (4, 'George', 100)]
result.close()
SQLAlchemy engine SELECT aggregate
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
result = engine.execute('SELECT COUNT(*) FROM person')
r = result.fetchone()[0]
print(r)
result.close()
SQLAlchemy engine SELECT IN
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
results = engine.execute("SELECT * FROM person WHERE name IN ('Joe', 'Jane')")
print(results.fetchall()) # [(2, 'Jane', 100), (1, 'Joe', 100)]
# engine.execute("SELECT * FROM person WHERE name IN (:a0, :a1)", a0 = 'Joe', a1 = 'Jane')
SQLAlchemy engine SELECT IN with placeholders
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
names = ['Joe', 'Jane']
placeholders = []
data = {}
for i in range(len(names)):
placeholders.append(':a' + str(i))
data['a' + str(i)] = names[i]
# print(placeholders) # [':a0', ':a1']
# print(data) # {'a0': 'Joe', 'a1': 'Jane'}
sql = "SELECT * FROM person WHERE name IN ({})".format(', '.join(placeholders))
# print(sql) # SELECT * FROM person WHERE name IN (:a0, :a1)
#results = engine.execute(sql, a0 = 'Jane', a1 = 'Joe')
results = engine.execute(sql, **data)
print(results.fetchall()) # [(2, 'Jane', 100), (1, 'Joe', 100)]
SQLAlchemy engine connection
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
conn = engine.connect()
results = conn.execute('SELECT balance, name FROM person WHERE id < :id', id = 3)
print(results.fetchall()) # [(100, 'Joe'), (100, 'Jane')]
conn.close()
SQLAlchemy engine transaction
from sqlalchemy import create_engine
dbname = 'test.db'
engine = create_engine('sqlite:///' + dbname)
conn = engine.connect()
trans = conn.begin()
src = 'Joe'
dst = 'Jane'
payment = 3
results = conn.execute("SELECT balance, name FROM person WHERE name = :name", name = src)
src_balance = results.fetchone()[0]
results.fetchall()
print(src_balance)
results = conn.execute("SELECT balance, name FROM person WHERE name = :name", name = dst)
dst_balance = results.fetchone()[0]
results.fetchall()
print(dst_balance)
conn.execute('UPDATE person SET balance = :balance WHERE name=:name', balance = src_balance - payment, name = src)
conn.execute('UPDATE person SET balance = :balance WHERE name=:name', balance = dst_balance + payment, name = dst)
trans.commit()
# trans.rollback()
conn.close()
results = engine.execute("SELECT * FROM person")
print(results.fetchall())
SQLAlchemy engine using context managers
with engine.begin() as trans:
conn.execute(...)
conn.execute(...)
raise Exception() # for rollback
Exercise: Create table
Create the following schema
CREATE TABLE node (
id INTEGER PRIMARY KEY,
name VARCHAR(100)
);
CREATE TABLE interface (
id INTEGER PRIMARY KEY,
node_id INTEGER NOT NULL,
ipv4 VARCHAR(15) UNIQUE,
ipv6 VARCHAR(80) UNIQUE,
FOREIGN KEY (node_id) REFERENCES node(id)
);
CREATE TABLE connection (
a INTEGER NOT NULL,
b INTEGER NOT NULL,
FOREIGN KEY (a) REFERENCES interface(id),
FOREIGN KEY (b) REFERENCES interface(id)
);
Insert a few data items. Write a few select statements.
SQLAlchemy Metada
Describe the Schema, the structure of the database (tables, columns, constraints, etc.) in Python.
- SQL generation from the metadata, generate to a schema.
- Reflection (Introspection) - Create the metadata from an existing database, from an existing schema.
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
metadata = MetaData()
user_table = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100), unique=True),
Column('balance', Integer, nullable=False)
)
print(user_table.name)
print(user_table.c.name)
print(user_table.c.id)
print(user_table.c)
print(user_table.columns) # A bit like a Python dictionary, but it is an associative array
from sqlalchemy import create_engine
engine = create_engine('sqlite://')
metadata.create_all(engine)
from sqlalchemy import ForeignKey
address_table = Table('address', metadata,
Column('id', Integer, primary_key=True),
Column('stree', String(100)),
Column('user_id', Integer, ForeignKey('user.id'))
)
address_table.create(engine)
from sqlalchemy import Unicode, UnicodeText, ForeignKeyConstraint, DateTime
story_table = Table('story', metadata,
Column('id', Integer, primary_key=True),
Column('version', Integer, primary_key=True),
Column('headline', Unicode(100), nullable=False),
Column('body', UnicodeText)
)
published_table = Table('published', metadata,
Column('id', Integer, primary_key=True),
Column('timestamp', DateTime, nullable=False),
Column('story_id', Integer, nullable=False),
Column('version', Integer, nullable=False),
ForeignKeyConstraint(
['story_id', 'version_id'],
['story.story_id', 'story.version_id']
)
)
conn.execute(user_table.insert(), [
{'username': 'Jack', 'fullname': 'Jack Burger'},
{'username': 'Jane', 'fullname': 'Jane Doe'}
])
from sqlalchemy import select
select_stmt = select([user_table.c.username, user_table.c.fullname]).where(user_table.c.username == 'ed')
result = conn.execute(select_stmt)
for row in result:
print(row)
select_stmt = select([user_table])
conn.execute(select_stmt).fetchall()
select_stmt = select([user_table]).where(
or_(
user_table.c.username == 'ed',
user_table.c.usernane == 'wendy'
)
)
joined_obj = user_table.join(address_table, user_table.c.id = address_table.c.user_id)
SQLAlchemy types
- Integer() - INT
- String() - ASCII strings - VARCHAR
- Unicode() - Unicode string - VARCHAR or NVARCHAR depending on database
- Boolean() - BOOLEAN, INT, TINYINT depending on db support for boolean type
- DateTime() - DATETIME or TIMESTAMP returns Python datetime() objects.
- Float() - floating point values
- Numeric() - precision numbers using Python Decimal()
SQLAlchemy ORM - Object Relational Mapping
- Domain model
- Mapping between Domain Object - Table Row
SQLAlchemy ORM create
import os
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False, unique=True)
class Genre(Base):
__tablename__ = 'genre'
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False, unique=True)
class Movie(Base):
__tablename__ = 'movie'
id = Column(Integer, primary_key=True)
title = Column(String(250), nullable=False, unique=True)
genre_id = Column(Integer, ForeignKey('genre.id'))
genre = relationship(Genre)
class Cast(Base):
__tablename__ = 'cast'
id = Column(Integer, primary_key=True)
character = Column(String(250))
person_id = Column(Integer, ForeignKey('person.id'))
movie_id = Column(Integer, ForeignKey('movie.id'))
if __name__ == '__main__':
dbname = 'imdb.db'
if os.path.exists(dbname):
os.unlink(dbname)
engine = create_engine('sqlite:///' + dbname)
Base.metadata.create_all(engine)
SQLAlchemy ORM schema
echo .schema | sqlite3 imdb.db
CREATE TABLE person (
id INTEGER NOT NULL,
name VARCHAR(250) NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE genre (
id INTEGER NOT NULL,
title VARCHAR(250),
PRIMARY KEY (id)
);
CREATE TABLE movie (
id INTEGER NOT NULL,
title VARCHAR(250),
genre_id INTEGER,
PRIMARY KEY (id),
FOREIGN KEY(genre_id) REFERENCES genre (id)
);
CREATE TABLE "cast" (
id INTEGER NOT NULL,
character VARCHAR(250),
person_id INTEGER,
movie_id INTEGER,
PRIMARY KEY (id),
FOREIGN KEY(person_id) REFERENCES person (id),
FOREIGN KEY(movie_id) REFERENCES movie (id)
);
SQLAlchemy ORM reflection
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base
Base = automap_base()
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.prepare(engine, reflect=True)
Genre = Base.classes.genre
print(Genre.metadata.sorted_tables)
for c in Base.classes:
print(c)
#session = Session(engine)
#session.add(Address(email_address="foo@bar.com", user=User(name="foo")))
#session.commit()
SQLAlchemy ORM INSERT after automap
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.automap import automap_base
Base = automap_base()
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.prepare(engine, reflect=True)
Genre = Base.classes.genre
Movie = Base.classes.movie
Person = Base.classes.person
Cast = Base.classes.cast
session = Session(engine)
for name in ('Action', 'Animation', 'Comedy', 'Documentary', 'Family', 'Horror'):
session.add(Genre(name = name))
session.add(Movie(title = "Sing", genre_id=2))
session.add(Movie(title = "Moana", genre_id=2))
session.add(Movie(title = "Trolls", genre_id=2))
session.add(Movie(title = "Power Rangers", genre_id=1))
session.commit()
SQLAlchemy ORM INSERT
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.metadata.bind = engine
session = Session(engine)
genre = {}
for name in ('Action', 'Animation', 'Comedy', 'Documentary', 'Family', 'Horror'):
genre[name] = Genre(name = name)
session.add(genre[name])
print(genre['Animation'].name) # Animation
print(genre['Animation'].id) # None
session.commit()
print(genre['Animation'].name) # Animation
print(genre['Animation'].id) # 2
session.add(Movie(title = "Sing", genre = genre['Animation']))
session.commit()
SQLAlchemy ORM SELECT
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.metadata.bind = engine
session = Session(engine)
for g in session.query(Genre).all():
print(g.name, g.id)
print("---")
animation = session.query(Genre).filter(Genre.name == 'Animation').one()
print(animation.name, animation.id)
SQLAlchemy ORM SELECT cross tables
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.metadata.bind = engine
session = Session(engine)
movies = session.query(Movie).all()
for m in movies:
print(m.title, "-", m.genre.name)
SQLAlchemy ORM SELECT and INSERT
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.metadata.bind = engine
session = Session(engine)
animation = session.query(Genre).filter(Genre.name == 'Animation').one()
session.add(Movie(title = "Moana", genre = animation))
session.add(Movie(title = "Trolls", genre = animation))
action = session.query(Genre).filter(Genre.name == 'Action').one()
session.add(Movie(title = "Power Rangers", genre = action))
comedy = session.query(Genre).filter(Genre.name == 'Comedy').one()
session.add(Movie(title = "Gostbuster", genre = comedy))
session.commit()
SQLAlchemy ORM UPDATE
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.metadata.bind = engine
session = Session(engine)
movie = session.query(Movie).filter(Movie.title == 'Gostbuster').one()
print(movie.title)
movie.title = 'Ghostbusters'
session.commit()
print(movie.title)
SQLAlchemy ORM logging
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from orm_create_db import Base, Genre, Movie, Person, Cast
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
logger = logging.getLogger('demo')
logger.setLevel(logging.INFO)
dbname = 'imdb.db'
engine = create_engine('sqlite:///' + dbname)
Base.metadata.bind = engine
session = Session(engine)
logger.info("Selecting all")
movies = session.query(Movie).all()
for m in movies:
logger.info("------------")
#print(m.title, "-", m.genre_id)
print(m.title, "-", m.genre.name)
Solution: Create table
Create the followig schema
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
from sqlalchemy import ForeignKey
metadata = MetaData()
node_table = Table('node', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100), unique=True)
)
interface_table = Table('interface', metadata,
Column('id', Integer, primary_key=True),
Column('node_id', Integer, ForeignKey('node.id'), nullable=False),
Column('ipv4', String(14), unique=True),
Column('ipv6', String(80), unique=True),
)
connection_table = Table('connection', metadata,
Column('a', Integer, ForeignKey('interface.id'), nullable=False),
Column('b', Integer, ForeignKey('interface.id'), nullable=False)
)
engine = create_engine('sqlite://', echo=True)
metadata.create_all(engine)
Exercise: Inspector
Use the inspector to list all the tables and all the columns in every table.
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
from sqlalchemy import ForeignKey
metadata = MetaData()
node_table = Table('node', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100), unique=True)
)
interface_table = Table('interface', metadata,
Column('id', Integer, primary_key=True),
Column('node_id', Integer, ForeignKey('node.id'), nullable=False),
Column('ipv4', String(14), unique=True),
Column('ipv6', String(80), unique=True),
)
connection_table = Table('connection', metadata,
Column('a', Integer, ForeignKey('interface.id'), nullable=False),
Column('b', Integer, ForeignKey('interface.id'), nullable=False)
)
engine = create_engine('sqlite://', echo=True)
metadata.create_all(engine)
m2 = MetaData()
m2_node_table = Table('node', m2, autoload=True, autoload_with=engine)
m2_interface_table = Table('interface', m2, autoload=True, autoload_with=engine)
print(m2_node_table.columns)
print(m2_interface_table.columns)
print(m2_node_table.__repr__())
from sqlalchemy import inspect
inspector = inspect(engine)
inspector.get_columns('address')
inspector.get_foreign_keys('address')
SQLAlchemy CREATE and DROP
- metadata.create_all(engine, checkfirst=True|False) emits CREATE statement for all tables.
- table.create(engine, checkfirst=False|True) emits CREATE statement for a single table.
- metadata.drop_all(engine, checkfirst=True|False) emits DROPT statement for all the tables.
- table.drop(engine, checkfirst=False|True) emits DROPT statement for a single table.
metada can create (or drop) the tables in the correct order to maintain the dependencies.
SQLAlchemy Notes
- Multi-column primary key (composite primary key).
- Composite foreign key.
SQLAlchemy Meta SQLite CREATE
from sqlalchemy import create_engine
import os
from sqlite_meta_schema import get_meta
dbname = 'test.db'
if os.path.exists(dbname):
os.unlink(dbname)
engine = create_engine('sqlite:///test.db')
metadata = get_meta()
metadata.create_all(engine)
from sqlalchemy import MetaData
from sqlalchemy import Table, Column
from sqlalchemy import Integer, String
from sqlalchemy import ForeignKey
def get_meta():
metadata = MetaData()
node_table = Table('node', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(100), unique=True)
)
interface_table = Table('interface', metadata,
Column('id', Integer, primary_key=True),
Column('node_id', Integer, ForeignKey('node.id'), nullable=False),
Column('ipv4', String(14), unique=True),
Column('ipv6', String(80), unique=True),
)
connection_table = Table('connection', metadata,
Column('a', Integer, ForeignKey('interface.id'), nullable=False),
Column('b', Integer, ForeignKey('interface.id'), nullable=False)
)
return metadata
SQLAlchemy Meta Reflection
from sqlalchemy import create_engine
import os
#from sqlalchemy import inspect
from sqlalchemy.engine import reflection
dbname = 'test.db'
if not os.path.exists(dbname):
exit("Database file '{}' could not be found".format(dbname))
engine = create_engine('sqlite:///test.db')
# inspector = inspect(engine)
# print(inspector)
# print(inspector.get_columns('address'))
# print(inspector.get_foreign_keys('address'))
insp = reflection.Inspector.from_engine(engine)
print(insp.get_table_names())
SQLAlchemy Meta INSERT
SQLAlchemy Meta SELECT