概述
非关系数据库:MongoDB,Redis
1.连接sqlite
# coding=utf-8 # http://www.runoob.com/sqlite/sqlite-python.html import sqlite3 import traceback try: # 如果表不存在,就创建 with sqlite3.connect('test.db') as conn: print("Opened database successfully") # 删除表 conn.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY AUTOINCREMENT,NAME TEXT NOT NULL,AGE INT NOT NULL,ADDRESS CHAR(50),SALARY REAL); """ conn.execute(sql) print("create table successfully") # 添加数据 conn.executemany("INSERT INTO COMPANY (NAME,AGE,ADDRESS,SALARY) VALUES (?,?,? )",[('Paul',32,'California',20000.00),('Allen',25,'Texas',15000.00),('Teddy',23,'Norway',('Mark','Rich-Mond ',65000.00),('David',27,85000.00),('Kim',22,'South-Hall',45000.00),('James',24,'Houston',10000.00)]) # conn.execute("INSERT INTO COMPANY (NAME,SALARY)\ # VALUES ( 'Paul',20000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,SALARY)\ # VALUES ('Allen',15000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,SALARY)\ # VALUES ('Teddy',SALARY)\ # VALUES ( 'Mark',65000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,SALARY)\ # VALUES ( 'David',85000.00 )"); # # conn.execute("INSERT INTO COMPANY (NAME,SALARY)\ # VALUES ( 'Kim',45000.00 )") # # conn.execute("INSERT INTO COMPANY (NAME,SALARY)\ # VALUES ( 'James',10000.00 )") # 提交,否则重新运行程序时,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,NAME,SALARY FROM COMPANY """ result = conn.execute(sql) for row in result: print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("id",row[0])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name",row[1])) print("%-10s %s" % ("age",row[2])) print("%-10s %s" % ("address",row[3])) print("%-10s %.2f" % ("salary",row[4])) # or # print('{:10s} {:.2f}'.format("salary",row[4])) except sqlite3.Error as e: print("sqlite3 Error:",e) traceback.print_exc()
2.连接MysqL
#! /usr/bin/env python2.7 # coding=utf-8 # Created by xiaosanyu at 16/5/30 # MysqLdb 只支持python2.7 # http://MysqL-python.sourceforge.net/ import MysqLdb from contextlib import closing import traceback try: # 获取一个数据库连接 with closing(MysqLdb.connect(host='localhost',user='root',passwd='root',db='test',port=3306,charset='utf8')) as conn: print("connect database successfully") with closing(conn.cursor()) as cur: # 删除表 cur.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY NOT NULL auto_increment,SALARY REAL); """ cur.execute(sql) print("create table successfully") # 添加数据 # 在一个conn.execute里面里面执行多个sql语句是非法的 cur.executemany("INSERT INTO COMPANY (NAME,SALARY) VALUES ( %s,%s,%s )",10000.00)]) # 提交,表中无数据 conn.commit() print("insert successfully") # 查询表 sql = """ select id,SALARY FROM COMPANY """ cur.execute(sql) for row in cur.fetchall(): print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("id",并且左对齐 print("%-10s %s" % ("name",row[1])) print("%-10s %s" % ("age",row[2])) print("%-10s %s" % ("address",row[3])) print("%-10s %s" % ("salary",row[4])) except MysqLdb.Error as e: print("MysqL Error:",e) traceback.print_exc() # 打印错误栈信息
2.2 使用MysqLdb
#! /usr/bin/env python2.7 # coding=utf-8 # Created by xiaosanyu at 16/5/30 # MysqLdb 只支持python2.7 # http://MysqL-python.sourceforge.net/ import MysqLdb from contextlib import closing import traceback try: # 获取一个数据库连接 with closing(MysqLdb.connect(host='localhost',e) traceback.print_exc() # 打印错误栈信息
2.3使用pyMysqL
2.1和2.2节使用MysqLdb,不支持python3.x
pyMysqL对Python2.x和python3.x的支持都比较好
# Created by xiaosanyu at 16/5/30 # coding=utf-8 # https://github.com/PyMysqL/PyMysqL/ import pyMysqL from contextlib import closing import traceback try: # 获取一个数据库连接,with关键字 表示退出时,conn自动关闭 # with 嵌套上一层的with 要使用closing() with closing(pyMysqL.connect(host='localhost',charset='utf8')) as conn: print("connect database successfully") # 获取游标,cur自动关闭 with conn.cursor() as cur: # 删除表 cur.execute("DROP TABLE IF EXISTS COMPANY") # 创建表 sql = """ CREATE TABLE IF NOT EXISTS COMPANY (ID INTEGER PRIMARY KEY NOT NULL auto_increment,row[4])) except pyMysqL.Error as e: print("MysqL Error:",e) traceback.print_exc()
3.连接mssql
# Created by xiaosanyu at 16/5/30 # http://www.pymssql.org/en/latest/ import pymssql from contextlib import closing try: # 先要保证数据库中有test数据库 # 获取一个数据库连接,conn自动关闭 # with 嵌套上一层的with 要使用closing() with closing(pymssql.connect(host='192.168.100.114',user='sa',password='sa12345',database='test',port=1433,cur自动关闭 with conn.cursor() as cur: # 删除表 cur.execute( '''if exists (select 1 from sys.objects where name='COMPANY' and type='U') drop table COMPANY''') # 创建表 sql = """ CREATE TABLE COMPANY (ID INT IDENTITY(1,1) PRIMARY KEY NOT NULL,'Rich-Mond',row[4])) except pymssql.Error as e: print("mssql Error:",e) # traceback.print_exc()
4.连接MongoDB
# Created by xiaosanyu at 16/5/30 # https://docs.mongodb.com/ecosystem/drivers/python/ # https://pypi.python.org/pypi/pymongo/ import pymongo from pymongo.mongo_client import MongoClient import pymongo.errors import traceback try: # 连接到 mongodb 服务 mongoClient = MongoClient('localhost',27017) # 连接到数据库 mongoDatabase = mongoClient.test print("connect database successfully") # 获取集合 mongoCollection = mongoDatabase.COMPANY # 移除所有数据 mongoCollection.remove() # 添加数据 mongoCollection.insert_many([{"Name": "Paul","Age": "32","Address": "California","Salary": "20000.00"},{"Name": "Allen","Age": "25","Address": "Texas","Salary": "15000.00"},{"Name": "Teddy","Age": "23","Address": "Norway",{"Name": "Mark","Address": "Rich-Mond","Salary": "65000.00"},{"Name": "David","Age": "27","Salary": "85000.00"},{"Name": "Kim","Age": "22","Address": "South-Hall","Salary": "45000.00"},{"Name": "James","Age": "24","Address": "Houston","Salary": "10000.00"},]) #获取集合中的值 for row in mongoCollection.find(): print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("_id",row['_id'])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name",row['Name'])) print("%-10s %s" % ("age",row['Age'])) print("%-10s %s" % ("address",row['Address'])) print("%-10s %s" % ("salary",row['Salary'])) print('\n\n\n') # 使id自增 mongoCollection.remove() # 创建计数表 mongoDatabase.counters.save({"_id": "people_id","sequence_value": 0}) # 创建存储过程 mongoDatabase.system_js.getSequenceValue = '''function getSequenceValue(sequenceName){ var sequenceDocument = db.counters.findAndModify({ query: {_id: sequenceName},update: {$inc:{sequence_value: 1}},new:true }); return sequenceDocument.sequence_value; }''' mongoCollection.insert_many( [{"_id": mongoDatabase.eval("getSequenceValue('people_id')"),"Name": "Paul",{"_id": mongoDatabase.eval("getSequenceValue('people_id')"),"Name": "Allen","Name": "Teddy","Name": "Mark","Name": "David","Name": "Kim","Name": "James",]) for row in mongoCollection.find(): print("-" * 50) # 输出50个-,int(row['_id']))) # 字段名固定10位宽度,row['Salary'])) except pymongo.errors.PyMongoError as e: print("mongo Error:",e) traceback.print_exc()
5.连接Redis
5.1使用redis
# coding=utf-8 # Created by xiaosanyu at 16/5/31 # https://pypi.python.org/pypi/redis/2.10.5 # http://redis-py.readthedocs.io/en/latest/# import redis r = redis.Redis(host='localhost',port=6379,db=0,password="12345") print("connect",r.ping()) # 看信息 info = r.info() # or 查看部分信息 # info = r.info("Server") # 输出信息 items = info.items() for i,(key,value) in enumerate(items): print("item %s----%s:%s" % (i,key,value)) # 删除键和对应的值 r.delete("company") # 可以一次性push一条或多条数据 r.rpush("company",{"id": 1,{"id": 2,{"id": 3,"Salary": "20000.00"}) r.rpush("company",{"id": 4,"Salary": "65000.00"}) r.rpush("company",{"id": 5,"Salary": "85000.00"}) r.rpush("company",{"id": 6,"Salary": "45000.00"}) r.rpush("company",{"id": 7,"Salary": "10000.00"}) # eval用来将dict格式的字符串转换成dict for row in map(lambda x: eval(x),r.lrange("company",r.llen("company"))): print("-" * 50) # 输出50个-,作为分界线 print("%-10s %s" % ("_id",row['id'])) # 字段名固定10位宽度,并且左对齐 print("%-10s %s" % ("name",row['Name'])) print("%-10s %s" % ("age",row['Age'])) print("%-10s %s" % ("address",row['Address'])) print("%-10s %s" % ("salary",row['Salary'])) # 关闭当前连接 # r.shutdown() #这个是关闭redis服务端
5.2使用pyredis
# Created by xiaosanyu at 16/5/30 # http://pyredis.readthedocs.io/en/latest/ import pyredis r = pyredis.Client(host='localhost',database=0,r.ping().decode("utf-8")) # 看信息 # info = r.execute("info").decode() # or 查看部分信息 info = r.execute("info","Server").decode() # 输出信息 print(info) # 删除键和对应的值 r.delete("company") # 可以一次性push一条或多条数据 r.rpush("company",'''{"id": 1,"Salary": "20000.00"}''','''{"id": 2,"Salary": "15000.00"}''','''{"id": 3,"Salary": "20000.00"}''') r.rpush("company",'''{"id": 4,"Salary": "65000.00"}''') r.rpush("company",'''{"id": 5,"Salary": "85000.00"}''') r.rpush("company",'''{"id": 6,"Salary": "45000.00"}''') r.rpush("company",'''{"id": 7,"Salary": "10000.00"}''') # eval用来将dict格式的字符串转换成dict for row in map(lambda x: eval(x),row['Salary'])) # 关闭当前连接 r.close()
以上这篇python 连接各类主流数据库的实例代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持编程小技巧。
总结
以上是编程之家为你收集整理的python 连接各类主流数据库的实例代码全部内容,希望文章能够帮你解决python 连接各类主流数据库的实例代码所遇到的程序开发问题。
如果您也喜欢它,动动您的小指点个赞吧