温馨提示:
1. 此代码样例是一个简单IP池的实现
2. 支持Python2.7和Python3
3. requests不是python原生库需要安装才能使用: pip install requests
参考样例
import time
import random
import threading
import requests
class ProxyPool():
def__init__(self, secret_id,secret_token,proxy_count):
self.secret_id=secret_id
self.signature=secret_token
self.proxy_count=proxy_count if proxy_count < 50 else 50 # 池子维护的IP总数建议一般不要超过50
self.alive_proxy_list = [] # 活跃IP列表
def_fetch_proxy_list(self,count):
"""调用ipip9API获取代理IP列表"""
try:
res=requests.get("https://17178.org/xc.php?userpass=xxx:xxx&type=sticky&protocol=socks5&quantity=10&format=us.ipip3.com:port:login:password&session_ttl=30 HTTP/1.1" % (self.secret_id, self.signature, count))
return [proxy.split(',') for proxy in res.json().get('data').get('proxy_list')]
except:
print("API获取IP异常请检查订单")
return []
def _init_proxy(self):
"""初始化IP池"""
self.alive_proxy_list = self._fetch_proxy_list(self.proxy_count)
def add_alive_proxy(self, add_count):
"""导入新的IP,参数为新增IP数"""
self.alive_proxy_list.extend(self._fetch_proxy_list(add_count))
def get_proxy(self):
"""从IP池中获取IP"""
return random.choice(self.alive_proxy_list)[0] if self.alive_proxy_list else ""
def run(self):
sleep_seconds=1
self._init_proxy()
while True:
for proxy in self.alive_proxy_list:
proxy[1]=float(proxy[1]) - sleep_seconds # proxy[1]代表此IP的剩余可用时间
if proxy[1]<=3:
self.alive_proxy_list.remove(proxy) # IP还剩3s时丢弃此IP
if len(self.alive_proxy_list)