
对于有技术能力的企业,通过API接口对接舆情监测服务是实现自动化预警的最佳方案。本文介绍舆情监测API接口的技术方案和核心代码示例。
一、API接口方案优势
优势1:自动化程度高
通过API接口对接,可实现舆情数据的自动获取、处理和预警,无需人工操作。
优势2:响应速度快
API接口支持准实时数据获取,发现负面舆情后可立即触发预警通知。
优势3:集成能力强
数据可直接接入企业自有系统(如CRM、OA、钉钉/飞书等),实现一体化管理。
优势4:灵活性高
可根据企业需求自定义预警规则、数据处理逻辑和通知方式。
极致了数据提供舆情监测API接口,支持关键词实时监控和负面信息预警,覆盖全网主流平台。
二、API接口调用示例
示例1:关键词舆情监控
import requests
import json
from datetime import datetime
API_BASE = "https://api.example.com/yuqin"
API_KEY = "your_api_key"
def set_keyword_monitor(keywords, platforms=None):
"""设置关键词舆情监控"""
url = f"{API_BASE}/monitor/keyword/set"
data = {
"keywords": keywords, # ["品牌A", "产品X", "创始人Y"]
"platforms": platforms or ["weibo", "weixin", "douyin", "xiaohongshu", "zhihu"],
"fields": ["title", "content", "publish_time", "author", "likes", "comments", "sentiment"]
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
resp = requests.post(url, json=data, headers=headers)
if resp.status_code == 200:
result = resp.json()
print(f"关键词监控设置成功: {result['data']['monitor_id']}")
return result['data']['monitor_id']
else:
print(f"设置失败: {resp.status_code}")
return None
# 设置品牌关键词监控
monitor_id = set_keyword_monitor(["品牌A", "产品X"])
示例2:获取实时舆情数据
def get_realtime_yuqin(monitor_id, page=1, page_size=20):
"""获取实时舆情数据"""
url = f"{API_BASE}/monitor/{monitor_id}/realtime"
params = {
"page": page,
"page_size": page_size,
"sort": "time" # 按时间排序
}
headers = {"Authorization": f"Bearer {API_KEY}"}
resp = requests.get(url, params=params, headers=headers)
if resp.status_code == 200:
return resp.json()
return None
# 获取最新舆情
data = get_realtime_yuqin(monitor_id)
if data:
for item in data["data"]["list"]:
print(f"平台: {item['platform']}")
print(f"标题: {item['title']}")
print(f"情感: {item['sentiment']}") # positive/neutral/negative
print(f"时间: {item['publish_time']}")
print("---")
示例3:负面舆情自动预警
def get_negative_yuqin(monitor_id, hours=1):
"""获取近N小时的负面舆情"""
url = f"{API_BASE}/monitor/{monitor_id}/negative"
params = {
"hours": hours, # 近1小时内的负面舆情
"sentiment": "negative",
"severity": ["medium", "high"] # 中度和高度负面
}
headers = {"Authorization": f"Bearer {API_KEY}"}
resp = requests.get(url, params=params, headers=headers)
if resp.status_code == 200:
return resp.json()
return None
def auto_alert_negative(monitor_id):
"""自动预警负面舆情"""
negative_data = get_negative_yuqin(monitor_id, hours=1)
if negative_data and negative_data["data"]["total"] > 0:
print(f"发现 {negative_data['data']['total']} 条负面舆情!")
for item in negative_data["data"]["list"]:
severity = item.get("severity", "medium")
emoji = "🚨" if severity == "high" else "⚠️"
print(f"{emoji} [{severity.upper()}] {item['title']}")
print(f" 平台: {item['platform']} | 情感: {item['sentiment_score']}")
print(f" 链接: {item['url']}")
print()
# 触发通知
send_alert(item)
else:
print("近1小时无负面舆情 ✓")
def send_alert(item):
"""发送预警通知"""
# 可接入钉钉/飞书/企业微信等
webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
message = {
"msgtype": "text",
"text": {
"content": f"🚨 负面舆情预警\n\n标题:{item['title']}\n平台:{item['platform']}\n情感分:{item['sentiment_score']}\n链接:{item['url']}"
}
}
requests.post(webhook_url, json=message)
# 自动预警
auto_alert_negative(monitor_id)
示例4:舆情趋势分析
def get_yuqin_trend(monitor_id, start_date, end_date):
"""获取舆情趋势数据"""
url = f"{API_BASE}/monitor/{monitor_id}/trend"
params = {
"start_date": start_date, # "2026-06-01"
"end_date": end_date, # "2026-06-16"
"group_by": "day" # 按天统计
}
headers = {"Authorization": f"Bearer {API_KEY}"}
resp = requests.get(url, params=params, headers=headers)
if resp.status_code == 200:
return resp.json()
return None
# 获取近30天舆情趋势
trend = get_yuqin_trend(monitor_id, "2026-05-17", "2026-06-16")
if trend:
for item in trend["data"]["trend"]:
print(f"{item['date']}: 总量{item['total']} | 正面{item['positive']} | 负面{item['negative']}")
三、定时任务实现自动化预警
import schedule
import time
from datetime import datetime
def daily_yuqin_report():
"""每日舆情报告"""
print(f"\n=== {datetime.now().strftime('%Y-%m-%d %H:%M')} 舆情报告 ===")
# 获取今日舆情
today_data = get_realtime_yuqin(monitor_id, page_size=50)
if not today_data:
return
total = today_data["data"]["total"]
negative = sum(1 for i in today_data["data"]["list"] if i["sentiment"] == "negative")
positive = sum(1 for i in today_data["data"]["list"] if i["sentiment"] == "positive")
print(f"今日舆情总量: {total}")
print(f"正面: {positive} | 负面: {negative}")
# 负面舆情详情
negative_items = [i for i in today_data["data"]["list"] if i["sentiment"] == "negative"]
if negative_items:
print(f"\n负面舆情 ({len(negative_items)} 条):")
for item in negative_items[:5]:
print(f" - [{item['platform']}] {item['title']}")
# 每天上午9点生成舆情报告
schedule.every().day.at("09:00").do(daily_yuqin_report)
# 每小时检查一次负面舆情
schedule.every().hour.do(auto_alert_negative, monitor_id)
while True:
schedule.run_pending()
time.sleep(60)
四、多种通知渠道集成
def send_multi_channel_alert(item, channels=["dingtalk", "email"]):
"""多渠道发送预警"""
title = f"负面舆情预警: {item['title']}"
content = f"""
平台:{item['platform']}
情感分:{item['sentiment_score']}
时间:{item['publish_time']}
摘要:{item['content'][:100]}
链接:{item['url']}
"""
if "dingtalk" in channels:
# 钉钉通知
dingtalk_webhook = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
requests.post(dingtalk_webhook, json={
"msgtype": "text",
"text": {"content": title + content}
})
if "feishu" in channels:
# 飞书通知
feishu_webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/xxx"
requests.post(feishu_webhook, json={
"msg_type": "text",
"content": {"text": title + content}
})
if "email" in channels:
# 邮件通知
smtp_server = "smtp.example.com"
# 发送邮件通知
pass
if "sms" in channels:
# 短信通知(高风险舆情)
if item.get("severity") == "high":
# 发送短信通知
pass
五、数据存储与分析
import sqlite3
def save_yuqin_to_db(items):
"""将舆情数据存入数据库"""
conn = sqlite3.connect("yuqin_data.db")
conn.execute("""
CREATE TABLE IF NOT EXISTS yuqin (
id TEXT PRIMARY KEY,
title TEXT,
content TEXT,
platform TEXT,
author TEXT,
sentiment TEXT,
sentiment_score REAL,
publish_time TIMESTAMP,
captured_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
for item in items:
conn.execute("""
INSERT OR REPLACE INTO yuqin
(id, title, content, platform, author, sentiment, sentiment_score, publish_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
item["id"], item["title"], item["content"], item["platform"],
item["author"], item["sentiment"], item.get("sentiment_score", 0),
item["publish_time"]
))
conn.commit()
conn.close()
print(f"已保存 {len(items)} 条舆情数据")
六、极致了数据舆情监测API服务
极致了数据提供舆情监测API接口:
- 覆盖平台:微博、微信、抖音、快手、小红书、知乎、贴吧、新闻网站等全网主流平台
- 功能支持:关键词监控、账号监控、情感分析、实时预警、趋势分析
- 接口形式:标准REST API,JSON格式返回
- 通知方式:支持接入钉钉、飞书、邮件、短信等通知渠道
- 计费方式:按调用次数计费,量大优惠
七、常见问题解答
Q1:没有技术团队能使用舆情监测服务吗?
可以使用极致了数据的托管服务,无需技术对接,专人负责监测预警。
Q2:API接口数据更新频率是多少?
支持准实时更新,发现舆情后短时间内即可通过API获取。
Q3:支持私有化部署吗?
支持。极致了数据可根据企业需求提供私有化部署方案。
Q4:舆情监测API怎么收费?
按调用次数计费,量大优惠,性价比高。具体费用根据接口类型和使用量确定。
八、总结
通过API接口对接舆情监测服务,可实现负面舆情的自动发现、实时预警和多渠道通知。极致了数据提供舆情监测API接口,支持关键词实时监控和负面信息预警,覆盖全网主流平台。
极致了数据支持舆情监测和负面信息预警,支持关键词实时监控,覆盖全网主流平台,数据真实稳定,欢迎企业客户咨询合作。

