flask 框架操作MySQL数据库简单示例

(编辑:jimmy 日期: 2025/1/13 浏览:2)

本文实例讲述了flask 框架操作MySQL数据库。分享给大家供大家参考,具体如下:

一、创建数据库表格

"""
Created on 19-10-8
@requirement:Anaconda 4.3.0 (64-bit) Python3.6
@description:创建表格
"""
import pymysql
server = '127.0.0.1'
user = 'root'
password = 'password'
# 连接数据库
conn = pymysql.connect(server, user, password, database='information_collection') # 获取连接
cursor = conn.cursor() # 获取游标
# "**ENGINE=InnoDB DEFAULT CHARSET=utf8**"-创建表的过程中增加这条,中文就不是乱码
# 创建表
cursor.execute("""
CREATE TABLE if not exists user(
   user_id INT NOT NULL auto_increment primary key,
   user_name VARCHAR(100),
   user_password VARCHAR(100),
   user_nickname VARCHAR(100),
   user_email VARCHAR(100)
   )
   ENGINE=InnoDB DEFAULT CHARSET=utf8
 """)
# 查询数据库表user内容
cursor.execute('SELECT * FROM user')
# 查看一行 多行:cursor.fetchall()
row = cursor.fetchone()
print(row)
# if row[0] is None:
#   row0 = list(row)
#   row0[0] = 0
#   row = tuple(row0)
# # 插入数据,注:与sqlserver有些区别
cursor.execute("INSERT INTO user VALUES('%s','%s','%s','%s')" % ('xiaoming','qwe','ming','@163.com'))
# 提交数据,才会写入表格
conn.commit()
# 关闭游标关闭数据库
cursor.close()
conn.close()

二、flask操作mysql

"""
Created on 19-10-8
@requirement:Anaconda 4.3.0 (64-bit) Python3.6
@description:
"""
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, jsonify, request
import configparser
import os
app = Flask(__name__)
# 使用ConfigParser 首选需要初始化实例,并读取配置文件:
my_config = configparser.ConfigParser()
my_config.read('db.conf')
# 连接数据库information_collection
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DEV_DATABASE_URL') or                "mysql+pymysql://root:password@127.0.0.1:3306/information_collection"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
mydb = SQLAlchemy()
mydb.init_app(app)
# 用户模型
class User(mydb.Model):
  user_id = mydb.Column(mydb.Integer, primary_key=True)
  user_name = mydb.Column(mydb.String(60), nullable=False)
  user_password = mydb.Column(mydb.String(30), nullable=False)
  user_nickname = mydb.Column(mydb.String(50))
  user_email = mydb.Column(mydb.String(30), nullable=False)
  def __repr__(self):
    return '<User %r>' % self.user_name
# 获取用户列表,所有数据
@app.route('/users', methods=['GET'])
def getUsers():
  data = User.query.all()
  datas = []
  for user in data:
    datas.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email})
  return jsonify(data=datas)
# 添加用户数据,一条一条添加
@app.route('/user', methods=['POST'])
def addUser():
  user_name = request.form.get('user_name')
  user_password = request.form.get('user_password')
  user_nickname = request.form.get('user_nickname')
  user_email = request.form.get('user_email')
  user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)
  try:
    mydb.session.add(user)
    mydb.session.commit()
  except:
    mydb.session.rollback()
    mydb.session.flush()
  userId = user.user_id
  if (user.user_id is None):
    result = {'msg': '添加失败'}
    return jsonify(data=result)
  data = User.query.filter_by(user_id=userId).first()
  result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
  return jsonify(data=result)
# 获取单条数据
@app.route('/user/<int:userId>', methods=['GET'])
def getUser(userId):
  user = User.query.filter_by(user_id=userId).first()
  if (user is None):
    result = {'msg': '找不到数据'}
  else:
    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
  return jsonify(data=result)
# 修改用户数据
@app.route('/user/<int:userId>', methods=['PATCH'])
def updateUser(userId):
  user_name = request.form.get('user_name')
  user_password = request.form.get('user_password')
  user_nickname = request.form.get('user_nickname')
  user_email = request.form.get('user_email')
  try:
    user = User.query.filter_by(user_id=userId).first()
    if (user is None):
      result = {'msg': '找不到要修改的记录'}
      return jsonify(data=result)
    else:
      user.user_name = user_name
      user.user_password = user_password
      user.user_nickname = user_nickname
      user.user_email = user_email
      mydb.session.commit()
  except:
    mydb.session.rollback() # 回滚
    mydb.session.flush() # 重置
  userId = user.user_id
  data = User.query.filter_by(user_id=userId).first()
  result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_password': user.user_password, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
  return jsonify(data=result)
# 删除用户数据
@app.route('/user/<int:userId>', methods=['DELETE'])
def deleteUser(userId):
  User.query.filter_by(user_id=userId).delete()
  mydb.session.commit()
  return getUsers()
if __name__ == '__main__':
  app.run()

三、返回数据的样式

{
  "data": {
    "user_email": "@126.com",
    "user_id": 6,
    "user_name": "xiaoli",
    "user_nickname": "lili"
  }
}

希望本文所述对大家基于flask框架的Python程序设计有所帮助。