Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Python Other

PyCharm

PyCharm Intro

  • IDE - Integrated Development Environment
  • Introspection (understands Python)
  • Running, Debugging
  • Refactoring

PyCharm configure interpreter

  • Mac: PyCharm / Preferences / Project: (name) / Project Interpreter
  • Windows/Linux: File / Settings / Project Interpreter

PyCharm install modules

  • Same place where we set the interpreter

PyCharm Project

  • At the opening create a new project (directory + Python version)
  • File/New Project

PyCharm Files

  • New file
  • Open file
  • Ctrl-Shift-N

PyCharm - run code

  • Run/Run
  • Set command line parameters
  • Set environment variables
import sys
import datetime
import random

def main():
    limit = get_limit()
    print(limit)

    date = datetime.datetime.now()

    rnd = random.randrange(2, 6)
    print(rnd)

    count(limit)
    print("after count")

def get_limit():
    limit = 10
    if len(sys.argv) == 2:
        limit = int(sys.argv[1])
    return limit

def count(limit):
    for ix in range(limit):
        div = ix - 12
        show(ix, ix / div)

def show(number, result):
    print(number, result)


if __name__ == '__main__':
    main()

PyCharm - debugging code

  • Set fixed Breakpoints (click on line next to row-number to have a red circle)
  • Run/Debug
  • Inspect variables
  • Conditional breakpoint
  • Step in function
  • Step out of function
  • Step over function

PyCharm Terminal

  • Bottom "Terminal"

PyCharm Python console at the bottom left

  • Bottom "Python Console"
 2 + 3
 x = 2
 print(x)
 def f(x, y):
    return x+y

 f(4, 5)

Refactoring example with PyCharm

  • Change variable name (in scope only)
def add(x, y):
    z = x + y
    return z

def multiply(x, y):
    z = x * y
    return z

x = 2
y = 3
z = add(x, y)
print(z)

z = multiply(x, y)
print(z)
  • Extract method

Visual Studio Code

VS Code Intro

  • Generic IDE - Integrated Development Environment

  • Tons of plugins

  • Open Source

  • Developed by Microsoft

  • Introspection (understands Python)

  • Running, Debugging

  • Refactoring

VS Code Project or Single file

  • Open File
  • Open Folder

Install the Python Extension

  • Usually VS Code will suggest you to install the plugin when you open a Python file with .py extension.

  • If not, click on the icon on the left and search for "python" (by Microsoft)

VS Code examples

  • Run program
  • Debug program
  • Set breakpoint
  • Set conditional breakpoint
def fib(n):
    if int(n) != n or n <= 0:
        raise ValueError("Bad parameter")

    if n == 1:
        return 1
    if n == 2:
        return 1
    return fib(n-1) + fib(n-2)

print(3, fib(3))    # 2
print(30, fib(30))  # 832040

fib(0.5)

  • Set argv
import sys

def main():
    if len(sys.argv) != 3:
        exit("Needs 2 arguments:  width length")

    width  = int( sys.argv[1] )
    length = int( sys.argv[2] )

    

    if length <= 0:
        exit("length is not positive")

    if width <= 0:
        exit("width is not positive")

    area = length * width
    print("The area is ",  area)

main()
  • Refactor "z" to "operator"
import random

def count():
    for x in range(1000):
        v = random.choice("abcd")
        print(x)
        print(v)

def add(x, y):
    return x + y

def multiply(x, y):
    return x * y


def calc(x, y, z):
    if z == "+":
        return x + y
    if z == "*":
        return x * y
    if z == "-":
        return x - y
    if z == "/":
        return x / y
    raise Exception(f"Unknown operator {z}")

import mylib

mylib.count()
print(mylib.calc( 2, 3, "+"))
print(mylib.calc( 2, 3, "*"))

PyPi - Python Package Index

What is PyPi?

pip

  • pip
$ pip install package_name

Configure pip on Windows to avoid SSL issues

On the command line:

pip install --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host files.pythonhosted.org  PACKAGE_NAME

Run the following command to get the list of configuration files:

pip config -v list

You will see something like this: (your username instead of FooBar)

For variant 'global', will try loading 'C:\ProgramData\pip\pip.ini'
For variant 'user', will try loading 'C:\Users\FooBar\pip\pip.ini'
For variant 'user', will try loading 'D:\Data\Users\FooBar\AppData\Roaming\pip\pip.ini'
For variant 'site', will try loading 'C:\Users\FooBar\AppData\Local\Programs\Python\Python310\pip.ini'

Create the first pip.ini file with the following content:

[global]
trusted-host = pypi.org files.pythonhosted.org pypi.python.org

If you run the pip config -v list again, you'll see an additional line on the output:  

global.trusted-host='pypi.org, files.pythonhosted.org ,pypi.python.org'

pip will now disregard the SSL issues.

Upgrade pip

  • pip install --upgrade pip Will probably not work on Windows because file is in use...

Upgrade PIP on Windows

py -m pip install --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --upgrade pip

PYTHONPATH

export PYTHONPATH=~/python

Requirements

numpy
pandas
requests
flask>=1.00
pip install -r requirements.txt

Virtualenv

  • virtualenv

On Linux/macOS:

$ cd project_dir
$ virtualenv -p python3 venv
$ source venv/bin/activate
$ ...
$ deactivate

On Windows:

venv\Scripts\activate.bat
...
deactivate

The virtualenv command will create a copy of python in the given directory inside the current directory. In the above example it will create the copy in the 'venv' directory inside the 'project_dir'. After source-ing the 'activate' file the PATH will include the local python with a local version of pip. This requires bash or zsh.

See also the Python guide.

Web client - web scraping

get HTML page using urllib

  • urllib

urllib is a rather low level library. It comes standard with Python.

import urllib.request

# fh is like a filehandle
with urllib.request.urlopen('https://python.org/') as fh:
    html = fh.read()

print(html)

Download image using urllib

Usually you will want to save the downloaded image to the local disk.

import urllib.request

url = 'https://www.python.org/images/python-logo.gif'
with urllib.request.urlopen(url) as fh:
    with open('logo.gif', 'wb') as out:
        out.write(fh.read())

get HTML page using requests

  • requests

requests is the de-facto standard in Python for dealing with web pages as a web client.

import requests

res = requests.get('https://python.org/')
print(type(res))
print(res.status_code)
print(res.headers)
print(res.headers['content-type'])
# print(res.content)

Download image using requests

import requests

url = 'https://www.python.org/images/python-logo.gif'
filename = 'logo.gif'
res = requests.get(url)
print(res.status_code)
with open(filename, 'wb') as out:
    out.write(res.content)

Download image as a stream using requests

OK, this is not such a good example for streaming.

import requests
import shutil

url = 'https://bloximages.newyork1.vip.townnews.com/wpsdlocal6.com/content/tncms/assets/v3/editorial/7/22/722f8401-e134-5758-9f4b-a542ed88a101/5d41b45d92106.image.jpg'
filename = "source.jpg"
res = requests.get(url, stream=True)
print(res.status_code)
with open(filename, 'wb') as fh:
    res.raw.decode_content
    shutil.copyfileobj(res.raw, fh)

Download zip file using requests

import requests
import shutil

url = "https://code-maven.com/public/developer_survey_2019.zip"
filename = "developer_survey_2019.zip"

res = requests.get(url, stream=True)
print(res.status_code)
if res.status_code == 200:
    with open(filename, 'wb') as fh:
        res.raw.decode_content
        shutil.copyfileobj(res.raw, fh)

Extract zip file

  • zipfile
  • unzip
  • zip

This is unrelated, but once you have downloaded a zip file you will need to be able to extract its content. This example shows how to unzip a file already on your disk.

import zipfile

path = "developer_survey_2019.zip"
zf = zipfile.ZipFile(path)
zf.extractall()

Beautiful Soup to parse HTML

from bs4 import BeautifulSoup
import requests

url = 'https://en.wikipedia.org/wiki/Main_Page'
res = requests.get(url)
if res.status_code != 200:
    exit(f"Error in getting the page. Status code: {res.status_code}")
html = res.content

soup = BeautifulSoup(html, features='lxml')
print(soup.title.text)

for link in soup.find_all("a", limit=3):
    print(link)
    print(link.text)
    print(link.attrs.get('href'))
    print()

print('-----------------------------------------')

forms = soup.select("#searchform")
if forms is not None:
    print(forms)
    form = forms[0] # We used an ID to search we expect to have 0 or one matches in the list
    print()
    print('Action: ', form.attrs.get('action'))

    # Search inside that element we found earlier
    for inp in form.find_all('input'):
        print('id: ', inp.attrs.get('id'))
print('-----------------------------------------')

tfa = soup.select("#mp-tfa")
if tfa is not None:
    #print(tfa)
    paras = tfa[0].select("p")
    if paras is not None:
        #print(paras)
        links = paras[0].find_all("a", limit=1)
        if links:
            print(links[0].text)
            print(links[0].attrs.get('href'))

requests - JSON - API

Downloading HTML pages and parsing them to extract data can be a lot of fun, but it is also very unstable. Page layouts will change. The code will break easily. In many cases there is a better way. Use the API provided by the site.

httpbin.org

requests get from httpbin - JSON

import requests

res = requests.get('https://httpbin.org/get')
print(type(res))
print(res.status_code)
print()
print(res.headers)
print()
#print(res.content)
print()
print(res.json())
data = res.json()
print(type(data))

requests get IP from httpbin - JSON

import requests

res = requests.get('http://httpbin.org/ip')
print(res.headers['content-type'])
print(res.text)
print()

data = res.json()
print(data)
print()
print(data['origin'])

requests get JSON User-Agent

When our browser sends a requests it identifies itself.

import requests

res = requests.get('http://httpbin.org/user-agent')
#print(res.headers['content-type'])
#print(res.text)
data = res.json()
print(data)
print(data['user-agent'])

requests change User-Agent

import requests

res = requests.get('http://httpbin.org/user-agent',
    headers = {'User-agent': 'Internet Explorer/2.0'})
# print(res.headers['content-type'])
# print(res.text)

data = res.json()
print(data)
print(data['user-agent'])

requests get header

httpbin makes it easy to see what kind of headers your browser sends. Not only the User Agent

import requests

res = requests.get('https://httpbin.org/headers')
print(res.text)

# {
#   "headers": {
#     "Accept": "*/*",
#     "Accept-Encoding": "gzip, deflate",
#     "Host": "httpbin.org",
#     "User-Agent": "python-requests/2.3.0 CPython/2.7.12 Darwin/16.3.0"
#   }
# }

print()
data = res.json()
print(data)
#print(data['headers'])

requests change header

  • requests

The requests module too sends a set of default headers, but you can tell it to send other fields and values as well. This examples shows how to set some additional headers.

import requests

res = requests.get('http://httpbin.org/headers',
        headers = {
            'User-agent'  : 'Internet Explorer/2.0',
            'SOAPAction'  : 'http://www.corp.net/some/path/CustMsagDown.Check',
            'Content-type': 'text/xml'
        }
    )
print(res.text)

# {
#   "headers": {
#     "Accept": "*/*",
#     "Accept-Encoding": "gzip, deflate",
#     "Content-Type": "text/xml",
#     "Host": "httpbin.org",
#     "Soapaction": "http://www.corp.net/some/path/CustMsagDown.Check",
#     "User-Agent": "Internet Explorer/2.0"
#   }
# }

requests post

  • requests
  • POST

We can also send POST requests to an address with any payload (content).

import requests

payload = '''
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:cus="http://www.corp.net/Request.XSD">
    <soapenv:Header/>
    <soapenv:Body>
       <cus:CustMsagDown.Check>
           <cus:MainCustNum>327</cus:MainCustNum>
           <cus:SourceSystem></cus:SourceSystem>
       </cus:CustMsagDown.Check>
    </soapenv:Body>
</soapenv:Envelope>
'''

res = requests.post('http://httpbin.org/post',
    headers = {
        'User-agent'  : 'Internet Explorer/2.0',
        'SOAPAction'  : 'http://www.corp.net/some/path/CustMsagDown.Check',
        'Content-type': 'text/xml'
    },
    data = payload,
)
print(res.headers['content-type'])
print(res.text)

Interactive Requests

import requests

r = requests.get('http://httpbin.org/')

import code
code.interact(local=locals())

Download the weather - scraping

Download the weather - API call with requests

import configparser
import requests
import sys
import os

def get_api_key():
    config_file = 'config.ini'
    if not os.path.exists(config_file):
        exit(f"File {config_file} must exists with an [openweathermap] section and an api=  field")
    config = configparser.ConfigParser()
    config.read(config_file)
    return config['openweathermap']['api']

def get_weather(api_key, location):
    url = "https://api.openweathermap.org/data/2.5/weather?q={}&units=metric&appid={}".format(location, api_key)
    r = requests.get(url)
    return r.json()

def main():
    if len(sys.argv) != 2:
        exit("Usage: {} LOCATION".format(sys.argv[0]))
    location = sys.argv[1]

    api_key = get_api_key()
    weather = get_weather(api_key, location)

    print(weather)
    print()
    print(weather['main']['temp'])


if __name__ == '__main__':
    main()

Download the weather - API call with requests

pip install openweathermap-simplified
from openweathermap import get_daily_forecast, APIError

try:
    #forecast = get_daily_forecast(49.24966, -123.11934)  # Vancouver, BC, Canada
    forecast = get_daily_forecast(34.8186, 31.8969)  # Rehovot, Israel
    #forecast = get_daily_forecast('Rehovot') 
    print(forecast)
except APIError as err:
    # Deal with missing/incorrect API key or failed requests
    print(err)

Tweet

import configparser
import twitter
import os

config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'api.cfg'));
api = twitter.Api( **config['twitter'] )

status = api.PostUpdate('My first Tweet using Python')
print(status.text)

bit.ly

import configparser
import os
import requests

def shorten(uri):
    config = configparser.ConfigParser()
    #config.read(os.path.join(os.path.expanduser('~'), 'api.cfg'))
    config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'api.cfg'))

    query_params = {
        'access_token': bitly_config['bitly']['access_token'],
        'longUrl': uri
    }

    endpoint = 'https://api-ssl.bitly.com/v3/shorten'
    response = requests.get(endpoint, params=query_params, verify=False)

    data = response.json()

    if not data['status_code'] == 200:
        exit("Unexpected status_code: {} in bitly response. {}".format(data['status_code'], response.text))
    return data['data']['url']

print(shorten("http://code-maven.com/"))

API config file

{% embed include file="src/examples/web-client/api.cfg)

Exercise: Combine web server and client

Write a web application that can get a site and a text as input (e.g. http://cnn.com and 'Korea') check if on the given site the word appears or not?

Extended version: Only get the URL as the input and create statistics, which are the most frequent words on the given page.

Python Web server

Hello world web

  • WSGI
  • CGI
from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server

import time

def hello_world(environ, start_response):
    setup_testing_defaults(environ)

    status = '200 OK'
    headers = [('Content-type', 'text/plain; charset=utf-8')]

    start_response(status, headers)

    res = f"Hello World {time.time()}".encode('utf-8')
    return [res]

port = 8080
with make_server('0.0.0.0', port, hello_world) as httpd:
    print("Serving on port {}...".format(port))
    httpd.serve_forever()

Dump web environment info

from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server

# A relatively simple WSGI application. It's going to print out the
# environment dictionary after being updated by setup_testing_defaults
def simple_app(environ, start_response):
    setup_testing_defaults(environ)

    status = '200 OK'
    headers = [('Content-type', 'text/plain')]

    start_response(status, headers)

    ret = ["{}: {}\n".format(key, value)
           for key, value in environ.iteritems()]
    return ret

httpd = make_server('', 8000, simple_app)
print("Serving on port 8000...")
httpd.serve_forever()

# taken from the standard documentation of Python

Web echo

from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server

import time
import cgi

def hello_world(environ, start_response):
    setup_testing_defaults(environ)

    status = '200 OK'
    headers = [('Content-type', 'text/html')]

    start_response(status, headers)

    form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ)
    if 'txt' in form:
       return 'Echo: ' + form['txt'].value

    return """
<form>
<input name="txt" />
<input type="submit" value="Echo" />
</form>
"""
httpd = make_server('', 8000, hello_world)
print("Serving on port 8000...")
httpd.serve_forever()

Web form

from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server

import time
import cgi

def hello_world(environ, start_response):
    setup_testing_defaults(environ)

    status = '200 OK'
    headers = [('Content-type', 'text/html')]

    start_response(status, headers)

    form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ)
    html = ''
    for f in form:
       html += f + '==' + form[f].value + '<br>'

    if not html:
        html = """
<a href="?fname=Foo&lname=Bar">click</a>
<form>
Username: <input name="username" /><br>
Password: <input type="password" name="pw" /><br>
Age group: Under 18 <input type="radio" name="age" value="kid" >
18-30 <input type="radio" name="age" value="young" >
30- <input type="radio" name="age" value="old" >
<input type="submit" value="Send" />
</form>
"""
    return html

httpd = make_server('', 8000, hello_world)
print("Serving on port 8000...")
httpd.serve_forever()

Resources

Networking

Secure shell

ssh

  • On Windows install putty
import subprocess
import sys

if len(sys.argv) !=2:
    exit("Usage: " + sys.argv[0] + " hostname")

host = sys.argv[1]
command = "uname -a"

ssh = subprocess.Popen(["ssh", host, command],
                       shell=False,
                       stdout=subprocess.PIPE,
                       stderr=subprocess.PIPE)
result = ssh.stdout.readlines()
error = ssh.stderr.readlines()
if error:
    for err in error:
        sys.stderr.write("ERROR: {}\n".format(err))
if result:
    print(result)

ssh from Windows

$ ssh foobar@hostname-or-ip
  -o "StrictHostKeyChecking no" 

$ plink.exe -ssh foobar@hostname-or-ip -pw "password" -C "uname -a"
$ plink.exe", "-ssh", "foobar@username-or-ip", "-pw", "no secret", "-C", "uname -a"
import subprocess
import sys

ssh = subprocess.Popen([r"c:\Users\foobar\download\plink.exe", "-ssh",
                    "foobar@username-or-ip",
                    "-pw", "password",
                    "-C", "uname -a"],
                       shell=False,
                       stdout=subprocess.PIPE,
                       stderr=subprocess.PIPE)
result = ssh.stdout.readlines()
error = ssh.stderr.readlines()
if error:
    for err in error:
        sys.stderr.write("ERROR: {}\n".format(err))
if result:
    print(result)

Parallel ssh

from pssh import ParallelSSHClient
hosts = ['myhost1', 'myhost2']
client = ParallelSSHClient(hosts)
output = client.run_command('ls -ltrh /tmp/', sudo=True)

telnet

import telnetlib

hostname  = '104.131.87.33'
user = 'gabor'
password = 'robag'

tn = telnetlib.Telnet(hostname)
tn.read_until("login: ")
tn.write(user + "\n")

tn.read_until("Password: ")
tn.write(password + "\n")
tn.read_until("~$")

tn.write("hostname\n")
print(tn.read_until("~$"))
print("-------");


tn.write("uptime\n")
print(tn.read_until("~$"))
print("-------");


print("going to exit")
tn.write("exit\n")

print("--------")
print(tn.read_all())

prompt for password

import getpass

password = getpass.getpass("Password:")

print(password)


ftp

$ sudo aptitude install proftpd
$ sudo /etc/init.d/proftpd start
$ sudo adduser   (user: foo pw: bar)
from ftplib import FTP
ftp = FTP('localhost')
ftp.login("foo", "bar")

print(ftp.retrlines('LIST'))

print('-------')
for f in ftp.nlst():
    print("file: " + f)

filename = 'ssh.py'

ftp.storlines("STOR " + filename, open(filename))

print('-------')
for f in ftp.nlst():
    print("file: " + f)

ftp.delete(filename)

print('-------')
for f in ftp.nlst():
    print("file: " + f)


 
-rw-rw-r--   1 foo      foo             6 Feb 18 19:18 a.txt
-rw-rw-r--   1 foo      foo             6 Feb 18 19:18 b.txt
226 Transfer complete
-------
file: b.txt
file: a.txt
-------
file: b.txt
file: a.txt
file: ssh.py
-------
file: b.txt
file: a.txt

Interactive shell

The Python interactive shell

  • len

Type python without any arguments on the command line and you'll get into the Interactive shell of Python. In the interactive shell you can type:

>>> print "hello"
hello

>>> "hello"
'hello'

>>> 6
6

>>> len("abc")
3

>>> "abc" + 6
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot concatenate 'str' and 'int' objects

>>> "abc" + str(6)
'abc6'

REPL - Read Evaluate Print Loop

  • int
  • float
  • REPL

A variable comes to existence the first time we assign a value to it. It points to an object and that object knows about its type.

>>> a = "abc"
>>> len(a)
3

>>> a = '3'
>>> a + 3
Traceback (most recent call last):
  File "<stdin>", line 1, in &lt;module>
TypeError: cannot concatenate 'str' and 'int' objects

>>> int(a) + 3
6

>>> a = '2.3'
>>> float(a) + 1
3.3

Using Modules

  • import
  • sys
  • version
  • executable

Python has lots of standard (and not standard) modules. You can load one of them using the import keyword. Once loaded, you can use functions from the module or access its objects. For example the sys module has a sys.version and a sys.executable variable.

>>> import sys
>>> sys.version
'2.7.3 (default, Apr 10 2012, 23:24:47) [MSC v.1500 64 bit (AMD64)]'
>>> sys.executable
'c:\\Python27\\python.exe'

You can also load specific object directly into your code.

>>> from sys import executable
>>> executable
'c:\\Python27\\python.exe'

To quit the interpreter call the exit() function.

>>> exit
Use exit() or Ctrl-Z plus Return to exit

The import binds the word sys to whatever it loaded from the file.

Getting help

  • help()
  • dir()
  • import
>>> help
Type help() for interactive help, or help(object) for help about object.
>>> help()    - entering an internal shell:
...
help> dir     - explains about the dir command.  Navigate using SPACE/ENTER/q
help> Ctrl-D  - to quite, (Ctrl-Z ENTER on Windows)
>>> help(dir) - the same explanation as before

>>> dir()
['__builtins__', '__doc__', '__name__', '__package__']
>>> dir("")   - list of string related methods
['__add__', '__class__', ... 'upper', 'zfill']

>>> dir(1)    - list of integer related methods
['__abs__', '__add__', ... 'numerator', 'real']

>>> dir(__builtins__)
...                   - functions available in python

>>> help(abs)         - exlain how abs() works
>>> help(sum)
>>> help(zip)
>>> help(int)
>>> help(str)

>>> help("".upper)   - explain how the upper method of strings work

>>> import sys
>>> dir(sys)
>>> help(sys)
>>> help(sys)
>>> help(sys.path)
>>> help(sys.path.pop)

Exercise: Interactive shell

  • Start the REPL and check the examples.
  • Check the documentation in the REPL.

Testing Demo

Testing Flask

git clone https://github.com/pallets/flask.git
cd flask
pip install -r requirements/dev.txt
pip install -e .
pytest

How do you test your code?

This mini-series is for people who don't have the time to delve into the way you'd write tests for your Python code, but would like to get a quick overview of the possibilities.

However before we can get into actually testing things, it is worth to think about and even to discuss the following questions.

  • What kind of things do you test?

  • Web application?

  • Command line application?

  • Databases?

  • ...

What is testing?

So what do we really mean when we mean testing?

For every piece of code wether its is a small module or a huge application you can have the following equasion.

There some environment the code works in. It might be just the interpreter/compiler in case of a single stand-alone function, or it might include multiple networking elements, servers, databases, ioT deviecs etc. No matter what, the environment is called by the testing people the "Fixture".

Then execute the code - the Application Under Test - and give it some input.

The result should be some "Expected Output".

So this is our equasion.

  • Fixture + Input = Expected Output

What is testing really?

In reality, however, many times we don't get exactly the expected output. Instead there is a small (or big) difference. That's the bug.

The goal of (automated) testing is to make it easy and cheap to notice when these bugs creep in.

To put it in other words, when you write your code you can check if the result is as expected either manually or by writing some automated tests. The question, how will you know your piece of code still works half a year from now when someone made some changes to some other part of the code?

Will you repeate all the manual tests you did earlier? You won't have time for that.

On the other hand if you automated your tests in the first place, then you can easily, quickly and cheaply run them again and you can verify if everything still works as earlier or if a bug appeared.

  • Fixture + Input = Expected Output + Bugs

Testing demo tools

In these examples we are going to see 3 Python modules that can be used for testing.

  • doctest
  • unittest
  • pytest

Testing demo methodology

We won't delve deep into the capabilities of these testing libraries. We will only us a very simple example to show how to write a passing and a failing test.

  • Have a simple AUT - Application Under Test with an obvious bug
  • Write a passing test
  • Write a failing test

Testing demo - AUT - Application Under Test

Given the following module with a single function, how can we use this function and how can we test it?

def add(x, y):
    return x * y

def multiply(x, y):
    return x + y

# Yes, I know there are bugs in this code!


You probably noticed that our function was called add and so the expectation is that it will be able to add two numbers. However the implementation has a bug. It actually multiplies the two numbers. I know it is a very obvious issue, but it is great as it allows us to see the mechanics of testing without getting distracted by a complex implementation and a complex problem.

Rest assured, the mechanism of the testing would be the same even if our function was calculating the moon-landing trajectory.

Testing demo - use the module

Before we start writing an "automated test", let's see how one could test this code "manually". In reality I see this many times, that people write short snippets of code to check if their real code works properly, but they don't turn these small snippets into real tests.

Basically the question is "How can we use the add function of the mymath module?"

The code is straight forward. We import the module. We import the "sys" module to be able to access the command line arguments. We take two arguments from the command line, call the function, and print the result.

Then, if we would like to make sure our code works well, we can compare that result to some expected result.

Based on this everything works fine.

import mymath
import sys

if len(sys.argv) != 3:
    exit(f"Usage {sys.argv[0]} NUMBER NUMBER")

a = int(sys.argv[1])
b = int(sys.argv[2])

result = mymath.add(a, b)

print(result)
python use_mymath.py 2 2
4

Testing demo: doctest

  • doctest
  • $?
  • %ERRORLEVEL%

The first way we are going to look at is using the "doctest" module. It is a very nice tool that allows us to test our code and to also verify that our documentation is aligned with the code. In addition to that, doctest is a standard module. It comes with every installation of Python so you don't need to worry about installation.

The big drawback is that it is not really useful for anything complex.

So how does it work?

In Python if you add a string immediately after the declaration of the function - meaning the line immediately after the "def" statement - that string becomes the documentation of the function. It can be a one-line string or a multi-line string using triple-quotes.

In the documentation you can write free text and you can also write examples as if one was using the interactive shell of Python. For these examples we have code snippets preceded with 3 greater-than signs, the prompt of the in Python interactive shell. The line immediately after that contains the result that you'd see if you actually typed in the expression into the interactive shell.

Doctest will read your source code, look at all the functions you have and for each function it will look at the documentation of the function. If in the documentation it sees 3 greater-than signs then it will take the content of that line as code to be executed and the next line will be the expected result. Doctest will execute each code snippet and compare it with the expected results. Effectively checking if the examples in your documentation and the implementation are aligned.

We can run doctest in the following way: python -m doctest mymath.py. If all the tests pass, then this execution will print nothing. This lack of positive feedback is a bit strange so you might want to check the so-called "exit code" of the execution. On Unix systems such as Linux and OSX, you'd inspect the $? environment variable while on MS Windows you need to inspect the %ERRORLEVEL% variable. On all of these systems you can use the echo command to inspect the variables. In either case 0 indicates success.

def add(x, y):
    """
    This function will add two numbers together
    >>> add(2, 2)
    4
    >>>
    And here we can have more documentation.
    """
    return x * y

def multiply(x, y):
    return x + y

# Yes, I know there are bugs in this code!


$ python -m doctest mymath.py
$ echo $?
0
> python -m doctest mymath.py
> echo %ERRORLEVEL%
0

Testing demo: doctest with failure

Of course we know that our code is not perfect (to say the least) so at one point someone will complain about the incorrect results received, for example in case they try to add 3 and 3. Before running and fixing the code however it is better to write a test case with the expected correct result that will fail.

So we added another example to the documentation.

If we run the same command as we did earlier we'll get an extensive output on the screen and the exit code with have some value different from 0.

At this point you'd probably also go and fix the code, but you have also increased the number of tests and eliminated the possibility of this failure to return unnoticed.

def add(x, y):
    """
    This function will add two numbers together
    >>> add(2, 2)
    4
    >>> add(3, 3)
    6
    >>>
    And here we can have more documentation.
    """
    return x * y

def multiply(x, y):
    return x + y

# Yes, I know there are bugs in this code!



{% embed include file="src/examples/testing-demo/doctest_fail/mymath.out)

$ python -m doctest mymath.py
$ echo $?
1
> python -m doctest mymath.py
> echo %ERRORLEVEL%
1

Testing demo: Unittest success

  • unittest
  • TestCase
  • assertEqual

Python comes with a built-in module for writing tests. Its name is unittest which might be a bit confusing as this module can be used to any kind of more complex feature-tests and other modules can be also used to write so called unit-tests.

Unlike the doctests that were part of the actual code, the unittest library calls for separate test files. It is recommended that the names of files start with the test_ prefix as that will make it easy for the various testing tools to locate them.

Inside the file you'd need to import both the unittest module and the module that we are testing. mystest in this case.

We need a class with a name that starts with Test and inherits from unittest.TestCase. In the class we can have one or more testing functions. Each one starts with a test_ prefix. Inside the function we can call the function that we are testing and we can compare the result returned by it to some expected value. We can compare them in various ways using the various assert-methods of the unittest.TestCase. In this example we used the assertEqual method as we wanted to make sure the actual return value equals the expected value.

We can run the tests using python -m unittest test_one_with_unittest.py. It will have some output on the screen indicating all the tests passed. The exit-code will be 0 as expected.

import unittest
import mymath

class TestMath(unittest.TestCase):
    def test_math(self):
        self.assertEqual(mymath.add(2, 2), 4)

{% embed include file="src/examples/testing-demo/test_one_with_unittest.out)

$ python -m unittest test_one_with_unittest.py
$ echo $?
0
> python -m unittest test_one_with_unittest.py
> echo %ERRORLEVEL%
0

Testing demo: Unittest failure

When we get the report on the incorrect results when adding 3 and 3, we can added another test-case. We could have added another assertion to the test_math function or we could have created a separare class with its own function, but in this case we opted creating a separate test-function.

We won't go into the pros and contras of each strategy now as we are only interested in the basic technique.

If we run the tests now the output will indicate that it ran 2 test-cases and one of them failed. It even shows use some details about the expected value and the actual value that can be really useful understanding the source of the problem.

Note there is also .F in the output. The dot indicates the test-function that passed, the F indicates the test-function that failed.

The exit code is again different from 0.

BTW this exit-code is used by the various CI systems to understand the results of the tests.

import unittest
import mymath

class TestMath(unittest.TestCase):
    def test_math(self):
        self.assertEqual(mymath.add(2, 2), 4)

    def test_more_math(self):
        self.assertEqual(mymath.add(3, 3), 6)

{% embed include file="src/examples/testing-demo/test_with_unittest.out)

$ python -m unittest test_with_unittest.py
$ echo $?
1
> python -m unittest test_with_unittest.py
> echo %ERRORLEVEL%
1

Testing demo: pytest using classes

  • pytest
  • assert

In our third example we are going to use the pytest module. The only drawback of the pytest module is that it does not come with the installation of Python itself. It is not a huge issue though as you probably install hundreds of other modules as well.

These days Pytest seems like the most popular testing library for Python.

We'll have several examples using Pytest.

In order to use it you'd create a file with a name that starts with test_ prefix. We need to import the module we are testing but we don't need to import pytest. Actually we don't even use pytest inside the code. (At least not in the simple use-cases.) In the file you need to create a class starting with Test, but this class does not need to inherit from any special class. In the class we can have one or more test-functions starting with the prefix test_. In the function we call the function we are testing and we compare the results to the expected results.

We use the built-in assert function of Python to check if the results were true.

No need to learn various specialized assert-statements as we had in the unittest module.

We run the test using the pytest command.

We'll get some output. Here too the single dot after the name of the test file indicates that there was one successful test function.

The exit-code of this execution in 0 as was the case with unittest.

pip install pytest
import mymath

class TestMath():
    def test_math(self):
        assert mymath.add(2, 2) == 4

{% embed include file="src/examples/testing-demo/test_with_pytest_class.out)

$ pytest test_with_pytest_class.py
$ echo $?
0
> pytest test_with_pytest_class.py
> echo %ERRORLEVEL%
0

Testing demo: pytest using classes - failure

Here too we can add additional test-functions to the same test-class. Executing pytest will print .F indicating one passing test-function and one failing test function. We'll get detailed explanation where the failure happened.

The exit-code will be different from 0 helping the CI systems and any other external system to know that the tests have failed.

import mymath

class TestMath():
    def test_math(self):
        assert mymath.add(2, 2) == 4

    def test_more_math(self):
        assert mymath.add(3, 3) == 6

{% embed include file="src/examples/testing-demo/test_with_pytest_class_failure.out)

$ pytest test_with_pytest_class_failure.py
$ echo $?
1
> pytest test_with_pytest_class_failure.py
> echo %ERRORLEVEL%
1

Testing demo: pytest without classes

In the previous example we used a test-class to write our tests, but in reality in many cases we don't need the classes. We could just as well write plain test-functions as in this example.

Test-functions without a class around them are easier to write and understand and they are a lot simplert to graps. So unless you really need the features a class can provide I'd recommend you use functions only. After all our test code should be a lot more simple than our application code.

pip install pytest
import mymath

def test_math():
    assert mymath.add(2, 2) == 4

{% embed include file="src/examples/testing-demo/test_with_pytest.out)

$ pytest test_with_pytest.py
$ echo $?
0
> pytest test_with_pytest.py
> echo %ERRORLEVEL%
0

Testing demo: pytest without classes failure

import mymath

def test_math():
    assert mymath.add(2, 2) == 4

def test_more_math():
    assert mymath.add(3, 3) == 6

{% embed include file="src/examples/testing-demo/test_with_pytest_failure.out)

$ pytest test_with_pytest.py
$ echo $?
1
> pytest test_with_pytest.py
> echo %ERRORLEVEL%
1

Testing demo: Failure in one sub

import mymath

def test_math():
    assert mymath.add(3, 3) == 6
    assert mymath.add(2, 2) == 4

Testing demo: pytest run doctests

The nice thing about pytest that it can also run all the doctests in your module. So you can start your testing journey with doctest and later switch to pytest.

You can easily test your examples in your documentation.

$ pytest --doctest-modules mymath.py

Testing demo: pytest run unittest

Pytest can also run the unit-test. You don't even need to tell it anything special. It will introspect the test code and if it notices tests-classes that are based on unittest it will execute them using the unittest module.

$ pytest test_one_with_unittest.py
$ pytest test_with_unittest.py

Test demo: test coverage

pip install pytest-cover
$ pytest test_with_pytest.py --cov mymath --cov-report html --cov-report term

{% embed include file="src/examples/testing-demo/test_with_pytest_cover.out)

Open htmlcov/index.html

Exercise: Testing demo - anagrams

  • An anagram is a pair of words that are created from exactly the same set of characters, but of different order.
  • For example listen and silent
  • Or bad credit and debit card
  • Given the following module with the is_anagram function write tests for it. (in a file called test_anagram.py" %}
  • Write a failing test as well.
  • Try doctest, unittest, and pytest as well.
def is_anagram(a_word, b_word):
    return sorted(a_word) == sorted(b_word)

Sample code to use the Anagram module.

from anagram import is_anagram
import sys

if len(sys.argv) != 3:
    exit(f"Usage {sys.argv[0]} WORD WORD")

if is_anagram(sys.argv[1], sys.argv[2]):
    print("Anagram")
else:
    print("NOT")

Exercise: Test previous solutions

  • Go back to your solutions to the previous exercises
  • Write tests
  • If you feel it is hard, maybe you need to change the code to make it more testable.

Solution: Testing demo

from anagram import is_anagram

def test_anagram():
    assert is_anagram("silent", "listen")
    assert is_anagram("bad credit", "debit card")

def test_not_anagram():
    assert not is_anagram("abc", "def")

def test_should_be_anagram_spaces():
    assert is_anagram("anagram", "nag a ram")


def test_should_be_anagram_case():
    assert is_anagram("Silent", "Listen")

GitHub API

GitHub data

  • Users / Organizations
  • Repositories
  • Commits
  • Issues
  • Pull-Requests
  • ...

GitHub API: REST vs GraphQL

  • REST API

  • Get data in the structure as the API provider though you'll need it.

  • Usually all the data from one table in the database.

  • GraphQL API

  • Have a mapping (edges) between pieces of data that are connected

  • Getting the data you need, nothing more

  • Nested fields

  • Strong typing of the data

  • Rare limits

Where is it used

GitHub get organization members

import json

from github_rest_api import get_from_github

orgid = 'github'
data = get_from_github(f"https://api.github.com/orgs/{orgid}/members")
with open("out.json", 'w') as fh:
    json.dump(data, fh, indent=4)
print(data)

python examples/github-rest/rest_get_org_members.py

{% embed include file="src/examples/github-graphql/get_org_members.gql)

python examples/github-graphql/run_query_requests.py examples/github-graphql/get_org_members.gql out.json

Details about an orgarnization REST

import json
import sys

from github_rest_api import get_from_github

if len(sys.argv) != 2:
    exit(f"Usage: {sys.argv[0]} USERNAME")

organization = sys.argv[1]

data = get_from_github(f"https://api.github.com/orgs/{organization}")
with open("out.json", 'w') as fh:
    json.dump(data, fh, indent=4)
python examples/github-rest/details-about-org.py github
python examples/github-rest/details-about-org.py kantoniko
python examples/github-rest/details-about-org.py osdc-code-maven

python examples/github-rest/details-about-org.py szabgab          error, this is a user

Details about an user REST

import json
import sys

from github_rest_api import get_from_github

if len(sys.argv) != 2:
    exit(f"Usage: {sys.argv[0]} USERNAME")

username = sys.argv[1]

data = get_from_github(f"https://api.github.com/users/{username}")
with open("out.json", 'w') as fh:
    json.dump(data, fh, indent=4)
python examples/github-rest/details-about-org.py szabgab

             but these also work:

python examples/github-rest/details-about-org.py github
python examples/github-rest/details-about-org.py kantoniko
python examples/github-rest/details-about-org.py osdc-code-maven

REST - List of repositories by organization (pagination!)

import json
import sys

from github_rest_api import get_from_github

if len(sys.argv) != 2:
    exit(f"Usage: {sys.argv[0]} USERNAME")

organization = sys.argv[1]

data = get_from_github(f"https://api.github.com/orgs/{organization}/repos", pages=True)
with open("out.json", 'w') as fh:
    json.dump(data, fh, indent=4)
python examples/github-rest/repos-of-org.py github
python examples/github-rest/repos-of-org.py kantoniko

python examples/github-rest/repos-of-org.py szabgab        error, this is a user

REST - List of reposistories by user (pagination!)

import json
import sys

from github_rest_api import get_from_github

if len(sys.argv) != 2:
    exit(f"Usage: {sys.argv[0]} USERNAME")

organization = sys.argv[1]

data = get_from_github(f"https://api.github.com/users/{organization}/repos", pages=True)
with open("out.json", 'w') as fh:
    json.dump(data, fh, indent=4)
python examples/github-rest/repos-of-user.py szabgab

        but these also work:
python examples/github-rest/repos-of-user.py kantoniko
python examples/github-rest/repos-of-user.py osdc-code-maven

GraphQL - List repositories by organization

import datetime
import argparse
import json
import os
import datetime
import sys
import requests

query = '''
query ($organization: String!) {
  organization(login: $organization) {
    avatarUrl
    repositories(first: 2, after: null) {
      nodes {
        createdAt
        url
        pushedAt
        name
        watchers {
          totalCount
        }
        visibility
        updatedAt
        stargazers {
          totalCount
        }
      }
      totalCount
      pageInfo {
        endCursor
        hasNextPage
      }
    }
  }
}
'''

def run_query(query, **variables):

    token = os.environ.get('MY_GITHUB_TOKEN')
    headers = {
        'Authorization': f'Bearer {token}',
    }

    #print(query)
    url = "https://api.github.com/graphql"
    res = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
    # print(res.status_code)
    if res.status_code == 200:
        return res.json()
    print(f"Request failed with status_code: {res.status_code}")
    print(res.data)

def main():
    if len(sys.argv) != 2:
        exit(f"Usage: {sys.argv[0]} ORGANIZATION")

    organization = sys.argv[1]
    results = run_query(query, organization=organization)
    with open("out.json", "w") as fh:
        json.dump(results, fh, indent=4)

main()

GitHub API KEY (PERSONAL TOKEN)

GitHub REST API

pip install requests

GitHub REST API execute query

import requests
import os


def get_from_github(url, expected=0, pages=False):
    token = os.environ.get('MY_GITHUB_TOKEN')
    if not token:
        print('Missing MY_GITHUB_TOKEN. Not collecting data from Github')
        return

    headers = {
        'Accept': 'application/vnd.github+json',
        'Authorization': f'Bearer {token}',
        'X-GitHub-Api-Version': '2022-11-28',
    }

    if pages:
        per_page = 100 # default is 30 max is 100
        page = 1
        all_data = []
        while True:
            real_url = f"{url}?per_page={per_page}&page={page}"
            print(f"Fetching from {real_url}")
            data = requests.get(real_url, headers=headers).json()
            all_data.extend(data)
            if expected:
                print(f"Received {len(data)} Total {len(all_data)} out of an expected {expected}")
            else:
                print(f"Received {len(data)} Total {len(all_data)}")
            page += 1
            if len(data) < per_page:
                break
    else:
        print(f"Fetching from {url}")
        all_data = requests.get(url, headers=headers).json()

    return all_data


GitHub API GraphQL

pip install requests

GitHub GraphQL explorer

GrapQL explorer

GitHub GraphQL execute query

import sys
import json
import os
import requests

def run_query(query):
    token = os.environ.get('MY_GITHUB_TOKEN')
    headers = {
        'Authorization': f'Bearer {token}',
    }

    url = "https://api.github.com/graphql"
    res = requests.post(url, json={"query": query}, headers=headers)
    # print(res.status_code)
    if res.status_code == 200:
        return res.json()
    print(f"Request failed with status_code: {res.status_code}")
    print(res.data)

if __name__ == "__main__":
    if 2 <= len(sys.argv) <= 3:
        query_filename = sys.argv[1]
        if len(sys.argv) == 3:
            output_file = sys.argv[2]
        else:
            output_file = None
    else:
        exit(f"Usage: {sys.argv[0]} QUERY_FILE [OUTPUT_FILE]")

    with open(query_filename) as fh:
        query = fh.read()
    result = run_query(query)

    if output_file:
        with open(output_file, 'w') as fh:
            json.dump(result, fh, indent=4)
    else:
        print(result)

GitHub GraphQL execute query async

pip install gql[all]
import sys
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

if 2 <= len(sys.argv) <= 3:
    query_filename = sys.argv[1]
    if len(sys.argv) == 3:
        output_file = sys.argv[2]
    else:
        output_file = None
else:
    exit(f"Usage: {sys.argv[0]} QUERY_FILE [OUTPUT_FILE]")

with open(query_filename) as fh:
    query = fh.read()

token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
    'Authorization': f'Bearer {token}',
}


url = "https://api.github.com/graphql"


transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query))

if output_file:
    with open(output_file, 'w') as fh:
        json.dump(result, fh, indent=4)
else:
    print(result)

GitHub GraphQL who am i

  • Get the username of who provided the token
query {
  viewer {
    login
  }
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/login.gql out.json
{
    "viewer": {
        "login": "szabgab"
    }
}

GitHub GraphQL list my repositories

query {
  viewer {
    repositories(first: 30) {
      totalCount
      pageInfo {
        hasNextPage
        endCursor
      }
      edges {
        node {
          name
        }
      }
    }
  }
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/list_my_repositories.gql out.json
{
    "viewer": {
        "repositories": {
            "totalCount": 470,
            "pageInfo": {
                "hasNextPage": true,
                "endCursor": "Y3Vyc29yOnYyOpHOACAlgw=="
            },
            "edges": [
                {
                    "node": {
                        "name": "whitecamel.org"
                    }
                },
                {
                    "node": {
                        "name": "perl6-in-perl5"
                    }
                },
                {
                    "node": {
                        "name": "test-snapshots"
                    }
                },
                {
                    "node": {
                        "name": "padre-plugin-debugger"
                    }
                },
                {
                    "node": {
                        "name": "Math-RPN"
                    }
                },
                {
                    "node": {
                        "name": "perl6-conf"
                    }
                },
                {
                    "node": {
                        "name": "the-driver"
                    }
                },
                {
                    "node": {
                        "name": "Rehovot.pm"
                    }
                },
                {
                    "node": {
                        "name": "CPAN-Forum"
                    }
                },
                {
                    "node": {
                        "name": "test-runner"
                    }
                },
                {
                    "node": {
                        "name": "test-class"
                    }
                },
                {
                    "node": {
                        "name": "perl-android-scripts"
                    }
                },
                {
                    "node": {
                        "name": "perl-promotion"
                    }
                },
                {
                    "node": {
                        "name": "prestool"
                    }
                },
                {
                    "node": {
                        "name": "pdf-create"
                    }
                },
                {
                    "node": {
                        "name": "pdf6"
                    }
                },
                {
                    "node": {
                        "name": "try.rakudo.org"
                    }
                },
                {
                    "node": {
                        "name": "CPAN-Digger-old"
                    }
                },
                {
                    "node": {
                        "name": "peg"
                    }
                },
                {
                    "node": {
                        "name": "Hypolit"
                    }
                },
                {
                    "node": {
                        "name": "topposters"
                    }
                },
                {
                    "node": {
                        "name": "Bailador"
                    }
                },
                {
                    "node": {
                        "name": "git_experiments"
                    }
                },
                {
                    "node": {
                        "name": "Code-Explain"
                    }
                },
                {
                    "node": {
                        "name": "Code-Explain-Web"
                    }
                },
                {
                    "node": {
                        "name": "CGI--Simple"
                    }
                },
                {
                    "node": {
                        "name": "Prima"
                    }
                },
                {
                    "node": {
                        "name": "Test-Version"
                    }
                },
                {
                    "node": {
                        "name": "dwimmer"
                    }
                },
                {
                    "node": {
                        "name": "Text-Trac"
                    }
                }
            ]
        }
    }
}

GitHub GraphQL list of repositories by username

query {
  repositoryOwner(login: "cm-demo") {
    repositories(first: 5, privacy: PUBLIC) {
      totalCount
      edges {
        node {
          id,
          name,
          isPrivate,
          description
        }
      }
    }
  }
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/list_repositories_by_username.gql out.json
{
    "repositoryOwner": {
        "repositories": {
            "totalCount": 5,
            "edges": [
                {
                    "node": {
                        "id": "R_kgDOGSKE7A",
                        "name": "cm-demo",
                        "isPrivate": false,
                        "description": "Config files for my GitHub profile."
                    }
                },
                {
                    "node": {
                        "id": "R_kgDOIx8BIw",
                        "name": "cm-demo.github.io-osdc-2023-01-public",
                        "isPrivate": false,
                        "description": null
                    }
                },
                {
                    "node": {
                        "id": "R_kgDOI4Gftw",
                        "name": "cm-demo.github.io-osdc-2023-01-perl",
                        "isPrivate": false,
                        "description": null
                    }
                },
                {
                    "node": {
                        "id": "R_kgDOJNSvyA",
                        "name": "cm-demo.github.io-osdc-2023-03-azrieli-",
                        "isPrivate": false,
                        "description": null
                    }
                },
                {
                    "node": {
                        "id": "R_kgDOJWTJHw",
                        "name": "osdc-2023-03-azrieli",
                        "isPrivate": false,
                        "description": "OSDC at Azriel College starting in 2023.03"
                    }
                }
            ]
        }
    }
}

GitHub GraphQL list issues by username

query {
  user(login: "szabgab") {
    issues(first: 10, filterBy: {since: "2023-03-20T00:00:00Z"}) {
      totalCount
      edges {
        node {
          number,
          title,
          state,
          createdAt,
          url,
          repository {
            owner {
              login
            }
          }
        }
      }
    }
  }
}

python examples/github-graphql/run_query_requests.py examples/github-graphql/list_issues_by_username.gql put.json
{
    "user": {
        "issues": {
            "totalCount": 50,
            "edges": [
                {
                    "node": {
                        "number": 8,
                        "title": "Check if package has link to Issues?",
                        "state": "CLOSED",
                        "createdAt": "2020-11-02T19:06:04Z",
                        "url": "https://github.com/szabgab/CPAN-Digger/issues/8",
                        "repository": {
                            "owner": {
                                "login": "szabgab"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 9,
                        "title": "Check if meta data contains the license field?",
                        "state": "CLOSED",
                        "createdAt": "2020-11-02T19:06:28Z",
                        "url": "https://github.com/szabgab/CPAN-Digger/issues/9",
                        "repository": {
                            "owner": {
                                "login": "szabgab"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 6051,
                        "title": "Hint how to unlock exercises",
                        "state": "OPEN",
                        "createdAt": "2021-10-21T11:43:51Z",
                        "url": "https://github.com/exercism/exercism/issues/6051",
                        "repository": {
                            "owner": {
                                "login": "exercism"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 20,
                        "title": "Add CPANcover data",
                        "state": "CLOSED",
                        "createdAt": "2022-12-06T04:46:24Z",
                        "url": "https://github.com/szabgab/CPAN-Digger/issues/20",
                        "repository": {
                            "owner": {
                                "login": "szabgab"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 1,
                        "title": "Misunderstood .gitignore?",
                        "state": "CLOSED",
                        "createdAt": "2022-12-30T05:02:13Z",
                        "url": "https://github.com/x-lamprocapnos-x/Movie-Selector/issues/1",
                        "repository": {
                            "owner": {
                                "login": "x-lamprocapnos-x"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 3,
                        "title": "Verify project URLs in the individual json files",
                        "state": "CLOSED",
                        "createdAt": "2023-02-08T12:59:29Z",
                        "url": "https://github.com/OSDC-Code-Maven/osdc-site-generator/issues/3",
                        "repository": {
                            "owner": {
                                "login": "OSDC-Code-Maven"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 1,
                        "title": "The __pycache__ folder should not be in git",
                        "state": "CLOSED",
                        "createdAt": "2023-02-12T14:38:30Z",
                        "url": "https://github.com/zguillez/python-toolz/issues/1",
                        "repository": {
                            "owner": {
                                "login": "zguillez"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 1,
                        "title": "Move all the data from the other 3 repositories",
                        "state": "OPEN",
                        "createdAt": "2023-03-05T07:44:06Z",
                        "url": "https://github.com/OSDC-Code-Maven/open-source-by-organizations/issues/1",
                        "repository": {
                            "owner": {
                                "login": "OSDC-Code-Maven"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 1823,
                        "title": "Flake error B031 caused by new release flake8-bugbear",
                        "state": "CLOSED",
                        "createdAt": "2023-03-10T12:04:24Z",
                        "url": "https://github.com/pallets/jinja/issues/1823",
                        "repository": {
                            "owner": {
                                "login": "pallets"
                            }
                        }
                    }
                },
                {
                    "node": {
                        "number": 6378,
                        "title": "How to setup local dev environment and run the tests?",
                        "state": "OPEN",
                        "createdAt": "2023-03-11T17:22:01Z",
                        "url": "https://github.com/psf/requests/issues/6378",
                        "repository": {
                            "owner": {
                                "login": "psf"
                            }
                        }
                    }
                }
            ]
        }
    }
}

GitHub GraphQL list issues using parameter

import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime

token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
    'Authorization': f'Bearer {token}',
}

url = "https://api.github.com/graphql"


query = '''
query($since:DateTime) {
  user(login: "szabgab") {
    issues(first: 1, filterBy: {since: $since}) {
      totalCount
      edges {
        node {
          number, title, state, createdAt, url, repository {
            owner {
              login
            }
          }
        }
      }
    }
  }
}
'''

#variables = {
#    "since": "2023-04-10T00:00:00Z"
#}

ts = datetime.datetime.now() - datetime.timedelta(days = 10)
variables = {
    "since": ts.strftime("%Y-%m-%dT%H:%M:%SZ")
}

transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)
print(result)



GitHub GraphQL list issues using several parameters

import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
import sys

if len(sys.argv) == 2:
    output_file = sys.argv[1]
else:
    output_file = None


token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
    'Authorization': f'Bearer {token}',
}

url = "https://api.github.com/graphql"


query = '''
query($since:DateTime, $first:Int, $user:String!) {
  user(login: $user) {
    issues(first: $first, filterBy: {since: $since}) {
      totalCount
      edges {
        node {
          number, title, state, createdAt, url, repository {
            owner {
              login
            }
          }
        }
      }
    }
  }
}
'''

ts = datetime.datetime.now() - datetime.timedelta(days = 20)
variables = {
    "user": "szabgab",
    "since": ts.strftime("%Y-%m-%dT%H:%M:%SZ"),
    "first": 30,
}

transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)

if output_file:
    with open(output_file, 'w') as fh:
        json.dump(result, fh, indent=4)
else:
    print(result)



GitHub GraphQL contribution counts

query($username:String!) {
  user(login: $username) {
    contributionsCollection {
      contributionCalendar {
        totalContributions
        weeks {
          contributionDays {
            contributionCount
            weekday
            date
          }
        }
      }
    }
  }
}

{
  "username": "szabgab"
}
  • Defaults to the last 1 year
query($username:String!, $from:DateTime, $to:DateTime) {
  user(login: $username) {
    contributionsCollection(from: $from, to: $to) {
      contributionCalendar {
        totalContributions
        weeks {
          contributionDays {
            contributionCount
            weekday
            date
          }
        }
      }
    }
  }
}

{
  "username": "szabgab",
  "from": "2013-03-20T00:00:00Z",
  "to": "2013-04-20T00:00:00Z"
}
  • Can set the start-date (defaults to now - 1 year)
  • Can set the end-date (defaults to start-date + 1 year)

GitHub GraphQL list Pull-Requests

  • List all the PRs created by a user in a time-range
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
import sys

if len(sys.argv) == 2:
    output_file = sys.argv[1]
else:
    output_file = None


token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
    'Authorization': f'Bearer {token}',
}

url = "https://api.github.com/graphql"


query = '''
query($username:String!, $last:Int) {
  user(login: $username) {
    pullRequests(last: $last) {
      totalCount
      edges {
        node {
          number, title, state, createdAt, author { login }, url
        }
      }
    }
  }
}
'''

ts = datetime.datetime.now() - datetime.timedelta(days = 20)
variables = {
    "username": "szabgab",
    "last": 30,
}

transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)

if output_file:
    with open(output_file, 'w') as fh:
        json.dump(result, fh, indent=4)
else:
    print(result)



import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
import sys

if len(sys.argv) == 2:
    output_file = sys.argv[1]
else:
    output_file = None


token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
    'Authorization': f'Bearer {token}',
}

url = "https://api.github.com/graphql"


query = '''
query($username:String!, $from:DateTime, $to:DateTime, $first:Int) {
  user(login: $username) {
    contributionsCollection(from: $from, to: $to) {
      pullRequestContributions(first: $first) {
        nodes {
          pullRequest {
            title, url, createdAt, state, repository { name }
          }
        }
      }
    }
  }
}
'''

ts = datetime.datetime.now() - datetime.timedelta(days = 20)
variables = {
    "username": "szabgab",
    "first": 30,
    "from": "2013-04-20T00:00:00Z",
    "to": "2014-04-20T00:00:00Z"
}

transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)

if output_file:
    with open(output_file, 'w') as fh:
        json.dump(result, fh, indent=4)
else:
    print(result)



GitHub GraphSQL paging using cursor

  • cursor
import datetime
import argparse
import json
import os
import datetime
import sys
import requests

query = '''
query($after:String) {
  viewer {
    repositories(first: 100, after: $after, privacy: PUBLIC) {
      pageInfo {
        hasNextPage
        endCursor
      }
      nodes {
        name
        releases(last:1) {
          totalCount
          nodes {
            name
            publishedAt
            url
          }
        }
      }
    }
  }
}
'''

def run_query(query, **variables):

    token = os.environ.get('MY_GITHUB_TOKEN')
    headers = {
        'Authorization': f'Bearer {token}',
    }

    #print(query)
    url = "https://api.github.com/graphql"
    res = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
    # print(res.status_code)
    if res.status_code == 200:
        return res.json()
    print(f"Request failed with status_code: {res.status_code}")
    print(res.data)

def run_query_all(query):
    cursor = None
    nodes = []
    while True:
        results = run_query(query, after=cursor)
        # print(results)
        # print("------")
        nodes.extend(results['data']['viewer']['repositories']['nodes'])
        if not results['data']['viewer']['repositories']['pageInfo']['hasNextPage']:
            break
        cursor = results['data']['viewer']['repositories']['pageInfo']['endCursor']
    return nodes

def main():
    #args = get_args()
    today = datetime.date.today()
    #print(today)
    #print(today.weekday())
    #now = datetime.datetime.now()
    #print(now)
    end_ts = today - datetime.timedelta(days=today.weekday())
    start_ts = end_ts - datetime.timedelta(days=7)
    #print(end_ts)
    #print(start_ts)
    #username = "szabgab"
    #results = get_data(usernamem start_ts, end_ts)

    results = run_query_all(query)
    with open("out.json", "w") as fh:
        json.dump(results, fh, indent=4)

main()

GitHub GraphQL activities

  • List all the activities of a user in a time-range

  • All the issues opened / commented on / closed

  • All the commits

  • All the activities of a list of users in a time-range

  • Get a list of projects written in python, that have between 2-5 stars and were updated in the last 5 weeks.

  • Given a repository list all the changes that are were done in all the forks.

Types in Python

mypy

pip install mypy

Changing types

Even without any additional work, running mypy on an existing code-base can reveal locations that might need fixing.

For example it can point out places where the content of a variable changes type. Python accepts this, and in some places this type of flexibility might have advantages, but it can also lead to confusion for the maintainer of this code.


x = 23
print(x)

x = "Python"
print(x)

x = ["a", "b"]
print(x)

python simple.py works without complaining.

mypy simple.py reports the following:

simple.py:5: error: Incompatible types in assignment (expression has type "str", variable has type "int")
simple.py:8: error: Incompatible types in assignment (expression has type "List[str]", variable has type "int")
Found 2 errors in 1 file (checked 1 source file)

Changing types when reading a number

A quite common case in the real-world when you read in something that is supposed to be a number. In terms of the Python type-system the input is always a string. Even if it looks like a number. We then need to convert it to int() or to float() to use them as such.

People will often reuse the same variable to first hold the string and then the number. This is ok with Python, but might be confusingt to the reader.


num = input("type in an integer: ")
print(num)
print(type(num).__name__)   # str

num = int(num)
print(num)
print(type(num).__name__)   # int

mypy input.py will print the following:

input.py:6: error: Incompatible types in assignment (expression has type "int", variable has type "str")
Found 1 error in 1 file (checked 1 source file)

Types of variables


x :int = 0

x = 2
print(x)

x = "hello"
print(x)

python variables.py

2
hello

mypy variables.py

variables.py:7: error: Incompatible types in assignment (expression has type "str", variable has type "int")
Found 1 error in 1 file (checked 1 source file)

Types of function parameters


def add(a :int, b :int) -> int:
    return a+b

print(add(2, 3))
print(add("Foo", "Bar"))
5
FooBar
function.py:6: error: Argument 1 to "add" has incompatible type "str"; expected "int"
function.py:6: error: Argument 2 to "add" has incompatible type "str"; expected "int"
Found 2 errors in 1 file (checked 1 source file)

Types function returns None or bool

-> bool means the function returns a boolean. Either True or False.

-> None means the function returns None. Explicitely, or implicitely.

def f() -> bool:
    return True

def g() -> None:
    return True


def h() -> None:
    return None

def x() -> None:
    return

def z() -> None:
    pass
function_bool.py:5: error: No return value expected
Found 1 error in 1 file (checked 1 source file)

Types used properly


def add(a :int, b :int) -> int:
    return a+b

print(add(2, 3))

x :int = 0

x = 2
print(x)

5
2
Success: no issues found in 1 source file

TODO: mypy

  • Complex data structures?
  • My types/classes?
  • Allow None (or not) for a variable.
from typing import Generator

def numbers(n: int) -> Generator[int, None, None]:
    return ( x for x in range(n))

print(list(numbers(10)))
from typing import List

def numbers(n: int) -> List[int]:

    return list(range(n))

print(numbers(10))

Logging

Simple logging

  • logging
  • basicConfig
import logging

logging.debug("debug") 
logging.info("info") 
logging.warning("warning") 
logging.error("error")
logging.critical("critical")

logging.log(logging.WARNING, "another warning")
logging.log(40, "another error")

WARNING:root:warning
ERROR:root:error
CRITICAL:root:critical
WARNING:root:another warning
ERROR:root:another error
  • Written on STDERR

Simple logging - set level

import logging
 
logging.basicConfig(level = logging.INFO)
 
logging.debug("debug") 
logging.info("info") 
logging.warning("warning") 
logging.error("error")
logging.critical("critical")
INFO:root:info
WARNING:root:warning
ERROR:root:error
CRITICAL:root:critical

Simple logging to a file

import logging
import time
 
logging.basicConfig(level = logging.INFO, filename = time.strftime("my-%Y-%m-%d.log"))
 
logging.debug("debug") 
logging.info("info") 
logging.warning("warning") 
logging.error("error")
logging.critical("critical")

Simple logging format

import logging
    
logging.basicConfig( format = '%(asctime)s  %(levelname)-10s %(processName)s  %(name)s %(message)s')
    
logging.debug("debug") 
logging.info("info") 
logging.warning("warning") 
logging.error("error")
logging.critical("critical")

Simple logging change date format

import logging
 
logging.basicConfig( format = '%(asctime)s  %(levelname)-10s %(processName)s  %(name)s %(message)s', datefmt =  "%Y-%m-%d-%H-%M-%S")
 
logging.debug("debug") 
logging.info("info") 
logging.warning("warning") 
logging.error("error")
logging.critical("critical")
2020-04-22-18-59-16  WARNING    MainProcess  root warning
2020-04-22-18-59-16  ERROR      MainProcess  root error
2020-04-22-18-59-16  CRITICAL   MainProcess  root critical

getLogger

  • getLogger
  • FileHandler
  • StreamHandler
import logging
 
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
 
fh = logging.FileHandler('my.log')
fh.setLevel(logging.INFO)
fh.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') )
logger.addHandler(fh)
 
 
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-10s - %(message)s'))
logger.addHandler(sh)
 
 
 
log = logging.getLogger(__name__)
log.debug("debug") 
log.info("info") 
log.warning("warning") 
log.error("error")
log.critical("critical")

Time-based logrotation

  • TimedRotatingFileHandler
import logging
import logging.handlers

log_file = "my.log"

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

ch = logging.handlers.TimedRotatingFileHandler(log_file, when='M', backupCount=2)
ch.setLevel(logging.INFO)
ch.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') )
logger.addHandler(ch)


log = logging.getLogger(__name__)
log.debug("debug")
log.info("info")
log.warning("warning")
log.error("error")
log.critical("critical")
  • S - seconds
  • M - minutes
  • H - hours
  • D - days
  • docs

Size-based logrotation

import logging
import logging.handlers

log_file = "my.log"

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

ch = logging.handlers.RotatingFileHandler(log_file, maxBytes=100, backupCount=2)
ch.setLevel(logging.INFO)
ch.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') )
logger.addHandler(ch)


log = logging.getLogger(__name__)
log.debug("debug")
log.info("info")
log.warning("warning")
log.error("error")
log.critical("critical")

Closures

Counter local - not working

def counter():
    count = 0
    count += 1
    return count

print(counter())
print(counter())
print(counter())
1
1
1

Counter with global

  • global
count = 0
def counter():
    global count
    count += 1
    return count

print(counter())
print(counter())
print(counter())

count = -42
print(counter())
1
2
3
-41

Create incrementors

In order to use in various map-expressions, we need a couple of functions that - for simplicity - need to increment a number:

def f3(x):
    return x + 3

def f7(x):
    return x + 7

def f23(x):
    return x + 23

print(f3(2))
print(f7(3))
print(f3(4))
print(f7(10))
print(f23(19))
5
10
7
17
42

Create internal function

def create_func():
    def internal():
        print("Hello world")
    internal()


func = create_func()
internal()
Hello world
Traceback (most recent call last):
  File "create_internal_func.py", line 8, in <module>
    internal()
NameError: name 'internal' is not defined

Create function by a function

def create_func():
    def internal():
        print("Hello world")
    #internal()

    return internal

func = create_func()
#internal()
func()
Hello world

Create function with parameters

def create_func(name):
    def internal():
        print(f"Hello {name}")

    return internal

foo = create_func("Foo")
foo()


bar = create_func("Bar")
bar()
Hello Foo
Hello Bar

Counter closure

  • nonlocal
def create_counter():
    count = 0
    def internal():
        nonlocal count
        count += 1
        return count
    return internal

counter = create_counter()

print(counter())
print(counter())
print(counter())
print()

other = create_counter()
print(counter())
print(other())
print(counter())
print(other())

print()
print(count)
1
2
3

4
1
5
2

Traceback (most recent call last):
  File "counter.py", line 23, in <module>
    print(count)
NameError: name 'count' is not defined

Make incrementor with def (closure)

  • closure
def make_incrementor(n):
    def inc(x):
        return x + n
    return inc

f3 = make_incrementor(3)
f7 = make_incrementor(7)

print(f3(2))
print(f7(3))
print(f3(4))
print(f7(10))
5
10
7
17

Make incrementor with lambda

def make_incrementor(n):
    return lambda x: x + n

f3 = make_incrementor(3)
f7 = make_incrementor(7)

print(f3(2))
print(f7(3))
print(f3(4))
print(f7(10))
5
10
7
17

Exercise: closure bank

  • Create a closure that returns a function that holds a number (like a bank account) that can be incremented or decremented as follows:
  • Allow for an extra paramter called prev that defaults to False. If True is passed then instead of returning the new balance, return the old balance.
bank = create_bank(20)

print(bank())    # 20
print(bank(7))   # 27
print(bank())    # 27
print(bank(-3))  # 24
print(bank())    # 24


print(bank(10, prev=True))   # 24
print(bank())    # 34

Exercise: counter with parameter

Change the counter example to accept a parameter and start counting from that number.

Solution: closure bank

def create_bank(n = 0):
    balance = n
    def bnk(change = 0, prev=False):
        nonlocal balance
        prev_balance = balance
        balance += change
        if prev:
            return prev_balance
        else:
            return balance
    return bnk


bank = create_bank(20)

print(bank())    # 20
print(bank(7))   # 27
print(bank())    # 27
print(bank(-3))  # 24
print(bank())    # 24


print(bank(10, prev=True))   # 24
print(bank())    # 34

20
27
27
24
24
24
34

Solution: counter with parameter

def create_counter(count=0):
    def internal():
        nonlocal count
        count += 1
        return count
    return internal

counter = create_counter()

print(counter())
print(counter())
print(counter())
print()

other = create_counter(42)
print(counter())
print(other())
print(counter())
print(other())
1
2
3

4
43
5
44

Decorators

Decorators: simple example

  • A decorators is that @something just before the declaration of the function.
  • Decorators can modify the behavior of functions or can set some meta information about them.

@some_decorator
def some_function():
    pass

Decorators - Flask

  • In Flask we use decorators to designate function as "routes".
from flask import Flask

app = Flask(__name__)

@app.route("/")
def main():
    return "Hello World!"

@app.route("/login")
def login():
    return "Showing the login page ..."
FLASK_APP=flask_app flask run

Decorators - Pytest

  • In Pytest we can use decorators to add special marks to test functions
  • ... or to mark them as fixtures.
import sys
import pytest

@pytest.mark.skipif(sys.platform != 'linux', reason="Linux tests")
def test_linux():
    assert True

@pytest.mark.skip(reason="To show we can skip tests without any condition.")
def test_any():
    assert True

@pytest.fixture(autouse = True, scope="module")
def module_demo():
    print(f"Fixture")


pytest -v

Decorators caching - no cache

  • Each call will execute the function and do the (expensive) computation.

def compute(x, y):
    print(f"Called with {x} and {y}")
    # some long computation here
    return x+y

print(compute(2, 3))
print(compute(3, 4))
print(compute(2, 3))


Called with 2 and 3
5
Called with 3 and 4
7
Called with 2 and 3
5

Decorators caching - with cache

  • cache

  • lru_cache

  • By adding the lru_cache decorator we can tell Python to cache the result and save on computation time.

  • functools

import functools

@functools.lru_cache()
def compute(x, y):
    print(f"Called with {x} and {y}")
    # some long computation here
    return x+y

print(compute(2, 3))
print(compute(3, 4))
print(compute(2, 3))


Called with 2 and 3
5
Called with 3 and 4
7
5

LRU - Least recently used cache

  • LRU - Cache replacement policy
  • When we call the function with (1, 5) it removes the least recently used results of (1, 2)
  • So next time it has to be computed again.
import functools

@functools.lru_cache(maxsize=3)
def compute(x, y):
    print(f"Called with {x} and {y}")
    # some long computation here
    return x+y

compute(1, 2) # Called with 1 and 2
compute(1, 2)
compute(1, 2)

compute(1, 3) # Called with 1 and 3
compute(1, 3)

compute(1, 4) # Called with 1 and 4
compute(1, 4)

compute(1, 5) # Called with 1 and 5

compute(1, 2) # Called with 1 and 2
compute(1, 2)

LRU - Least recently used cache

  • Here we called (1, 2) after (1, 4) when it was still in the cache
  • When we called (1, 5) it removed the LRU pair, but it was NOT the (1, 2) pair
  • So it was in the cache even after the (1, 5) call.
import functools

@functools.lru_cache(maxsize=3)
def compute(x, y):
    print(f"Called with {x} and {y}")
    # some long computation here
    return x+y

compute(1, 2) # Called with 1 and 2
compute(1, 2)
compute(1, 2)

compute(1, 3) # Called with 1 and 3
compute(1, 3)

compute(1, 4) # Called with 1 and 4
compute(1, 4)

compute(1, 2)
compute(1, 5) # Called with 1 and 5
compute(1, 2)

OOP - classmethod - staticmethod

class Person(object):
    def __init__(self, name):
        print(f"init:            '{self}'   '{self.__class__.__name__}'")
        self.name = name

    def show_name(self):
        print(f"instance method: '{self}'   '{self.__class__.__name__}'")

    @classmethod
    def from_occupation(cls, occupation):
        print(f"class method     '{cls}'    '{cls.__class__.__name__}'")

    @staticmethod
    def is_valid_occupation(param):
        print(f"static method   '{param}'    '{param.__class__.__name__}'")


fb = Person('Foo Bar')
fb.show_name()

fb.from_occupation('Tailor')
Person.from_occupation('Tailor') # This is how we should call it.

fb.is_valid_occupation('Tailor')
Person.is_valid_occupation('Tailor')
init:            '<__main__.Person object at 0x7fb008f3a640>'   'Person'
instance method: '<__main__.Person object at 0x7fb008f3a640>'   'Person'
class method     '<class '__main__.Person'>'    'type'
class method     '<class '__main__.Person'>'    'type'
static method   'Tailor'    'str'
static method   'Tailor'    'str'

Use cases for decorators in Python

  • classmethod

  • staticmethod

  • pytest

  • Common decorators are @classmethod and @staticmethod.

  • Flask uses them to mark and configure the routes.

  • Pytest uses them to add marks to the tests.

  • functools

  • dataclasses

  • Logging calls with parameters.

  • Logging elapsed time of calls.

  • Access control in Django or other web frameworks. (e.g. login required)

  • Memoization (caching)

  • Retry

  • Function timeout

  • Locking for thread safety

  • Decorator Library

Function assignment

Before we learn about decorators let's remember that we can assign function names to other names and then use the new name:


def hello(name):
    print(f"Hello {name}")

hello("Python")
print(hello)

greet = hello
greet("Python")
print(greet)


Hello Python
<function hello at 0x7f8aee3401f0>
Hello Python
<function hello at 0x7f8aee3401f0>

Function assignment - alias print to say

say = print
say("Hello World")

Function assignment - don't do this

numbers = [2, 4, 3, 1, 1, 1]
print(sum(numbers))   # 12
print(max(numbers))   #  4

sum = max
print(sum(numbers))   #  4
print(max(numbers))   #  4


sum = lambda values: len(values)
print(sum(numbers))   # 6

Passing functions as parameters



def call(func):
    return func(42)

def double(val):
    print(2*val)

call(double)      # 84
call(lambda x: print(x // 2))    # 21

Traversing directory tree

import sys
import os

def walker(path, todo):
    if os.path.isdir(path):
        items = os.listdir(path)
        for item in items:
            walker(os.path.join(path, item), todo)
    else:
        todo(path)


def print_size(name):
    print(f"{os.stat(name).st_size:6}  {name} ")

if __name__ == '__main__':
    if len(sys.argv) < 2:
        exit(f"Usage: {sys.argv[0]} PATH")
    walker(sys.argv[1], print)
    #walker(sys.argv[1], print_size)
    #walker(sys.argv[1], lambda name: print(f"{os.stat(name).st_size:6}  {name[::-1]} "))

Declaring Functions inside other function

Let's also remember that we can define a function inside another function and then the internally defined function only exists in the scope of the function where it was defined in. Not outside.

def f():
    def g():
        print("in g")
    print("start f")
    g()
    print("end f")

f()
g()
start f
in g
end f
Traceback (most recent call last):
  File "examples/decorators/function_in_function.py", line 9, in <module>
    g()
NameError: name 'g' is not defined

Returning a new function from a function

def create_function():
    print("creating a function")
    def internal():
        print("This is the generated function")
    print("creation done")
    return internal

func = create_function()

func()



creating a function
creation done
This is the generated function

Returning a closure



def create_incrementer(num):
    def inc(val):
        return num + val
    return inc

inc_5 = create_incrementer(5)

print(inc_5(10))  # 15
print(inc_5(0))   #  5


inc_7 = create_incrementer(7)

print(inc_7(10))  # 17
print(inc_7(0))   #  7

Decorator

  • @

  • A function that changes the behaviour of other functions.

  • The input of a decorator is a function.

  • The returned value of a decorator is a modified version of the same function.

from some_module import some_decorator

@some_decorator
def f(...):
    ...
def f(...):
    ...
f = some_decorator(f)

Decorator Demo

  • Just a simple example created step-by-step
import time


def replace(func):
    def new_func():
        print("start new")
        start = time.time()
        func()
        end = time.time()
        print(f"end new {end-start}")
    return new_func

@replace
def f():
    time.sleep(1)
    print("in f")


f()


Decorator to register function

  • Pytest, Flask probably do this

functions = []

def register(func):
    global functions
    functions.append(func.__name__)

    return func

@register
def f():
    print("in f")

print(functions)

A recursive Fibonacci

def fibo(n):
    if n in (1,2):
        return 1
    return fibo(n-1) + fibo(n-2)

print(fibo(5))  # 5

trace fibo

import decor

@decor.tron
def fibo(n):
    if n in (1,2):
        return 1
    return fibo(n-1) + fibo(n-2)

print(fibo(5))
Calling fibo(5)
Calling fibo(4)
Calling fibo(3)
Calling fibo(2)
Calling fibo(1)
Calling fibo(2)
Calling fibo(3)
Calling fibo(2)
Calling fibo(1)
5

tron decorator

def tron(func):
    def new_func(v):
        print(f"Calling {func.__name__}({v})")
        return func(v)
    return new_func

Decorate with direct call

import decor

def fibo(n):
    if n in (1,2):
        return 1
    return fibo(n-1) + fibo(n-2)

fibo = decor.tron(fibo)

print(fibo(5))

Decorate with parameter

import decor_param

@decor_param.tron('foo')
def fibo(n):
    if n in (1,2):
        return 1
    return fibo(n-1) + fibo(n-2)

print(fibo(5))
foo Calling fibo(5)
foo Calling fibo(4)
foo Calling fibo(3)
foo Calling fibo(2)
foo Calling fibo(1)
foo Calling fibo(2)
foo Calling fibo(3)
foo Calling fibo(2)
foo Calling fibo(1)
5

Decorator accepting parameter

def tron(prefix):
    def real_tron(func):
        def new_func(v):
            print("{} Calling {}({})".format(prefix, func.__name__, v))
            return func(v)
        return new_func
    return real_tron

Decorate function with any signature

  • How can we decorate a function that is flexible on the number of arguments?
  • Accept *args and **kwargs and pass them on.
from decor_any import tron


@tron
def one(param):
    print(f"one({param})")

@tron
def two(first, second = 42):
    print(f"two({first}, {second})")


one("hello")
one(param = "world")

two("hi")
two(first = "Foo", second = "Bar")

Decorate function with any signature - implementation

def tron(func):
    def new_func(*args, **kw):
        params = list(map(lambda p: str(p), args))
        for (k, v) in kw.items():
            params.append(f"{k}={v}")
        print("Calling {}({})".format(func.__name__, ', '.join(params)))
        return func(*args, **kw)
    return new_func

Calling one(hello)
one(hello)
Calling one(param=world)
one(world)
Calling two(hi)
two(hi, 42)
Calling two(first=Foo, second=Bar)
two(Foo, Bar)

Decorate function with any signature - skeleton

def decorator(func):
    def wrapper(*args, **kw):
        return func(*args, **kw)
    return wrapper


@decorator
def zero():
    print("zero")

@decorator
def one(x):
    print(f"one({x})")

@decorator
def two(x, y):
    print(f"two({x, y})")


zero()
one('hello')
two( y = 7, x = 8 )

print(zero)
print(one)
print(two)
print(zero.__name__)
print(one.__name__)
print(two.__name__)
zero
one(hello)
two((8, 7))
<function decorator.<locals>.wrapper at 0x7f1165258a60>
<function decorator.<locals>.wrapper at 0x7f1165258b80>
<function decorator.<locals>.wrapper at 0x7f1165258ca0>

Decorate function with any signature - skeleton with name

import functools

def decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kw):
        return func(*args, **kw)
    return wrapper


@decorator
def zero():
    print("zero")

@decorator
def one(x):
    print(f"one({x})")

@decorator
def two(x, y):
    print(f"two({x, y})")


zero()
one('hello')
two( y = 7, x = 8 )

print(zero)
print(one)
print(two)

print(zero.__name__)
print(one.__name__)
print(two.__name__)
zero
one(hello)
two((8, 7))
<function zero at 0x7f9079bdca60>
<function one at 0x7f9079bdcb80>
<function two at 0x7f9079bdcca0>

Functool - partial

  • partial
from functools import partial

val = '101010'
print(int(val, base=2))

basetwo = partial(int, base=2)
basetwo.__doc__ = 'Convert base 2 string to an int.'
print(basetwo(val))

# Based on example from https://docs.python.org/3/library/functools.html

Exercise: Logger decorator

  • In the previous pages we created a decorator that can decorate arbitrary function logging the call and its parameters.
  • Add time measurement to each call to see how long each function took.

Exercise: decorators decorator

Write a function that gets a functions as attribute and returns a new functions while memoizing (caching) the input/output pairs. Then write a unit test that checks it. You probably will need to create a subroutine to be decoratorsd.

  • Write tests for the fibonacci functions.
  • Implement the decorators decorator for a function with a single parameter.
  • Apply the decorator.
  • Run the tests again.
  • Check the speed differences.
  • or decorate with tron to see the calls...

Solution: Logger decorator

import time
def tron(func):
    def new_func(*args, **kwargs):
        start = time.time()
        print("Calling {}({}, {})".format(func.__name__, args, kwargs))
        out = func(*args, **kwargs)
        end = time.time()
        print("Finished {}({})".format(func.__name__, out))
        print("Elapsed time: {}".format(end - start))
        return out
    return new_func

Solution: Logger decorator (testing)

from logger_decor import tron

@tron
def f(a, b=1, *args, **kwargs):
    print('a:     ', a)
    print('b:     ', b)
    print('args:  ', args)
    print('kwargs:', kwargs)
    return a + b

f(2, 3, 4, 5, c=6, d=7)
print()
f(2, c=5, d=6)
print()
f(10)
Calling f((2, 3, 4, 5), {'c': 6, 'd': 7})
a:      2
b:      3
args:   (4, 5)
kwargs: {'c': 6, 'd': 7}
Finished f(5)
Elapsed time: 1.3589859008789062e-05

Calling f((2,), {'c': 5, 'd': 6})
a:      2
b:      1
args:   ()
kwargs: {'c': 5, 'd': 6}
Finished f(3)
Elapsed time: 5.245208740234375e-06

Calling f((10,), {})
a:      10
b:      1
args:   ()
kwargs: {}
Finished f(11)
Elapsed time: 4.291534423828125e-06

Solution decorators decorator

import sys
import memoize_attribute
import memoize_nonlocal
import decor_any

#@memoize_attribute.memoize
#@memoize_nonlocal.memoize
#@decor_any.tron
def fibonacci(n):
    if n == 1:
        return 1
    if n == 2:
        return 1
    return fibonacci(n-1) + fibonacci(n-2)

if __name__ == '__main__':
    if len(sys.argv) != 2:
        sys.stderr.write("Usage: {} N\n".format(sys.argv[0]))
        exit(1)
    print(fibonacci(int(sys.argv[1])))


def memoize(f):
    data = {}
    def caching(n):
        nonlocal data
        key = n
        if key not in data:
            data[key] = f(n)
        return data[key]

    return caching

def memoize(f):
    def caching(n):
        key = n
        #if 'data' not in caching.__dict__:
        #    caching.data = {}
        if key not in caching.data:
            caching.data[key] = f(n)
        return caching.data[key]
    caching.data = {}

    return caching

Before

$ time python fibonacci.py 35
9227465

real   0m3.850s
user   0m3.832s
sys    0m0.015s

After

$ time python fibonacci.py 35
9227465

real   0m0.034s
user   0m0.019s
sys    0m0.014s

A list of functions


def hello(name):
    print(f"Hello {name}")

def morning(name):
    print(f"Good morning {name}")


hello("Jane")
morning("Jane")
print()

funcs = [hello, morning]
funcs[0]("Peter")
print()

for func in funcs:
    func("Mary")
Hello Jane
Good morning Jane

Hello Peter

Hello Mary
Good morning Mary

Insert element in sorted list using insort

  • insort
import bisect
solar_system = ['Earth', 'Jupiter', 'Mercury', 'Saturn', 'Venus']

name = 'Mars'

# Find the location where to insert the element to keep the list sorted and insert the element
bisect.insort(solar_system, name)
print(solar_system)
print(sorted(solar_system))

import sys
import os

def traverse(path):
    if os.path.isfile(path):
        print(path)
        return
    if os.path.isdir(path):
        for item in os.listdir(path):
            traverse(os.path.join(path, item))
        return
    # other unhandled things


if len(sys.argv) < 2:
    exit(f"Usage: {sys.argv[0]} DIR|FILE")
traverse(sys.argv[1])


import sys
import os

def traverse(path, func):
    response = {}
    if os.path.isfile(path):
        func(path)
        return response
    if os.path.isdir(path):
        for item in os.listdir(path):
            traverse(os.path.join(path, item), func)
        return response
    # other unhandled things


if len(sys.argv) < 2:
    exit(f"Usage: {sys.argv[0]} DIR|FILE")
#traverse(sys.argv[1], print)
#traverse(sys.argv[1], lambda path: print(f"{os.path.getsize(path):>6} {path}"))


import sys
import os

def traverse(path, func):
    if os.path.isfile(path):
        func(path)
        return
    if os.path.isdir(path):
        for item in os.listdir(path):
            traverse(os.path.join(path, item), func)
        return
    # other unhandled things


if len(sys.argv) < 2:
    exit(f"Usage: {sys.argv[0]} DIR|FILE")
#traverse(sys.argv[1], print)
#traverse(sys.argv[1], lambda path: print(f"{os.path.getsize(path):>6} {path}"))


#from inspect import getmembers, isfunction
import inspect


def change(sub):
    def new(*args, **kw):
        print("before")
        res = sub(*args, **kw)
        print("after")
        return res
    return new

def add(x, y):
    return x+y

#print(add(2, 3))

fixed = change(add)
#print(fixed(3, 4))

def replace(subname):
    def new(*args, **kw):
        print("before")
        res = locals()[subname](*args, **kw)
        print("after")
        return res
    locals()[subname] = new

replace('add')
add(1, 7)

def say():
    print("hello")

#print(dir())
#getattr('say')


Context managers (with statement)

Why use context managers?

In certain operations you might want to ensure that when the operation is done there will be an opportunity to clean up after it. Even if decided to end the operation early or if there is an exception in the middle of the operation.

In the following pseudo-code example you can see that cleanup must be called both at the end and before the early-end, but that still leaves the bad-code that raises exception avoiding the cleanup. That forces us to wrap the whole section in a try-block.

def sample():
    start
    do
    do
    do
    do
    cleanup

What is we have some conditions for early termination?

def sample():
    start
    do
    do
    if we are done early:
        cleanup
        return # early-end
    do
    do
    cleanup

What if we might have an exception in the code?

def sample():
    start
    try:
        do
        do
        if we are done early:
            cleanup
            return early-end
        do
        bad-code    (raises exception)
        do
        cleanup
    finally:
        cleanup

It is a lot of unnecessary code duplication and we can easily forget to add it in every location where we early-end our code.

Using Context Manager

with cm_for_sample():
    start
    do
    do
    if we are done early:
        return early-end
    do
    bad-code    (raises exception)
    do
  • cleanup happens automatically, it is defined inside the cm_for_sample

Context Manager examples

A few examples where context managers can be useful:

  • Opening a file - close it once we are done with it so we don't leak file descriptors.

  • Changing directory - change back when we are done.

  • Create temporary directory - remove when we are done.

  • Open connection to database - close connection.

  • Open SSH connection - close connection.

  • More information about context managers

cd in a function

  • getcwd
  • chdir

In this example we have a function in which we change to a directory and then when we are done we change back to the original directory. For this to work first we save the current working directory using the os.getcwd call. Unfortunatelly in the middle of the code there is a conditional call to return. If that condition is True we won't change back to the original directory. We could fix this by calling os.chdir(start_dir) just before calling return. However this would still not solve the problem if there is an exception in the function.

import sys
import os

def do_something(path):
    start_dir = os.getcwd()
    os.chdir(path)

    content = os.listdir()
    number = len(content)
    print(number)
    if number < 15:
        return

    os.chdir(start_dir)

def main():
    if len(sys.argv) != 2:
        exit(f"Usage: {sys.argv[0]} PATH")
    path = sys.argv[1]
    print(os.getcwd())
    do_something(path)
    print(os.getcwd())

main()
$ python no_context_cd.py /tmp/

/home/gabor/work/slides/python-programming/examples/advanced
19
/home/gabor/work/slides/python-programming/examples/advanced
$ python no_context_cd.py /opt/

/home/gabor/work/slides/python-programming/examples/advanced
9
/opt
  • In the second example return was called and thus we stayed on the /opt directory.:w

open in function

This is not the recommended way to open a file, but this is how it was done before the introduction of the with context manager. Here we have the same issue. We have a conditional call to return where we forgot to close the file.

import sys
import re

def do_something(filename):
    fh = open(filename)

    while True:
        line = fh.readline()
        if line is None:
            break
        line = line.rstrip("\n")

        if re.search(r'\A\s*\Z', line):
            return
        print(line)

    fh.close()

def main():
    if len(sys.argv) != 2:
        exit(f"Usage: {sys.argv[0]} FILENAME")
    filename = sys.argv[1]
    do_something(filename)

main()

open in for loop

  • stat
  • os.stat

Calling write does not immediately write to disk. The Operating System provides buffering as an optimization to avoid frequent access to the disk. In this case it means the file has not been saved before we already check its size.

import os

for ix in range(10):
    filename = f'data{ix}.txt'
    fh = open(filename, 'w')
    fh.write('hello')
    if ix == 0:
        break
    fh.close()
stat = os.stat(filename)
print(stat.st_size)    # 0,   the file has not been saved yet

open in function using with

If we open the file in the recommended way using the with statement then we can be sure that the close method of the fh object will be called when we leave the context of the with statement.

import sys
import re

def do_something(filename):
    with open(filename) as fh:

        while True:
            line = fh.readline()
            if line is None:
                break
            line = line.rstrip("\n")

            if re.search(r'\A\s*\Z', line):
                return
            print(line)


def main():
    if len(sys.argv) != 2:
        exit(f"Usage: {sys.argv[0]} FILENAME")
    filename = sys.argv[1]
    do_something(filename)

main()

Plain context manager

from contextlib import contextmanager
import sys

param = ''
if len(sys.argv) == 2:
    #exit(f"Usage: {sys.argv[0]} []")
    param = sys.argv[1]

def code_with_context_manager():
    with my_plain_context():
        print("  In plain context")
        if param == "return":
            return
        if param == "die":
            raise Exception("we have a problem")
        print("  More work")


@contextmanager
def my_plain_context():
    print("setup context")
    try:
        yield
    except Exception as err:
        print(f"  We got an exception: {err}")
    print("cleanup context")

print("START")
code_with_context_manager()
print("END")
START
start context
  In plain context
  More work
end context
END

Param context manager

from contextlib import contextmanager

@contextmanager
def my_param_context(name):
   print(f"start {name}")
   yield
   print(f"end {name}")

with my_param_context("foo"):
   print("In param context")
start foo
In param context
end foo

Context manager that returns a value

from contextlib import contextmanager

import time
import random
import os
import shutil


@contextmanager
def my_tempdir():
    print("start return")
    tmpdir = '/tmp/' + str(time.time()) + str(random.random())
    os.mkdir(tmpdir)
    try:
        yield tmpdir
    finally:
        shutil.rmtree(tmpdir)
        print("end return")
import os
from my_tempdir import my_tempdir

with my_tempdir() as tmp_dir:
    print(f"In return context with {tmp_dir}")
    with open(tmp_dir + '/data.txt', 'w') as fh:
        fh.write("hello")
    print(os.listdir(tmp_dir))

print('')
print(tmp_dir)
print(os.path.exists(tmp_dir))
start return
In return context with /tmp/1578211890.49409370.6063140788762365
['data.txt']
end return

/tmp/1578211890.49409370.6063140788762365
False

Use my tempdir - return

import os
from my_tempdir import my_tempdir

def some_code():
    with my_tempdir() as tmp_dir:
        print(f"In return context with {tmp_dir}")
        with open(tmp_dir + '/data.txt', 'w') as fh:
            fh.write("hello")
        print(os.listdir(tmp_dir))
        return

    print('')
    print(tmp_dir)
    print(os.path.exists(tmp_dir))

some_code()
start return
In return context with /tmp/1578211902.3545020.7667694368935928
['data.txt']
end return

Use my tempdir - exception

import os
from my_tempdir import my_tempdir

with my_tempdir() as tmp_dir:
    print(f"In return context with {tmp_dir}")
    with open(tmp_dir + '/data.txt', 'w') as fh:
        fh.write("hello")
    print(os.listdir(tmp_dir))
    raise Exception('trouble')

print('')
print(tmp_dir)
print(os.path.exists(tmp_dir))
start return
In return context with /tmp/1578211921.12552210.9000097350821897
['data.txt']
end return
Traceback (most recent call last):
  File "use_my_tempdir_exception.py", line 9, in <module>
    raise Exception('trouble')
Exception: trouble

cwd context manager

import os
from contextlib import contextmanager

@contextmanager
def cwd(path):
    oldpwd = os.getcwd()
    os.chdir(path)
    try:
        yield
    finally:
        os.chdir(oldpwd)
import sys
import os
from mycwd import cwd

def do_something(path):
    with cwd(path):
        content = os.listdir()
        if len(content) < 10:
            return

def main():
    if len(sys.argv) != 2:
        exit(f"Usage: {sys.argv[0]} PATH")
    path = sys.argv[1]
    print(os.getcwd())
    do_something(path)
    print(os.getcwd())

main()
$ python context_cd.py /tmp
/home/gabor/work/slides/python/examples/context
/home/gabor/work/slides/python/examples/context

$ python context_cd.py /opt
/home/gabor/work/slides/python/examples/context
/home/gabor/work/slides/python/examples/context

tempdir context manager

  • contextlib
  • contextmanager
  • tempfile
  • mkdtemp
import os
from contextlib import contextmanager
import tempfile
import shutil

@contextmanager
def tmpdir():
    dd = tempfile.mkdtemp()
    try:
        yield dd
    finally:
        shutil.rmtree(dd)
from mytmpdir import tmpdir
import os

with tmpdir() as temp_dir:
    print(temp_dir)
    with open( os.path.join(temp_dir, 'some.txt'), 'w') as fh:
        fh.write("hello")
    print(os.path.exists(temp_dir))
    print(os.listdir(temp_dir))

print(os.path.exists(temp_dir))
/tmp/tmprpuywa3_
True
['some.txt']
False

Context manager with class

  • enter
  • exit
class MyCM:
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        print(f'__enter__ {self.name}')
        return self

    def __exit__(self, exception_type, exception, traceback):
        print(f'__exit__  {self.name}')

    def something(self):
        print(f'something {self.name}')

def main():
    with MyCM('Foo') as cm:
        print(cm.name)
        cm.something()
        #raise Exception('nono')
    print('in main - after')

main()
print('after main')

Context managers with class

  • enter
  • exit

Even if there was en exception in the middle of the process, the exit methods of each object will be called.

class MyCM:
    def __init__(self, n):
        self.name = n

    def __enter__(self):
        print('__enter__', self.name)

    def __exit__(self, exception_type, exception, traceback):
        print('__exit__ ', self.name)

    def something(self):
        print('something', self.name)

def main():
    a = MyCM('a')
    b = MyCM('b')
    with a, b:
        a.partner = b
        b.partner = a
        a.something()
        raise Exception('nono')
        b.something()
    print('in main - after')

main()
print('after main')
__enter__ a
__enter__ b
something a
__exit__  b
__exit__  a
Traceback (most recent call last):
  File "context-managers.py", line 27, in <module>
    main()
  File "context-managers.py", line 23, in main
    raise Exception('nono')
Exception: nono

Context manager: with for file

  • with
import sys

if len(sys.argv) != 2:
    sys.stderr.write('Usage: {} FILENAME\n'.format(sys.argv[0]))
    exit()

file = sys.argv[1]
print(file)
with open(file) as f:
    for line in f:
        val = 30/int(line)

print('done')

With - context managers

  • with
class WithClass:
    def __init__(self, name='default'):
        self.name = name

    def __enter__(self):
        print('entering the system')
        return self.name

    def __exit__(self, exc_type, exc_value, traceback):
        print('exiting the system')

    def __str__(self):
        return 'WithObject:'+self.name

x = WithClass()
with x as y:
    print(x,y)

Exercise: Context manager

Create a few CSV file likes these:

a11,a12
a21,a22
b13,b14
b23,b24
c15,c16
c25,c26

Merge them horizontally to get this:

a11,a12,b13,b14,c15,c16
a21,a22,b23,b24,c25,c26
  • Do it without your own context manager
  • Create a context manager called myopen that accepts N filenames. It opens the first one to write and the other N-1 to read
with myopen(outfile, infile1, infile2, infile3) as out, ins:
    ...

Exercise: Tempdir on Windows

Make the tempdir context manager example work on windows as well. Probably need to cd out of the directory.

Solution: Context manager

import sys
from contextlib import contextmanager

if len(sys.argv) < 3:
    exit(f"Usage: {sys.argv[0]} OUTFILE INFILEs")

outfile = sys.argv[1]
infiles = sys.argv[2:]
#print(outfile)
#print(infiles)

@contextmanager
def myopen(outfile, *infiles):
    #print(len(infiles))
    out = open(outfile, 'w')
    ins = []
    for filename in infiles:
        ins.append(open(filename, 'r'))
    try:
        yield out, ins
    except Exception as ex:
        print(ex)
        pass
    finally:
        out.close()
        for fh in ins:
            fh.close()


with myopen(outfile, *infiles) as (out_fh, input_fhs):
    #print(out_fh.__class__.__name__)
    #print(len(input_fhs))
    while True:
        row = ''
        done = False
        for infh in (input_fhs):
            line = infh.readline()
            #print(f"'{line}'")
            if not line:
                done = True
                break
            if row:
                row += ','
            row += line.rstrip("\n")
        if done:
            break
        out_fh.write(row)
        out_fh.write("\n")

Advanced lists

Change list while looping: endless list


numbers = [1, 1]
for n in numbers:
    print(n)
    numbers.append(numbers[-1] + numbers[-2])

    if n > 100:
        break

print(numbers)

Creating a Fibonacci series in a crazy way.

Change list while looping

Probably not a good idea...


numbers = [1, 2, 3, 4]
for n in numbers:
    print(n)
    if n == 2:
        numbers.remove(2)


print(numbers)
1
2
4
[1, 3, 4]

Note, the loop only iterated 3 times, and it skipped value 3

Copy list before iteration

It is better to copy the list using list slices before the iteration starts.


numbers = [1, 2, 3, 4]
for n in numbers[:]:
    print(n)
    if n == 2:
        numbers.remove(2)


print(numbers)
1
2
3
4
[1, 3, 4]

for with flag

names = ['Foo', 'Bar', 'Baz']

ok = False
for i in range(3):
    name = input('Your name please: ')
    if name in names:
        ok = True
        break

if not ok:
    print("Not OK")
    exit()

print("OK....")


for else

The else statement of the for loop is executed when the iteration ends normally. (without calling break)

names = ['Foo', 'Bar', 'Baz']


for i in range(3):
    name = input('Your name please: ')
    if name in names:
        break
else:
    print("Not OK")
    exit()

print("OK....")

enumerate

  • enumerate
names = ['Foo', 'Bar', 'Baz']

for i in range(len(names)):
    print(i,  names[i])

print('')

for i, n  in enumerate(names):
    print(i, n)

0 Foo
1 Bar
2 Baz

0 Foo
1 Bar
2 Baz

do while

  • do while

There is no do-while in Python, but you can emulate it:


while True:
   do_stuff()
   if not loop_condition():
       break


x = 0

while True:
   x += 1
   print(x)
   if x > 0:
       break

list slice is copy

x = [1, 1, 2, 3, 5, 8, 13, 21, 34]
y = x[2:5]
print(y)    # [2, 3, 5]

x[2] = 20
print(x)    # [1, 1, 20, 3, 5, 8, 13, 21, 34]
print(y)    #  [2, 3, 5]

Warnings

Warnings

  • warn
from warnings import warn

def foo():
    warn("foo will be deprecated soon. Use bar() instead", DeprecationWarning)
    print("foo still works")


def main():
    foo()
    print("afterfoo")

main()