以下是使用Python统计网站访问流量的几种常见方法及实现代码,涵盖基础到高级的不同场景需求:
![图片[1]_使用Python统计网站访问流量_知途无界](https://zhituwujie.com/wp-content/uploads/2025/04/d2b5ca33bd20250417092251.png)
一、基础流量统计方案
1. 使用Apache/Nginx日志分析
from collections import defaultdictimport redef parse_log(file_path):ip_counter = defaultdict(int)url_counter = defaultdict(int)log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP'with open(file_path) as f:for line in f:match = re.search(log_pattern, line)if match:ip, method, url = match.groups()ip_counter[ip] += 1url_counter[(method, url.split('?')[0])] += 1 # 忽略查询参数return ip_counter, url_counter# 使用示例ip_counts, url_counts = parse_log('/var/log/nginx/access.log')print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10])print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])from collections import defaultdict import re def parse_log(file_path): ip_counter = defaultdict(int) url_counter = defaultdict(int) log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP' with open(file_path) as f: for line in f: match = re.search(log_pattern, line) if match: ip, method, url = match.groups() ip_counter[ip] += 1 url_counter[(method, url.split('?')[0])] += 1 # 忽略查询参数 return ip_counter, url_counter # 使用示例 ip_counts, url_counts = parse_log('/var/log/nginx/access.log') print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10]) print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])from collections import defaultdict import re def parse_log(file_path): ip_counter = defaultdict(int) url_counter = defaultdict(int) log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP' with open(file_path) as f: for line in f: match = re.search(log_pattern, line) if match: ip, method, url = match.groups() ip_counter[ip] += 1 url_counter[(method, url.split('?')[0])] += 1 # 忽略查询参数 return ip_counter, url_counter # 使用示例 ip_counts, url_counts = parse_log('/var/log/nginx/access.log') print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10]) print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])
2. 实时流量监控(Flask示例)
from flask import Flask, requestfrom collections import dequeimport timeapp = Flask(__name__)request_queue = deque(maxlen=1000) # 保存最近1000次请求@app.before_requestdef log_request():request_queue.append({'time': time.time(),'ip': request.remote_addr,'method': request.method,'path': request.path,'user_agent': request.user_agent.string})def get_realtime_stats():now = time.time()last_min = [r for r in request_queue if now - r['time'] < 60]return {'rpm': len(last_min),'top_ips': Counter(r['ip'] for r in last_min).most_common(5)}from flask import Flask, request from collections import deque import time app = Flask(__name__) request_queue = deque(maxlen=1000) # 保存最近1000次请求 @app.before_request def log_request(): request_queue.append({ 'time': time.time(), 'ip': request.remote_addr, 'method': request.method, 'path': request.path, 'user_agent': request.user_agent.string }) def get_realtime_stats(): now = time.time() last_min = [r for r in request_queue if now - r['time'] < 60] return { 'rpm': len(last_min), 'top_ips': Counter(r['ip'] for r in last_min).most_common(5) }from flask import Flask, request from collections import deque import time app = Flask(__name__) request_queue = deque(maxlen=1000) # 保存最近1000次请求 @app.before_request def log_request(): request_queue.append({ 'time': time.time(), 'ip': request.remote_addr, 'method': request.method, 'path': request.path, 'user_agent': request.user_agent.string }) def get_realtime_stats(): now = time.time() last_min = [r for r in request_queue if now - r['time'] < 60] return { 'rpm': len(last_min), 'top_ips': Counter(r['ip'] for r in last_min).most_common(5) }
二、数据库存储方案
3. SQLite存储+分析
import sqlite3from datetime import datetime# 初始化数据库conn = sqlite3.connect('traffic.db')c = conn.cursor()c.execute('''CREATE TABLE IF NOT EXISTS visits(id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''')# 插入访问记录def log_visit(ip, url):c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)",(ip, url, datetime.now()))conn.commit()# 查询统计def get_daily_visits(date):c.execute('''SELECT strftime('%H', timestamp) as hour,COUNT(*) FROM visitsWHERE date(timestamp) = ?GROUP BY hour''', (date,))return c.fetchall()import sqlite3 from datetime import datetime # 初始化数据库 conn = sqlite3.connect('traffic.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS visits (id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''') # 插入访问记录 def log_visit(ip, url): c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)", (ip, url, datetime.now())) conn.commit() # 查询统计 def get_daily_visits(date): c.execute('''SELECT strftime('%H', timestamp) as hour, COUNT(*) FROM visits WHERE date(timestamp) = ? GROUP BY hour''', (date,)) return c.fetchall()import sqlite3 from datetime import datetime # 初始化数据库 conn = sqlite3.connect('traffic.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS visits (id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''') # 插入访问记录 def log_visit(ip, url): c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)", (ip, url, datetime.now())) conn.commit() # 查询统计 def get_daily_visits(date): c.execute('''SELECT strftime('%H', timestamp) as hour, COUNT(*) FROM visits WHERE date(timestamp) = ? GROUP BY hour''', (date,)) return c.fetchall()
4. 使用Pandas分析
import pandas as pd# 从数据库加载数据df = pd.read_sql('SELECT * FROM visits', conn)df['timestamp'] = pd.to_datetime(df['timestamp'])# 按小时统计hourly = df.groupby(df['timestamp'].dt.hour).size()# 热门页面top_pages = df['url'].value_counts().head(10)import pandas as pd # 从数据库加载数据 df = pd.read_sql('SELECT * FROM visits', conn) df['timestamp'] = pd.to_datetime(df['timestamp']) # 按小时统计 hourly = df.groupby(df['timestamp'].dt.hour).size() # 热门页面 top_pages = df['url'].value_counts().head(10)import pandas as pd # 从数据库加载数据 df = pd.read_sql('SELECT * FROM visits', conn) df['timestamp'] = pd.to_datetime(df['timestamp']) # 按小时统计 hourly = df.groupby(df['timestamp'].dt.hour).size() # 热门页面 top_pages = df['url'].value_counts().head(10)
三、高级分析方案
5. 用户行为分析(Session识别)
from datetime import timedeltadef group_sessions(df, inactivity_threshold=30):df = df.sort_values('timestamp')df['time_diff'] = df.groupby('ip')['timestamp'].diff()df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold)df['session_id'] = df.groupby('ip')['new_session'].cumsum()return df# 分析会话时长sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max'])sessions['duration'] = sessions['max'] - sessions['min']from datetime import timedelta def group_sessions(df, inactivity_threshold=30): df = df.sort_values('timestamp') df['time_diff'] = df.groupby('ip')['timestamp'].diff() df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold) df['session_id'] = df.groupby('ip')['new_session'].cumsum() return df # 分析会话时长 sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max']) sessions['duration'] = sessions['max'] - sessions['min']from datetime import timedelta def group_sessions(df, inactivity_threshold=30): df = df.sort_values('timestamp') df['time_diff'] = df.groupby('ip')['timestamp'].diff() df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold) df['session_id'] = df.groupby('ip')['new_session'].cumsum() return df # 分析会话时长 sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max']) sessions['duration'] = sessions['max'] - sessions['min']
6. 地理信息分析
import geoip2.databasereader = geoip2.database.Reader('GeoLite2-City.mmdb')def get_geoip(ip):try:response = reader.city(ip)return {'country': response.country.name,'city': response.city.name,'lat': response.location.latitude,'lng': response.location.longitude}except:return None# 应用地理分析df['geo'] = df['ip'].apply(get_geoip)geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()import geoip2.database reader = geoip2.database.Reader('GeoLite2-City.mmdb') def get_geoip(ip): try: response = reader.city(ip) return { 'country': response.country.name, 'city': response.city.name, 'lat': response.location.latitude, 'lng': response.location.longitude } except: return None # 应用地理分析 df['geo'] = df['ip'].apply(get_geoip) geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()import geoip2.database reader = geoip2.database.Reader('GeoLite2-City.mmdb') def get_geoip(ip): try: response = reader.city(ip) return { 'country': response.country.name, 'city': response.city.name, 'lat': response.location.latitude, 'lng': response.location.longitude } except: return None # 应用地理分析 df['geo'] = df['ip'].apply(get_geoip) geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()
四、可视化方案
7. 使用Matplotlib绘制趋势图
import matplotlib.pyplot as plt# 按小时流量趋势hourly.plot(kind='bar', title='Hourly Traffic')plt.xlabel('Hour of Day')plt.ylabel('Visits')plt.show()# 地理热力图import foliumfrom folium.plugins import HeatMapmap_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist()m = folium.Map(location=[30, 0], zoom_start=2)HeatMap(map_data).add_to(m)m.save('heatmap.html')import matplotlib.pyplot as plt # 按小时流量趋势 hourly.plot(kind='bar', title='Hourly Traffic') plt.xlabel('Hour of Day') plt.ylabel('Visits') plt.show() # 地理热力图 import folium from folium.plugins import HeatMap map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist() m = folium.Map(location=[30, 0], zoom_start=2) HeatMap(map_data).add_to(m) m.save('heatmap.html')import matplotlib.pyplot as plt # 按小时流量趋势 hourly.plot(kind='bar', title='Hourly Traffic') plt.xlabel('Hour of Day') plt.ylabel('Visits') plt.show() # 地理热力图 import folium from folium.plugins import HeatMap map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist() m = folium.Map(location=[30, 0], zoom_start=2) HeatMap(map_data).add_to(m) m.save('heatmap.html')
8. 实时仪表盘(Dash)
import dashfrom dash import dcc, htmlimport plotly.express as pxapp = dash.Dash(__name__)app.layout = html.Div([dcc.Graph(id='live-graph'),dcc.Interval(id='interval', interval=60*1000) # 每分钟更新])@app.callback(Output('live-graph', 'figure'),Input('interval', 'n_intervals'))def update_graph(n):df = pd.read_sql('''SELECT date(timestamp) as day,COUNT(*) as visitsFROM visitsGROUP BY day''', conn)fig = px.line(df, x='day', y='visits', title='Daily Visits')return figif __name__ == '__main__':app.run_server(debug=True)import dash from dash import dcc, html import plotly.express as px app = dash.Dash(__name__) app.layout = html.Div([ dcc.Graph(id='live-graph'), dcc.Interval(id='interval', interval=60*1000) # 每分钟更新 ]) @app.callback(Output('live-graph', 'figure'), Input('interval', 'n_intervals')) def update_graph(n): df = pd.read_sql('''SELECT date(timestamp) as day, COUNT(*) as visits FROM visits GROUP BY day''', conn) fig = px.line(df, x='day', y='visits', title='Daily Visits') return fig if __name__ == '__main__': app.run_server(debug=True)import dash from dash import dcc, html import plotly.express as px app = dash.Dash(__name__) app.layout = html.Div([ dcc.Graph(id='live-graph'), dcc.Interval(id='interval', interval=60*1000) # 每分钟更新 ]) @app.callback(Output('live-graph', 'figure'), Input('interval', 'n_intervals')) def update_graph(n): df = pd.read_sql('''SELECT date(timestamp) as day, COUNT(*) as visits FROM visits GROUP BY day''', conn) fig = px.line(df, x='day', y='visits', title='Daily Visits') return fig if __name__ == '__main__': app.run_server(debug=True)
五、生产环境建议
- 日志轮转处理
import loggingfrom logging.handlers import RotatingFileHandlerhandler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5)logging.basicConfig(handlers=[handler], level=logging.INFO,format='%(asctime)s %(ip)s %(message)s')import logging from logging.handlers import RotatingFileHandler handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5) logging.basicConfig(handlers=[handler], level=logging.INFO, format='%(asctime)s %(ip)s %(message)s')import logging from logging.handlers import RotatingFileHandler handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5) logging.basicConfig(handlers=[handler], level=logging.INFO, format='%(asctime)s %(ip)s %(message)s')
- 性能优化技巧
- 使用多线程处理日志:
from concurrent.futures import ThreadPoolExecutordef process_log_chunk(chunk):# 处理日志块passwith ThreadPoolExecutor() as executor:with open('large.log') as f:executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))from concurrent.futures import ThreadPoolExecutor def process_log_chunk(chunk): # 处理日志块 pass with ThreadPoolExecutor() as executor: with open('large.log') as f: executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))from concurrent.futures import ThreadPoolExecutor def process_log_chunk(chunk): # 处理日志块 pass with ThreadPoolExecutor() as executor: with open('large.log') as f: executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))
- 安全注意事项
# 过滤敏感路径SAFE_PATHS = {'/', '/about', '/products'}def is_safe(url):return any(url.startswith(path) for path in SAFE_PATHS)# 屏蔽扫描器SCANNER_UAS = {'nmap', 'sqlmap'}def is_scanner(user_agent):return any(s in user_agent.lower() for s in SCANNER_UAS)# 过滤敏感路径 SAFE_PATHS = {'/', '/about', '/products'} def is_safe(url): return any(url.startswith(path) for path in SAFE_PATHS) # 屏蔽扫描器 SCANNER_UAS = {'nmap', 'sqlmap'} def is_scanner(user_agent): return any(s in user_agent.lower() for s in SCANNER_UAS)# 过滤敏感路径 SAFE_PATHS = {'/', '/about', '/products'} def is_safe(url): return any(url.startswith(path) for path in SAFE_PATHS) # 屏蔽扫描器 SCANNER_UAS = {'nmap', 'sqlmap'} def is_scanner(user_agent): return any(s in user_agent.lower() for s in SCANNER_UAS)
六、方案选型指南
需求场景 | 推荐方案 | 优势 |
---|---|---|
小型网站基础统计 | Nginx日志+Pandas分析 | 零成本,快速上手 |
实时监控 | Flask中间件+Redis计数 | 低延迟,简单可视化 |
用户行为分析 | SQLite+Session识别 | 完整用户旅程跟踪 |
全球流量分布 | GeoIP2+Folium热力图 | 直观地理可视化 |
企业级分析 | ELK(Elasticsearch+Logstash+Kibana) | 支持PB级数据处理 |
通过组合这些方法,您可以构建从简单计数到复杂用户行为分析的完整流量统计系统。对于高流量网站,建议使用专业工具如Google Analytics或自建ELK栈。
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END
暂无评论内容