前言
洞态IAST上线有一段时间了,基于被动式IAST技术,高检出率和低误报率等特点,很好的集成到devops流程中,增加我们的效率,然而每次发现漏洞后没有及时查看导致漏洞处理上的滞后,这里通过钉钉群里机器人做自动化告警,使IAST使用上闭环起来
钉钉群聊机器人(这里可以使用飞书、企业微信等都可以的)
群里机器人文档:https://developers.dingtalk.com/document/robots/custom-robot-access
介绍了如何接入、消息类型、数据格式和错误代码排查等使用上的问题
这里获取了钉钉机器人的webhook,复制下来,在安全设置中,添加关键词用来接收告警
1 2 |
webhook = https://oapi.dingtalk.com/robot/send?access_token=xxx 关键词:IAST |
洞态IAST接入钉钉机器人
通过洞态服务架构了解,DongTai-engine 服务是用来漏洞检测的,根据调用方法数据和污点跟踪算法分析HTTP/HTTPS/RPC请求中是否存在漏洞,通过代码发现火线团队已经在代码中预留了发送漏洞告警的方法
1
|
DongTai-engine/signals/handlers/vul_handler.py 的 send_vul_notify()
|
我们接入钉钉告警机器人就放到这个方法里
这里可以看到,传入了一个vul参数,由handler_vul()
调用 send_vul_notify()
, 看上面的注释,vul参数的数据格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
http_url: 漏洞所在url http_uri: 漏洞所在uri context_path: HTTP请求上下文 http_method: HTTP请求方法 http_scheme: HTTP请求协议 http_protocol: HTTP请求协议 req_header: HTTP请求头 req_data: HTTP请求体 res_header: HTTP响应头 res_body: HTTP响应体 vul_type: 漏洞类型 vul_level: 漏洞等级 full_stack: 漏洞对应的调用链数据 top_stack: 漏洞对应污点调用链的链首 bottom_stack: 漏洞对应污点调用链的链尾 taint_value: 污点值 taint_position: 污点所在位置 agent_token: Agent的token project: 所在的项目 counts: 漏洞出现次数 client_ip: 客户端IP username: 漏洞所在用户的用户名 |
因为我们只是自己用,就不做交互处理,直接写死到代码中
钉钉机器人支持@人,这里可以根据不同业务线设置不同告警接收人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# 新增业务线告警接收人 def project_vul(vul): if vul.agent.project_name == "project_1" or vul.agent.project_name == "project_2": phone = "13111111111" send_vul_notify(vul, phone) elif vul.agent.project_name == "project_3" or vul.agent.project_name == "project_4": phone = "13222222222" send_vul_notify(vul, phone) # 修改发动报警方法 def send_vul_notify(vul, phone): """ :param vul_data: :return: """ dingding = "https://oapi.dingtalk.com/robot/send?access_token=xxx" header = { "Content-Type": "application/json", "Charset": "UTF-8" } message = { "msgtype": "text", "text": { "content": "@{} 洞态IAST告警通知:n漏洞类型:{}n危害等级:{}n漏洞URL:{}n业务线名称:{}n探针agent:{}n请及时处理!!!".format(phone, vul.type, vul.level.name_value, vul.url, vul.agent.project_name, vul.agent.token) }, "at": { "atMobiles": [ "{}".format(phone) ], "isAtAll": False } } resp = requests.post(url=dingding, headers=header, data=json.dumps(message)) if resp.status_code == 200: pass |
然后修改 handler_vul()
方法
1 2 3 |
if vul: # send_vul_notify(vul) project_vul(vul) |
效果展示
发布者:常山赵子龙,转载请注明出处:https://www.qztxs.com/archives/science/technology/12142