小结 - CongGreat/async-await GitHub Wiki

技术软件清单

1.业务语言:python

2.数据库:mysql, redis

3.日志系统:kibana

4.监控系统:grafana

5.服务发现:consul

6.网关服务:apisix

7.网关语言:lua

8.消息队列:kafka

9.定时任务:landsat、AsyncIOScheduler

dotfile 安装步骤

git clone [email protected]:imzhongqi/dotfiles.git

修改git 用户信息

vim ~/dotfiles/git/.config/git/config

安装命令

brew install stow
brew install git
brew install xx
# 提示哪个命令没有就安装哪个命令

执行脚本

./init

更新子模块

git submodule init
git submodule update

安装zsh(核心)

stow vim zsh 

如果出现`WARNING! stowing zsh would cause conflicts:
existing target is not owned by stow: .zshenv
All operations aborted.`

mv ~/.zshenv ~/.zshenv_backup
重新执行  stow vim zsh 或者 stow zsh

无限试用Navicat

# 删除该字段
/usr/libexec/PlistBuddy -c "print"  ~/Library/Preferences/com.prect.NavicatPremium15.plist
/usr/libexec/PlistBuddy -c "Delete :043591897841277B0A387F0EDD83BAD6"  ~/Library/Preferences/com.prect.NavicatPremium15.plist
cd ~/Library/Application\ Support/PremiumSoft\ CyberTech/Navicat\ CC/Navicat\ Premium/
ls -lah

rm -rf .*

supervisor部署

  1. 在/etc/supervisord.d中添加要配置的服务

    [group:risk_cms]
    programs=check_system_fastapi
    priority=1
    [program:check_system_fastapi]
    command=/data/venv/rcms-byb-check-system-fastapi/bin/python3 /data/app/rcms-byb-check-system-fastapi/src/server.py --env=test --port=9980
    directory=/data/app/rcms-byb-check-system-fastapi/src
    stdout_logfile=/data/app/rcms-byb-check-system-fastapi/src/log/supervisor_check_system_fastapi.log
    redirect_stderr=true
    user=bybank
    autorestart=true
    autostart=true
    autorestart=true
    startsecs=10
    stopwaitsecs=60
    
  2. 重新加载服务

    sudo supervisorctl update
    
  3. 查看服务是否启动成功

    sudo supervisorctl
    

nginx 配置

/usr/local/nginx/vhost

重启nginx

openresty -s reload -p /data/app/project_name -c nginx.conf

Demo

Fastapi 文件流返回文件

@router.post("/risk/cms/check/statistics/audit/time/file",
             summary='Audit time')
async def download_statistics_audit_time_ratio(info: GetAuditTimeSchema):
    data = await statistics_audit_time(info)
    return export_excel('audit_time', data, list(data[0].keys()))


def export_excel(filename: str, data: List[Dict], head: List[str], column_list=None):
    """
    export Excel
    :param filename: file name
    :param data:  [[d1, d2, d3, ...], ...]
    :param head: []
    :return:
    """
    file = filename + '.xlsx'

    output = BytesIO()
    workbook = xlsxwriter.Workbook(output, {'in_memory': True})
    worksheet = workbook.add_worksheet('sheet1')

    head_fmt = workbook.add_format({'bold': True})
    data_fmt = workbook.add_format({'text_wrap': True})

    # worksheet.set_default_row(22)  # height of row
    worksheet.write_row(0, 0, head, head_fmt)  # head
    if column_list:
        for x in column_list:
            worksheet.set_column(x['col'], x['col_num'])

    for i in range(len(data)):
        row = data[i]
        if isinstance(row, dict):
            row = list(row.values())
        worksheet.write_row(i + 1, 0, row, data_fmt)
    workbook.close()

    output.seek(0)
    response = Response(content=output.read(),
                        headers={'Content-Disposition': 'attachment; filename=%s' % file},
                        media_type='application/vnd.ms-excel')
    return response

tornado 请求运行步骤

class Application(ReversibleRouter): 
  def __call__(
        self, request: httputil.HTTPServerRequest
    ) -> Optional[Awaitable[None]]:
        # Legacy HTTPServer interface
        dispatcher = self.find_handler(request) #  返回_HandlerDelegate对象
        return dispatcher.execute()
class _HandlerDelegate(httputil.HTTPMessageDelegate):
        def execute(self) -> Optional[Awaitable[None]]:
        # If template cache is disabled (usually in the debug mode),
        # re-compile templates and reload static files on every
        # request so you don't need to restart to see changes
        if not self.application.settings.get("compiled_template_cache", True):
            with RequestHandler._template_loader_lock:
                for loader in RequestHandler._template_loaders.values():
                    loader.reset()
        if not self.application.settings.get("static_hash_cache", True):
            StaticFileHandler.reset()

        self.handler = self.handler_class(
            self.application, self.request, **self.handler_kwargs
        )  #  RequestHandler对象
        transforms = [t(self.request) for t in self.application.transforms]

        if self.stream_request_body:
            self.handler._prepared_future = Future()
        # Note that if an exception escapes handler._execute it will be
        # trapped in the Future it returns (which we are ignoring here,
        # leaving it to be logged when the Future is GC'd).
        # However, that shouldn't happen because _execute has a blanket
        # except handler, and we cannot easily access the IOLoop here to
        # call add_future (because of the requirement to remain compatible
        # with WSGI)
        fut = gen.convert_yielded(
            self.handler._execute(transforms, *self.path_args, **self.path_kwargs)
        )  # 执行RequestHandler对象的_execute方法
        fut.add_done_callback(lambda f: f.result())
        # If we are streaming the request body, then execute() is finished
        # when the handler has prepared to receive the body.  If not,
        # it doesn't matter when execute() finishes (so we return None)
        return self.handler._prepared_future
class RequestHandler(object): 
  def __init__(
        self,
        application: "Application",
        request: httputil.HTTPServerRequest,
        **kwargs: Any
    ) -> None:
        super(RequestHandler, self).__init__()

        ...
        self.clear()
        ...
        self.initialize(**kwargs)  # type: ignore

    async def _execute(
        self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
    ) -> None:
        """Executes this request with the given output transforms."""
        self._transforms = transforms
        try:
            if self.request.method not in self.SUPPORTED_METHODS:
                raise HTTPError(405)
            self.path_args = [self.decode_argument(arg) for arg in args]
            self.path_kwargs = dict(
                (k, self.decode_argument(v, name=k)) for (k, v) in kwargs.items()
            )
            # If XSRF cookies are turned on, reject form submissions without
            # the proper cookie
            if self.request.method not in (
                "GET",
                "HEAD",
                "OPTIONS",
            ) and self.application.settings.get("xsrf_cookies"):
                self.check_xsrf_cookie()

            result = self.prepare()  # 在执行视图函数之前执行,对数据做一些预处理
            if result is not None:
                result = await result
            if self._prepared_future is not None:
                # Tell the Application we've finished with prepare()
                # and are ready for the body to arrive.
                future_set_result_unless_cancelled(self._prepared_future, None)
            if self._finished:
                return

            if _has_stream_request_body(self.__class__):
                # In streaming mode request.body is a Future that signals
                # the body has been completely received.  The Future has no
                # result; the data has been passed to self.data_received
                # instead.
                try:
                    await self.request._body_future
                except iostream.StreamClosedError:
                    return

            method = getattr(self, self.request.method.lower())  # 获取视图函数
            result = method(*self.path_args, **self.path_kwargs) # 执行对应的视图函数,get/post/delete/...
            if result is not None:
                result = await result
            if self._auto_finish and not self._finished:
                self.finish()
        except Exception as e:
            try:
                self._handle_request_exception(e)
            except Exception:
                app_log.error("Exception in exception handler", exc_info=True)
            finally:
                # Unset result to avoid circular references
                result = None
            if self._prepared_future is not None and not self._prepared_future.done():
                # In case we failed before setting _prepared_future, do it
                # now (to unblock the HTTP server).  Note that this is not
                # in a finally block to avoid GC issues prior to Python 3.4.
                self._prepared_future.set_result(None)
    def clear(self) -> None:
        """Resets all headers and content for this response."""
        self._headers = httputil.HTTPHeaders(
            {
                "Server": "TornadoServer/%s" % tornado.version,
                "Content-Type": "text/html; charset=UTF-8",
                "Date": httputil.format_timestamp(time.time()),
            }
        )
        self.set_default_headers()
        self._write_buffer = []  # type: List[bytes]
        self._status_code = 200
        self._reason = httputil.responses[200]
    def set_default_headers(self) -> None:
        """Override this to set HTTP headers at the beginning of the request.

        For example, this is the place to set a custom ``Server`` header.
        Note that setting such headers in the normal flow of request
        processing may not do what you want, since headers may be reset
        during error handling.
        """
        pass
    def prepare(self) -> Optional[Awaitable[None]]:
        """Called at the beginning of a request before  `get`/`post`/etc.

        Override this method to perform common initialization regardless
        of the request method.

        Asynchronous support: Use ``async def`` or decorate this method with
        `.gen.coroutine` to make it asynchronous.
        If this method returns an  ``Awaitable`` execution will not proceed
        until the ``Awaitable`` is done.

        .. versionadded:: 3.1
           Asynchronous support.
        """
        pass

读取excel,生成批量插入sql

# 读取excel,生存批量插入sql
import json
import time
import xlrd


async def import_case_white_list_batch(file_name):
    with open(file_name, 'rb') as f:
        contents = f.read()
    workbook = xlrd.open_workbook(file_contents=contents)
    data = workbook.sheet_by_index(0)
    total_rows = data.nrows
    uids = []
    for uid in [data.row_values(n)[0] for n in range(1, total_rows)]:
        if uid not in uids:
            uids.append(uid)
        else:
            print(uid)
    x = 5000
    with open("case_white_list_5000.sql", 'w+') as f:
        for n in range(0, (len(uids) // x) + 1):

            start = n * x
            end = (n+1)*x if (n+1)*x <= len(afi_uids) else len(afi_uids)
            print(start, end)
            insert_rows = []
            for uid in afi_uids[start:end]:
                create_time = int(time.time()*1000)
                update_time = int(time.time()*1000)
                ids = json.dumps([1104000])
                insert_rows.append(f"({int(uid)}, '{entry_ids}', {create_time}, {update_time})")
            values_str = ','.join(insert_rows)
            sql_insert_many = f"""INSERT INTO `r_case_tmp_update_info` (`afi_uid`, `entry_ids`, `create_time`, `update_time`) VALUES {values_str}"""
            f.write(f"{sql_insert_many};\n")


async def main():
    await import_case_white_list_batch('./case_white_list_2.xlsx')


if __name__ == '__main__':
    import asyncio
    loops = asyncio.get_event_loop()
    loops.run_until_complete(main())


"""CREATE TABLE `r_case_tmp_update_info` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `uid` bigint(20) NOT NULL COMMENT 'afi uid',
  `ids` text NOT NULL COMMENT 'entry_ids',
  `create_time` bigint(20) NOT NULL COMMENT 'create time',
  `update_time` bigint(20) NOT NULL COMMENT 'update time',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COMMENT='xxx'"""

网关

apisix参数说明

uri:url前缀
name: 系统名称
desc:描述
methods: 请求方法
hosts: 内外网域名
plugins: 自定义插件及配置
upstream: upstream配置
labels:
status: 状态

参数示例

{
  "uri": "/risk/cms/user/*", 
  "name": "系统名称",
  "desc": "泰国用户系统 /risk/cms/user/*",
  "methods": [
    "GET",
    "POST",
    "PUT",
    "PATCH",
    "HEAD",
    "CONNECT",
    "OPTIONS"
  ],
  "hosts": [
    "test-rcms.paypaya.com",
    "local-test-rcms.paypaya.com"
  ],
  "plugins": {
    "risk-perm-check": {
      "disable": false,
      "limiter": [
        {
          "burst": 0,
          "name": "start_order_limit",
          "path": "/risk/cms/check/start_order",
          "request": 1
        }
      ],
      "login_path": "/risk/cms/user/login/",
      "perm_table_prefix": "akux:user:url:perm:",
      "public_urls": [
        "/risk/cms/user/login/*",
        "/risk/cms/user/logout/*",
        "/risk/cms/check/dialing/cb",
        "/risk/rules/flows/v2/flow_publish_detail"
      ],
      "redis_cluster_name": "test",
      "redis_cluster_nodes": [
        "192.168.1.102:7002",
        "192.168.1.102:7001",
        "192.168.1.102:7003"
      ],
      "redis_password": "feature@123@#a",
      "redis_type": "redis-cluster",
      "session_table_prefix": "akux:user:session:",
      "trusted_origins": [
        "http://192.168.1.101:12350",
        "http://192.168.1.194:5001",
        "http://192.168.1.135",
        "https://rcms.paypaya.com:9443",
        "https://rcms.paypaya.com:10443",
        "https://rcms.paypaya.com:8443",
        "https://test-rcms.paypaya.com",
        "http://10.0",
        "http://local-test-rcms.paypaya.com:9080"
      ],
      "user_perm_table_prefix": "akux:user:permission:"
    }
  },
  "upstream": {
    "timeout": {
      "connect": 16,
      "send": 16,
      "read": 16
    },
    "type": "roundrobin",
    "scheme": "http",
    "discovery_type": "consul",
    "pass_host": "pass",
    "service_name": "rcms-akux-user-system",
    "keepalive_pool": {
      "idle_timeout": 60,
      "requests": 1000,
      "size": 320
    }
  },
  "labels": {
    "akux": "泰国项目",
    "rcms-user": "用户系统"
  },
  "status": 1
}

apisix网关功能

  • 校验url是否满足不需要网关校验

  • 校验session是否有效

  • 校验配置中limiter的url访问频率是否满足条件

  • 检验最后一次登陆时间是否有效期内

  • 为应用服务器添加请求头,用户信息

  • 校验用户是否有权限访问该url

  • 重写请求头返回session,用户信息(权限点,最后一次登陆时间等)

性能分析

## 请求数量800,并发量120
ab -n 800 -c 120 "http://127.0.0.1:7676/risk/cms/comprehensive/inquiry_letter_manage/inquiry_letter/all?page=1"