Databases and Python
Database
Relational Databases (SQL)
- SQLite
- MySQL - MariaDB
- PostgreSQL
- ...
NoSQL
Types of NoSQL databases
- Document oriented - MongoDB
- Key-Value store - Redis
- Graph - Neo4j
- Tuple store - Apache River, TIBCO
SQLite Database Access
SQLite
-
sqlite
-
SQLite the most popular embedded relational database.
-
sqlite3 - Python library to use SQLite.
Connecting to SQLite database
- connect|sqlite
- cursor|sqlite
This connects to the database in the given file. If the file does not exist yet this will create the file and prepare it to hold an SQLite database. No tables are created at this point and no data is inserted.
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
# use the database here
conn.close()
Connecting to in-memory SQLite database
We can also create in-memory database.
This is not persistent, but it can be useful to load some data into memory and then do fast SQL queries on that database.
import sqlite3
conn = sqlite3.connect(":memory:")
crs = conn.cursor()
# use the database here
conn.close()
Create TABLE in SQLite
- CREATE|sqlite
- execute|sqlite
- commit|sqlite
execute and commit
import sqlite3
def create_table(conn):
crs = conn.cursor()
sql = '''
CREATE TABLE companies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCRCHAR(100) UNIQUE NOT NULL,
established INTEGER NOT NULL,
employees INTEGER DEFAULT 0
)
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
print(f'remove companies.db to show how it works')
conn.commit()
def main():
conn = sqlite3.connect("companies.db")
create_table(conn)
conn.close()
print('done')
if __name__ == "__main__":
main()
INSERT data into SQLite database
- INSERT|sqlite
- ?|sqlite
import sqlite3
sql = 'INSERT INTO companies (name, employees, established) VALUES (?, ?, ?)'
def insert_one(conn, crs):
company_name = 'Hostlocal'
employee_count = 1
year_of_establishment = 2000
try:
crs.execute(sql, (company_name, employee_count, year_of_establishment))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0]) # column name is not unique
conn.commit()
def insert_many(conn, crs):
companies = [
('Google', 150_028, 1998),
('Facebook', 68_177, 2003),
('Apple', 154_000, 1977),
('Microsoft', 181_000, 1975),
]
try:
crs.executemany(sql, companies)
except sqlite3.IntegrityError as err:
print(f'sqlite error: {err.args[0]}')
conn.commit()
def main():
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
insert_one(conn, crs)
insert_many(conn, crs)
conn.close()
print('done')
main()
- Use placeholders (?) supply the data in tuples and to avoid Bobby tables
SELECT data from SQLite database
- SELECT|sqlite
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
employees = 3
sql = 'SELECT * FROM companies WHERE employees >= ?'
for company in crs.execute(sql, (employees,)):
print(company)
print('-----------')
year = 2000
sql = 'SELECT id, name FROM companies WHERE employees >= ? AND established < ?'
for id, name in crs.execute(sql, (employees, year)):
print(f"{id} - {name}")
conn.close()
- Use the result as an iterator.
SELECT aggregate data from SQLite database
- SELECT|sqlite
- COUNT|sqlite
- SUM|sqlite
- fetchone|sqlite
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
employees = 3
year = 2000
sql = 'SELECT COUNT(id) FROM companies WHERE employees >= ? AND established < ?'
crs.execute(sql, (employees, year))
row = crs.fetchone()
print(row)
print(row[0])
name = '%o%'
sql = 'SELECT SUM(employees) FROM companies WHERE name LIKE ? AND established < ?'
crs.execute(sql, (name, year))
row = crs.fetchone()
print(row)
print(row[0])
conn.close()
If expecting only one row, call the fetchone method. If the result set might be empty, then the fetchone might return None. Check for it!
SELECT data from SQLite database into dictionaries
import sqlite3
conn = sqlite3.connect("companies.db")
conn.row_factory = sqlite3.Row
crs = conn.cursor()
employees = 0
year = 2000
sql = 'SELECT * FROM companies WHERE employees >= ? AND established < ?'
for company in crs.execute(sql, (employees, year)):
# returns sqlite3.Row objects, but they can also act as dictionaries
print(f"{company['id']} - {company['name']} - {company['employees']} - {company['established']}")
conn.close()
UPDATE data in SQLite database
-
UPDATE|sqlite
-
UPDATE works quite similar, but it might have a WHERE clause.
import sqlite3
conn = sqlite3.connect("companies.db")
crs = conn.cursor()
name = 'Hostlocal'
sql = 'SELECT employees FROM companies WHERE name = ?'
crs.execute(sql, (name,))
row = crs.fetchone()
employees = row[0]
sql = 'UPDATE companies SET employees=? WHERE name = ?'
crs.execute(sql, (employees+1, name))
conn.commit()
print('-----------')
sql = 'SELECT name, employees FROM companies'
for name, employees in crs.execute(sql):
print(f"{name} - {employees}")
conn.close()
A counter
"""
Counter using an SQLite backend
--list list all the counters
--start name creates the counter for 'name'
name counts for 'name'
"""
import sys
import os
import argparse
import sqlite3
database_file = "counter.db"
def list_counters(crs):
print('List counters:')
for name, count in crs.execute("SELECT name, count FROM counters"):
print(f"{name}: {count}")
def get_counter(crs, name):
crs.execute("SELECT count FROM counters WHERE name = ?", (name,))
line = crs.fetchone()
if line is None:
return None
return line[0]
def increase_counter(conn, crs, name):
counter = get_counter(crs, name)
if counter is None:
print(f"Invalid counter name '{name}' use the --start flag to start a new counter")
return
counter += 1
crs.execute("UPDATE counters SET count=? WHERE name = ?", (counter, name))
conn.commit()
print(f"{name} {counter}")
def start_counter(conn, crs, name):
counter = get_counter(crs, name)
if counter is not None:
print(f"Counter {name} already exists")
return
try:
crs.execute("INSERT INTO counters (name, count) VALUES(?, ?)", (name, 0))
conn.commit()
except sqlite3.IntegrityError as err:
print(f"Name '{name}' already exists")
def get_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--list', action='store_true')
parser.add_argument('--start', action='store_true')
parser.add_argument('name', nargs="?")
args = parser.parse_args()
if args.name is None and not args.list:
parser.print_help()
exit()
return args
def main():
args = get_arguments()
with sqlite3.connect(database_file) as conn:
crs = conn.cursor()
try:
crs.execute('''CREATE TABLE counters (
id PRIMARY KEY,
name VARCRCHAR(100) UNIQUE NOT NULL,
count INTEGER NOT NULL
)''')
except sqlite3.OperationalError as err:
pass
#print(f'sqlite error: {err.args[0]}')
if args.list:
list_counters(crs)
return
if args.start and args.name:
start_counter(conn, crs, args.name)
return
if args.name:
increase_counter(conn, crs, args.name)
return
main()
#print "TODO get the value of 'name' from the database"
# if it was not there then add
#try:
# c.execute('''INSERT INTO companies (name) VALUES ('Stonehenge')''')
#except sqlite3.IntegrityError as e:
# print 'sqlite error: ', e.args[0] # column name is not unique
#conn.commit()
#conn.close()
#print "done"
SQLite in-memory AUTOINCREMENT
- AUTOINCREMENT
- memory
import sqlite3
def create_table(conn, crs):
sql = '''
CREATE TABLE people (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(100) UNIQUE NOT NULL
);
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
conn.commit()
def insert(conn, crs, username):
sql = 'INSERT INTO people (username) VALUES (?)'
try:
crs.execute(sql, (username,))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def list_rows(conn, crs):
sql = "SELECT * FROM people"
for id, username in crs.execute(sql):
print(f"{id} - {username}")
def main():
conn = sqlite3.connect(":memory:")
crs = conn.cursor()
create_table(conn, crs)
insert(conn, crs, "rachel")
list_rows(conn, crs)
insert(conn, crs, "joey")
list_rows(conn, crs)
insert(conn, crs, "monica")
list_rows(conn, crs)
insert(conn, crs, "monica")
list_rows(conn, crs)
conn.close()
print('done')
main()
SQLite in-memory field with DEFAULT value
- DEFAULT
- memory
import sqlite3
def create_table(conn, crs):
sql = '''
CREATE TABLE qa (
question TEXT,
answer TEXT DEFAULT "42"
);
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
conn.commit()
def insert_qa(conn, crs, question, answer):
sql = 'INSERT INTO qa (question, answer) VALUES (?, ?)'
try:
crs.execute(sql, (question, answer))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def insert_q(conn, crs, question):
sql = 'INSERT INTO qa (question) VALUES (?)'
try:
crs.execute(sql, (question,))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def list_rows(conn, crs):
sql = "SELECT * FROM qa"
for question, answer in crs.execute(sql):
print(f"{question} - {answer}")
print("------")
def main():
conn = sqlite3.connect(":memory:")
crs = conn.cursor()
create_table(conn, crs)
insert_qa(conn, crs, "Language?", "Python")
list_rows(conn, crs)
insert_qa(conn, crs, "Database?", "SQLite")
list_rows(conn, crs)
insert_q(conn, crs, "Meaning of life?")
list_rows(conn, crs)
conn.close()
print('done')
main()
SQLite transactions
- TBD
This example is supposed to show the importance of transactions, but so far I failed to replicate the problem as we have in this example
import sqlite3
def create_table(conn, crs):
sql = '''
CREATE TABLE bank (
name TEXT PRIMARY KEY,
balance INTEGER NOT NULL
);
'''
try:
crs.execute(sql)
except sqlite3.OperationalError as err:
print(f'sqlite error: {err.args[0]}') # table companies already exists
conn.commit()
def insert_account(conn, crs, name, balance):
sql = 'INSERT INTO bank (name, balance) VALUES (?, ?)'
try:
crs.execute(sql, (name, balance))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def show(conn, crs):
sql = "SELECT * FROM bank"
for name, balance in crs.execute(sql):
print(f"{name:5}: {balance:>5}")
for (total,) in crs.execute("SELECT SUM(balance) AS total FROM bank"):
print(f"Total: {total:>5}")
print("------")
def update(conn, crs, name, amount):
sql = 'UPDATE bank SET balance = (SELECT balance FROM bank WHERE name = ?) + ? WHERE name = ?';
try:
crs.execute(sql, (name, amount, name))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
conn.commit()
def without_transaction(conn, crs, from_name, to_name, amount):
update(conn, crs, from_name, -amount);
return
update(conn, crs, to_name, amount);
def with_transaction(conn, crs, from_name, to_name, amount):
sql = 'UPDATE bank SET balance = (SELECT balance FROM bank WHERE name = ?) + ? WHERE name = ?';
try:
crs.execute(sql, (from_name, -amount, from_name))
return
crs.execute(sql, (to_name, amount, to_name))
except sqlite3.IntegrityError as err:
print('sqlite error: ', err.args[0])
print("here")
conn.commit()
def main():
conn = sqlite3.connect(":memory:", autocommit=False, isolation_level=None)
crs = conn.cursor()
create_table(conn, crs)
insert_account(conn, crs, "Jane", 0)
insert_account(conn, crs, "Mary", 1000)
insert_account(conn, crs, "Ann", 1000)
show(conn, crs)
update(conn, crs, "Mary", -100)
update(conn, crs, "Jane", +100)
show(conn, crs)
without_transaction(conn, crs, "Mary", "Jane", 100)
show(conn, crs)
with_transaction(conn, crs, "Mary", "Jane", 100)
show(conn, crs)
conn.close()
main()
MySQL
Install MySQL support
- Anaconda on MS Windows: conda install mysql-connector-python
- Otherwise: pip install mysql-connector
Create database user (manually)
$ mysql -u root -p
SHOW DATABASES;
CREATE USER 'foobar'@'localhost' IDENTIFIED BY 'no secret';
GRANT ALL PRIVILEGES ON fb_db . * TO 'foobar'@'localhost';
GRANT ALL PRIVILEGES ON * . * TO 'foobar'@'%' IDENTIFIED BY 'no secret';
FLUSH PRIVILEGES;
exit
vim /etc/mysql/mysql.conf.d/mysqld.cnf
comment out
# bind-address = 127.0.0.1
service mysql restart
Create database (manually)
$ mysql -u foobar -p
CREATE DATABASE fb_db;
DROP DATABASE fb_db;
exit
Create table (manually)
$ mysql -u foobar -p
USE fb_db;
CREATE TABLE person (
id INTEGER PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255),
birthdate DATE,
score REAL
);
INSERT INTO person (name, birthdate, score)
VALUES ("Foo Bar", "1998-05-23", 42.1)
Connect to MySQL
- connect
- close
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
print("Connected:", conn)
conn.close()
if __name__ == "__main__":
main()
$ python3 examples/mysql/connect.py
- Change some of the parameters and try again
Connect to MySQL and Handle exception
import mysql.connector
def main():
try:
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
except mysql.connector.Error as e:
print("MySQL exception: ", e)
return
#except Exception as e:
# print("Other exception", e);
# return
print("Connected:", conn)
conn.close()
if __name__ == "__main__":
main()
Select data
- cursor
- execute
- fetchone
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
row = cursor.fetchone()
print(row)
# cursor.close() # mysql.connector.errors.InternalError: Unread result found.
conn.close()
if __name__ == "__main__":
main()
Select more data
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
while True:
row = cursor.fetchone()
if not row:
break
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Select all data fetchall
- fetchall
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
rows = cursor.fetchall()
print(len(rows))
for row in rows:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Select some data fetchmany
- fetchmany
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person")
size = 2
while True:
rows = cursor.fetchmany(size)
if not rows:
break
print(len(rows))
for row in rows:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Select some data WHERE clause
- WHERE
- %s
import mysql.connector
def main(min_score):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("SELECT * FROM person WHERE score > %s", (min_score,))
size = 2
while True:
rows = cursor.fetchmany(size)
if not rows:
break
print(len(rows))
for row in rows:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main(40)
Select into dictionaries
import mysql.connector
def main():
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM person")
for row in cursor:
print(row)
cursor.close()
conn.close()
if __name__ == "__main__":
main()
Insert data
- INSERT
- commit
import mysql.connector
def main(name, birthdate, score):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute(
"INSERT INTO person (name, birthdate, score) VALUES (%s, %s, %s)",
(name, birthdate, score))
if cursor.lastrowid:
print('last insert id', cursor.lastrowid)
else:
print('last insert id not found')
conn.commit()
conn.close()
if __name__ == "__main__":
main('Monty Python', '1969-10-05', 100)
Update data
- UPDATE
import mysql.connector
def main(uid, score):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("UPDATE person SET score=%s WHERE id=%s",
(score, uid))
conn.commit()
conn.close()
if __name__ == "__main__":
main(12, 32)
Delete data
- DELETE
import mysql.connector
def main(uid):
conn = mysql.connector.connect(
host = 'localhost',
database = 'fb_db',
user = 'foobar',
password='no secret')
cursor = conn.cursor()
cursor.execute("DELETE FROM person WHERE id=%s", (uid,))
conn.commit()
conn.close()
if __name__ == "__main__":
main(11)
Exercise MySQL
- Create a user with a password manually.
- Create a database manually.
- Create a table manually for describing fleet of cars: id, license-plate, year-built, brand, owner. (Owner is the name of the owner)
- Create a program that accepts values on the command line and insterts the data in the database
- Create another program that lists all the cars.
- Improve the selector program to accept command line paramter --minage N and --maxage N and show the cars within those age limits (N is a number of years e.g. 3)
- Create program to delete a car.
- Create program to change the owner of a car.
Exercise: MySQL Connection
Instead of hard-coding the connection details in the script, let's create an INI file that contains the connection information and use that.
[development]
host = localhost
database = fb_db
user = foobar
password = no secret
Solution: MySQL Connection
import configparser
import mysql.connector
config_file = 'examples/mysql/connect.ini'
def read_config(section = 'development'):
print(section)
cp = configparser.ConfigParser()
cp.read(config_file)
if not cp.has_section(section):
raise Exception("No configuration found for '{}'".format(section))
return cp[section]
def main():
try:
db = read_config()
print(db['password'])
print(db)
conn = mysql.connector.connect(**db)
except mysql.connector.Error as e:
print("MySQL exception: ", e)
return
except Exception as e:
print("Other exception", e);
return
if conn.is_connected():
print("is connected")
print("Connected:", conn)
conn.close()
if __name__ == "__main__":
main()
PostgreSQL
PostgreSQL install
$ sudo aptitude install postgresql
$ sudo -i -u postgres
$ createuser --interactive
Add "ubuntu" as superuser (we need a username that matches our Linux username)
$ createdb testdb
$ psql
$ sudo -u postgres psql
$ psql testdb
testdb=# CREATE TABLE people (id INTEGER PRIMARY KEY, name VARCHAR(100));
PostgreSQL with Docker compose
FROM python:3
WORKDIR /opt
RUN pip install psycopg2==2.9.3
# COPY . .
version: '3'
services:
app:
build:
context: .
dockerfile: Dockerfile
tty: true
command: bash
volumes:
- .:/opt
environment:
POSTGRES_USER: username
POSTGRES_PASSWORD: password
POSTGRES_DB: default_database
pgdatabase:
image: 'postgres:latest'
ports:
- 5432:5432
environment:
POSTGRES_USER: username
POSTGRES_PASSWORD: password
POSTGRES_DB: default_database
volumes:
- pg-data-volume:/var/lib/postgresql/data/
volumes:
pg-data-volume:
docker-compose up
docker exec -it postgresql_app_1 bash
Python and Postgresql
$ sudo aptitude install python3-postgresql
$ sudo aptitude install python3-psycopg2
PostgreSQL connect
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
PostgreSQL create table
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("CREATE TABLE people (id INTEGER PRIMARY KEY, name VARCHAR(100))")
conn.commit()
except Exception as err:
print(err)
INSERT
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
uid = 1
name = 'Foo'
try:
cur.execute("INSERT INTO people (id, name) VALUES (%s, %s)", (uid, name))
conn.commit()
except Exception as err:
print(err)
duplicate key value violates unique constraint "people_pkey"
DETAIL: Key (id)=(1) already exists.
INSERT (from command line)
import psycopg2
import sys
if len(sys.argv) != 3:
exit("Usage: {} ID NAME".format(sys.argv[0]))
uid, name = sys.argv[1:]
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("INSERT INTO people (id, name) VALUES (%s, %s)", (uid, name))
conn.commit()
except Exception as err:
print(err)
SELECT
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("SELECT * from people")
for row in cur.fetchall():
print(row)
except Exception as err:
print(err)
DELETE
import psycopg2
try:
#conn = psycopg2.connect("postgresql:///testdb")
conn = psycopg2.connect("dbname='default_database' user='username' host='pgdatabase' password='password'")
except Exception as err:
print(f"I am unable to connect to the database: {err}")
cur = conn.cursor()
try:
cur.execute("DELETE FROM people")
conn.commit()
except Exception as err:
print(err)
try:
cur.execute("SELECT * from people")
for row in cur.fetchall():
print(row)
except Exception as err:
print(err)