以下是使用Python统计网站访问流量的几种常见方法及实现代码,涵盖基础到高级的不同场景需求:
![图片[1]_使用Python统计网站访问流量_知途无界](https://zhituwujie.com/wp-content/uploads/2025/04/d2b5ca33bd20250417092251.png)
一、基础流量统计方案
1. 使用Apache/Nginx日志分析
from collections import defaultdict
import re
def parse_log(file_path):
ip_counter = defaultdict(int)
url_counter = defaultdict(int)
log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP'
with open(file_path) as f:
for line in f:
match = re.search(log_pattern, line)
if match:
ip, method, url = match.groups()
ip_counter[ip] += 1
url_counter[(method, url.split('?')[0])] += 1 # 忽略查询参数
return ip_counter, url_counter
# 使用示例
ip_counts, url_counts = parse_log('/var/log/nginx/access.log')
print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10])
print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])
2. 实时流量监控(Flask示例)
from flask import Flask, request
from collections import deque
import time
app = Flask(__name__)
request_queue = deque(maxlen=1000) # 保存最近1000次请求
@app.before_request
def log_request():
request_queue.append({
'time': time.time(),
'ip': request.remote_addr,
'method': request.method,
'path': request.path,
'user_agent': request.user_agent.string
})
def get_realtime_stats():
now = time.time()
last_min = [r for r in request_queue if now - r['time'] < 60]
return {
'rpm': len(last_min),
'top_ips': Counter(r['ip'] for r in last_min).most_common(5)
}
二、数据库存储方案
3. SQLite存储+分析
import sqlite3
from datetime import datetime
# 初始化数据库
conn = sqlite3.connect('traffic.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS visits
(id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''')
# 插入访问记录
def log_visit(ip, url):
c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)",
(ip, url, datetime.now()))
conn.commit()
# 查询统计
def get_daily_visits(date):
c.execute('''SELECT strftime('%H', timestamp) as hour,
COUNT(*) FROM visits
WHERE date(timestamp) = ?
GROUP BY hour''', (date,))
return c.fetchall()
4. 使用Pandas分析
import pandas as pd
# 从数据库加载数据
df = pd.read_sql('SELECT * FROM visits', conn)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 按小时统计
hourly = df.groupby(df['timestamp'].dt.hour).size()
# 热门页面
top_pages = df['url'].value_counts().head(10)
三、高级分析方案
5. 用户行为分析(Session识别)
from datetime import timedelta
def group_sessions(df, inactivity_threshold=30):
df = df.sort_values('timestamp')
df['time_diff'] = df.groupby('ip')['timestamp'].diff()
df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold)
df['session_id'] = df.groupby('ip')['new_session'].cumsum()
return df
# 分析会话时长
sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max'])
sessions['duration'] = sessions['max'] - sessions['min']
6. 地理信息分析
import geoip2.database
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
def get_geoip(ip):
try:
response = reader.city(ip)
return {
'country': response.country.name,
'city': response.city.name,
'lat': response.location.latitude,
'lng': response.location.longitude
}
except:
return None
# 应用地理分析
df['geo'] = df['ip'].apply(get_geoip)
geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()
四、可视化方案
7. 使用Matplotlib绘制趋势图
import matplotlib.pyplot as plt
# 按小时流量趋势
hourly.plot(kind='bar', title='Hourly Traffic')
plt.xlabel('Hour of Day')
plt.ylabel('Visits')
plt.show()
# 地理热力图
import folium
from folium.plugins import HeatMap
map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist()
m = folium.Map(location=[30, 0], zoom_start=2)
HeatMap(map_data).add_to(m)
m.save('heatmap.html')
8. 实时仪表盘(Dash)
import dash
from dash import dcc, html
import plotly.express as px
app = dash.Dash(__name__)
app.layout = html.Div([
dcc.Graph(id='live-graph'),
dcc.Interval(id='interval', interval=60*1000) # 每分钟更新
])
@app.callback(Output('live-graph', 'figure'),
Input('interval', 'n_intervals'))
def update_graph(n):
df = pd.read_sql('''SELECT date(timestamp) as day,
COUNT(*) as visits
FROM visits
GROUP BY day''', conn)
fig = px.line(df, x='day', y='visits', title='Daily Visits')
return fig
if __name__ == '__main__':
app.run_server(debug=True)
五、生产环境建议
- 日志轮转处理
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5)
logging.basicConfig(handlers=[handler], level=logging.INFO,
format='%(asctime)s %(ip)s %(message)s')
- 性能优化技巧
- 使用多线程处理日志:
from concurrent.futures import ThreadPoolExecutor
def process_log_chunk(chunk):
# 处理日志块
pass
with ThreadPoolExecutor() as executor:
with open('large.log') as f:
executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))
- 安全注意事项
# 过滤敏感路径
SAFE_PATHS = {'/', '/about', '/products'}
def is_safe(url):
return any(url.startswith(path) for path in SAFE_PATHS)
# 屏蔽扫描器
SCANNER_UAS = {'nmap', 'sqlmap'}
def is_scanner(user_agent):
return any(s in user_agent.lower() for s in SCANNER_UAS)
六、方案选型指南
| 需求场景 | 推荐方案 | 优势 |
|---|---|---|
| 小型网站基础统计 | Nginx日志+Pandas分析 | 零成本,快速上手 |
| 实时监控 | Flask中间件+Redis计数 | 低延迟,简单可视化 |
| 用户行为分析 | SQLite+Session识别 | 完整用户旅程跟踪 |
| 全球流量分布 | GeoIP2+Folium热力图 | 直观地理可视化 |
| 企业级分析 | ELK(Elasticsearch+Logstash+Kibana) | 支持PB级数据处理 |
通过组合这些方法,您可以构建从简单计数到复杂用户行为分析的完整流量统计系统。对于高流量网站,建议使用专业工具如Google Analytics或自建ELK栈。
© 版权声明
文中内容均来源于公开资料,受限于信息的时效性和复杂性,可能存在误差或遗漏。我们已尽力确保内容的准确性,但对于因信息变更或错误导致的任何后果,本站不承担任何责任。如需引用本文内容,请注明出处并尊重原作者的版权。
THE END

























暂无评论内容