源码阅读学习笔记-kennethreitz/records SQL for Humans™

封装sql查询语句,方便快捷,对人友好;
学习总结:
1、特殊的构造方法学习,__getitem__ __getattr__ 实现方式,以及用途;
2、
# -*- coding: utf-8 -*-

import os
from collections import OrderedDict  #collections是Python内建的一个集合模块,提供了许多有用的集合类,这里使用了有序的dict;
from inspect import isclass  #查看python 类的参数和模块、函数代码

import tablib  #第三方模块,主要作用是将数据导出为各种不同的格式
from docopt import docopt
from sqlalchemy import create_engine, inspect, text  #最有名的ORM框架SQLAlchemy,Object-Relational Mapping,把关系数据库的表结构映射到对象上

DATABASE_URL = os.environ.get('DATABASE_URL')


def isexception(obj):   #内置函数使用 ,判断类型 ,貌似不符合python的多态思想;
    """Given an object, return a boolean indicating whether it is an instance
    or subclass of :py:class:`Exception`.
    """
    if isinstance(obj, Exception):
        return True
    if isclass(obj) and issubclass(obj, Exception):
        return True
    return False


class Record(object):
    """A row, from a query, from a database."""
    __slots__ = ('_keys', '_values')   #限制该class能添加的属性,对子类无效

    def __init__(self, keys, values):
        self._keys = keys
        self._values = values

        # Ensure that lengths match properly.
        assert len(self._keys) == len(self._values)  #断言,判断条件是否符合,false 引发异常

    def keys(self):
        """Returns the list of column names from the query."""
        return self._keys

    def values(self):
        """Returns the list of values from the query."""
        return self._values

    def __repr__(self):
        return '<Record {}>'.format(self.export('json')[1:-1])

    def __getitem__(self, key):    #拦截索引运算符;迭代器会先尝试__iter__方法,在尝试__getitem__.也就是如果对象不支持迭代协议,就会尝试索引运算
                                   #迭代是通过调用__iter__方法来实现的,这种方法返回一个迭代器对象,如果有就会重复调用这个迭代器对象的next方法,
                                   #直至发生StopIteration异常,如果没找到这类__iter__方法,Python就会改用__getitem__机制,通过偏移量重复索引,直至发生IndexError异常

        # Support for index-based lookup.
        if isinstance(key, int):
            return self.values()[key]

        # Support for string-based lookup.
        if key in self.keys():
            i = self.keys().index(key)
            return self.values()[i]

        raise KeyError("Record contains no '{}' field.".format(key))

    def __getattr__(self, key):      #拦截点号运算。当对未定义的属性名称和实例进行点号运算时,就会用属性名作为字符串调用这个方法。如果可以找到该属性,则不调用此方法
        try:
            return self[key]
        except KeyError as e:
            raise AttributeError(e)

    def __dir__(self):                    #dir()  
        standard = dir(super(Record, self))
        # Merge standard attrs with generated ones (from column names).
        return sorted(standard + [str(k) for k in self.keys()])

    def get(self, key, default=None):
        """Returns the value for a given key, or default."""
        try:
            return self[key]
        except KeyError:
            return default

    def as_dict(self, ordered=False):
        """Returns the row as a dictionary, as ordered."""
        items = zip(self.keys(), self.values())

        return OrderedDict(items) if ordered else dict(items)  #又见if 表达式赋值

    @property
    def dataset(self):
        """A Tablib Dataset containing the row."""
        data = tablib.Dataset()
        data.headers = self.keys()

        row = _reduce_datetimes(self.values())
        data.append(row)

        return data

    def export(self, format, **kwargs):
        """Exports the row to the given format."""
        return self.dataset.export(format, **kwargs)


class RecordCollection(object):
    """A set of excellent Records from a query."""
    def __init__(self, rows):
        self._rows = rows
        self._all_rows = []
        self.pending = True

    def __repr__(self):
        return '<RecordCollection size={} pending={}>'.format(len(self), self.pending)

    def __iter__(self):
        """Iterate over all rows, consuming the underlying generator
        only when necessary."""
        i = 0
        while True:
            # Other code may have iterated between yields,
            # so always check the cache.
            if i < len(self):
                yield self[i]
            else:
                # Throws StopIteration when done.
                # Prevent StopIteration bubbling from generator, following https://www.python.org/dev/peps/pep-0479/
                try:
                    yield next(self)
                except StopIteration:
                    return
            i += 1

    def next(self):
        return self.__next__()

    def __next__(self):
        try:
            nextrow = next(self._rows)
            self._all_rows.append(nextrow)
            return nextrow
        except StopIteration:
            self.pending = False
            raise StopIteration('RecordCollection contains no more rows.')

    def __getitem__(self, key):
        is_int = isinstance(key, int)

        # Convert RecordCollection[1] into slice.
        if is_int:
            key = slice(key, key + 1)

        while len(self) < key.stop or key.stop is None:
            try:
                next(self)
            except StopIteration:
                break

        rows = self._all_rows[key]
        if is_int:
            return rows[0]
        else:
            return RecordCollection(iter(rows))

    def __len__(self):
        return len(self._all_rows)

    def export(self, format, **kwargs):
        """Export the RecordCollection to a given format (courtesy of Tablib)."""
        return self.dataset.export(format, **kwargs)

    @property
    def dataset(self):
        """A Tablib Dataset representation of the RecordCollection."""
        # Create a new Tablib Dataset.
        data = tablib.Dataset()

        # If the RecordCollection is empty, just return the empty set
        # Check number of rows by typecasting to list
        if len(list(self)) == 0:
            return data

        # Set the column names as headers on Tablib Dataset.
        first = self[0]

        data.headers = first.keys()
        for row in self.all():
            row = _reduce_datetimes(row.values())
            data.append(row)

        return data

    def all(self, as_dict=False, as_ordereddict=False):
        """Returns a list of all rows for the RecordCollection. If they haven't
        been fetched yet, consume the iterator and cache the results."""

        # By calling list it calls the __iter__ method
        rows = list(self)

        if as_dict:
            return [r.as_dict() for r in rows]
        elif as_ordereddict:
            return [r.as_dict(ordered=True) for r in rows]

        return rows

    def as_dict(self, ordered=False):
        return self.all(as_dict=not(ordered), as_ordereddict=ordered)

    def first(self, default=None, as_dict=False, as_ordereddict=False):
        """Returns a single record for the RecordCollection, or `default`. If
        `default` is an instance or subclass of Exception, then raise it
        instead of returning it."""

        # Try to get a record, or return/raise default.
        try:
            record = self[0]
        except IndexError:
            if isexception(default):
                raise default
            return default

        # Ensure that we don't have more than one row.
        try:
            self[1]
        except IndexError:
            pass
        else:
            raise ValueError('RecordCollection contains too many rows.')

        # Cast and return.
        if as_dict:
            return record.as_dict()
        elif as_ordereddict:
            return record.as_dict(ordered=True)
        else:
            return record

class Database(object):
    """A Database connection."""

    def __init__(self, db_url=None, **kwargs):
        # If no db_url was provided, fallback to $DATABASE_URL.
        self.db_url = db_url or DATABASE_URL

        if not self.db_url:
            raise ValueError('You must provide a db_url.')

        self._engine = create_engine(self.db_url, **kwargs)

        # Connect to the database.
        self.db = self._engine.connect()
        self.open = True

    def close(self):
        """Closes the connection to the Database."""
        self.db.close()
        self.open = False

    def __enter__(self):
        return self

    def __exit__(self, exc, val, traceback):
        self.close()

    def __repr__(self):
        return '<Database open={}>'.format(self.open)

    def get_table_names(self, internal=False):
        """Returns a list of table names for the connected database."""

        # Setup SQLAlchemy for Database inspection.
        return inspect(self._engine).get_table_names()

    def query(self, query, fetchall=False, **params):
        """Executes the given SQL query against the Database. Parameters
        can, optionally, be provided. Returns a RecordCollection, which can be
        iterated over to get result rows as dictionaries.
        """

        # Execute the given query.
        cursor = self.db.execute(text(query), **params) # TODO: PARAMS GO HERE

        # Row-by-row Record generator.
        row_gen = (Record(cursor.keys(), row) for row in cursor)

        # Convert psycopg2 results to RecordCollection.
        results = RecordCollection(row_gen)

        # Fetch all results if desired.
        if fetchall:
            results.all()

        return results

    def bulk_query(self, query, *multiparams):
        """Bulk insert or update."""

        self.db.execute(text(query), *multiparams)

    def query_file(self, path, fetchall=False, **params):
        """Like Database.query, but takes a filename to load a query from."""

        # If path doesn't exists
        if not os.path.exists(path):
            raise IOError("File '{}'' not found!".format(path))

        # If it's a directory
        if os.path.isdir(path):
            raise IOError("'{}' is a directory!".format(path))

        # Read the given .sql file into memory.
        with open(path) as f:
            query = f.read()

        # Defer processing to self.query method.
        return self.query(query=query, fetchall=fetchall, **params)

    def bulk_query_file(self, path, *multiparams):
        """Like Database.bulk_query, but takes a filename to load a query from."""

         # If path doesn't exists
        if not os.path.exists(path):
            raise IOError("File '{}'' not found!".format(path))

        # If it's a directory
        if os.path.isdir(path):
            raise IOError("'{}' is a directory!".format(path))

        # Read the given .sql file into memory.
        with open(path) as f:
            query = f.read()

        self.db.execute(text(query), *multiparams)

    def transaction(self):
        """Returns a transaction object. Call ``commit`` or ``rollback``
        on the returned object as appropriate."""
        return self.db.begin()

def _reduce_datetimes(row):
    """Receives a row, converts datetimes to strings."""

    row = list(row)

    for i in range(len(row)):
        if hasattr(row[i], 'isoformat'):
            row[i] = row[i].isoformat()
    return tuple(row)

def cli():
    cli_docs ="""Records: SQL for Humans™
A Kenneth Reitz project.
Usage:
  records <query> [<format>] [<params>...] [--url=<url>]
  records (-h | --help)
Options:
  -h --help     Show this screen.
  --url=<url>   The database URL to use. Defaults to $DATABASE_URL.
Supported Formats:
   csv, tsv, json, yaml, html, xls, xlsx, dbf, latex, ods
   Note: xls, xlsx, dbf, and ods formats are binary, and should only be
         used with redirected output e.g. '$ records sql xls > sql.xls'.
Query Parameters:
    Query parameters can be specified in key=value format, and injected
    into your query in :key format e.g.:
    $ records 'select * from repos where language ~= :lang' lang=python
Notes:
  - While you may specify a database connection string with --url, records
    will automatically default to the value of $DATABASE_URL, if available.
  - Query is intended to be the path of a SQL file, however a query string
    can be provided instead. Use this feature discernfully; it's dangerous.
  - Records is intended for report-style exports of database queries, and
    has not yet been optimized for extremely large data dumps.
    """
    supported_formats = 'csv tsv json yaml html xls xlsx dbf latex ods'.split()

    # Parse the command-line arguments.
    arguments = docopt(cli_docs)

    # Create the Database.
    db = Database(arguments['--url'])

    query = arguments['<query>']
    params = arguments['<params>']

    # Can't send an empty list if params aren't expected.
    try:
        params = dict([i.split('=') for i in params])
    except ValueError:
        print('Parameters must be given in key=value format.')
        exit(64)

    # Execute the query, if it is a found file.
    if os.path.isfile(query):
        rows = db.query_file(query, **params)

    # Execute the query, if it appears to be a query string.
    elif len(query.split()) > 2:
        rows = db.query(query, **params)

    # Otherwise, say the file wasn't found.
    else:
        print('The given query could not be found.')
        exit(66)

    # Print results in desired format.
    if arguments['<format>']:
        print(rows.export(arguments['<format>']))
    else:
        print(rows.dataset)

# Run the CLI when executed directly.
if __name__ == '__main__':
cli()

Pingbacks已打开。

引用地址

评论
  1. LarryWox LarryWox 发布于 2021年3月9日 01:36 #

    חדרים לפי שעה
    [url=https://www.irooms.co.il/]חדרים לפי שעה[/url]

  2. Samuelkew Samuelkew 发布于 2021年3月9日 01:41 #

    prescription drugs without a doctor <a href=" https://genericwdp.com/# ">generic pills for sale</a>
    usa pharmacy india: https://genericwdp.com/ overseas pharmacies shipping to usa
    [url=https://genericwdp.com/#]generic pills for ed[/url] buy medication without an rx

  3. 코인 카지노 코인 카지노 发布于 2021年3月9日 01:47 #

    Hey there! I know this is kind of off topic but I was wondering if you
    knew where I could get a captcha plugin for my comment form?
    I'm using the same blog platform as yours and I'm having problems finding
    one? Thanks a lot!

  4. 코인 카지노 코인 카지노 发布于 2021年3月9日 01:47 #

    Hey there! I know this is kind of off topic but I was
    wondering if you knew where I could get a captcha
    plugin for my comment form? I'm using the
    same blog platform as yours and I'm having problems finding
    one? Thanks a lot!

  5. 코인 카지노 코인 카지노 发布于 2021年3月9日 01:48 #

    Hmm it seems like your blog ate my first
    comment (it was extremely long) so I guess I'll just sum it up what I
    submitted and say, I'm thoroughly enjoying your blog. I as well am
    an aspiring blog blogger but I'm still new to the whole thing.
    Do you have any points for novice blog writers?
    I'd certainly appreciate it.

  6. 코인 카지노 코인 카지노 发布于 2021年3月9日 01:48 #

    Hmm it seems like your blog ate my first comment (it was extremely
    long) so I guess I'll just sum it up what I submitted and say, I'm thoroughly enjoying
    your blog. I as well am an aspiring blog blogger but I'm still new to the whole thing.
    Do you have any points for novice blog writers?
    I'd certainly appreciate it.

  7. 코인 카지노 코인 카지노 发布于 2021年3月9日 01:56 #

    Remarkable things here. I am very satisfied to peer your article.
    Thank you so much and I'm taking a look ahead to contact you.
    Will you please drop me a e-mail?

  8. 파라오 카지노 파라오 카지노 发布于 2021年3月9日 01:58 #

    I’m not that much of a internet reader to be honest but your sites really nice, keep it up!
    I'll go ahead and bookmark your site to come back down the road.
    All the best

  9. 파라오 카지노 파라오 카지노 发布于 2021年3月9日 01:58 #

    I’m not that much of a internet reader to be honest
    but your sites really nice, keep it up! I'll go ahead and bookmark your
    site to come back down the road. All the best

  10. 예스카지노 예스카지노 发布于 2021年3月9日 02:04 #

    Thanks for a marvelous posting! I definitely enjoyed reading it, you can be a great author.

    I will remember to bookmark your blog and will often come back later on. I want to
    encourage one to continue your great writing, have a nice evening!

  11. 예스카지노 예스카지노 发布于 2021年3月9日 02:04 #

    Thanks for a marvelous posting! I definitely enjoyed
    reading it, you can be a great author. I will remember to bookmark your blog and will often come back later on. I want to encourage one to
    continue your great writing, have a nice evening!

  12. 예스카지노 예스카지노 发布于 2021年3月9日 02:11 #

    I just could not depart your web site prior to suggesting that
    I actually loved the usual info a person provide on your visitors?
    Is gonna be again continuously in order to check out new posts

  13. 예스카지노 예스카지노 发布于 2021年3月9日 02:11 #

    I just could not depart your web site prior to suggesting that I actually
    loved the usual info a person provide on your visitors? Is gonna be again continuously
    in order to check out new posts

发表评论