Python Other
PyCharm
PyCharm Intro
- IDE - Integrated Development Environment
- Introspection (understands Python)
- Running, Debugging
- Refactoring
PyCharm configure interpreter
- Mac: PyCharm / Preferences / Project: (name) / Project Interpreter
- Windows/Linux: File / Settings / Project Interpreter
PyCharm install modules
- Same place where we set the interpreter
PyCharm Project
- At the opening create a new project (directory + Python version)
- File/New Project
PyCharm Files
- New file
- Open file
- Ctrl-Shift-N
PyCharm - run code
- Run/Run
- Set command line parameters
- Set environment variables
import sys
import datetime
import random
def main():
limit = get_limit()
print(limit)
date = datetime.datetime.now()
rnd = random.randrange(2, 6)
print(rnd)
count(limit)
print("after count")
def get_limit():
limit = 10
if len(sys.argv) == 2:
limit = int(sys.argv[1])
return limit
def count(limit):
for ix in range(limit):
div = ix - 12
show(ix, ix / div)
def show(number, result):
print(number, result)
if __name__ == '__main__':
main()
PyCharm - debugging code
- Set fixed Breakpoints (click on line next to row-number to have a red circle)
- Run/Debug
- Inspect variables
- Conditional breakpoint
- Step in function
- Step out of function
- Step over function
PyCharm Terminal
- Bottom "Terminal"
PyCharm Python console at the bottom left
- Bottom "Python Console"
2 + 3
x = 2
print(x)
def f(x, y):
return x+y
f(4, 5)
Refactoring example with PyCharm
- Change variable name (in scope only)
def add(x, y):
z = x + y
return z
def multiply(x, y):
z = x * y
return z
x = 2
y = 3
z = add(x, y)
print(z)
z = multiply(x, y)
print(z)
- Extract method
Visual Studio Code
VS Code Intro
-
Generic IDE - Integrated Development Environment
-
Tons of plugins
-
Open Source
-
Developed by Microsoft
-
Introspection (understands Python)
-
Running, Debugging
-
Refactoring
VS Code Project or Single file
- Open File
- Open Folder
Install the Python Extension
-
Usually VS Code will suggest you to install the plugin when you open a Python file with .py extension.
-
If not, click on the icon on the left and search for "python" (by Microsoft)
VS Code examples
- Run program
- Debug program
- Set breakpoint
- Set conditional breakpoint
def fib(n):
if int(n) != n or n <= 0:
raise ValueError("Bad parameter")
if n == 1:
return 1
if n == 2:
return 1
return fib(n-1) + fib(n-2)
print(3, fib(3)) # 2
print(30, fib(30)) # 832040
fib(0.5)
- Set argv
import sys
def main():
if len(sys.argv) != 3:
exit("Needs 2 arguments: width length")
width = int( sys.argv[1] )
length = int( sys.argv[2] )
if length <= 0:
exit("length is not positive")
if width <= 0:
exit("width is not positive")
area = length * width
print("The area is ", area)
main()
- Refactor "z" to "operator"
import random
def count():
for x in range(1000):
v = random.choice("abcd")
print(x)
print(v)
def add(x, y):
return x + y
def multiply(x, y):
return x * y
def calc(x, y, z):
if z == "+":
return x + y
if z == "*":
return x * y
if z == "-":
return x - y
if z == "/":
return x / y
raise Exception(f"Unknown operator {z}")
import mylib
mylib.count()
print(mylib.calc( 2, 3, "+"))
print(mylib.calc( 2, 3, "*"))
PyPi - Python Package Index
What is PyPi?
pip
- pip
$ pip install package_name
Configure pip on Windows to avoid SSL issues
On the command line:
pip install --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host files.pythonhosted.org PACKAGE_NAME
Run the following command to get the list of configuration files:
pip config -v list
You will see something like this: (your username instead of FooBar)
For variant 'global', will try loading 'C:\ProgramData\pip\pip.ini'
For variant 'user', will try loading 'C:\Users\FooBar\pip\pip.ini'
For variant 'user', will try loading 'D:\Data\Users\FooBar\AppData\Roaming\pip\pip.ini'
For variant 'site', will try loading 'C:\Users\FooBar\AppData\Local\Programs\Python\Python310\pip.ini'
Create the first pip.ini
file with the following content:
[global]
trusted-host = pypi.org files.pythonhosted.org pypi.python.org
If you run the pip config -v list
again, you'll see an additional line on the output:
global.trusted-host='pypi.org, files.pythonhosted.org ,pypi.python.org'
pip
will now disregard the SSL issues.
Upgrade pip
pip install --upgrade pip
Will probably not work on Windows because file is in use...
Upgrade PIP on Windows
py -m pip install --trusted-host pypi.org --trusted-host pypi.python.org --trusted-host files.pythonhosted.org --upgrade pip
PYTHONPATH
export PYTHONPATH=~/python
Requirements
numpy
pandas
requests
flask>=1.00
pip install -r requirements.txt
Virtualenv
- virtualenv
On Linux/macOS:
$ cd project_dir
$ virtualenv -p python3 venv
$ source venv/bin/activate
$ ...
$ deactivate
On Windows:
venv\Scripts\activate.bat
...
deactivate
The virtualenv
command will create a copy of python in the given directory inside the current directory.
In the above example it will create the copy in the 'venv' directory inside the 'project_dir'.
After source-ing the 'activate' file the PATH will include the local python with a local version of pip.
This requires bash or zsh.
See also the Python guide.
Web client - web scraping
get HTML page using urllib
- urllib
urllib is a rather low level library. It comes standard with Python.
import urllib.request
# fh is like a filehandle
with urllib.request.urlopen('https://python.org/') as fh:
html = fh.read()
print(html)
Download image using urllib
Usually you will want to save the downloaded image to the local disk.
import urllib.request
url = 'https://www.python.org/images/python-logo.gif'
with urllib.request.urlopen(url) as fh:
with open('logo.gif', 'wb') as out:
out.write(fh.read())
get HTML page using requests
- requests
requests is the de-facto standard in Python for dealing with web pages as a web client.
import requests
res = requests.get('https://python.org/')
print(type(res))
print(res.status_code)
print(res.headers)
print(res.headers['content-type'])
# print(res.content)
Download image using requests
import requests
url = 'https://www.python.org/images/python-logo.gif'
filename = 'logo.gif'
res = requests.get(url)
print(res.status_code)
with open(filename, 'wb') as out:
out.write(res.content)
Download image as a stream using requests
OK, this is not such a good example for streaming.
import requests
import shutil
url = 'https://bloximages.newyork1.vip.townnews.com/wpsdlocal6.com/content/tncms/assets/v3/editorial/7/22/722f8401-e134-5758-9f4b-a542ed88a101/5d41b45d92106.image.jpg'
filename = "source.jpg"
res = requests.get(url, stream=True)
print(res.status_code)
with open(filename, 'wb') as fh:
res.raw.decode_content
shutil.copyfileobj(res.raw, fh)
Download zip file using requests
import requests
import shutil
url = "https://code-maven.com/public/developer_survey_2019.zip"
filename = "developer_survey_2019.zip"
res = requests.get(url, stream=True)
print(res.status_code)
if res.status_code == 200:
with open(filename, 'wb') as fh:
res.raw.decode_content
shutil.copyfileobj(res.raw, fh)
Extract zip file
- zipfile
- unzip
- zip
This is unrelated, but once you have downloaded a zip file you will need to be able to extract its content. This example shows how to unzip a file already on your disk.
import zipfile
path = "developer_survey_2019.zip"
zf = zipfile.ZipFile(path)
zf.extractall()
Beautiful Soup to parse HTML
-
bs4
-
BeautifulSoup
from bs4 import BeautifulSoup
import requests
url = 'https://en.wikipedia.org/wiki/Main_Page'
res = requests.get(url)
if res.status_code != 200:
exit(f"Error in getting the page. Status code: {res.status_code}")
html = res.content
soup = BeautifulSoup(html, features='lxml')
print(soup.title.text)
for link in soup.find_all("a", limit=3):
print(link)
print(link.text)
print(link.attrs.get('href'))
print()
print('-----------------------------------------')
forms = soup.select("#searchform")
if forms is not None:
print(forms)
form = forms[0] # We used an ID to search we expect to have 0 or one matches in the list
print()
print('Action: ', form.attrs.get('action'))
# Search inside that element we found earlier
for inp in form.find_all('input'):
print('id: ', inp.attrs.get('id'))
print('-----------------------------------------')
tfa = soup.select("#mp-tfa")
if tfa is not None:
#print(tfa)
paras = tfa[0].select("p")
if paras is not None:
#print(paras)
links = paras[0].find_all("a", limit=1)
if links:
print(links[0].text)
print(links[0].attrs.get('href'))
requests - JSON - API
Downloading HTML pages and parsing them to extract data can be a lot of fun, but it is also very unstable. Page layouts will change. The code will break easily. In many cases there is a better way. Use the API provided by the site.
httpbin.org
- httpbin.org a website to practice various URL requests
- source code of httpbin.
requests get from httpbin - JSON
import requests
res = requests.get('https://httpbin.org/get')
print(type(res))
print(res.status_code)
print()
print(res.headers)
print()
#print(res.content)
print()
print(res.json())
data = res.json()
print(type(data))
requests get IP from httpbin - JSON
import requests
res = requests.get('http://httpbin.org/ip')
print(res.headers['content-type'])
print(res.text)
print()
data = res.json()
print(data)
print()
print(data['origin'])
requests get JSON User-Agent
When our browser sends a requests it identifies itself.
import requests
res = requests.get('http://httpbin.org/user-agent')
#print(res.headers['content-type'])
#print(res.text)
data = res.json()
print(data)
print(data['user-agent'])
requests change User-Agent
import requests
res = requests.get('http://httpbin.org/user-agent',
headers = {'User-agent': 'Internet Explorer/2.0'})
# print(res.headers['content-type'])
# print(res.text)
data = res.json()
print(data)
print(data['user-agent'])
requests get header
httpbin makes it easy to see what kind of headers your browser sends. Not only the User Agent
import requests
res = requests.get('https://httpbin.org/headers')
print(res.text)
# {
# "headers": {
# "Accept": "*/*",
# "Accept-Encoding": "gzip, deflate",
# "Host": "httpbin.org",
# "User-Agent": "python-requests/2.3.0 CPython/2.7.12 Darwin/16.3.0"
# }
# }
print()
data = res.json()
print(data)
#print(data['headers'])
requests change header
- requests
The requests module too sends a set of default headers, but you can tell it to send other fields and values as well. This examples shows how to set some additional headers.
import requests
res = requests.get('http://httpbin.org/headers',
headers = {
'User-agent' : 'Internet Explorer/2.0',
'SOAPAction' : 'http://www.corp.net/some/path/CustMsagDown.Check',
'Content-type': 'text/xml'
}
)
print(res.text)
# {
# "headers": {
# "Accept": "*/*",
# "Accept-Encoding": "gzip, deflate",
# "Content-Type": "text/xml",
# "Host": "httpbin.org",
# "Soapaction": "http://www.corp.net/some/path/CustMsagDown.Check",
# "User-Agent": "Internet Explorer/2.0"
# }
# }
requests post
- requests
- POST
We can also send POST requests to an address with any payload (content).
import requests
payload = '''
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:cus="http://www.corp.net/Request.XSD">
<soapenv:Header/>
<soapenv:Body>
<cus:CustMsagDown.Check>
<cus:MainCustNum>327</cus:MainCustNum>
<cus:SourceSystem></cus:SourceSystem>
</cus:CustMsagDown.Check>
</soapenv:Body>
</soapenv:Envelope>
'''
res = requests.post('http://httpbin.org/post',
headers = {
'User-agent' : 'Internet Explorer/2.0',
'SOAPAction' : 'http://www.corp.net/some/path/CustMsagDown.Check',
'Content-type': 'text/xml'
},
data = payload,
)
print(res.headers['content-type'])
print(res.text)
Interactive Requests
import requests
r = requests.get('http://httpbin.org/')
import code
code.interact(local=locals())
Download the weather - scraping
- Open Weather map
- type in the name of your city
Download the weather - API call with requests
import configparser
import requests
import sys
import os
def get_api_key():
config_file = 'config.ini'
if not os.path.exists(config_file):
exit(f"File {config_file} must exists with an [openweathermap] section and an api= field")
config = configparser.ConfigParser()
config.read(config_file)
return config['openweathermap']['api']
def get_weather(api_key, location):
url = "https://api.openweathermap.org/data/2.5/weather?q={}&units=metric&appid={}".format(location, api_key)
r = requests.get(url)
return r.json()
def main():
if len(sys.argv) != 2:
exit("Usage: {} LOCATION".format(sys.argv[0]))
location = sys.argv[1]
api_key = get_api_key()
weather = get_weather(api_key, location)
print(weather)
print()
print(weather['main']['temp'])
if __name__ == '__main__':
main()
Download the weather - API call with requests
pip install openweathermap-simplified
from openweathermap import get_daily_forecast, APIError
try:
#forecast = get_daily_forecast(49.24966, -123.11934) # Vancouver, BC, Canada
forecast = get_daily_forecast(34.8186, 31.8969) # Rehovot, Israel
#forecast = get_daily_forecast('Rehovot')
print(forecast)
except APIError as err:
# Deal with missing/incorrect API key or failed requests
print(err)
Tweet
import configparser
import twitter
import os
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'api.cfg'));
api = twitter.Api( **config['twitter'] )
status = api.PostUpdate('My first Tweet using Python')
print(status.text)
bit.ly
import configparser
import os
import requests
def shorten(uri):
config = configparser.ConfigParser()
#config.read(os.path.join(os.path.expanduser('~'), 'api.cfg'))
config.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'api.cfg'))
query_params = {
'access_token': bitly_config['bitly']['access_token'],
'longUrl': uri
}
endpoint = 'https://api-ssl.bitly.com/v3/shorten'
response = requests.get(endpoint, params=query_params, verify=False)
data = response.json()
if not data['status_code'] == 200:
exit("Unexpected status_code: {} in bitly response. {}".format(data['status_code'], response.text))
return data['data']['url']
print(shorten("http://code-maven.com/"))
API config file
{% embed include file="src/examples/web-client/api.cfg)
Exercise: Combine web server and client
Write a web application that can get a site and a text as input (e.g. http://cnn.com and 'Korea') check if on the given site the word appears or not?
Extended version: Only get the URL as the input and create statistics, which are the most frequent words on the given page.
Python Web server
Hello world web
- WSGI
- CGI
from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server
import time
def hello_world(environ, start_response):
setup_testing_defaults(environ)
status = '200 OK'
headers = [('Content-type', 'text/plain; charset=utf-8')]
start_response(status, headers)
res = f"Hello World {time.time()}".encode('utf-8')
return [res]
port = 8080
with make_server('0.0.0.0', port, hello_world) as httpd:
print("Serving on port {}...".format(port))
httpd.serve_forever()
Dump web environment info
from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server
# A relatively simple WSGI application. It's going to print out the
# environment dictionary after being updated by setup_testing_defaults
def simple_app(environ, start_response):
setup_testing_defaults(environ)
status = '200 OK'
headers = [('Content-type', 'text/plain')]
start_response(status, headers)
ret = ["{}: {}\n".format(key, value)
for key, value in environ.iteritems()]
return ret
httpd = make_server('', 8000, simple_app)
print("Serving on port 8000...")
httpd.serve_forever()
# taken from the standard documentation of Python
Web echo
from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server
import time
import cgi
def hello_world(environ, start_response):
setup_testing_defaults(environ)
status = '200 OK'
headers = [('Content-type', 'text/html')]
start_response(status, headers)
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ)
if 'txt' in form:
return 'Echo: ' + form['txt'].value
return """
<form>
<input name="txt" />
<input type="submit" value="Echo" />
</form>
"""
httpd = make_server('', 8000, hello_world)
print("Serving on port 8000...")
httpd.serve_forever()
Web form
from wsgiref.util import setup_testing_defaults
from wsgiref.simple_server import make_server
import time
import cgi
def hello_world(environ, start_response):
setup_testing_defaults(environ)
status = '200 OK'
headers = [('Content-type', 'text/html')]
start_response(status, headers)
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ)
html = ''
for f in form:
html += f + '==' + form[f].value + '<br>'
if not html:
html = """
<a href="?fname=Foo&lname=Bar">click</a>
<form>
Username: <input name="username" /><br>
Password: <input type="password" name="pw" /><br>
Age group: Under 18 <input type="radio" name="age" value="kid" >
18-30 <input type="radio" name="age" value="young" >
30- <input type="radio" name="age" value="old" >
<input type="submit" value="Send" />
</form>
"""
return html
httpd = make_server('', 8000, hello_world)
print("Serving on port 8000...")
httpd.serve_forever()
Resources
Networking
Secure shell
ssh
- On Windows install putty
import subprocess
import sys
if len(sys.argv) !=2:
exit("Usage: " + sys.argv[0] + " hostname")
host = sys.argv[1]
command = "uname -a"
ssh = subprocess.Popen(["ssh", host, command],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = ssh.stdout.readlines()
error = ssh.stderr.readlines()
if error:
for err in error:
sys.stderr.write("ERROR: {}\n".format(err))
if result:
print(result)
ssh from Windows
$ ssh foobar@hostname-or-ip
-o "StrictHostKeyChecking no"
$ plink.exe -ssh foobar@hostname-or-ip -pw "password" -C "uname -a"
$ plink.exe", "-ssh", "foobar@username-or-ip", "-pw", "no secret", "-C", "uname -a"
import subprocess
import sys
ssh = subprocess.Popen([r"c:\Users\foobar\download\plink.exe", "-ssh",
"foobar@username-or-ip",
"-pw", "password",
"-C", "uname -a"],
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = ssh.stdout.readlines()
error = ssh.stderr.readlines()
if error:
for err in error:
sys.stderr.write("ERROR: {}\n".format(err))
if result:
print(result)
Parallel ssh
- parallel-ssh
- pip install parallel-ssh
from pssh import ParallelSSHClient
hosts = ['myhost1', 'myhost2']
client = ParallelSSHClient(hosts)
output = client.run_command('ls -ltrh /tmp/', sudo=True)
telnet
import telnetlib
hostname = '104.131.87.33'
user = 'gabor'
password = 'robag'
tn = telnetlib.Telnet(hostname)
tn.read_until("login: ")
tn.write(user + "\n")
tn.read_until("Password: ")
tn.write(password + "\n")
tn.read_until("~$")
tn.write("hostname\n")
print(tn.read_until("~$"))
print("-------");
tn.write("uptime\n")
print(tn.read_until("~$"))
print("-------");
print("going to exit")
tn.write("exit\n")
print("--------")
print(tn.read_all())
prompt for password
import getpass
password = getpass.getpass("Password:")
print(password)
ftp
$ sudo aptitude install proftpd
$ sudo /etc/init.d/proftpd start
$ sudo adduser (user: foo pw: bar)
from ftplib import FTP
ftp = FTP('localhost')
ftp.login("foo", "bar")
print(ftp.retrlines('LIST'))
print('-------')
for f in ftp.nlst():
print("file: " + f)
filename = 'ssh.py'
ftp.storlines("STOR " + filename, open(filename))
print('-------')
for f in ftp.nlst():
print("file: " + f)
ftp.delete(filename)
print('-------')
for f in ftp.nlst():
print("file: " + f)
-rw-rw-r-- 1 foo foo 6 Feb 18 19:18 a.txt
-rw-rw-r-- 1 foo foo 6 Feb 18 19:18 b.txt
226 Transfer complete
-------
file: b.txt
file: a.txt
-------
file: b.txt
file: a.txt
file: ssh.py
-------
file: b.txt
file: a.txt
Interactive shell
The Python interactive shell
- len
Type python
without any arguments on the command line and
you'll get into the Interactive shell of Python.
In the interactive shell you can type:
>>> print "hello"
hello
>>> "hello"
'hello'
>>> 6
6
>>> len("abc")
3
>>> "abc" + 6
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: cannot concatenate 'str' and 'int' objects
>>> "abc" + str(6)
'abc6'
REPL - Read Evaluate Print Loop
- int
- float
- REPL
A variable comes to existence the first time we assign a value to it. It points to an object and that object knows about its type.
>>> a = "abc"
>>> len(a)
3
>>> a = '3'
>>> a + 3
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: cannot concatenate 'str' and 'int' objects
>>> int(a) + 3
6
>>> a = '2.3'
>>> float(a) + 1
3.3
Using Modules
- import
- sys
- version
- executable
Python has lots of standard (and not standard) modules. You can load one of them using the
import
keyword. Once loaded, you can use functions from the module
or access its objects. For example the sys
module has a sys.version
and a sys.executable
variable.
>>> import sys
>>> sys.version
'2.7.3 (default, Apr 10 2012, 23:24:47) [MSC v.1500 64 bit (AMD64)]'
>>> sys.executable
'c:\\Python27\\python.exe'
You can also load specific object directly into your code.
>>> from sys import executable
>>> executable
'c:\\Python27\\python.exe'
To quit the interpreter call the exit()
function.
>>> exit
Use exit() or Ctrl-Z plus Return to exit
The import
binds the word sys to whatever it loaded from the file.
Getting help
- help()
- dir()
- import
>>> help
Type help() for interactive help, or help(object) for help about object.
>>> help() - entering an internal shell:
...
help> dir - explains about the dir command. Navigate using SPACE/ENTER/q
help> Ctrl-D - to quite, (Ctrl-Z ENTER on Windows)
>>> help(dir) - the same explanation as before
>>> dir()
['__builtins__', '__doc__', '__name__', '__package__']
>>> dir("") - list of string related methods
['__add__', '__class__', ... 'upper', 'zfill']
>>> dir(1) - list of integer related methods
['__abs__', '__add__', ... 'numerator', 'real']
>>> dir(__builtins__)
... - functions available in python
>>> help(abs) - exlain how abs() works
>>> help(sum)
>>> help(zip)
>>> help(int)
>>> help(str)
>>> help("".upper) - explain how the upper method of strings work
>>> import sys
>>> dir(sys)
>>> help(sys)
>>> help(sys)
>>> help(sys.path)
>>> help(sys.path.pop)
Exercise: Interactive shell
- Start the REPL and check the examples.
- Check the documentation in the REPL.
Testing Demo
Testing Flask
git clone https://github.com/pallets/flask.git
cd flask
pip install -r requirements/dev.txt
pip install -e .
pytest
How do you test your code?
This mini-series is for people who don't have the time to delve into the way you'd write tests for your Python code, but would like to get a quick overview of the possibilities.
However before we can get into actually testing things, it is worth to think about and even to discuss the following questions.
-
What kind of things do you test?
-
Web application?
-
Command line application?
-
Databases?
-
...
What is testing?
So what do we really mean when we mean testing?
For every piece of code wether its is a small module or a huge application you can have the following equasion.
There some environment the code works in. It might be just the interpreter/compiler in case of a single stand-alone function, or it might include multiple networking elements, servers, databases, ioT deviecs etc. No matter what, the environment is called by the testing people the "Fixture".
Then execute the code - the Application Under Test - and give it some input.
The result should be some "Expected Output".
So this is our equasion.
- Fixture + Input = Expected Output
What is testing really?
In reality, however, many times we don't get exactly the expected output. Instead there is a small (or big) difference. That's the bug.
The goal of (automated) testing is to make it easy and cheap to notice when these bugs creep in.
To put it in other words, when you write your code you can check if the result is as expected either manually or by writing some automated tests. The question, how will you know your piece of code still works half a year from now when someone made some changes to some other part of the code?
Will you repeate all the manual tests you did earlier? You won't have time for that.
On the other hand if you automated your tests in the first place, then you can easily, quickly and cheaply run them again and you can verify if everything still works as earlier or if a bug appeared.
- Fixture + Input = Expected Output + Bugs
Testing demo tools
In these examples we are going to see 3 Python modules that can be used for testing.
- doctest
- unittest
- pytest
Testing demo methodology
We won't delve deep into the capabilities of these testing libraries. We will only us a very simple example to show how to write a passing and a failing test.
- Have a simple AUT - Application Under Test with an obvious bug
- Write a passing test
- Write a failing test
Testing demo - AUT - Application Under Test
Given the following module with a single function, how can we use this function and how can we test it?
def add(x, y):
return x * y
def multiply(x, y):
return x + y
# Yes, I know there are bugs in this code!
You probably noticed that our function was called add
and so the expectation is that it will be able to add two numbers.
However the implementation has a bug. It actually multiplies the two numbers. I know it is a very obvious issue,
but it is great as it allows us to see the mechanics of testing without getting distracted by
a complex implementation and a complex problem.
Rest assured, the mechanism of the testing would be the same even if our function was calculating the moon-landing trajectory.
Testing demo - use the module
Before we start writing an "automated test", let's see how one could test this code "manually". In reality I see this many times, that people write short snippets of code to check if their real code works properly, but they don't turn these small snippets into real tests.
Basically the question is "How can we use the add function of the mymath module?"
The code is straight forward. We import the module. We import the "sys" module to be able to access the command line arguments. We take two arguments from the command line, call the function, and print the result.
Then, if we would like to make sure our code works well, we can compare that result to some expected result.
Based on this everything works fine.
import mymath
import sys
if len(sys.argv) != 3:
exit(f"Usage {sys.argv[0]} NUMBER NUMBER")
a = int(sys.argv[1])
b = int(sys.argv[2])
result = mymath.add(a, b)
print(result)
python use_mymath.py 2 2
4
Testing demo: doctest
- doctest
- $?
- %ERRORLEVEL%
The first way we are going to look at is using the "doctest" module. It is a very nice tool that allows us to test our code and to also verify that our documentation is aligned with the code. In addition to that, doctest is a standard module. It comes with every installation of Python so you don't need to worry about installation.
The big drawback is that it is not really useful for anything complex.
So how does it work?
In Python if you add a string immediately after the declaration of the function - meaning the line immediately after the "def" statement - that string becomes the documentation of the function. It can be a one-line string or a multi-line string using triple-quotes.
In the documentation you can write free text and you can also write examples as if one was using the interactive shell of Python. For these examples we have code snippets preceded with 3 greater-than signs, the prompt of the in Python interactive shell. The line immediately after that contains the result that you'd see if you actually typed in the expression into the interactive shell.
Doctest will read your source code, look at all the functions you have and for each function it will look at the documentation of the function. If in the documentation it sees 3 greater-than signs then it will take the content of that line as code to be executed and the next line will be the expected result. Doctest will execute each code snippet and compare it with the expected results. Effectively checking if the examples in your documentation and the implementation are aligned.
We can run doctest in the following way: python -m doctest mymath.py
. If all the tests pass, then this execution will print nothing.
This lack of positive feedback is a bit strange so you might want to check the so-called "exit code" of the execution. On Unix systems such as Linux and OSX,
you'd inspect the $?
environment variable while on MS Windows you need to inspect the %ERRORLEVEL%
variable. On all of these systems you can use
the echo
command to inspect the variables. In either case 0 indicates success.
def add(x, y):
"""
This function will add two numbers together
>>> add(2, 2)
4
>>>
And here we can have more documentation.
"""
return x * y
def multiply(x, y):
return x + y
# Yes, I know there are bugs in this code!
$ python -m doctest mymath.py
$ echo $?
0
> python -m doctest mymath.py
> echo %ERRORLEVEL%
0
Testing demo: doctest with failure
Of course we know that our code is not perfect (to say the least) so at one point someone will complain about the incorrect results received, for example in case they try to add 3 and 3. Before running and fixing the code however it is better to write a test case with the expected correct result that will fail.
So we added another example to the documentation.
If we run the same command as we did earlier we'll get an extensive output on the screen and the exit code with have some value different from 0.
At this point you'd probably also go and fix the code, but you have also increased the number of tests and eliminated the possibility of this failure to return unnoticed.
def add(x, y):
"""
This function will add two numbers together
>>> add(2, 2)
4
>>> add(3, 3)
6
>>>
And here we can have more documentation.
"""
return x * y
def multiply(x, y):
return x + y
# Yes, I know there are bugs in this code!
{% embed include file="src/examples/testing-demo/doctest_fail/mymath.out)
$ python -m doctest mymath.py
$ echo $?
1
> python -m doctest mymath.py
> echo %ERRORLEVEL%
1
Testing demo: Unittest success
- unittest
- TestCase
- assertEqual
Python comes with a built-in module for writing tests. Its name is unittest
which might be a bit confusing
as this module can be used to any kind of more complex feature-tests and other modules can be also used to write
so called unit-tests.
Unlike the doctests that were part of the actual code, the unittest library calls for separate test files.
It is recommended that the names of files start with the test_
prefix as that will make it easy for the various testing
tools to locate them.
Inside the file you'd need to import both the unittest
module and the module that we are testing. mystest
in this case.
We need a class with a name that starts with Test
and inherits from unittest.TestCase
. In the class we can have one or more
testing functions. Each one starts with a test_
prefix.
Inside the function we can call the function that we are testing and we can compare the result returned by it to some expected value.
We can compare them in various ways using the various assert-methods of the unittest.TestCase. In this example we used the assertEqual
method as we wanted to make sure the actual return value equals the expected value.
We can run the tests using python -m unittest test_one_with_unittest.py
. It will have some output on the screen indicating all the tests
passed. The exit-code will be 0 as expected.
import unittest
import mymath
class TestMath(unittest.TestCase):
def test_math(self):
self.assertEqual(mymath.add(2, 2), 4)
{% embed include file="src/examples/testing-demo/test_one_with_unittest.out)
$ python -m unittest test_one_with_unittest.py
$ echo $?
0
> python -m unittest test_one_with_unittest.py
> echo %ERRORLEVEL%
0
Testing demo: Unittest failure
When we get the report on the incorrect results when adding 3 and 3, we can added another test-case.
We could have added another assertion to the test_math
function or we could have created a separare
class with its own function, but in this case we opted creating a separate test-function.
We won't go into the pros and contras of each strategy now as we are only interested in the basic technique.
If we run the tests now the output will indicate that it ran 2 test-cases and one of them failed. It even shows use some details about the expected value and the actual value that can be really useful understanding the source of the problem.
Note there is also .F
in the output. The dot indicates the test-function that passed, the F indicates
the test-function that failed.
The exit code is again different from 0.
BTW this exit-code is used by the various CI systems to understand the results of the tests.
import unittest
import mymath
class TestMath(unittest.TestCase):
def test_math(self):
self.assertEqual(mymath.add(2, 2), 4)
def test_more_math(self):
self.assertEqual(mymath.add(3, 3), 6)
{% embed include file="src/examples/testing-demo/test_with_unittest.out)
$ python -m unittest test_with_unittest.py
$ echo $?
1
> python -m unittest test_with_unittest.py
> echo %ERRORLEVEL%
1
Testing demo: pytest using classes
- pytest
- assert
In our third example we are going to use the pytest
module. The only drawback of the pytest module is that it does not
come with the installation of Python itself. It is not a huge issue though as you probably install hundreds of other
modules as well.
These days Pytest seems like the most popular testing library for Python.
We'll have several examples using Pytest.
In order to use it you'd create a file with a name that starts with test_
prefix. We need to import the module we are testing
but we don't need to import pytest. Actually we don't even use pytest inside the code. (At least not in the simple use-cases.)
In the file you need to create a class starting with Test
, but this class does not need to inherit from any special class.
In the class we can have one or more test-functions starting with the prefix test_
.
In the function we call the function we are testing and we compare the results to the expected results.
We use the built-in assert
function of Python to check if the results were true.
No need to learn various specialized assert-statements as we had in the unittest
module.
We run the test using the pytest
command.
We'll get some output. Here too the single dot after the name of the test file indicates that there was one successful test function.
The exit-code of this execution in 0 as was the case with unittest.
pip install pytest
import mymath
class TestMath():
def test_math(self):
assert mymath.add(2, 2) == 4
{% embed include file="src/examples/testing-demo/test_with_pytest_class.out)
$ pytest test_with_pytest_class.py
$ echo $?
0
> pytest test_with_pytest_class.py
> echo %ERRORLEVEL%
0
Testing demo: pytest using classes - failure
Here too we can add additional test-functions to the same test-class.
Executing pytest
will print .F
indicating one passing test-function and one failing test function.
We'll get detailed explanation where the failure happened.
The exit-code will be different from 0 helping the CI systems and any other external system to know that the tests have failed.
import mymath
class TestMath():
def test_math(self):
assert mymath.add(2, 2) == 4
def test_more_math(self):
assert mymath.add(3, 3) == 6
{% embed include file="src/examples/testing-demo/test_with_pytest_class_failure.out)
$ pytest test_with_pytest_class_failure.py
$ echo $?
1
> pytest test_with_pytest_class_failure.py
> echo %ERRORLEVEL%
1
Testing demo: pytest without classes
In the previous example we used a test-class to write our tests, but in reality in many cases we don't need the classes. We could just as well write plain test-functions as in this example.
Test-functions without a class around them are easier to write and understand and they are a lot simplert to graps. So unless you really need the features a class can provide I'd recommend you use functions only. After all our test code should be a lot more simple than our application code.
pip install pytest
import mymath
def test_math():
assert mymath.add(2, 2) == 4
{% embed include file="src/examples/testing-demo/test_with_pytest.out)
$ pytest test_with_pytest.py
$ echo $?
0
> pytest test_with_pytest.py
> echo %ERRORLEVEL%
0
Testing demo: pytest without classes failure
import mymath
def test_math():
assert mymath.add(2, 2) == 4
def test_more_math():
assert mymath.add(3, 3) == 6
{% embed include file="src/examples/testing-demo/test_with_pytest_failure.out)
$ pytest test_with_pytest.py
$ echo $?
1
> pytest test_with_pytest.py
> echo %ERRORLEVEL%
1
Testing demo: Failure in one sub
import mymath
def test_math():
assert mymath.add(3, 3) == 6
assert mymath.add(2, 2) == 4
Testing demo: pytest run doctests
The nice thing about pytest
that it can also run all the doctests in your module.
So you can start your testing journey with doctest and later switch to pytest.
You can easily test your examples in your documentation.
$ pytest --doctest-modules mymath.py
Testing demo: pytest run unittest
Pytest can also run the unit-test. You don't even need to tell it anything special. It will introspect the test code and if it notices tests-classes that are based on unittest it will execute them using the unittest module.
$ pytest test_one_with_unittest.py
$ pytest test_with_unittest.py
Test demo: test coverage
pip install pytest-cover
$ pytest test_with_pytest.py --cov mymath --cov-report html --cov-report term
{% embed include file="src/examples/testing-demo/test_with_pytest_cover.out)
Open htmlcov/index.html
Exercise: Testing demo - anagrams
- An anagram is a pair of words that are created from exactly the same set of characters, but of different order.
- For example listen and silent
- Or bad credit and debit card
- Given the following module with the is_anagram function write tests for it. (in a file called test_anagram.py" %}
- Write a failing test as well.
- Try doctest, unittest, and pytest as well.
def is_anagram(a_word, b_word):
return sorted(a_word) == sorted(b_word)
Sample code to use the Anagram module.
from anagram import is_anagram
import sys
if len(sys.argv) != 3:
exit(f"Usage {sys.argv[0]} WORD WORD")
if is_anagram(sys.argv[1], sys.argv[2]):
print("Anagram")
else:
print("NOT")
Exercise: Test previous solutions
- Go back to your solutions to the previous exercises
- Write tests
- If you feel it is hard, maybe you need to change the code to make it more testable.
Solution: Testing demo
from anagram import is_anagram
def test_anagram():
assert is_anagram("silent", "listen")
assert is_anagram("bad credit", "debit card")
def test_not_anagram():
assert not is_anagram("abc", "def")
def test_should_be_anagram_spaces():
assert is_anagram("anagram", "nag a ram")
def test_should_be_anagram_case():
assert is_anagram("Silent", "Listen")
GitHub API
GitHub data
- Users / Organizations
- Repositories
- Commits
- Issues
- Pull-Requests
- ...
GitHub API: REST vs GraphQL
-
REST API
-
Get data in the structure as the API provider though you'll need it.
-
Usually all the data from one table in the database.
-
GraphQL API
-
Have a mapping (edges) between pieces of data that are connected
-
Getting the data you need, nothing more
-
Nested fields
-
Strong typing of the data
-
Rare limits
Where is it used
- Open Source Develeopment Courses
- Open Source by Organizations
- Weekly report
GitHub get organization members
import json
from github_rest_api import get_from_github
orgid = 'github'
data = get_from_github(f"https://api.github.com/orgs/{orgid}/members")
with open("out.json", 'w') as fh:
json.dump(data, fh, indent=4)
print(data)
python examples/github-rest/rest_get_org_members.py
{% embed include file="src/examples/github-graphql/get_org_members.gql)
python examples/github-graphql/run_query_requests.py examples/github-graphql/get_org_members.gql out.json
Details about an orgarnization REST
import json
import sys
from github_rest_api import get_from_github
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} USERNAME")
organization = sys.argv[1]
data = get_from_github(f"https://api.github.com/orgs/{organization}")
with open("out.json", 'w') as fh:
json.dump(data, fh, indent=4)
python examples/github-rest/details-about-org.py github
python examples/github-rest/details-about-org.py kantoniko
python examples/github-rest/details-about-org.py osdc-code-maven
python examples/github-rest/details-about-org.py szabgab error, this is a user
Details about an user REST
import json
import sys
from github_rest_api import get_from_github
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} USERNAME")
username = sys.argv[1]
data = get_from_github(f"https://api.github.com/users/{username}")
with open("out.json", 'w') as fh:
json.dump(data, fh, indent=4)
python examples/github-rest/details-about-org.py szabgab
but these also work:
python examples/github-rest/details-about-org.py github
python examples/github-rest/details-about-org.py kantoniko
python examples/github-rest/details-about-org.py osdc-code-maven
REST - List of repositories by organization (pagination!)
import json
import sys
from github_rest_api import get_from_github
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} USERNAME")
organization = sys.argv[1]
data = get_from_github(f"https://api.github.com/orgs/{organization}/repos", pages=True)
with open("out.json", 'w') as fh:
json.dump(data, fh, indent=4)
python examples/github-rest/repos-of-org.py github
python examples/github-rest/repos-of-org.py kantoniko
python examples/github-rest/repos-of-org.py szabgab error, this is a user
REST - List of reposistories by user (pagination!)
import json
import sys
from github_rest_api import get_from_github
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} USERNAME")
organization = sys.argv[1]
data = get_from_github(f"https://api.github.com/users/{organization}/repos", pages=True)
with open("out.json", 'w') as fh:
json.dump(data, fh, indent=4)
python examples/github-rest/repos-of-user.py szabgab
but these also work:
python examples/github-rest/repos-of-user.py kantoniko
python examples/github-rest/repos-of-user.py osdc-code-maven
GraphQL - List repositories by organization
import datetime
import argparse
import json
import os
import datetime
import sys
import requests
query = '''
query ($organization: String!) {
organization(login: $organization) {
avatarUrl
repositories(first: 2, after: null) {
nodes {
createdAt
url
pushedAt
name
watchers {
totalCount
}
visibility
updatedAt
stargazers {
totalCount
}
}
totalCount
pageInfo {
endCursor
hasNextPage
}
}
}
}
'''
def run_query(query, **variables):
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
#print(query)
url = "https://api.github.com/graphql"
res = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
# print(res.status_code)
if res.status_code == 200:
return res.json()
print(f"Request failed with status_code: {res.status_code}")
print(res.data)
def main():
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} ORGANIZATION")
organization = sys.argv[1]
results = run_query(query, organization=organization)
with open("out.json", "w") as fh:
json.dump(results, fh, indent=4)
main()
GitHub API KEY (PERSONAL TOKEN)
GitHub REST API
pip install requests
GitHub REST API execute query
import requests
import os
def get_from_github(url, expected=0, pages=False):
token = os.environ.get('MY_GITHUB_TOKEN')
if not token:
print('Missing MY_GITHUB_TOKEN. Not collecting data from Github')
return
headers = {
'Accept': 'application/vnd.github+json',
'Authorization': f'Bearer {token}',
'X-GitHub-Api-Version': '2022-11-28',
}
if pages:
per_page = 100 # default is 30 max is 100
page = 1
all_data = []
while True:
real_url = f"{url}?per_page={per_page}&page={page}"
print(f"Fetching from {real_url}")
data = requests.get(real_url, headers=headers).json()
all_data.extend(data)
if expected:
print(f"Received {len(data)} Total {len(all_data)} out of an expected {expected}")
else:
print(f"Received {len(data)} Total {len(all_data)}")
page += 1
if len(data) < per_page:
break
else:
print(f"Fetching from {url}")
all_data = requests.get(url, headers=headers).json()
return all_data
GitHub API GraphQL
-
Scalars (types)
-
String! means the field is string that cannot be null.
pip install requests
GitHub GraphQL explorer
GitHub GraphQL execute query
import sys
import json
import os
import requests
def run_query(query):
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
url = "https://api.github.com/graphql"
res = requests.post(url, json={"query": query}, headers=headers)
# print(res.status_code)
if res.status_code == 200:
return res.json()
print(f"Request failed with status_code: {res.status_code}")
print(res.data)
if __name__ == "__main__":
if 2 <= len(sys.argv) <= 3:
query_filename = sys.argv[1]
if len(sys.argv) == 3:
output_file = sys.argv[2]
else:
output_file = None
else:
exit(f"Usage: {sys.argv[0]} QUERY_FILE [OUTPUT_FILE]")
with open(query_filename) as fh:
query = fh.read()
result = run_query(query)
if output_file:
with open(output_file, 'w') as fh:
json.dump(result, fh, indent=4)
else:
print(result)
GitHub GraphQL execute query async
pip install gql[all]
import sys
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
if 2 <= len(sys.argv) <= 3:
query_filename = sys.argv[1]
if len(sys.argv) == 3:
output_file = sys.argv[2]
else:
output_file = None
else:
exit(f"Usage: {sys.argv[0]} QUERY_FILE [OUTPUT_FILE]")
with open(query_filename) as fh:
query = fh.read()
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
url = "https://api.github.com/graphql"
transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query))
if output_file:
with open(output_file, 'w') as fh:
json.dump(result, fh, indent=4)
else:
print(result)
GitHub GraphQL who am i
- Get the username of who provided the token
query {
viewer {
login
}
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/login.gql out.json
{
"viewer": {
"login": "szabgab"
}
}
GitHub GraphQL list my repositories
query {
viewer {
repositories(first: 30) {
totalCount
pageInfo {
hasNextPage
endCursor
}
edges {
node {
name
}
}
}
}
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/list_my_repositories.gql out.json
{
"viewer": {
"repositories": {
"totalCount": 470,
"pageInfo": {
"hasNextPage": true,
"endCursor": "Y3Vyc29yOnYyOpHOACAlgw=="
},
"edges": [
{
"node": {
"name": "whitecamel.org"
}
},
{
"node": {
"name": "perl6-in-perl5"
}
},
{
"node": {
"name": "test-snapshots"
}
},
{
"node": {
"name": "padre-plugin-debugger"
}
},
{
"node": {
"name": "Math-RPN"
}
},
{
"node": {
"name": "perl6-conf"
}
},
{
"node": {
"name": "the-driver"
}
},
{
"node": {
"name": "Rehovot.pm"
}
},
{
"node": {
"name": "CPAN-Forum"
}
},
{
"node": {
"name": "test-runner"
}
},
{
"node": {
"name": "test-class"
}
},
{
"node": {
"name": "perl-android-scripts"
}
},
{
"node": {
"name": "perl-promotion"
}
},
{
"node": {
"name": "prestool"
}
},
{
"node": {
"name": "pdf-create"
}
},
{
"node": {
"name": "pdf6"
}
},
{
"node": {
"name": "try.rakudo.org"
}
},
{
"node": {
"name": "CPAN-Digger-old"
}
},
{
"node": {
"name": "peg"
}
},
{
"node": {
"name": "Hypolit"
}
},
{
"node": {
"name": "topposters"
}
},
{
"node": {
"name": "Bailador"
}
},
{
"node": {
"name": "git_experiments"
}
},
{
"node": {
"name": "Code-Explain"
}
},
{
"node": {
"name": "Code-Explain-Web"
}
},
{
"node": {
"name": "CGI--Simple"
}
},
{
"node": {
"name": "Prima"
}
},
{
"node": {
"name": "Test-Version"
}
},
{
"node": {
"name": "dwimmer"
}
},
{
"node": {
"name": "Text-Trac"
}
}
]
}
}
}
GitHub GraphQL list of repositories by username
query {
repositoryOwner(login: "cm-demo") {
repositories(first: 5, privacy: PUBLIC) {
totalCount
edges {
node {
id,
name,
isPrivate,
description
}
}
}
}
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/list_repositories_by_username.gql out.json
{
"repositoryOwner": {
"repositories": {
"totalCount": 5,
"edges": [
{
"node": {
"id": "R_kgDOGSKE7A",
"name": "cm-demo",
"isPrivate": false,
"description": "Config files for my GitHub profile."
}
},
{
"node": {
"id": "R_kgDOIx8BIw",
"name": "cm-demo.github.io-osdc-2023-01-public",
"isPrivate": false,
"description": null
}
},
{
"node": {
"id": "R_kgDOI4Gftw",
"name": "cm-demo.github.io-osdc-2023-01-perl",
"isPrivate": false,
"description": null
}
},
{
"node": {
"id": "R_kgDOJNSvyA",
"name": "cm-demo.github.io-osdc-2023-03-azrieli-",
"isPrivate": false,
"description": null
}
},
{
"node": {
"id": "R_kgDOJWTJHw",
"name": "osdc-2023-03-azrieli",
"isPrivate": false,
"description": "OSDC at Azriel College starting in 2023.03"
}
}
]
}
}
}
GitHub GraphQL list issues by username
query {
user(login: "szabgab") {
issues(first: 10, filterBy: {since: "2023-03-20T00:00:00Z"}) {
totalCount
edges {
node {
number,
title,
state,
createdAt,
url,
repository {
owner {
login
}
}
}
}
}
}
}
python examples/github-graphql/run_query_requests.py examples/github-graphql/list_issues_by_username.gql put.json
{
"user": {
"issues": {
"totalCount": 50,
"edges": [
{
"node": {
"number": 8,
"title": "Check if package has link to Issues?",
"state": "CLOSED",
"createdAt": "2020-11-02T19:06:04Z",
"url": "https://github.com/szabgab/CPAN-Digger/issues/8",
"repository": {
"owner": {
"login": "szabgab"
}
}
}
},
{
"node": {
"number": 9,
"title": "Check if meta data contains the license field?",
"state": "CLOSED",
"createdAt": "2020-11-02T19:06:28Z",
"url": "https://github.com/szabgab/CPAN-Digger/issues/9",
"repository": {
"owner": {
"login": "szabgab"
}
}
}
},
{
"node": {
"number": 6051,
"title": "Hint how to unlock exercises",
"state": "OPEN",
"createdAt": "2021-10-21T11:43:51Z",
"url": "https://github.com/exercism/exercism/issues/6051",
"repository": {
"owner": {
"login": "exercism"
}
}
}
},
{
"node": {
"number": 20,
"title": "Add CPANcover data",
"state": "CLOSED",
"createdAt": "2022-12-06T04:46:24Z",
"url": "https://github.com/szabgab/CPAN-Digger/issues/20",
"repository": {
"owner": {
"login": "szabgab"
}
}
}
},
{
"node": {
"number": 1,
"title": "Misunderstood .gitignore?",
"state": "CLOSED",
"createdAt": "2022-12-30T05:02:13Z",
"url": "https://github.com/x-lamprocapnos-x/Movie-Selector/issues/1",
"repository": {
"owner": {
"login": "x-lamprocapnos-x"
}
}
}
},
{
"node": {
"number": 3,
"title": "Verify project URLs in the individual json files",
"state": "CLOSED",
"createdAt": "2023-02-08T12:59:29Z",
"url": "https://github.com/OSDC-Code-Maven/osdc-site-generator/issues/3",
"repository": {
"owner": {
"login": "OSDC-Code-Maven"
}
}
}
},
{
"node": {
"number": 1,
"title": "The __pycache__ folder should not be in git",
"state": "CLOSED",
"createdAt": "2023-02-12T14:38:30Z",
"url": "https://github.com/zguillez/python-toolz/issues/1",
"repository": {
"owner": {
"login": "zguillez"
}
}
}
},
{
"node": {
"number": 1,
"title": "Move all the data from the other 3 repositories",
"state": "OPEN",
"createdAt": "2023-03-05T07:44:06Z",
"url": "https://github.com/OSDC-Code-Maven/open-source-by-organizations/issues/1",
"repository": {
"owner": {
"login": "OSDC-Code-Maven"
}
}
}
},
{
"node": {
"number": 1823,
"title": "Flake error B031 caused by new release flake8-bugbear",
"state": "CLOSED",
"createdAt": "2023-03-10T12:04:24Z",
"url": "https://github.com/pallets/jinja/issues/1823",
"repository": {
"owner": {
"login": "pallets"
}
}
}
},
{
"node": {
"number": 6378,
"title": "How to setup local dev environment and run the tests?",
"state": "OPEN",
"createdAt": "2023-03-11T17:22:01Z",
"url": "https://github.com/psf/requests/issues/6378",
"repository": {
"owner": {
"login": "psf"
}
}
}
}
]
}
}
}
GitHub GraphQL list issues using parameter
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
url = "https://api.github.com/graphql"
query = '''
query($since:DateTime) {
user(login: "szabgab") {
issues(first: 1, filterBy: {since: $since}) {
totalCount
edges {
node {
number, title, state, createdAt, url, repository {
owner {
login
}
}
}
}
}
}
}
'''
#variables = {
# "since": "2023-04-10T00:00:00Z"
#}
ts = datetime.datetime.now() - datetime.timedelta(days = 10)
variables = {
"since": ts.strftime("%Y-%m-%dT%H:%M:%SZ")
}
transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)
print(result)
GitHub GraphQL list issues using several parameters
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
import sys
if len(sys.argv) == 2:
output_file = sys.argv[1]
else:
output_file = None
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
url = "https://api.github.com/graphql"
query = '''
query($since:DateTime, $first:Int, $user:String!) {
user(login: $user) {
issues(first: $first, filterBy: {since: $since}) {
totalCount
edges {
node {
number, title, state, createdAt, url, repository {
owner {
login
}
}
}
}
}
}
}
'''
ts = datetime.datetime.now() - datetime.timedelta(days = 20)
variables = {
"user": "szabgab",
"since": ts.strftime("%Y-%m-%dT%H:%M:%SZ"),
"first": 30,
}
transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)
if output_file:
with open(output_file, 'w') as fh:
json.dump(result, fh, indent=4)
else:
print(result)
GitHub GraphQL contribution counts
query($username:String!) {
user(login: $username) {
contributionsCollection {
contributionCalendar {
totalContributions
weeks {
contributionDays {
contributionCount
weekday
date
}
}
}
}
}
}
{
"username": "szabgab"
}
- Defaults to the last 1 year
query($username:String!, $from:DateTime, $to:DateTime) {
user(login: $username) {
contributionsCollection(from: $from, to: $to) {
contributionCalendar {
totalContributions
weeks {
contributionDays {
contributionCount
weekday
date
}
}
}
}
}
}
{
"username": "szabgab",
"from": "2013-03-20T00:00:00Z",
"to": "2013-04-20T00:00:00Z"
}
- Can set the start-date (defaults to now - 1 year)
- Can set the end-date (defaults to start-date + 1 year)
GitHub GraphQL list Pull-Requests
- List all the PRs created by a user in a time-range
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
import sys
if len(sys.argv) == 2:
output_file = sys.argv[1]
else:
output_file = None
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
url = "https://api.github.com/graphql"
query = '''
query($username:String!, $last:Int) {
user(login: $username) {
pullRequests(last: $last) {
totalCount
edges {
node {
number, title, state, createdAt, author { login }, url
}
}
}
}
}
'''
ts = datetime.datetime.now() - datetime.timedelta(days = 20)
variables = {
"username": "szabgab",
"last": 30,
}
transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)
if output_file:
with open(output_file, 'w') as fh:
json.dump(result, fh, indent=4)
else:
print(result)
import json
import os
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
import datetime
import sys
if len(sys.argv) == 2:
output_file = sys.argv[1]
else:
output_file = None
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
url = "https://api.github.com/graphql"
query = '''
query($username:String!, $from:DateTime, $to:DateTime, $first:Int) {
user(login: $username) {
contributionsCollection(from: $from, to: $to) {
pullRequestContributions(first: $first) {
nodes {
pullRequest {
title, url, createdAt, state, repository { name }
}
}
}
}
}
}
'''
ts = datetime.datetime.now() - datetime.timedelta(days = 20)
variables = {
"username": "szabgab",
"first": 30,
"from": "2013-04-20T00:00:00Z",
"to": "2014-04-20T00:00:00Z"
}
transport = AIOHTTPTransport(url=url, headers=headers)
client = Client(transport=transport, fetch_schema_from_transport=True)
result = client.execute(gql(query), variable_values=variables)
if output_file:
with open(output_file, 'w') as fh:
json.dump(result, fh, indent=4)
else:
print(result)
GitHub GraphSQL paging using cursor
- cursor
import datetime
import argparse
import json
import os
import datetime
import sys
import requests
query = '''
query($after:String) {
viewer {
repositories(first: 100, after: $after, privacy: PUBLIC) {
pageInfo {
hasNextPage
endCursor
}
nodes {
name
releases(last:1) {
totalCount
nodes {
name
publishedAt
url
}
}
}
}
}
}
'''
def run_query(query, **variables):
token = os.environ.get('MY_GITHUB_TOKEN')
headers = {
'Authorization': f'Bearer {token}',
}
#print(query)
url = "https://api.github.com/graphql"
res = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
# print(res.status_code)
if res.status_code == 200:
return res.json()
print(f"Request failed with status_code: {res.status_code}")
print(res.data)
def run_query_all(query):
cursor = None
nodes = []
while True:
results = run_query(query, after=cursor)
# print(results)
# print("------")
nodes.extend(results['data']['viewer']['repositories']['nodes'])
if not results['data']['viewer']['repositories']['pageInfo']['hasNextPage']:
break
cursor = results['data']['viewer']['repositories']['pageInfo']['endCursor']
return nodes
def main():
#args = get_args()
today = datetime.date.today()
#print(today)
#print(today.weekday())
#now = datetime.datetime.now()
#print(now)
end_ts = today - datetime.timedelta(days=today.weekday())
start_ts = end_ts - datetime.timedelta(days=7)
#print(end_ts)
#print(start_ts)
#username = "szabgab"
#results = get_data(usernamem start_ts, end_ts)
results = run_query_all(query)
with open("out.json", "w") as fh:
json.dump(results, fh, indent=4)
main()
GitHub GraphQL activities
-
List all the activities of a user in a time-range
-
All the issues opened / commented on / closed
-
All the commits
-
All the activities of a list of users in a time-range
-
Get a list of projects written in python, that have between 2-5 stars and were updated in the last 5 weeks.
-
Given a repository list all the changes that are were done in all the forks.
Types in Python
mypy
-
mypy
pip install mypy
Changing types
Even without any additional work, running mypy on an existing code-base can reveal locations that might need fixing.
For example it can point out places where the content of a variable changes type. Python accepts this, and in some places this type of flexibility might have advantages, but it can also lead to confusion for the maintainer of this code.
x = 23
print(x)
x = "Python"
print(x)
x = ["a", "b"]
print(x)
python simple.py
works without complaining.
mypy simple.py
reports the following:
simple.py:5: error: Incompatible types in assignment (expression has type "str", variable has type "int")
simple.py:8: error: Incompatible types in assignment (expression has type "List[str]", variable has type "int")
Found 2 errors in 1 file (checked 1 source file)
Changing types when reading a number
A quite common case in the real-world when you read in something that is supposed to be a number. In terms of the Python type-system the input is always a string. Even if it looks like a number. We then need to convert it to int() or to float() to use them as such.
People will often reuse the same variable to first hold the string and then the number. This is ok with Python, but might be confusingt to the reader.
num = input("type in an integer: ")
print(num)
print(type(num).__name__) # str
num = int(num)
print(num)
print(type(num).__name__) # int
mypy input.py
will print the following:
input.py:6: error: Incompatible types in assignment (expression has type "int", variable has type "str")
Found 1 error in 1 file (checked 1 source file)
Types of variables
x :int = 0
x = 2
print(x)
x = "hello"
print(x)
python variables.py
2
hello
mypy variables.py
variables.py:7: error: Incompatible types in assignment (expression has type "str", variable has type "int")
Found 1 error in 1 file (checked 1 source file)
Types of function parameters
def add(a :int, b :int) -> int:
return a+b
print(add(2, 3))
print(add("Foo", "Bar"))
5
FooBar
function.py:6: error: Argument 1 to "add" has incompatible type "str"; expected "int"
function.py:6: error: Argument 2 to "add" has incompatible type "str"; expected "int"
Found 2 errors in 1 file (checked 1 source file)
Types function returns None or bool
-> bool means the function returns a boolean. Either True or False.
-> None means the function returns None. Explicitely, or implicitely.
def f() -> bool:
return True
def g() -> None:
return True
def h() -> None:
return None
def x() -> None:
return
def z() -> None:
pass
function_bool.py:5: error: No return value expected
Found 1 error in 1 file (checked 1 source file)
Types used properly
def add(a :int, b :int) -> int:
return a+b
print(add(2, 3))
x :int = 0
x = 2
print(x)
5
2
Success: no issues found in 1 source file
TODO: mypy
- Complex data structures?
- My types/classes?
- Allow None (or not) for a variable.
from typing import Generator
def numbers(n: int) -> Generator[int, None, None]:
return ( x for x in range(n))
print(list(numbers(10)))
from typing import List
def numbers(n: int) -> List[int]:
return list(range(n))
print(numbers(10))
Logging
Simple logging
- logging
- basicConfig
import logging
logging.debug("debug")
logging.info("info")
logging.warning("warning")
logging.error("error")
logging.critical("critical")
logging.log(logging.WARNING, "another warning")
logging.log(40, "another error")
WARNING:root:warning
ERROR:root:error
CRITICAL:root:critical
WARNING:root:another warning
ERROR:root:another error
- Written on STDERR
Simple logging - set level
import logging
logging.basicConfig(level = logging.INFO)
logging.debug("debug")
logging.info("info")
logging.warning("warning")
logging.error("error")
logging.critical("critical")
INFO:root:info
WARNING:root:warning
ERROR:root:error
CRITICAL:root:critical
Simple logging to a file
import logging
import time
logging.basicConfig(level = logging.INFO, filename = time.strftime("my-%Y-%m-%d.log"))
logging.debug("debug")
logging.info("info")
logging.warning("warning")
logging.error("error")
logging.critical("critical")
Simple logging format
import logging
logging.basicConfig( format = '%(asctime)s %(levelname)-10s %(processName)s %(name)s %(message)s')
logging.debug("debug")
logging.info("info")
logging.warning("warning")
logging.error("error")
logging.critical("critical")
Simple logging change date format
import logging
logging.basicConfig( format = '%(asctime)s %(levelname)-10s %(processName)s %(name)s %(message)s', datefmt = "%Y-%m-%d-%H-%M-%S")
logging.debug("debug")
logging.info("info")
logging.warning("warning")
logging.error("error")
logging.critical("critical")
2020-04-22-18-59-16 WARNING MainProcess root warning
2020-04-22-18-59-16 ERROR MainProcess root error
2020-04-22-18-59-16 CRITICAL MainProcess root critical
getLogger
- getLogger
- FileHandler
- StreamHandler
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler('my.log')
fh.setLevel(logging.INFO)
fh.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') )
logger.addHandler(fh)
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-10s - %(message)s'))
logger.addHandler(sh)
log = logging.getLogger(__name__)
log.debug("debug")
log.info("info")
log.warning("warning")
log.error("error")
log.critical("critical")
Time-based logrotation
- TimedRotatingFileHandler
import logging
import logging.handlers
log_file = "my.log"
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
ch = logging.handlers.TimedRotatingFileHandler(log_file, when='M', backupCount=2)
ch.setLevel(logging.INFO)
ch.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') )
logger.addHandler(ch)
log = logging.getLogger(__name__)
log.debug("debug")
log.info("info")
log.warning("warning")
log.error("error")
log.critical("critical")
- S - seconds
- M - minutes
- H - hours
- D - days
- docs
Size-based logrotation
import logging
import logging.handlers
log_file = "my.log"
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
ch = logging.handlers.RotatingFileHandler(log_file, maxBytes=100, backupCount=2)
ch.setLevel(logging.INFO)
ch.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)-10s - %(message)s') )
logger.addHandler(ch)
log = logging.getLogger(__name__)
log.debug("debug")
log.info("info")
log.warning("warning")
log.error("error")
log.critical("critical")
Closures
Counter local - not working
def counter():
count = 0
count += 1
return count
print(counter())
print(counter())
print(counter())
1
1
1
Counter with global
- global
count = 0
def counter():
global count
count += 1
return count
print(counter())
print(counter())
print(counter())
count = -42
print(counter())
1
2
3
-41
Create incrementors
In order to use in various map-expressions, we need a couple of functions that - for simplicity - need to increment a number:
def f3(x):
return x + 3
def f7(x):
return x + 7
def f23(x):
return x + 23
print(f3(2))
print(f7(3))
print(f3(4))
print(f7(10))
print(f23(19))
5
10
7
17
42
Create internal function
def create_func():
def internal():
print("Hello world")
internal()
func = create_func()
internal()
Hello world
Traceback (most recent call last):
File "create_internal_func.py", line 8, in <module>
internal()
NameError: name 'internal' is not defined
Create function by a function
def create_func():
def internal():
print("Hello world")
#internal()
return internal
func = create_func()
#internal()
func()
Hello world
Create function with parameters
def create_func(name):
def internal():
print(f"Hello {name}")
return internal
foo = create_func("Foo")
foo()
bar = create_func("Bar")
bar()
Hello Foo
Hello Bar
Counter closure
- nonlocal
def create_counter():
count = 0
def internal():
nonlocal count
count += 1
return count
return internal
counter = create_counter()
print(counter())
print(counter())
print(counter())
print()
other = create_counter()
print(counter())
print(other())
print(counter())
print(other())
print()
print(count)
1
2
3
4
1
5
2
Traceback (most recent call last):
File "counter.py", line 23, in <module>
print(count)
NameError: name 'count' is not defined
Make incrementor with def (closure)
- closure
def make_incrementor(n):
def inc(x):
return x + n
return inc
f3 = make_incrementor(3)
f7 = make_incrementor(7)
print(f3(2))
print(f7(3))
print(f3(4))
print(f7(10))
5
10
7
17
Make incrementor with lambda
def make_incrementor(n):
return lambda x: x + n
f3 = make_incrementor(3)
f7 = make_incrementor(7)
print(f3(2))
print(f7(3))
print(f3(4))
print(f7(10))
5
10
7
17
Exercise: closure bank
- Create a closure that returns a function that holds a number (like a bank account) that can be incremented or decremented as follows:
- Allow for an extra paramter called
prev
that defaults toFalse
. IfTrue
is passed then instead of returning the new balance, return the old balance.
bank = create_bank(20)
print(bank()) # 20
print(bank(7)) # 27
print(bank()) # 27
print(bank(-3)) # 24
print(bank()) # 24
print(bank(10, prev=True)) # 24
print(bank()) # 34
Exercise: counter with parameter
Change the counter example to accept a parameter and start counting from that number.
Solution: closure bank
def create_bank(n = 0):
balance = n
def bnk(change = 0, prev=False):
nonlocal balance
prev_balance = balance
balance += change
if prev:
return prev_balance
else:
return balance
return bnk
bank = create_bank(20)
print(bank()) # 20
print(bank(7)) # 27
print(bank()) # 27
print(bank(-3)) # 24
print(bank()) # 24
print(bank(10, prev=True)) # 24
print(bank()) # 34
20
27
27
24
24
24
34
Solution: counter with parameter
def create_counter(count=0):
def internal():
nonlocal count
count += 1
return count
return internal
counter = create_counter()
print(counter())
print(counter())
print(counter())
print()
other = create_counter(42)
print(counter())
print(other())
print(counter())
print(other())
1
2
3
4
43
5
44
Decorators
Decorators: simple example
- A decorators is that @something just before the declaration of the function.
- Decorators can modify the behavior of functions or can set some meta information about them.
@some_decorator
def some_function():
pass
Decorators - Flask
- In Flask we use decorators to designate function as "routes".
from flask import Flask
app = Flask(__name__)
@app.route("/")
def main():
return "Hello World!"
@app.route("/login")
def login():
return "Showing the login page ..."
FLASK_APP=flask_app flask run
Decorators - Pytest
- In Pytest we can use decorators to add special marks to test functions
- ... or to mark them as fixtures.
import sys
import pytest
@pytest.mark.skipif(sys.platform != 'linux', reason="Linux tests")
def test_linux():
assert True
@pytest.mark.skip(reason="To show we can skip tests without any condition.")
def test_any():
assert True
@pytest.fixture(autouse = True, scope="module")
def module_demo():
print(f"Fixture")
pytest -v
Decorators caching - no cache
- Each call will execute the function and do the (expensive) computation.
def compute(x, y):
print(f"Called with {x} and {y}")
# some long computation here
return x+y
print(compute(2, 3))
print(compute(3, 4))
print(compute(2, 3))
Called with 2 and 3
5
Called with 3 and 4
7
Called with 2 and 3
5
Decorators caching - with cache
-
cache
-
lru_cache
-
By adding the lru_cache decorator we can tell Python to cache the result and save on computation time.
import functools
@functools.lru_cache()
def compute(x, y):
print(f"Called with {x} and {y}")
# some long computation here
return x+y
print(compute(2, 3))
print(compute(3, 4))
print(compute(2, 3))
Called with 2 and 3
5
Called with 3 and 4
7
5
LRU - Least recently used cache
- LRU - Cache replacement policy
- When we call the function with (1, 5) it removes the least recently used results of (1, 2)
- So next time it has to be computed again.
import functools
@functools.lru_cache(maxsize=3)
def compute(x, y):
print(f"Called with {x} and {y}")
# some long computation here
return x+y
compute(1, 2) # Called with 1 and 2
compute(1, 2)
compute(1, 2)
compute(1, 3) # Called with 1 and 3
compute(1, 3)
compute(1, 4) # Called with 1 and 4
compute(1, 4)
compute(1, 5) # Called with 1 and 5
compute(1, 2) # Called with 1 and 2
compute(1, 2)
LRU - Least recently used cache
- Here we called (1, 2) after (1, 4) when it was still in the cache
- When we called (1, 5) it removed the LRU pair, but it was NOT the (1, 2) pair
- So it was in the cache even after the (1, 5) call.
import functools
@functools.lru_cache(maxsize=3)
def compute(x, y):
print(f"Called with {x} and {y}")
# some long computation here
return x+y
compute(1, 2) # Called with 1 and 2
compute(1, 2)
compute(1, 2)
compute(1, 3) # Called with 1 and 3
compute(1, 3)
compute(1, 4) # Called with 1 and 4
compute(1, 4)
compute(1, 2)
compute(1, 5) # Called with 1 and 5
compute(1, 2)
OOP - classmethod - staticmethod
class Person(object):
def __init__(self, name):
print(f"init: '{self}' '{self.__class__.__name__}'")
self.name = name
def show_name(self):
print(f"instance method: '{self}' '{self.__class__.__name__}'")
@classmethod
def from_occupation(cls, occupation):
print(f"class method '{cls}' '{cls.__class__.__name__}'")
@staticmethod
def is_valid_occupation(param):
print(f"static method '{param}' '{param.__class__.__name__}'")
fb = Person('Foo Bar')
fb.show_name()
fb.from_occupation('Tailor')
Person.from_occupation('Tailor') # This is how we should call it.
fb.is_valid_occupation('Tailor')
Person.is_valid_occupation('Tailor')
init: '<__main__.Person object at 0x7fb008f3a640>' 'Person'
instance method: '<__main__.Person object at 0x7fb008f3a640>' 'Person'
class method '<class '__main__.Person'>' 'type'
class method '<class '__main__.Person'>' 'type'
static method 'Tailor' 'str'
static method 'Tailor' 'str'
Use cases for decorators in Python
-
classmethod
-
staticmethod
-
pytest
-
Common decorators are @classmethod and @staticmethod.
-
Flask uses them to mark and configure the routes.
-
Pytest uses them to add marks to the tests.
-
Logging calls with parameters.
-
Logging elapsed time of calls.
-
Access control in Django or other web frameworks. (e.g. login required)
-
Memoization (caching)
-
Retry
-
Function timeout
-
Locking for thread safety
Function assignment
Before we learn about decorators let's remember that we can assign function names to other names and then use the new name:
def hello(name):
print(f"Hello {name}")
hello("Python")
print(hello)
greet = hello
greet("Python")
print(greet)
Hello Python
<function hello at 0x7f8aee3401f0>
Hello Python
<function hello at 0x7f8aee3401f0>
Function assignment - alias print to say
say = print
say("Hello World")
Function assignment - don't do this
numbers = [2, 4, 3, 1, 1, 1]
print(sum(numbers)) # 12
print(max(numbers)) # 4
sum = max
print(sum(numbers)) # 4
print(max(numbers)) # 4
sum = lambda values: len(values)
print(sum(numbers)) # 6
Passing functions as parameters
def call(func):
return func(42)
def double(val):
print(2*val)
call(double) # 84
call(lambda x: print(x // 2)) # 21
Traversing directory tree
import sys
import os
def walker(path, todo):
if os.path.isdir(path):
items = os.listdir(path)
for item in items:
walker(os.path.join(path, item), todo)
else:
todo(path)
def print_size(name):
print(f"{os.stat(name).st_size:6} {name} ")
if __name__ == '__main__':
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} PATH")
walker(sys.argv[1], print)
#walker(sys.argv[1], print_size)
#walker(sys.argv[1], lambda name: print(f"{os.stat(name).st_size:6} {name[::-1]} "))
Declaring Functions inside other function
Let's also remember that we can define a function inside another function and then the internally defined function only exists in the scope of the function where it was defined in. Not outside.
def f():
def g():
print("in g")
print("start f")
g()
print("end f")
f()
g()
start f
in g
end f
Traceback (most recent call last):
File "examples/decorators/function_in_function.py", line 9, in <module>
g()
NameError: name 'g' is not defined
Returning a new function from a function
def create_function():
print("creating a function")
def internal():
print("This is the generated function")
print("creation done")
return internal
func = create_function()
func()
creating a function
creation done
This is the generated function
Returning a closure
def create_incrementer(num):
def inc(val):
return num + val
return inc
inc_5 = create_incrementer(5)
print(inc_5(10)) # 15
print(inc_5(0)) # 5
inc_7 = create_incrementer(7)
print(inc_7(10)) # 17
print(inc_7(0)) # 7
Decorator
-
@
-
A function that changes the behaviour of other functions.
-
The input of a decorator is a function.
-
The returned value of a decorator is a modified version of the same function.
from some_module import some_decorator
@some_decorator
def f(...):
...
def f(...):
...
f = some_decorator(f)
Decorator Demo
- Just a simple example created step-by-step
import time
def replace(func):
def new_func():
print("start new")
start = time.time()
func()
end = time.time()
print(f"end new {end-start}")
return new_func
@replace
def f():
time.sleep(1)
print("in f")
f()
Decorator to register function
- Pytest, Flask probably do this
functions = []
def register(func):
global functions
functions.append(func.__name__)
return func
@register
def f():
print("in f")
print(functions)
A recursive Fibonacci
def fibo(n):
if n in (1,2):
return 1
return fibo(n-1) + fibo(n-2)
print(fibo(5)) # 5
trace fibo
import decor
@decor.tron
def fibo(n):
if n in (1,2):
return 1
return fibo(n-1) + fibo(n-2)
print(fibo(5))
Calling fibo(5)
Calling fibo(4)
Calling fibo(3)
Calling fibo(2)
Calling fibo(1)
Calling fibo(2)
Calling fibo(3)
Calling fibo(2)
Calling fibo(1)
5
tron decorator
def tron(func):
def new_func(v):
print(f"Calling {func.__name__}({v})")
return func(v)
return new_func
Decorate with direct call
import decor
def fibo(n):
if n in (1,2):
return 1
return fibo(n-1) + fibo(n-2)
fibo = decor.tron(fibo)
print(fibo(5))
Decorate with parameter
import decor_param
@decor_param.tron('foo')
def fibo(n):
if n in (1,2):
return 1
return fibo(n-1) + fibo(n-2)
print(fibo(5))
foo Calling fibo(5)
foo Calling fibo(4)
foo Calling fibo(3)
foo Calling fibo(2)
foo Calling fibo(1)
foo Calling fibo(2)
foo Calling fibo(3)
foo Calling fibo(2)
foo Calling fibo(1)
5
Decorator accepting parameter
def tron(prefix):
def real_tron(func):
def new_func(v):
print("{} Calling {}({})".format(prefix, func.__name__, v))
return func(v)
return new_func
return real_tron
Decorate function with any signature
- How can we decorate a function that is flexible on the number of arguments?
- Accept
*args
and**kwargs
and pass them on.
from decor_any import tron
@tron
def one(param):
print(f"one({param})")
@tron
def two(first, second = 42):
print(f"two({first}, {second})")
one("hello")
one(param = "world")
two("hi")
two(first = "Foo", second = "Bar")
Decorate function with any signature - implementation
def tron(func):
def new_func(*args, **kw):
params = list(map(lambda p: str(p), args))
for (k, v) in kw.items():
params.append(f"{k}={v}")
print("Calling {}({})".format(func.__name__, ', '.join(params)))
return func(*args, **kw)
return new_func
Calling one(hello)
one(hello)
Calling one(param=world)
one(world)
Calling two(hi)
two(hi, 42)
Calling two(first=Foo, second=Bar)
two(Foo, Bar)
Decorate function with any signature - skeleton
def decorator(func):
def wrapper(*args, **kw):
return func(*args, **kw)
return wrapper
@decorator
def zero():
print("zero")
@decorator
def one(x):
print(f"one({x})")
@decorator
def two(x, y):
print(f"two({x, y})")
zero()
one('hello')
two( y = 7, x = 8 )
print(zero)
print(one)
print(two)
print(zero.__name__)
print(one.__name__)
print(two.__name__)
zero
one(hello)
two((8, 7))
<function decorator.<locals>.wrapper at 0x7f1165258a60>
<function decorator.<locals>.wrapper at 0x7f1165258b80>
<function decorator.<locals>.wrapper at 0x7f1165258ca0>
Decorate function with any signature - skeleton with name
import functools
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kw):
return func(*args, **kw)
return wrapper
@decorator
def zero():
print("zero")
@decorator
def one(x):
print(f"one({x})")
@decorator
def two(x, y):
print(f"two({x, y})")
zero()
one('hello')
two( y = 7, x = 8 )
print(zero)
print(one)
print(two)
print(zero.__name__)
print(one.__name__)
print(two.__name__)
zero
one(hello)
two((8, 7))
<function zero at 0x7f9079bdca60>
<function one at 0x7f9079bdcb80>
<function two at 0x7f9079bdcca0>
Functool - partial
- partial
from functools import partial
val = '101010'
print(int(val, base=2))
basetwo = partial(int, base=2)
basetwo.__doc__ = 'Convert base 2 string to an int.'
print(basetwo(val))
# Based on example from https://docs.python.org/3/library/functools.html
Exercise: Logger decorator
- In the previous pages we created a decorator that can decorate arbitrary function logging the call and its parameters.
- Add time measurement to each call to see how long each function took.
Exercise: decorators decorator
Write a function that gets a functions as attribute and returns a new functions while memoizing (caching) the input/output pairs. Then write a unit test that checks it. You probably will need to create a subroutine to be decoratorsd.
- Write tests for the fibonacci functions.
- Implement the decorators decorator for a function with a single parameter.
- Apply the decorator.
- Run the tests again.
- Check the speed differences.
- or decorate with tron to see the calls...
Solution: Logger decorator
import time
def tron(func):
def new_func(*args, **kwargs):
start = time.time()
print("Calling {}({}, {})".format(func.__name__, args, kwargs))
out = func(*args, **kwargs)
end = time.time()
print("Finished {}({})".format(func.__name__, out))
print("Elapsed time: {}".format(end - start))
return out
return new_func
Solution: Logger decorator (testing)
from logger_decor import tron
@tron
def f(a, b=1, *args, **kwargs):
print('a: ', a)
print('b: ', b)
print('args: ', args)
print('kwargs:', kwargs)
return a + b
f(2, 3, 4, 5, c=6, d=7)
print()
f(2, c=5, d=6)
print()
f(10)
Calling f((2, 3, 4, 5), {'c': 6, 'd': 7})
a: 2
b: 3
args: (4, 5)
kwargs: {'c': 6, 'd': 7}
Finished f(5)
Elapsed time: 1.3589859008789062e-05
Calling f((2,), {'c': 5, 'd': 6})
a: 2
b: 1
args: ()
kwargs: {'c': 5, 'd': 6}
Finished f(3)
Elapsed time: 5.245208740234375e-06
Calling f((10,), {})
a: 10
b: 1
args: ()
kwargs: {}
Finished f(11)
Elapsed time: 4.291534423828125e-06
Solution decorators decorator
import sys
import memoize_attribute
import memoize_nonlocal
import decor_any
#@memoize_attribute.memoize
#@memoize_nonlocal.memoize
#@decor_any.tron
def fibonacci(n):
if n == 1:
return 1
if n == 2:
return 1
return fibonacci(n-1) + fibonacci(n-2)
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write("Usage: {} N\n".format(sys.argv[0]))
exit(1)
print(fibonacci(int(sys.argv[1])))
def memoize(f):
data = {}
def caching(n):
nonlocal data
key = n
if key not in data:
data[key] = f(n)
return data[key]
return caching
def memoize(f):
def caching(n):
key = n
#if 'data' not in caching.__dict__:
# caching.data = {}
if key not in caching.data:
caching.data[key] = f(n)
return caching.data[key]
caching.data = {}
return caching
Before
$ time python fibonacci.py 35
9227465
real 0m3.850s
user 0m3.832s
sys 0m0.015s
After
$ time python fibonacci.py 35
9227465
real 0m0.034s
user 0m0.019s
sys 0m0.014s
A list of functions
def hello(name):
print(f"Hello {name}")
def morning(name):
print(f"Good morning {name}")
hello("Jane")
morning("Jane")
print()
funcs = [hello, morning]
funcs[0]("Peter")
print()
for func in funcs:
func("Mary")
Hello Jane
Good morning Jane
Hello Peter
Hello Mary
Good morning Mary
Insert element in sorted list using insort
- insort
import bisect
solar_system = ['Earth', 'Jupiter', 'Mercury', 'Saturn', 'Venus']
name = 'Mars'
# Find the location where to insert the element to keep the list sorted and insert the element
bisect.insort(solar_system, name)
print(solar_system)
print(sorted(solar_system))
import sys
import os
def traverse(path):
if os.path.isfile(path):
print(path)
return
if os.path.isdir(path):
for item in os.listdir(path):
traverse(os.path.join(path, item))
return
# other unhandled things
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} DIR|FILE")
traverse(sys.argv[1])
import sys
import os
def traverse(path, func):
response = {}
if os.path.isfile(path):
func(path)
return response
if os.path.isdir(path):
for item in os.listdir(path):
traverse(os.path.join(path, item), func)
return response
# other unhandled things
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} DIR|FILE")
#traverse(sys.argv[1], print)
#traverse(sys.argv[1], lambda path: print(f"{os.path.getsize(path):>6} {path}"))
import sys
import os
def traverse(path, func):
if os.path.isfile(path):
func(path)
return
if os.path.isdir(path):
for item in os.listdir(path):
traverse(os.path.join(path, item), func)
return
# other unhandled things
if len(sys.argv) < 2:
exit(f"Usage: {sys.argv[0]} DIR|FILE")
#traverse(sys.argv[1], print)
#traverse(sys.argv[1], lambda path: print(f"{os.path.getsize(path):>6} {path}"))
#from inspect import getmembers, isfunction
import inspect
def change(sub):
def new(*args, **kw):
print("before")
res = sub(*args, **kw)
print("after")
return res
return new
def add(x, y):
return x+y
#print(add(2, 3))
fixed = change(add)
#print(fixed(3, 4))
def replace(subname):
def new(*args, **kw):
print("before")
res = locals()[subname](*args, **kw)
print("after")
return res
locals()[subname] = new
replace('add')
add(1, 7)
def say():
print("hello")
#print(dir())
#getattr('say')
Context managers (with statement)
Why use context managers?
In certain operations you might want to ensure that when the operation is done there will be an opportunity to clean up after it. Even if decided to end the operation early or if there is an exception in the middle of the operation.
In the following pseudo-code example you can see that cleanup
must be called both at the end and before the early-end
, but
that still leaves the bad-code that raises exception avoiding the cleanup. That forces us to wrap the whole section in a try-block.
def sample():
start
do
do
do
do
cleanup
What is we have some conditions for early termination?
def sample():
start
do
do
if we are done early:
cleanup
return # early-end
do
do
cleanup
What if we might have an exception in the code?
def sample():
start
try:
do
do
if we are done early:
cleanup
return early-end
do
bad-code (raises exception)
do
cleanup
finally:
cleanup
It is a lot of unnecessary code duplication and we can easily forget to add it in every location where we early-end our code.
Using Context Manager
with cm_for_sample():
start
do
do
if we are done early:
return early-end
do
bad-code (raises exception)
do
cleanup
happens automatically, it is defined inside thecm_for_sample
Context Manager examples
A few examples where context managers can be useful:
-
Opening a file - close it once we are done with it so we don't leak file descriptors.
-
Changing directory - change back when we are done.
-
Create temporary directory - remove when we are done.
-
Open connection to database - close connection.
-
Open SSH connection - close connection.
-
More information about context managers
cd in a function
- getcwd
- chdir
In this example we have a function in which we change to a directory and then when we are done we change back to the original directory.
For this to work first we save the current working directory using the os.getcwd
call. Unfortunatelly in the middle of the code there
is a conditional call to return
. If that condition is True
we won't change back to the original directory. We could fix this by
calling os.chdir(start_dir)
just before calling return
. However this would still not solve the problem if there is an exception
in the function.
import sys
import os
def do_something(path):
start_dir = os.getcwd()
os.chdir(path)
content = os.listdir()
number = len(content)
print(number)
if number < 15:
return
os.chdir(start_dir)
def main():
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} PATH")
path = sys.argv[1]
print(os.getcwd())
do_something(path)
print(os.getcwd())
main()
$ python no_context_cd.py /tmp/
/home/gabor/work/slides/python-programming/examples/advanced
19
/home/gabor/work/slides/python-programming/examples/advanced
$ python no_context_cd.py /opt/
/home/gabor/work/slides/python-programming/examples/advanced
9
/opt
- In the second example
return
was called and thus we stayed on the /opt directory.:w
open in function
This is not the recommended way to open a file, but this is how it was done before the introduction of the with
context manager.
Here we have the same issue. We have a conditional call to return
where we forgot to close the file.
import sys
import re
def do_something(filename):
fh = open(filename)
while True:
line = fh.readline()
if line is None:
break
line = line.rstrip("\n")
if re.search(r'\A\s*\Z', line):
return
print(line)
fh.close()
def main():
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} FILENAME")
filename = sys.argv[1]
do_something(filename)
main()
open in for loop
- stat
- os.stat
Calling write
does not immediately write to disk. The Operating System provides buffering as an optimization
to avoid frequent access to the disk. In this case it means the file has not been saved before we already check its size.
import os
for ix in range(10):
filename = f'data{ix}.txt'
fh = open(filename, 'w')
fh.write('hello')
if ix == 0:
break
fh.close()
stat = os.stat(filename)
print(stat.st_size) # 0, the file has not been saved yet
open in function using with
If we open the file in the recommended way using the with
statement then we can be sure that the close
method
of the fh
object will be called when we leave the context of the with
statement.
import sys
import re
def do_something(filename):
with open(filename) as fh:
while True:
line = fh.readline()
if line is None:
break
line = line.rstrip("\n")
if re.search(r'\A\s*\Z', line):
return
print(line)
def main():
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} FILENAME")
filename = sys.argv[1]
do_something(filename)
main()
Plain context manager
from contextlib import contextmanager
import sys
param = ''
if len(sys.argv) == 2:
#exit(f"Usage: {sys.argv[0]} []")
param = sys.argv[1]
def code_with_context_manager():
with my_plain_context():
print(" In plain context")
if param == "return":
return
if param == "die":
raise Exception("we have a problem")
print(" More work")
@contextmanager
def my_plain_context():
print("setup context")
try:
yield
except Exception as err:
print(f" We got an exception: {err}")
print("cleanup context")
print("START")
code_with_context_manager()
print("END")
START
start context
In plain context
More work
end context
END
Param context manager
from contextlib import contextmanager
@contextmanager
def my_param_context(name):
print(f"start {name}")
yield
print(f"end {name}")
with my_param_context("foo"):
print("In param context")
start foo
In param context
end foo
Context manager that returns a value
from contextlib import contextmanager
import time
import random
import os
import shutil
@contextmanager
def my_tempdir():
print("start return")
tmpdir = '/tmp/' + str(time.time()) + str(random.random())
os.mkdir(tmpdir)
try:
yield tmpdir
finally:
shutil.rmtree(tmpdir)
print("end return")
import os
from my_tempdir import my_tempdir
with my_tempdir() as tmp_dir:
print(f"In return context with {tmp_dir}")
with open(tmp_dir + '/data.txt', 'w') as fh:
fh.write("hello")
print(os.listdir(tmp_dir))
print('')
print(tmp_dir)
print(os.path.exists(tmp_dir))
start return
In return context with /tmp/1578211890.49409370.6063140788762365
['data.txt']
end return
/tmp/1578211890.49409370.6063140788762365
False
Use my tempdir - return
import os
from my_tempdir import my_tempdir
def some_code():
with my_tempdir() as tmp_dir:
print(f"In return context with {tmp_dir}")
with open(tmp_dir + '/data.txt', 'w') as fh:
fh.write("hello")
print(os.listdir(tmp_dir))
return
print('')
print(tmp_dir)
print(os.path.exists(tmp_dir))
some_code()
start return
In return context with /tmp/1578211902.3545020.7667694368935928
['data.txt']
end return
Use my tempdir - exception
import os
from my_tempdir import my_tempdir
with my_tempdir() as tmp_dir:
print(f"In return context with {tmp_dir}")
with open(tmp_dir + '/data.txt', 'w') as fh:
fh.write("hello")
print(os.listdir(tmp_dir))
raise Exception('trouble')
print('')
print(tmp_dir)
print(os.path.exists(tmp_dir))
start return
In return context with /tmp/1578211921.12552210.9000097350821897
['data.txt']
end return
Traceback (most recent call last):
File "use_my_tempdir_exception.py", line 9, in <module>
raise Exception('trouble')
Exception: trouble
cwd context manager
import os
from contextlib import contextmanager
@contextmanager
def cwd(path):
oldpwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(oldpwd)
import sys
import os
from mycwd import cwd
def do_something(path):
with cwd(path):
content = os.listdir()
if len(content) < 10:
return
def main():
if len(sys.argv) != 2:
exit(f"Usage: {sys.argv[0]} PATH")
path = sys.argv[1]
print(os.getcwd())
do_something(path)
print(os.getcwd())
main()
$ python context_cd.py /tmp
/home/gabor/work/slides/python/examples/context
/home/gabor/work/slides/python/examples/context
$ python context_cd.py /opt
/home/gabor/work/slides/python/examples/context
/home/gabor/work/slides/python/examples/context
tempdir context manager
- contextlib
- contextmanager
- tempfile
- mkdtemp
import os
from contextlib import contextmanager
import tempfile
import shutil
@contextmanager
def tmpdir():
dd = tempfile.mkdtemp()
try:
yield dd
finally:
shutil.rmtree(dd)
from mytmpdir import tmpdir
import os
with tmpdir() as temp_dir:
print(temp_dir)
with open( os.path.join(temp_dir, 'some.txt'), 'w') as fh:
fh.write("hello")
print(os.path.exists(temp_dir))
print(os.listdir(temp_dir))
print(os.path.exists(temp_dir))
/tmp/tmprpuywa3_
True
['some.txt']
False
Context manager with class
- enter
- exit
class MyCM:
def __init__(self, name):
self.name = name
def __enter__(self):
print(f'__enter__ {self.name}')
return self
def __exit__(self, exception_type, exception, traceback):
print(f'__exit__ {self.name}')
def something(self):
print(f'something {self.name}')
def main():
with MyCM('Foo') as cm:
print(cm.name)
cm.something()
#raise Exception('nono')
print('in main - after')
main()
print('after main')
Context managers with class
- enter
- exit
Even if there was en exception in the middle of the process, the exit methods of each object will be called.
class MyCM:
def __init__(self, n):
self.name = n
def __enter__(self):
print('__enter__', self.name)
def __exit__(self, exception_type, exception, traceback):
print('__exit__ ', self.name)
def something(self):
print('something', self.name)
def main():
a = MyCM('a')
b = MyCM('b')
with a, b:
a.partner = b
b.partner = a
a.something()
raise Exception('nono')
b.something()
print('in main - after')
main()
print('after main')
__enter__ a
__enter__ b
something a
__exit__ b
__exit__ a
Traceback (most recent call last):
File "context-managers.py", line 27, in <module>
main()
File "context-managers.py", line 23, in main
raise Exception('nono')
Exception: nono
Context manager: with for file
- with
import sys
if len(sys.argv) != 2:
sys.stderr.write('Usage: {} FILENAME\n'.format(sys.argv[0]))
exit()
file = sys.argv[1]
print(file)
with open(file) as f:
for line in f:
val = 30/int(line)
print('done')
With - context managers
- with
class WithClass:
def __init__(self, name='default'):
self.name = name
def __enter__(self):
print('entering the system')
return self.name
def __exit__(self, exc_type, exc_value, traceback):
print('exiting the system')
def __str__(self):
return 'WithObject:'+self.name
x = WithClass()
with x as y:
print(x,y)
Exercise: Context manager
Create a few CSV file likes these:
a11,a12
a21,a22
b13,b14
b23,b24
c15,c16
c25,c26
Merge them horizontally to get this:
a11,a12,b13,b14,c15,c16
a21,a22,b23,b24,c25,c26
- Do it without your own context manager
- Create a context manager called myopen that accepts N filenames. It opens the first one to write and the other N-1 to read
with myopen(outfile, infile1, infile2, infile3) as out, ins:
...
Exercise: Tempdir on Windows
Make the tempdir context manager example work on windows as well. Probably need to cd out of the directory.
Solution: Context manager
import sys
from contextlib import contextmanager
if len(sys.argv) < 3:
exit(f"Usage: {sys.argv[0]} OUTFILE INFILEs")
outfile = sys.argv[1]
infiles = sys.argv[2:]
#print(outfile)
#print(infiles)
@contextmanager
def myopen(outfile, *infiles):
#print(len(infiles))
out = open(outfile, 'w')
ins = []
for filename in infiles:
ins.append(open(filename, 'r'))
try:
yield out, ins
except Exception as ex:
print(ex)
pass
finally:
out.close()
for fh in ins:
fh.close()
with myopen(outfile, *infiles) as (out_fh, input_fhs):
#print(out_fh.__class__.__name__)
#print(len(input_fhs))
while True:
row = ''
done = False
for infh in (input_fhs):
line = infh.readline()
#print(f"'{line}'")
if not line:
done = True
break
if row:
row += ','
row += line.rstrip("\n")
if done:
break
out_fh.write(row)
out_fh.write("\n")
Advanced lists
Change list while looping: endless list
numbers = [1, 1]
for n in numbers:
print(n)
numbers.append(numbers[-1] + numbers[-2])
if n > 100:
break
print(numbers)
Creating a Fibonacci series in a crazy way.
Change list while looping
Probably not a good idea...
numbers = [1, 2, 3, 4]
for n in numbers:
print(n)
if n == 2:
numbers.remove(2)
print(numbers)
1
2
4
[1, 3, 4]
Note, the loop only iterated 3 times, and it skipped value 3
Copy list before iteration
It is better to copy the list using list slices before the iteration starts.
numbers = [1, 2, 3, 4]
for n in numbers[:]:
print(n)
if n == 2:
numbers.remove(2)
print(numbers)
1
2
3
4
[1, 3, 4]
for with flag
names = ['Foo', 'Bar', 'Baz']
ok = False
for i in range(3):
name = input('Your name please: ')
if name in names:
ok = True
break
if not ok:
print("Not OK")
exit()
print("OK....")
for else
The else statement of the for loop is executed when the iteration ends normally. (without calling break)
names = ['Foo', 'Bar', 'Baz']
for i in range(3):
name = input('Your name please: ')
if name in names:
break
else:
print("Not OK")
exit()
print("OK....")
enumerate
- enumerate
names = ['Foo', 'Bar', 'Baz']
for i in range(len(names)):
print(i, names[i])
print('')
for i, n in enumerate(names):
print(i, n)
0 Foo
1 Bar
2 Baz
0 Foo
1 Bar
2 Baz
do while
- do while
There is no do-while in Python, but you can emulate it:
while True:
do_stuff()
if not loop_condition():
break
x = 0
while True:
x += 1
print(x)
if x > 0:
break
list slice is copy
x = [1, 1, 2, 3, 5, 8, 13, 21, 34]
y = x[2:5]
print(y) # [2, 3, 5]
x[2] = 20
print(x) # [1, 1, 20, 3, 5, 8, 13, 21, 34]
print(y) # [2, 3, 5]
Warnings
Warnings
- warn
from warnings import warn
def foo():
warn("foo will be deprecated soon. Use bar() instead", DeprecationWarning)
print("foo still works")
def main():
foo()
print("afterfoo")
main()