使用Python统计网站访问流量

以下是使用Python统计网站访问流量的几种常见方法及实现代码,涵盖基础到高级的不同场景需求:

图片[1]_使用Python统计网站访问流量_知途无界

一、基础流量统计方案

1. 使用Apache/Nginx日志分析

from collections import defaultdict
import re

def parse_log(file_path):
    ip_counter = defaultdict(int)
    url_counter = defaultdict(int)

    log_pattern = r'(\d+\.\d+\.\d+\.\d+).*?"(GET|POST) (.+?) HTTP'

    with open(file_path) as f:
        for line in f:
            match = re.search(log_pattern, line)
            if match:
                ip, method, url = match.groups()
                ip_counter[ip] += 1
                url_counter[(method, url.split('?')[0])] += 1  # 忽略查询参数

    return ip_counter, url_counter

# 使用示例
ip_counts, url_counts = parse_log('/var/log/nginx/access.log')
print("Top 10 IPs:", sorted(ip_counts.items(), key=lambda x: -x[1])[:10])
print("Top 10 URLs:", sorted(url_counts.items(), key=lambda x: -x[1])[:10])

2. 实时流量监控(Flask示例)

from flask import Flask, request
from collections import deque
import time

app = Flask(__name__)
request_queue = deque(maxlen=1000)  # 保存最近1000次请求

@app.before_request
def log_request():
    request_queue.append({
        'time': time.time(),
        'ip': request.remote_addr,
        'method': request.method,
        'path': request.path,
        'user_agent': request.user_agent.string
    })

def get_realtime_stats():
    now = time.time()
    last_min = [r for r in request_queue if now - r['time'] < 60]
    return {
        'rpm': len(last_min),
        'top_ips': Counter(r['ip'] for r in last_min).most_common(5)
    }

二、数据库存储方案

3. SQLite存储+分析

import sqlite3
from datetime import datetime

# 初始化数据库
conn = sqlite3.connect('traffic.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS visits
             (id INTEGER PRIMARY KEY, ip TEXT, url TEXT, timestamp DATETIME)''')

# 插入访问记录
def log_visit(ip, url):
    c.execute("INSERT INTO visits (ip, url, timestamp) VALUES (?, ?, ?)",
              (ip, url, datetime.now()))
    conn.commit()

# 查询统计
def get_daily_visits(date):
    c.execute('''SELECT strftime('%H', timestamp) as hour, 
                 COUNT(*) FROM visits 
                 WHERE date(timestamp) = ? 
                 GROUP BY hour''', (date,))
    return c.fetchall()

4. 使用Pandas分析

import pandas as pd

# 从数据库加载数据
df = pd.read_sql('SELECT * FROM visits', conn)
df['timestamp'] = pd.to_datetime(df['timestamp'])

# 按小时统计
hourly = df.groupby(df['timestamp'].dt.hour).size()
# 热门页面
top_pages = df['url'].value_counts().head(10)

三、高级分析方案

5. 用户行为分析(Session识别)

from datetime import timedelta

def group_sessions(df, inactivity_threshold=30):
    df = df.sort_values('timestamp')
    df['time_diff'] = df.groupby('ip')['timestamp'].diff()
    df['new_session'] = df['time_diff'] > timedelta(minutes=inactivity_threshold)
    df['session_id'] = df.groupby('ip')['new_session'].cumsum()
    return df

# 分析会话时长
sessions = df.groupby(['ip', 'session_id'])['timestamp'].agg(['min', 'max'])
sessions['duration'] = sessions['max'] - sessions['min']

6. 地理信息分析

import geoip2.database

reader = geoip2.database.Reader('GeoLite2-City.mmdb')

def get_geoip(ip):
    try:
        response = reader.city(ip)
        return {
            'country': response.country.name,
            'city': response.city.name,
            'lat': response.location.latitude,
            'lng': response.location.longitude
        }
    except:
        return None

# 应用地理分析
df['geo'] = df['ip'].apply(get_geoip)
geo_stats = df['geo'].apply(pd.Series)['country'].value_counts()

四、可视化方案

7. 使用Matplotlib绘制趋势图

import matplotlib.pyplot as plt

# 按小时流量趋势
hourly.plot(kind='bar', title='Hourly Traffic')
plt.xlabel('Hour of Day')
plt.ylabel('Visits')
plt.show()

# 地理热力图
import folium
from folium.plugins import HeatMap

map_data = df['geo'].dropna().apply(lambda x: [x['lat'], x['lng']]).tolist()
m = folium.Map(location=[30, 0], zoom_start=2)
HeatMap(map_data).add_to(m)
m.save('heatmap.html')

8. 实时仪表盘(Dash)

import dash
from dash import dcc, html
import plotly.express as px

app = dash.Dash(__name__)

app.layout = html.Div([
    dcc.Graph(id='live-graph'),
    dcc.Interval(id='interval', interval=60*1000)  # 每分钟更新
])

@app.callback(Output('live-graph', 'figure'),
              Input('interval', 'n_intervals'))
def update_graph(n):
    df = pd.read_sql('''SELECT date(timestamp) as day, 
                        COUNT(*) as visits 
                        FROM visits 
                        GROUP BY day''', conn)
    fig = px.line(df, x='day', y='visits', title='Daily Visits')
    return fig

if __name__ == '__main__':
    app.run_server(debug=True)

五、生产环境建议

  1. 日志轮转处理
import logging
from logging.handlers import RotatingFileHandler

handler = RotatingFileHandler('traffic.log', maxBytes=10*1024*1024, backupCount=5)
logging.basicConfig(handlers=[handler], level=logging.INFO, 
                   format='%(asctime)s %(ip)s %(message)s')
  1. 性能优化技巧
  • 使用多线程处理日志:
from concurrent.futures import ThreadPoolExecutor

def process_log_chunk(chunk):
    # 处理日志块
    pass

with ThreadPoolExecutor() as executor:
    with open('large.log') as f:
        executor.map(process_log_chunk, iter(lambda: list(islice(f, 1000)), []))
  1. 安全注意事项
# 过滤敏感路径
SAFE_PATHS = {'/', '/about', '/products'}
def is_safe(url):
    return any(url.startswith(path) for path in SAFE_PATHS)

# 屏蔽扫描器
SCANNER_UAS = {'nmap', 'sqlmap'}
def is_scanner(user_agent):
    return any(s in user_agent.lower() for s in SCANNER_UAS)

六、方案选型指南

需求场景推荐方案优势
小型网站基础统计Nginx日志+Pandas分析零成本,快速上手
实时监控Flask中间件+Redis计数低延迟,简单可视化
用户行为分析SQLite+Session识别完整用户旅程跟踪
全球流量分布GeoIP2+Folium热力图直观地理可视化
企业级分析ELK(Elasticsearch+Logstash+Kibana)支持PB级数据处理

通过组合这些方法,您可以构建从简单计数到复杂用户行为分析的完整流量统计系统。对于高流量网站,建议使用专业工具如Google Analytics或自建ELK栈。

© 版权声明
THE END
喜欢就点个赞,支持一下吧!
点赞71 分享
评论 抢沙发
头像
欢迎您留下评论!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容