使用Python统计网站访问流量

以下是使用Python统计网站访问流量的几种常见方法及实现代码,涵盖基础到高级的不同场景需求:

图片[1]_使用Python统计网站访问流量_知途无界

一、基础流量统计方案

1. 使用Apache/Nginx日志分析

from collections import defaultdict
import re
def parse_log(file_path):
ip_counter = defaultdict(int)
url_counter = defaultdict(int)
log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP'
with open(file_path) as f:
for line in f:
match = re.search(log_pattern, line)
if match:
ip, method, url = match.groups()
ip_counter[ip] += 1
url_counter[(method, url.split('?')[0])] += 1 # 忽略查询参数
return ip_counter, url_counter
# 使用示例
ip_counts, url_counts = parse_log('/var/log/nginx/access.log')
print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10])
print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])
from collections import defaultdict
import re

def parse_log(file_path):
    ip_counter = defaultdict(int)
    url_counter = defaultdict(int)

    log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP'

    with open(file_path) as f:
        for line in f:
            match = re.search(log_pattern, line)
            if match:
                ip, method, url = match.groups()
                ip_counter[ip] += 1
                url_counter[(method, url.split('?')[0])] += 1  # 忽略查询参数

    return ip_counter, url_counter

# 使用示例
ip_counts, url_counts = parse_log('/var/log/nginx/access.log')
print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10])
print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])
from collections import defaultdict import re def parse_log(file_path): ip_counter = defaultdict(int) url_counter = defaultdict(int) log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP' with open(file_path) as f: for line in f: match = re.search(log_pattern, line) if match: ip, method, url = match.groups() ip_counter[ip] += 1 url_counter[(method, url.split('?')[0])] += 1 # 忽略查询参数 return ip_counter, url_counter # 使用示例 ip_counts, url_counts = parse_log('/var/log/nginx/access.log') print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10]) print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])

2. 实时流量监控(Flask示例)

from flask import Flask, request
from collections import deque
import time
app = Flask(__name__)
request_queue = deque(maxlen=1000) # 保存最近1000次请求
@app.before_request
def log_request():
request_queue.append({
'time': time.time(),
'ip': request.remote_addr,
'method': request.method,
'path': request.path,
'user_agent': request.user_agent.string
})
def get_realtime_stats():
now = time.time()
last_min = [r for r in request_queue if now - r['time'] < 60]
return {
'rpm': len(last_min),
'top_ips': Counter(r['ip'] for r in last_min).most_common(5)
}
from flask import Flask, request
from collections import deque
import time

app = Flask(__name__)
request_queue = deque(maxlen=1000)  # 保存最近1000次请求

@app.before_request
def log_request():
    request_queue.append({
        'time': time.time(),
        'ip': request.remote_addr,
        'method': request.method,
        'path': request.path,
        'user_agent': request.user_agent.string
    })

def get_realtime_stats():
    now = time.time()
    last_min = [r for r in request_queue if now - r['time'] < 60]
    return {
        'rpm': len(last_min),
        'top_ips': Counter(r['ip'] for r in last_min).most_common(5)
    }
from flask import Flask, request from collections import deque import time app = Flask(__name__) request_queue = deque(maxlen=1000) # 保存最近1000次请求 @app.before_request def log_request(): request_queue.append({ 'time': time.time(), 'ip': request.remote_addr, 'method': request.method, 'path': request.path, 'user_agent': request.user_agent.string }) def get_realtime_stats(): now = time.time() last_min = [r for r in request_queue if now - r['time'] < 60] return { 'rpm': len(last_min), 'top_ips': Counter(r['ip'] for r in last_min).most_common(5) }

二、数据库存储方案

3. SQLite存储+分析

import sqlite3
from datetime import datetime
# 初始化数据库
conn = sqlite3.connect('traffic.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS visits
(id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''')
# 插入访问记录
def log_visit(ip, url):
c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)",
(ip, url, datetime.now()))
conn.commit()
# 查询统计
def get_daily_visits(date):
c.execute('''SELECT strftime('%H', timestamp) as hour,
COUNT(*) FROM visits
WHERE date(timestamp) = ?
GROUP BY hour''', (date,))
return c.fetchall()
import sqlite3
from datetime import datetime

# 初始化数据库
conn = sqlite3.connect('traffic.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS visits
             (id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''')

# 插入访问记录
def log_visit(ip, url):
    c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)",
              (ip, url, datetime.now()))
    conn.commit()

# 查询统计
def get_daily_visits(date):
    c.execute('''SELECT strftime('%H', timestamp) as hour, 
                 COUNT(*) FROM visits 
                 WHERE date(timestamp) = ? 
                 GROUP BY hour''', (date,))
    return c.fetchall()
import sqlite3 from datetime import datetime # 初始化数据库 conn = sqlite3.connect('traffic.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS visits (id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''') # 插入访问记录 def log_visit(ip, url): c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)", (ip, url, datetime.now())) conn.commit() # 查询统计 def get_daily_visits(date): c.execute('''SELECT strftime('%H', timestamp) as hour, COUNT(*) FROM visits WHERE date(timestamp) = ? GROUP BY hour''', (date,)) return c.fetchall()

4. 使用Pandas分析

import pandas as pd
# 从数据库加载数据
df = pd.read_sql('SELECT * FROM visits', conn)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 按小时统计
hourly = df.groupby(df['timestamp'].dt.hour).size()
# 热门页面
top_pages = df['url'].value_counts().head(10)
import pandas as pd

# 从数据库加载数据
df = pd.read_sql('SELECT * FROM visits', conn)
df['timestamp'] = pd.to_datetime(df['timestamp'])

# 按小时统计
hourly = df.groupby(df['timestamp'].dt.hour).size()
# 热门页面
top_pages = df['url'].value_counts().head(10)
import pandas as pd # 从数据库加载数据 df = pd.read_sql('SELECT * FROM visits', conn) df['timestamp'] = pd.to_datetime(df['timestamp']) # 按小时统计 hourly = df.groupby(df['timestamp'].dt.hour).size() # 热门页面 top_pages = df['url'].value_counts().head(10)

三、高级分析方案

5. 用户行为分析(Session识别)

from datetime import timedelta
def group_sessions(df, inactivity_threshold=30):
df = df.sort_values('timestamp')
df['time_diff'] = df.groupby('ip')['timestamp'].diff()
df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold)
df['session_id'] = df.groupby('ip')['new_session'].cumsum()
return df
# 分析会话时长
sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max'])
sessions['duration'] = sessions['max'] - sessions['min']
from datetime import timedelta

def group_sessions(df, inactivity_threshold=30):
    df = df.sort_values('timestamp')
    df['time_diff'] = df.groupby('ip')['timestamp'].diff()
    df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold)
    df['session_id'] = df.groupby('ip')['new_session'].cumsum()
    return df

# 分析会话时长
sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max'])
sessions['duration'] = sessions['max'] - sessions['min']
from datetime import timedelta def group_sessions(df, inactivity_threshold=30): df = df.sort_values('timestamp') df['time_diff'] = df.groupby('ip')['timestamp'].diff() df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold) df['session_id'] = df.groupby('ip')['new_session'].cumsum() return df # 分析会话时长 sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max']) sessions['duration'] = sessions['max'] - sessions['min']

6. 地理信息分析

import geoip2.database
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
def get_geoip(ip):
try:
response = reader.city(ip)
return {
'country': response.country.name,
'city': response.city.name,
'lat': response.location.latitude,
'lng': response.location.longitude
}
except:
return None
# 应用地理分析
df['geo'] = df['ip'].apply(get_geoip)
geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()
import geoip2.database

reader = geoip2.database.Reader('GeoLite2-City.mmdb')

def get_geoip(ip):
    try:
        response = reader.city(ip)
        return {
            'country': response.country.name,
            'city': response.city.name,
            'lat': response.location.latitude,
            'lng': response.location.longitude
        }
    except:
        return None

# 应用地理分析
df['geo'] = df['ip'].apply(get_geoip)
geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()
import geoip2.database reader = geoip2.database.Reader('GeoLite2-City.mmdb') def get_geoip(ip): try: response = reader.city(ip) return { 'country': response.country.name, 'city': response.city.name, 'lat': response.location.latitude, 'lng': response.location.longitude } except: return None # 应用地理分析 df['geo'] = df['ip'].apply(get_geoip) geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()

四、可视化方案

7. 使用Matplotlib绘制趋势图

import matplotlib.pyplot as plt
# 按小时流量趋势
hourly.plot(kind='bar', title='Hourly Traffic')
plt.xlabel('Hour of Day')
plt.ylabel('Visits')
plt.show()
# 地理热力图
import folium
from folium.plugins import HeatMap
map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist()
m = folium.Map(location=[30, 0], zoom_start=2)
HeatMap(map_data).add_to(m)
m.save('heatmap.html')
import matplotlib.pyplot as plt

# 按小时流量趋势
hourly.plot(kind='bar', title='Hourly Traffic')
plt.xlabel('Hour of Day')
plt.ylabel('Visits')
plt.show()

# 地理热力图
import folium
from folium.plugins import HeatMap

map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist()
m = folium.Map(location=[30, 0], zoom_start=2)
HeatMap(map_data).add_to(m)
m.save('heatmap.html')
import matplotlib.pyplot as plt # 按小时流量趋势 hourly.plot(kind='bar', title='Hourly Traffic') plt.xlabel('Hour of Day') plt.ylabel('Visits') plt.show() # 地理热力图 import folium from folium.plugins import HeatMap map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist() m = folium.Map(location=[30, 0], zoom_start=2) HeatMap(map_data).add_to(m) m.save('heatmap.html')

8. 实时仪表盘(Dash)

import dash
from dash import dcc, html
import plotly.express as px
app = dash.Dash(__name__)
app.layout = html.Div([
dcc.Graph(id='live-graph'),
dcc.Interval(id='interval', interval=60*1000) # 每分钟更新
])
@app.callback(Output('live-graph', 'figure'),
Input('interval', 'n_intervals'))
def update_graph(n):
df = pd.read_sql('''SELECT date(timestamp) as day,
COUNT(*) as visits
FROM visits
GROUP BY day''', conn)
fig = px.line(df, x='day', y='visits', title='Daily Visits')
return fig
if __name__ == '__main__':
app.run_server(debug=True)
import dash
from dash import dcc, html
import plotly.express as px

app = dash.Dash(__name__)

app.layout = html.Div([
    dcc.Graph(id='live-graph'),
    dcc.Interval(id='interval', interval=60*1000)  # 每分钟更新
])

@app.callback(Output('live-graph', 'figure'),
              Input('interval', 'n_intervals'))
def update_graph(n):
    df = pd.read_sql('''SELECT date(timestamp) as day, 
                        COUNT(*) as visits 
                        FROM visits 
                        GROUP BY day''', conn)
    fig = px.line(df, x='day', y='visits', title='Daily Visits')
    return fig

if __name__ == '__main__':
    app.run_server(debug=True)
import dash from dash import dcc, html import plotly.express as px app = dash.Dash(__name__) app.layout = html.Div([ dcc.Graph(id='live-graph'), dcc.Interval(id='interval', interval=60*1000) # 每分钟更新 ]) @app.callback(Output('live-graph', 'figure'), Input('interval', 'n_intervals')) def update_graph(n): df = pd.read_sql('''SELECT date(timestamp) as day, COUNT(*) as visits FROM visits GROUP BY day''', conn) fig = px.line(df, x='day', y='visits', title='Daily Visits') return fig if __name__ == '__main__': app.run_server(debug=True)

五、生产环境建议

  1. 日志轮转处理
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5)
logging.basicConfig(handlers=[handler], level=logging.INFO,
format='%(asctime)s %(ip)s %(message)s')
import logging
from logging.handlers import RotatingFileHandler

handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5)
logging.basicConfig(handlers=[handler], level=logging.INFO, 
                   format='%(asctime)s %(ip)s %(message)s')
import logging from logging.handlers import RotatingFileHandler handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5) logging.basicConfig(handlers=[handler], level=logging.INFO, format='%(asctime)s %(ip)s %(message)s')
  1. 性能优化技巧
  • 使用多线程处理日志:
from concurrent.futures import ThreadPoolExecutor
def process_log_chunk(chunk):
# 处理日志块
pass
with ThreadPoolExecutor() as executor:
with open('large.log') as f:
executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))
from concurrent.futures import ThreadPoolExecutor

def process_log_chunk(chunk):
    # 处理日志块
    pass

with ThreadPoolExecutor() as executor:
    with open('large.log') as f:
        executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))
from concurrent.futures import ThreadPoolExecutor def process_log_chunk(chunk): # 处理日志块 pass with ThreadPoolExecutor() as executor: with open('large.log') as f: executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))
  1. 安全注意事项
# 过滤敏感路径
SAFE_PATHS = {'/', '/about', '/products'}
def is_safe(url):
return any(url.startswith(path) for path in SAFE_PATHS)
# 屏蔽扫描器
SCANNER_UAS = {'nmap', 'sqlmap'}
def is_scanner(user_agent):
return any(s in user_agent.lower() for s in SCANNER_UAS)
# 过滤敏感路径
SAFE_PATHS = {'/', '/about', '/products'}
def is_safe(url):
    return any(url.startswith(path) for path in SAFE_PATHS)

# 屏蔽扫描器
SCANNER_UAS = {'nmap', 'sqlmap'}
def is_scanner(user_agent):
    return any(s in user_agent.lower() for s in SCANNER_UAS)
# 过滤敏感路径 SAFE_PATHS = {'/', '/about', '/products'} def is_safe(url): return any(url.startswith(path) for path in SAFE_PATHS) # 屏蔽扫描器 SCANNER_UAS = {'nmap', 'sqlmap'} def is_scanner(user_agent): return any(s in user_agent.lower() for s in SCANNER_UAS)

六、方案选型指南

需求场景推荐方案优势
小型网站基础统计Nginx日志+Pandas分析零成本,快速上手
实时监控Flask中间件+Redis计数低延迟,简单可视化
用户行为分析SQLite+Session识别完整用户旅程跟踪
全球流量分布GeoIP2+Folium热力图直观地理可视化
企业级分析ELK(Elasticsearch+Logstash+Kibana)支持PB级数据处理

通过组合这些方法,您可以构建从简单计数到复杂用户行为分析的完整流量统计系统。对于高流量网站,建议使用专业工具如Google Analytics或自建ELK栈。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞71 分享
Believe you can and you’re halfway there.
相信自己,你也就成功了一半
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容