
对于有技术能力的企业来说,通过API接口自建舆情监测系统,是实现数据自主可控的最佳路径。本文提供一份完整的社媒舆情监测API接口开发教程。
一、系统架构设计
整体架构
企业级舆情监测系统分为四个层次:
┌─────────────────────────────────────────────┐
│ 应用层(可视化) │
│ 数据看板 / 预警通知 / 报告生成 / 检索查询 │
├─────────────────────────────────────────────┤
│ 分析层(数据处理) │
│ 情感分析 / 传播分析 / 聚类分析 / 趋势预测 │
├─────────────────────────────────────────────┤
│ 存储层(数据管理) │
│ 原始库 / 分析库 / 索引库 / 特征库 │
├─────────────────────────────────────────────┤
│ 采集层(数据获取) │
│ 极致了数据API / 官方API / 爬虫 │
└─────────────────────────────────────────────┘
技术栈选择
- 采集层:Python + Requests + asyncio
- 存储层:MySQL(结构化数据)+ Elasticsearch(检索)
- 分析层:Python + jieba分词 + snownlp情感分析
- 应用层:Vue/React前端 + Django/Flask后端
二、数据采集模块开发
基础API对接
import requests
import time
import json
from datetime import datetime, timedelta
class SocialMediaMonitor:
def __init__(self, api_key, api_secret):
self.base_url = "https://api.jzl.com/v1"
self.api_key = api_key
self.api_secret = api_secret
self.token = None
def authenticate(self):
"""获取访问令牌"""
url = f"{self.base_url}/auth/token"
response = requests.post(url, json={
"api_key": self.api_key,
"api_secret": self.api_secret
})
if response.status_code == 200:
data = response.json()
self.token = data["access_token"]
return True
return False
def create_monitoring_task(self, keywords, platforms, alert_config):
"""创建监测任务"""
url = f"{self.base_url}/social/monitoring/create"
headers = {"Authorization": f"Bearer {self.token}"}
payload = {
"task_name": f"舆情监测_{datetime.now().strftime('%Y%m%d')}",
"keywords": keywords,
"platforms": platforms,
"alert_channels": alert_config.get("channels", ["wechat"]),
"alert_levels": alert_config.get("levels", ["high", "medium"]),
"real_time": alert_config.get("real_time", True)
}
response = requests.post(url, headers=headers, json=payload)
return response.json()
def get_monitoring_data(self, task_id, start_date, end_date,
sentiment=None, min_interactions=None):
"""获取监测数据"""
url = f"{self.base_url}/social/monitoring/data/{task_id}"
headers = {"Authorization": f"Bearer {self.token}"}
params = {
"start_date": start_date,
"end_date": end_date
}
if sentiment:
params["sentiment"] = sentiment
if min_interactions:
params["min_interactions"] = min_interactions
response = requests.get(url, headers=headers, params=params)
return response.json()
def get_realtime_alerts(self, task_id, since_timestamp=None):
"""获取实时预警"""
url = f"{self.base_url}/social/monitoring/alerts/{task_id}"
headers = {"Authorization": f"Bearer {self.token}"}
params = {}
if since_timestamp:
params["since"] = since_timestamp
response = requests.get(url, headers=headers, params=params)
return response.json()
关键词+账号双重监测
def monitor_keywords_and_accounts(self, keywords, account_ids, platforms):
"""关键词+账号双重监测"""
# 监测关键词命中内容
keyword_data = self.get_keyword_results(keywords, platforms)
# 监测指定账号的帖子
account_data = []
for account_id in account_ids:
posts = self.get_account_posts(account_id)
account_data.extend(posts)
# 合并去重
all_data = keyword_data + account_data
unique_data = self.deduplicate(all_data)
return unique_data
def get_keyword_results(self, keywords, platforms):
"""按关键词搜索"""
url = f"{self.base_url}/social/search"
headers = {"Authorization": f"Bearer {self.token}"}
payload = {
"keywords": keywords,
"platforms": platforms,
"sort_by": "time", # 按时间排序
"limit": 100
}
response = requests.post(url, headers=headers, json=payload)
return response.json().get("data", [])
def get_account_posts(self, account_id):
"""获取指定账号的帖子"""
url = f"{self.base_url}/social/account/posts"
headers = {"Authorization": f"Bearer {self.token}"}
params = {"account_id": account_id}
response = requests.get(url, headers=headers, params=params)
return response.json().get("data", [])
三、定时任务配置
定时采集任务
import schedule
import time
from threading import Thread
class MonitorScheduler:
def __init__(self, monitor):
self.monitor = monitor
self.running = False
def job_hourly(self):
"""每小时执行:采集最新数据"""
print(f"[{datetime.now()}] 执行小时级数据采集...")
data = self.monitor.get_monitoring_data(
task_id="current_task",
start_date=datetime.now().strftime("%Y-%m-%d %H:00:00"),
end_date=datetime.now().strftime("%Y-%m-%d %H:59:59")
)
self.save_to_database(data)
def job_daily(self):
"""每天执行:生成日报"""
print(f"[{datetime.now()}] 生成每日舆情报告...")
report = self.generate_daily_report()
self.send_report(report)
def job_realtime(self):
"""实时检查预警(每分钟)"""
alerts = self.monitor.get_realtime_alerts(
task_id="current_task",
since_timestamp=int(time.time()) - 60
)
if alerts:
self.process_alerts(alerts)
def start(self):
"""启动调度器"""
schedule.every(1).minutes.do(self.job_realtime)
schedule.every(1).hours.do(self.job_hourly)
schedule.every().day.at("09:00").do(self.job_daily)
self.running = True
while self.running:
schedule.run_pending()
time.sleep(1)
def stop(self):
self.running = False
Webhook实时推送
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhook/jzl', methods=['POST'])
def handle_alert():
"""接收极致了数据的实时预警推送"""
alert = request.json
level = alert.get("level") # high/medium/low
content = alert.get("content")
platform = alert.get("platform")
url = alert.get("url")
# 根据预警级别处理
if level == "high":
# 红色预警:立即通知
send_urgent_notification(alert)
elif level == "medium":
# 橙色预警:加入待处理队列
add_to_queue(alert)
else:
# 黄色预警:记录日志
log_info(alert)
return jsonify({"status": "ok"})
def send_urgent_notification(alert):
"""发送紧急预警"""
message = f"""🚨 舆情预警
平台:{alert['platform']}
内容:{alert['content'][:100]}...
链接:{alert['url']}
时间:{alert['created_at']}
"""
# 发送微信/短信/邮件通知
send_wechat(message)
send_sms(message)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
四、数据存储与索引
MySQL表结构设计
-- 舆情数据主表
CREATE TABLE public_opinion (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
content_id VARCHAR(64) UNIQUE, -- 内容唯一ID
platform VARCHAR(32), -- 来源平台
author_id VARCHAR(64), -- 作者ID
author_name VARCHAR(128), -- 作者昵称
author_followers INT, -- 作者粉丝数
content TEXT, -- 正文内容
content_type VARCHAR(16), -- content/video/live
publish_time DATETIME, -- 发布时间
collect_time DATETIME, -- 采集时间
read_count INT, -- 阅读数
like_count INT, -- 点赞数
comment_count INT, -- 评论数
share_count INT, -- 转发数
sentiment VARCHAR(16), -- 情感标签
sentiment_score FLOAT, -- 情感得分
keywords_match TEXT, -- 匹配的关键词
alert_level VARCHAR(16), -- 预警级别
url VARCHAR(512), -- 原文链接
INDEX idx_platform (platform),
INDEX idx_publish_time (publish_time),
INDEX idx_sentiment (sentiment),
FULLTEXT idx_content (content)
);
-- 预警记录表
CREATE TABLE alert_records (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
public_opinion_id BIGINT,
alert_level VARCHAR(16),
alert_time DATETIME,
notified_channels TEXT,
handled_status VARCHAR(16),
handler VARCHAR(64),
handle_time DATETIME,
remark TEXT,
FOREIGN KEY (public_opinion_id) REFERENCES public_opinion(id)
);
五、情感分析与分类
基础情感分析
from snownlp import SnowNLP
def analyze_sentiment(text):
"""使用SnowNLP进行情感分析"""
try:
s = SnowNLP(text)
score = s.sentiments # 0-1,越接近1越正面
if score > 0.6:
return "positive", score
elif score < 0.4:
return "negative", score
else:
return "neutral", score
except:
return "neutral", 0.5
def batch_analyze(data_list):
"""批量情感分析"""
results = []
for item in data_list:
sentiment, score = analyze_sentiment(item["content"])
item["sentiment"] = sentiment
item["sentiment_score"] = score
results.append(item)
return results
六、总结
通过API接口自建舆情监测系统,可以实现数据的完全自主可控。
极致了数据提供的社媒数据API接口,支持关键词监测、账号追踪、实时预警等功能,可根据客户需求个性化定制开发,提供标准API接口方便与企业现有系统集成。一个接口对接,即可获取20+平台的社媒数据,大幅降低开发成本和时间周期。
常见问答(FAQ)
Q1:舆情监测API开发需要多长时间?
使用极致了数据的标准API,基础对接1-2天即可完成,完整系统开发2-4周。如需极致了数据提供根据客户需求个性化定制开发,可进一步缩短周期。
Q2:API调用有频率限制吗?
有,具体限制取决于套餐级别。标准版每秒10次请求,高级版更高。可通过批量接口减少请求次数。
Q3:数据存储选MySQL还是MongoDB?
结构化数据用MySQL(账号信息、统计数据),非结构化数据用MongoDB(帖子正文、评论内容)。或者直接用Elasticsearch兼顾检索和存储。
Q4:舆情数据需要存储多久?
建议热数据(近3个月)存MySQL,冷数据(3个月以上)归档。极致了数据支持历史数据回溯,按需调取即可。
Q5:API可以个性化定制吗?
可以。极致了数据提供根据客户需求个性化定制开发服务,可针对特定行业、特定场景定制API功能和参数,并提供标准API接口方便系统集成。

