mysql 方法总结

Posted by tzwlwy's Blog on August 20, 2020

重新梳理了一下mysql方法

import logging
import pymysql

class MYSQL_DB(object):
    def __init__(self, mysql_da_data):
        self.cursor = None
        self.con = None
        self.mysql_da_data = mysql_da_data

    @staticmethod
    def connect(database):
        try:
            con = pymysql.connect(**database)
        except Exception as e:
            print(e)
            return False
        return con


    def _select(self,perform_sql):
        self.cursor.execute(perform_sql)
        res=self.sql_fetch_json()
        return res

    def _update_or_insert(self,perform_sql):
        self.cursor.execute(perform_sql)
        self.con.commit()
        self.closed()

    def _cursor(self):
        con = self.connect(self.mysql_da_data)
        cursor = con.cursor()
        if cursor:
            return con, cursor
        raise ConnectionError



    @staticmethod
    def to_json(result):
        _list = []
        for l in result:
            _list.append(list(l))

        return _list

    def sql_fetch_json(self):
        keys = []
        for column in self.cursor.description:
            keys.append(column[0])
        key_number = len(keys)
        json_data = []
        for row in self.cursor.fetchall():
            item = dict()
            for q in range(key_number):
                item[keys[q]] = row[q]
            json_data.append(item)
        print(json_data)
        self.closed()
        return json_data

    def closed(self):
        print("sql 执行完毕")
        self.cursor.close()

    def execute(self, perform_sql):
        print('执行sql:%s' % perform_sql)
        logging.info('执行sql:%s' % perform_sql)
        self.con, self.cursor = self._cursor()
        way=perform_sql[:6].lower()
        if way == 'select':
            self._select(perform_sql)
        else:
            self._update_or_insert(perform_sql)

if __name__ == '__main__':
    MYSQL_DB_DATA1 = {
        "host": "localhost",
        "user": "root",
        "password": "123456",
        "db": "django1",
        "port": 3306
    }

    sql = '''select * from test
    '''
    db = MYSQL_DB(MYSQL_DB_DATA1)
    db.execute(sql)