1,主函式入口
#!../venv/bin/python3
])# 全域性變數字典
tornado.options.define('global_dict', type=dict, default={}, multiple=true)
tornado.options.parse_command_line()
server.bind(8888)
server.start(0)
tornado.ioloop.ioloop.current().start()
2.構建返回體及驗證介面# -*- coding: utf-8 -*-
# 設定返回體和跨域請求
rsp_data = ,
'code': 0,
'message': 'success',
"requestid": str(uuid.uuid4())
}# 引數獲取
try:
if self.request.headers['content-type'].startswith("multipart/form-data"):
print('request header%s', str(str(self.request.headers).split('\n')))
params = json.loads(self.get_argument("params"))
inte***ce_name = self.get_argument('action')
else:
req_data = tornado.escape.json_decode(self.request.body)
# route to the corresponding handler by action name
print('request header%s body[%s]', str(str(self.request.headers).split('\n')), str(req_data))
inte***ce_name = req_data['action']
params = req_data['params']
except exception:
print('request format error: %s', traceback.format_exc())
self._rsp_data(rsp_data, 10005, 'invalid request body')
self.write(tornado.escape.json_encode(rsp_data))
return
# 許可權驗證
try:
# 如果介面不存在,直接返回
if inte***ce_name not in inte***ce_post_all.inte***ce_route.keys():
rsp_data['code'] = 10004
rsp_data['message'] = 'invalid inte***ce name'
self.write(tornado.escape.json_encode(rsp_data))
return
except jwt.exceptions.expiredsignature as e:
self._rsp_data(rsp_data, 4001, str(e))
self.write(tornado.escape.json_encode(rsp_data))
return
except jwt.exceptions.pyjwterror as e:
self._rsp_data(rsp_data, 4002, str(e))
self.write(tornado.escape.json_encode(rsp_data))
return
except exception as e:
print('unknown error occur when invoke api[%s], error type[%s], error: %s',
inte***ce_name,
str(type(e)),
traceback.format_exc())
self._rsp_data(rsp_data, 4004, str(e))
self.write(tornado.escape.json_encode(rsp_data))
return
# action對映
try:
action_handler = inte***ce_post_all.inte***ce_route_all[inte***ce_name](params)
data = action_handler.process()
rsp_data['data'] = data
except exception as e:
print('unknown error occur when invoke api[%s], error type[%s], error: %s',
inte***ce_name,
str(type(e)),
traceback.format_exc())
rsp_data['code'] = 10001
rsp_data['message'] = 'unknown error'
finally:
# lists not accepted for security reasons;
# rsp_data must be dict or unicode type or bytes
print('response data[%s]', tornado.escape.json_encode(rsp_data))
rsp_data = tornado.escape.json_encode(rsp_data)
self.write(rsp_data)
def _rsp_data(self, rsp_data, code, message, data=none):
rsp_data['code'] = code
rsp_data['message'] = message
if data:
rsp_data['data'] = data
print('response data[%s]', tornado.escape.json_encode(rsp_data))
def options(self, *args, **kwargs):
print('option method occur')
3.介面集合class addsoftwaredefinition(object):
def __init__(self, params):
pass
def process(self):
return
inte***ce_route =
inte***ce_route_all = dict(inte***ce_route)
上述3個py檔案放在乙個資料夾裡面即可執行,下面是請求體。自己可以定義介面,,來體驗下。post請求
url:127.0.0.1:8888/wf/price
請求體:
}返回結果:
, "requestid": "cbda039a-250c-4859-a799-48bb4a4a72b5",
"version": "1",
"message": "success",
"componenttime": "tcequotedprice"
}
python tornado 框架使用 (1)
1 日誌系統 common logging.py usr bin env python coding utf 8 import logging import logging.config import os from unipath import path logging.config.fileco...
python tornado非同步效能測試
測試兩個介面 coding utf 8 import time import tornado.web import tornado.gen import tornado.ioloop from tornado.concurrent import run on executor from concur...
python tornado實現簡單的檔案上傳功能
在web應用開發的功能中,檔案的上傳是經常會使用到的功能,本文就是利用python tornado框架,實現了乙個簡單的檔案上傳功能 tornado.httputil.httpfile物件三個屬性 1.filename檔名 2.body檔案內部實際內容 3.type檔案的型別 defget self...