Skip to content

fluent-python #69

@BruceChen7

Description

@BruceChen7

参考资料

named tuple

>>> Card = collections.namedtuple('Card', ['rank', 'suit'])
>>> beer_card = Card('7', 'diamonds')
>>> beer_card
Card(rank='7', suit='diamonds')

FrenchDeck

Card = collections.namedtuple('Card', ['rank', 'suit'])
class FrenchDeck:
    ranks = [str(n) for n in range(2, 11)] + list("JQKA")
    suits = 'spades diamonds clubs hearts'.split()
    def __init__():
        self._cards=[Card(rank, suit) for suit in self.suits
                                      for rank in self.rands]
    def __len__(self):
        return len(self._cards)
    def __getitem__(self, position):
        return self._cards[position]

>>> deck = FrenchDeck()
>>> len(deck)
52
# 能随意的选择卡牌
>>> from random import choice
>>> choice(deck)
Card(rank='3', suit='hearts')
  • 定义了__getitem__能让很自由的遍历整个 deck,排序等功能。
    for card in deck:
        print card
    Card(rank='2', suit = 'spades')
    Card(rank='3', suit = 'spades')
    Card(rank='4', suit = 'spades')
    
    for card in reserved(deck):
        print card
    
    >>> Card('Q', 'hearts') in deck
    True
    
    def spades_high(card):
        rank_value = FrenchDeck.ranks.index(card.rank)
        return rank_value * len(suit_values) + suit_values[card.suit]
    
    for card in sorted(deck, key=spades_heigh)
        print card

魔法方法

Category Method names
String/bytes representation __repr__, __str__, __format__, __bytes__
Conversion to number __abs__, __bool__, __complex__, __int__, __float__, __hash__,__index__
Emulating collections len, getitem, setitem, delitem, contains
Context management __enter__, __exit__
Instance creation and destruction __new__, __init__, __del__
Attribute management getattr_, getattribute, setattr, delattr, dir
Attribute descriptors get, set, delete
Class services prepare_, instancecheck, subclasscheck
Unary numeric operators neg -, pos +, abs abs()

数据结构

>>> colors = ['black', 'white']
>>> sizes = ['S', 'M', 'L']
>>> tshirts = [(color, size) for color in colors for size in sizes]
tshirts
[('black', 'S'), ('black', 'M'), ('black', 'L'), ('white', 'M'), ('white', 'L')]。每个Unicode standard都是4到6个16进制的字符

生成表达式

Text versus Bytes

  • Python 3 明确区分了人类可读的文本字符串和原始的字节序列。
  • 隐式地把字节序列转换成 Unicode 文本(的行为)已成过去。

character issues

  • string 是一系列的字符组成,什么是字符呢?
  • character is a unicode character。
  • Python3 中的 str 类型为 Unicode characters.
  • Unicode 标准委员会规定了 bytes 到 character 的表示。
    • The identity of a character—its code point—is a number from 0 to 1,114,111。每个 Unicode 都是 4 到 6 个 16 进制的字符。表示都是在 U+ 后面,比如 U+0041 表示 A。
    • 实际 bytes 用来表示一个 character,根据其编码来决定。比如:对于字符 A,使用 UTF-8 编码,\x41 表示,\x41\x00 表示 UTF-16 的编码。
    >>> s = 'café'
    >>> len(s)
    4
    >>> b = s.encode('utf8')
    >>> b
    b'caf\xc3\xa9'  #
    >>> len(b)
    5
    >>> b.decode('utf8')
    b'caf\xc3\xa9'

Byte Essentials

序列类型

  • Python 内置了两种基本的二进制序列类型:不可变的 bytes 和可变的 bytearray

    # 基本的编码
    content = "São Paulo"
    for codec in ["utf_8", "utf_16"]:
        print(codec, content.encode(codec))
    # UnicodeEncodeError
    try:
        content.encode('cp437')
    except UnicodeEncodeError as e:
        print(e)
    
    # 忽略无法编码的字符
    print(content.encode('cp437', errors='ignore'))
    # 把无法编码的字符替换成 ?
    print(content.encode('cp437', errors='replace'))
    # 把无法编码的字符替换成 xml 实体
    print(content.encode('cp437', errors='xmlcharrefreplace'))
    # 还能自己设置错误处理方式
    # https://docs.python.org/3/library/codecs.html#type/codecs.register_error
  • 返回:

    utf_8 b'S\xc3\xa3o Paulo'
    utf_16 b'\xff\xfeS\x00\xe3\x00o\x00 \x00P\x00a\x00u\x00l\x00o\x00'
    'charmap' codec can't encode character '\xe3' in position 1: character maps to <undefined>
    b'So Paulo'
    b'S?o Paulo'
    b'S&#227;o Paulo'
  • 基本的解码

    # 基本的解码
    # 处理 UnicodeDecodeError
    octets = b'Montr\xe9al'
    print(octets.decode('cp1252'))
    print(octets.decode('iso8859_7'))
    print(octets.decode('koi8_r'))
    try:
        print(octets.decode('utf-8'))
    except UnicodeDecodeError as e:
        print(e)
    
    # 将错误字符替换成 � (U+FFFD)
    octets.decode('utf-8', errors='replace')
    
    # Python3 能使用非 ASCII 名称
    São = 'Paulo'
    # 但是不能用 Emoji…
  • 能用 chardet 检测字符所使用的编码

  • BOM:字节序标记 (byte-order mark):
    \ufffe 为字节序标记,放在文件开头,UTF-16 用它来表示文本以大端表示 (\xfe\xff) 还是小端表示 (\xff\xfe)。

  • UTF-8 编码并不需要 BOM,但是微软还是给它加了 BOM,非常烦人。

处理文本文件

  • 要尽早地把输入的字节序列解码成字符串,尽量晚地对字符串进行编码输出;在处理逻辑中只处理字符串对象,不应该去编码或解码。
  • 除非想判断编码,否则不要再二进制模式中打开文本文件;即便如此,也应该使用 Chardet,而不是重新发明轮子。
  • 常规代码只应该使用二进制模式打开二进制文件,比如图像。

默认编码

  • 使用 sys.getdefaultencoding() 获取系统默认编码;
  • Linux 的默认编码为 UTF-8,Windows 系统中不同语言设置使用的编码也不同,这导致了更多的问题。
  • locale.getpreferredencoding() 返回的编码是最重要的:这是打开文件的默认编码,也是重定向到文件的 sys.stdout/stdin/stderr 的默认编码。不过这个编码在某些系统中是能改的…
  • 关于编码默认值的最佳建议是:别依赖默认值。

Concurrent with futures

import os
import time
import sys
import requests

POP20_CC = ('CN', 'IN', 'US')
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp:
        fp.write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print(text, end=' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

download with concurrent.futures

def download_once(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))

    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_once, sorted(cc_list))
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers = 3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = exector.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {} : {}'
            # 存储每个 future,这样能能够获取它通过 as_complete
            print(msg.format(cc, future))

        results = []
        # as_completed 将会产生 futures
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)

    return len(results)

futures

  • futures 封装了阻塞的操作,该操作完成的状态是能被查询和结果是被获取。
  • futures 不能自己的创建,其是被 concurrency 框架。
  • Future represents something that will eventually happen, and the only way to be sure that something will happen is to schedule its execution.
  • 两种类型的 future 都有 done 方法,其是 non-blocking,返回一个 bool 来表示相关联的 callable 是否已经执行或者没有。
  • 通常情况下,客户端代码是不会调用这个 done 来查询是否已经执行,而是在执行完的时候通知客户端。
  • 两种 future 都提供了 add_done_callback() 来将 future 完成的时候将回执行该回调。
  • result() 方法,其返回 callable 的结果或者是抛出异常。
  • 然而,对于该方法对于 concurrent.futures.Future 和 asyncio.Future 是有差异的,对于前者类型的 future,其回阻塞调用者线程,当然能传入一个超时的时间,这样在该时间结束后,将会抛出一个 Timeout 异常。
  • 对 asyncio.Future.result 方法不支持超时,倾向于通过 yield from 来返回。

使用多进程

def download_many(cc_list):
    with futures.ProccessPoolExector() as executor

体验 Exector.map

from time import sleep, strftime
from concurrent import futures
def display(*args):
    print(strftime['%H:%M:%S'], end = ' ')
    print(*args)
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}):done'
    display(msg.format('\t'*n, n))
    return n * 10
def main():
    display('scripting starting')
    executor = futures.ThreadPoolExecutor(max_workers = 3)
    results = executor.map(loiter, range(5))
    display('results', results)
    display('waiting for individual results:')

    for i, result in enumerate(results):
        display('{result {} : {}}'.format(i, result))

main()

Thread versus coroutine

import threading
import itertools
import time
import sys
class Signal:
   go = True

def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target = spin, args=('thinking', signal))
    print('spinnner object', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)
if __name__="__main__":
    main()

coroutine 版本

import asyncio
import itertoos
import sys
@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1) # 将执行流交给事件循环这里假装在进行 io 操作。
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function()
    yield_from asyncio.sleep(3) # 将执行流交给事件循环,在 3 秒后,该 coroutine 将会执行
    return

@asyncio.coroutine
def supervisor()
    spinner = asyncio.async(spin('thinking')) # 调度 spin coroutine 来执行,wrapping it in a Task object.
    print('spinner object:', spinner)
    result = yield from slow_function() 等待执行完
    spinner.cancel() #取消 coroutine
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print("Answer:", result)

if __name__ == "__main__":
    main()
  • asyncio.Task 就类似于一个线程,Task 来 drive coroutine,类比于一个线程调用一个 callable 对象。
  • 你不需要实例化一个 asyncio.Task,直接通过 asyncio.async 来获取,或者是 loop.create_task(...)
  • 你不需要显示的调度一个 Task 对象来执行,实际上,它创建的时候就已经被调度去执行
  • supervisor() 和 slow_function 是简单的函数,它们是通过线程调用的,而 coroutine 则是通过 yield from 则是通过来驱动的。
  • Task.cancel() 来 raise CancelledError inside coroutine,通过 catch 该 exception,来处理。

Yielding from futures, tasks and coroutines

  • 在 asyncio 中,future 和 corotuine 有很亲密的关系,coroutine 通过 yield from 产生 futures。
  • res = yield from foo() 在 foo 是一个 corotinue 的 function(所以,其被调用时候产生 coroutine 对象)或者是简单的函数返回 Task 和 Future 对象。
  • 注意 yield from 对应的 coroutine 不能是阻塞的,必须让每一个网络 IO 都是异步的。
  • 这样控制流才能尽快的到 event_loop 中。

在 coroutine 中使用 yield from 需要注意的是

  • Every arrangement of coroutines chained with yield from must be ultimately
    driven by a caller that is not a coroutine, which invokes next(…) or .send(…) on
    the outermost delegating generator, explicitly or implicitly (e.g., in a for loop).
  • The innermost subgenerator in the chain must be a simple generator that uses just
    yield—or an iterable object.

而在 asyncio 中。

  • The coroutine chains we write are always driven by passing our outermost dele‐
    gating generator to an asyncio API call, such as loop.run_until_complete(…)
  • The coroutine chains we write always end by delegating with yield from to some
    asyncio coroutine function or coroutine method . In other words, the innermost subgenerator will be a library function that does the actual I/O, not something we write.

#type/python #async #public

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions