商城網(wǎng)站開發(fā)教程視頻北京網(wǎng)絡(luò)營銷推廣
壓箱底代碼存活
- 1、引言
- 2、代碼實例
- 2.1 操作存儲服務(wù)
- 2.1.1 Redis操作
- 2.1.2 MongoDB操作
- 2.1.3 MySQL操作
- 2.2 異步操作
- 2.3 多線程
- 3、總結(jié)
1、引言
小屌絲:魚哥,這年底了,得不得分享一點壓箱底的東西啊
小魚:… 壓箱底的東西,還能分享出來?
小屌絲:唉,為了我能學(xué)習(xí)更多的知識, 你可以分享分享。
小魚:確定要壓箱底的?
小屌絲:必須的。
小魚:好吧,既然你這么想要,那就給你。
小屌絲:… 你這…
小魚:這就是我壓箱底的知識啊。
小屌絲:沒行到,魚哥你…
小魚:算了算了, 我還是跟你分享一點點點點的壓箱底的代碼吧
小屌絲:這還差不多。
2、代碼實例
2.1 操作存儲服務(wù)
2.1.1 Redis操作
# -*- coding:utf-8 -*-
# @Time : 2023-12-19
# @Author : Carl_DJ'''
實現(xiàn)功能:對redis鏈接,寫入數(shù)據(jù)等操作
'''import reids
import requests#鏈接redis
def redis_con():pool = reids.ConnectPool(host = 'host',port = 8809,decode_responses = True)rd = redis.Redis(connnect_pool = pool)return rdif __name__ == '__main__':#寫入redis操作rd = redis_con()rd.set('redis_testDemo1','redis_testDemo2')
2.1.2 MongoDB操作
# -*- coding:utf-8 -*-
# @Time : 2023-12-19
# @Author : Carl_DJ'''
實現(xiàn)功能:對MongoDB 鏈接,寫入數(shù)據(jù)等操作
'''from pymongo import MongoClient#鏈接MongoDB
conn = MongoClient("mongodb://%s:%s@ipaddress:14339/mydb" % ('username', 'password'))
db = conn.mydb
mongo_collection = db.mydata# 批量寫入MongoDB操作
res = requests.get(url=url,params=query).json()
commentList = res['data']['commentList']
mongo_collection.insert_many(commentList)
2.1.3 MySQL操作
# -*- coding:utf-8 -*-
# @Time : 2023-12-19
# @Author : Carl_DJ'''
實現(xiàn)功能:對MySQL鏈接,查詢等等操作
'''import MySQLdb#鏈接MySQL數(shù)據(jù)庫
db = MySQLdb.connect("localhost","username","password","DBname",charset = 'utf8')# 創(chuàng)建游標
cursor = db.cursor()#執(zhí)行sql語句
cursor.execute("SELCT * FROM DBname ")# 獲取一條數(shù)據(jù)
data_one = cursor.fetchone()# 獲取多條數(shù)據(jù)
data_all = cursor.fetchall()#關(guān)閉數(shù)據(jù)庫鏈接
db.close()
2.2 異步操作
# -*- coding:utf-8 -*-
# @Time : 2023-12-19
# @Author : Carl_DJ'''
實現(xiàn)功能:異步編程
'''import aiofilesasync def get_html(session,url):try:async with session.get(url,timeout = 10) as res:if not res.status // 100 == 2:print(f'狀態(tài)碼為:{res.status}')print(f"無法爬取{url},報錯了 ")else:res.encoding = 'utf-8'text = await res.text()return textexcept Exception as e:print(f'錯誤信息:{e}')await get_html(session,url)async def download(title_list,content_list):async with aiofiles.open('{}.txt'.format(title_list[0]),'a',encoding='utf-8') as f:await f.write('{}'.format(str(content_list)))
2.3 多線程
# -*- coding:utf-8 -*-
# @Time : 2023-12-19
# @Author : Carl_DJ'''
實現(xiàn)功能:多線程
'''import threading
from datetime import *#自定義全局變量需要的線程數(shù),20
thread_num = 20
#自定義全局變量每個線程需要循環(huán)的數(shù)量,10
one_work_num = 10
#自定義函數(shù)test()
def test():#編寫測試代碼now = datetime.now()print("打印時間戳:",now)#設(shè)置死循環(huán)#x =0#while (x ==1):# print("打印時間戳:",now)
def working():# 引用全局變量global one_work_num#嵌套執(zhí)行指定循環(huán)次數(shù)的 test()函數(shù)for i in range(0,one_work_num):test()
#自定義thr()函數(shù),來執(zhí)行多線程
def thr():#自定義一個空的數(shù)組,用來存放線程組threads = []#設(shè)置循環(huán)次數(shù)為thread_numfor i in range(thread_num):#將working()函數(shù)放入到線程中t =threading.Thread(target=working,name="T" + str(i))#設(shè)置守護線程t.setDaemon(True)threads.append(t)#啟動線程并執(zhí)行for t in threads:t.start()#設(shè)置阻塞線程for t in threads:t.join(2)
if __name__ == "__main__":thr()
3、總結(jié)
看到這里,今天的分享差不多就結(jié)束了。
其實,在平時工作中,只要善于總結(jié),總會有更高效的代碼片段的收集。
關(guān)于自動化腳本,可以參照小魚的這些博文:
- 《Python3,這2個自動化腳本,徹底解放生產(chǎn)力。》
- 《Python3,自從掌握了這3個自動化腳本生成神器,讓我的幸福感提升了200%。》
- 《Python3,掌握這4個自動化腳本》
- …等等
也可以關(guān)注小魚的Python專欄:
- 《Python開發(fā)實戰(zhàn)及代碼示例講解》
我是小魚:
- CSDN 博客專家;
- 阿里云 專家博主;
- 51CTO博客專家;
- 51認證講師等;
- 認證金牌面試官;
- 職場面試培訓(xùn)、職業(yè)規(guī)劃師;
- 多個國內(nèi)主流技術(shù)社區(qū)的認證專家博主;
- 多款主流產(chǎn)品(阿里云等)測評一、二等獎獲得者;
關(guān)注小魚,帶你學(xué)習(xí)更多更專業(yè)更前言的Python相關(guān)技術(shù)。