一、项目依赖
APScheduler==3.10.4
eventlet==0.33.3
Flask==2.1.3
Flask-Caching==1.10.1
Flask-Cors==3.0.10
Flask-Migrate==2.7.0
Flask-RESTful==0.3.9
Flask-SocketIO==5.1.1
Flask-SQLAlchemy==2.5.1
PyJWT==2.3.0
PyMySQL==1.0.2
redis==3.5.3
SQLAlchemy==1.4.0 #额外修改
Werkzeug==2.0.2 #额外修改
注意:在flask2.x版本依赖,不再支持flask_script了
flask2.x版本会自动注册 flask run 和flask db 两个命令行命令
1、启动项目
flask run --host 0.0.0.0 --port 9000
2、数据库迁移命令
flask db init
flask db migrate
flask db upgrade
二、项目结构
apps
__init__.py : 创建app应用,各种注册
websocket
consumers.py #将所有socketio处理类放到这里
base
settings.py
ext
__init__.py: 拓展对象都放在这里
config.py : 拓展对象的配置内容
app.py
三、具体代码使用
settings.py
import os
import datetime
def get_database(dic):
'"mysql+pymysql://root:Huawei@123@localhost:3306/study_flask?charset=utf8"'
engine = dic.get('ENGINE')
driver = dic.get('DRIVER')
user = dic.get('USER')
host = dic.get('HOST')
password = dic.get("PASSWORD")
port = dic.get('PORT')
name = dic.get('NAME')
dbinfo = f"{engine}+{driver}://{user}:{password}@{host}:{port}/{name}?charset=utf8"
return dbinfo
#全局通用配置类
class Config:
"""项目配置核心类"""
DEBUG=True
LOG_LEVEL = "INFO"
SECRET_KEY= '8hdj^sasdas6736475#$#5&GHG'
BASE_PATH = os.path.dirname(os.path.abspath(__file__))
STATIC_PATH = os.path.join(BASE_PATH, 'static') #static/ 路由对应的目录
TEMPLATES_PATH = os.path.join(BASE_PATH, 'templates') # 用于专门检索静态文件的位置,方便修改
# 中文乱码
JSON_AS_ASCII = False
# # 配置redis
# # 项目上线以后,这个地址就会被替换成真实IP地址,mysql也是
# REDIS_HOST = 'your host'
# REDIS_PORT = your port
# REDIS_PASSWORD = 'your password'
# REDIS_POLL = 10
#数据库配置
dbinfo={
'ENGINE':'mysql',
'DRIVER':'pymysql',
'USER':'root',
'PASSWORD':'123456',
'HOST':"127.0.0.1",
"PORT":"3306",
'NAME':'flask_obj'
}
# 数据库连接格式
SQLALCHEMY_DATABASE_URI = get_database(dbinfo)
# 动态追踪修改设置,如未设置只会提示警告
SQLALCHEMY_TRACK_MODIFICATIONS = False
# 查询时会显示原始SQL语句
SQLALCHEMY_ECHO = False
# 数据库连接池的大小
SQLALCHEMY_POOL_SIZE=100
#指定数据库连接池的超时时间
SQLALCHEMY_POOL_TIMEOUT=30
# 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_MAX_OVERFLOW=2
#配置日志时,需要设置False,只能flask才能捕获移除写到日志文件中
PROPAGATE_EXCEPTIONS = False
#配置时区
TIMEZONE = local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo
class DevConfig(Config):
DEBUG = True
class TestConfig(Config):
DEBUG = True
class Online(Config):
DEBUG = False
envs = {
'dev':DevConfig,
'test':TestConfig,
'online':Online,
'default':Config
}
3.1、ext配置
1、ext/__init__.py
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from flask_restful import Api
from flask_caching import Cache
from flask_socketio import SocketIO
db = SQLAlchemy() #数据库对象
cors = CORS() #跨域
cache = Cache() #缓存
socketio = SocketIO()#websocket对象
2、ext/config.py
#跨域的配置
cors_config = {
"origins": "*", #所有域都允许
"expose_headers": ["Content-Type", "token","x-requested-with"], #跨域请求头允许的
"methods": ["GET", "POST","PUT","PATCH","OPTIONS","DELETE"], #跨域允许的请求方式
"supports_credentials": True, #允许在cookies跨域
}
#cache缓存配置-使用redis数据库
cache_config_redis = {
'CACHE_TYPE':'redis',
'CACHE_REDIS_HOST':'127.0.0.1',
'CACHE_REDIS_PORT':6637,
# 'CACHE_REDIS_PASSWORD':'密码', #如果redis配置了密码
'CACHE_REDIS_DB':0, #指定使用的redis的db,默认0-15
}
#cache缓存配置-使用内存
cache_config_mem = {
'CACHE_TYPE':'simple'#使用内存作为cache
}
3.2、apps配置
1、__init__.py
import logging
import eventlet
from flask import Flask,jsonify
#导入跨域对象
from ext import cors
#导入db对象
from ext import db
#导入cache
from ext import cache
#导入socketio
from ext import socketio
#导入拓展的配置内容
from ext.config import cache_config_redis,cache_config_mem,cors_config
#导入中间件
from base.middleware import after_request #响应前执行的中间件
#导入配置字典
from base.settings import envs
#导入日志
from base.logger import getLogHandlerFile
from base.logger import getLogHanderTime
#定时任务
from base.scheduler import SchedulerManage
#注册socketio的命名空间
from apps.websocket.consumers import TotalWebsocketNamespace
#导入蓝图
from apps.user.urls import user_bp
def create_app():
#创建一个flask实例,传递__name__ ,是把当前文路径作为flask实例的根路径
#static和templates都是创建在该路径下的
app = Flask(__name__,static_folder='../static',template_folder='../templates') #static目录位置是上层的static
eventlet.monkey_patch() # 开启补丁机制
'基本配置'
#导入配置从类中
app.config.from_object(envs.get('default'))
'日志配置'
app.logger.addHandler(getLogHanderTime()) #基于时间
app.logger.addHandler(getLogHandlerFile()) #基于文件大小
app.logger.setLevel(logging.INFO)
'中间件'
#每次响应前都先设置好响应头,做好跨域
app.after_request(after_request) #【1、使用中间件解决跨域】
#执行请求处理前的中间件,
# app.before_request(before_request)
'拓展配置'
#配置db对象 将db对象与app进行绑定,orm对象与app绑定
db.init_app(app)
#配置跨域,supports_credentials=True 允许携带cookies等信息
cors.init_app(app,**cors_config) #【2、使用三方扩展解决跨域】
#配置缓存
cache.init_app(app=app,config=cache_config_mem)
#配置socketio
socketio.init_app(app,cors_allowed_origins='*')
'socketio注册命名空间的位置: 必须在这个py文件中注册'
socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间
'蓝图注册'
app.register_blueprint(user_bp)
'异常处理'
@app.errorhandler(Exception)
def handle_exception(e):
# 将异常错误写到日志文件中
app.logger.exception(str(e))
# print(e, type(e))
# 对异常错误的响应,使用api的格式
return jsonify(code=500, message=str(e)), 500
'定时任务'
# SchedulerManage()
return app
在本文中,最重要的就是将socketio命名空间的注册,放到create_app函数中了。
2、websocket/consumers.py
from flask import render_template, request, jsonify
from flask_socketio import SocketIO, send, emit, join_room
from ext import socketio
from flask_socketio import Namespace
'''
一、非群聊功能,前端需要实时更新某些数据使用
1、返回html页面
2、主动发送websocket到后端,后端返回数据给请求的用户
3、调用某个视图函数,在视图函数中,给所有连接推送新的数据
'''
class TotalWebsocketNamespace(Namespace):
def on_handle_data(self,data):
print(data, '接收浏览器发送的数据')
# 1、给发送给后端的websocket,发送数据,单独给这个websocket发送
# socketio.emit('handle_data', {'data':'返回的数据','type':'user','msg':'单独返回'})
emit('handle_data', {'data': '返回的数据', 'type': 'user', 'msg': '主动请求时,返回的数据'})
def on_connect(self):
print('connect连接')
token = request.args.get('token')
sid = request.sid
print(request.args, 'args')
# print('连接的sid',request.sid)
if token == '123456':
socketio.emit('success', '验证token成功')
join_room('default') # 加入到默认的房间中了
# 表明连接成功
else:
print('token验证失败')
# 阻止连接
return False
四、总结
如何将socketio处理函数集中管理,将所有的处理类集中到一个py文件中。
1、创建一个包,在包中创建一个py文件,将所有处理socketio的类都写在这里
2、在创建flask的app应用中,注册socketio的命名空间:在create_app函数中,注册命名空间
'socketio注册命名空间的位置: 必须在这个py文件中注册'
socketio.on_namespace(TotalWebsocketNamespace('/')) #使用默认的全局名称空间
建议采用这种方式来管理socketio,在后续的维护会比较容易,拓展起来也简单。不建议使用函数的方式,这样就必须将函数都写在app.py文件中,不然后端无法监听到前端发送的websocket的请求。
五、错误写法
1、直接在consumers.py同级目录下创建一个routings.py,将命名空间的注册写到该py文件中,这样后端是无法接收到前端的websocket请求的。文章来源:https://www.toymoban.com/news/detail-726915.html
2、使用函数的方式来处理逻辑,且把函数都集中放到了websocket/consumers.py文件中,这样后端是无法接收到前端的websocket请求的。文章来源地址https://www.toymoban.com/news/detail-726915.html
到了这里,关于flask框架-[实现websocket]:将socketio处理函数部分集中管理,使用类的方式来管理,集中管理socketio处理函数的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!