
Daily Threat Intel by CyberDudeBivash
Zero-days, exploit breakdowns, IOCs, detection rules & mitigation playbooks.
Follow on LinkedInApps & Security ToolsWWW.CYBERDUDEBIVASH.COM CYBERDUDEBIVASH PVT LTD
#!/usr/bin/env python3
“””
MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)
Defensive usage: query metadata for triage, threat intel, IOC enrichment.
Features:
– Query by hash (sha256/md5/sha1)
– Get recent samples
– Search by tag or signature
– Save results to JSON and optional CSV
– Optional download (OFF by default) for controlled lab-only use
Docs: https://bazaar.abuse.ch/api/
“””
from __future__ import annotations
import argparse
import csv
import json
import os
import time
from typing import Any, Dict, List, Optional
import requests
API_URL = “https://mb-api.abuse.ch/api/v1/”
def post_api(payload: Dict[str, Any], timeout: int = 20) -> Dict[str, Any]:
r = requests.post(API_URL, data=payload, timeout=timeout)
r.raise_for_status()
return r.json()
def write_json(path: str, obj: Any) -> None:
os.makedirs(os.path.dirname(path) or “.”, exist_ok=True)
with open(path, “w”, encoding=”utf-8″) as f:
json.dump(obj, f, indent=2, ensure_ascii=False)
def write_csv(path: str, rows: List[Dict[str, Any]], field_order: Optional[List[str]] = None) -> None:
if not rows:
return
os.makedirs(os.path.dirname(path) or “.”, exist_ok=True)
# pick stable fields
if field_order is None:
# common MalwareBazaar keys
field_order = [
“sha256_hash“, “md5_hash”, “sha1_hash”, “file_name”, “file_type”,
“file_type_mime”, “file_size”, “first_seen”, “last_seen”, “reporter”,
“signature”, “tags”, “intelligence”
]
# add any unknown fields
for k in rows[0].keys():
if k not in field_order:
field_order.append(k)
with open(path, “w”, newline=””, encoding=”utf-8″) as f:
w = csv.DictWriter(f, fieldnames=field_order)
w.writeheader()
for row in rows:
clean = dict(row)
# flatten lists/dicts for csv
for k, v in list(clean.items()):
if isinstance(v, (list, dict)):
clean[k] = json.dumps(v, ensure_ascii=False)
w.writerow({k: clean.get(k, “”) for k in field_order})
def normalize_rows(resp: Dict[str, Any]) -> List[Dict[str, Any]]:
“””
MalwareBazaar usually returns:
{“query_status”:”ok”,”data”:[{…},{…}]}
or query_status not ok.
“””
if resp.get(“query_status”) != “ok”:
return []
data = resp.get(“data”)
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
return []
def download_sample(sha256: str, out_dir: str, timeout: int = 60) -> str:
“””
Lab-only: downloads the sample zip from MalwareBazaar.
Requires: query=get_file, sha256_hash=…
“””
os.makedirs(out_dir, exist_ok=True)
payload = {“query”: “get_file”, “sha256_hash”: sha256}
r = requests.post(API_URL, data=payload, timeout=timeout)
r.raise_for_status()
# API returns raw file content for get_file
out_path = os.path.join(out_dir, f”{sha256}.zip”)
with open(out_path, “wb”) as f:
f.write(r.content)
return out_path
def main() -> int:
ap = argparse.ArgumentParser(description=”MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)”)
sub = ap.add_subparsers(dest=”cmd”, required=True)
# hash
p_hash = sub.add_parser(“hash”, help=”Query by hash (sha256/md5/sha1)”)
p_hash.add_argument(“–value”, required=True, help=”Hash value to search”)
p_hash.add_argument(“–out”, default=”out/mb_hash.json”, help=”Output JSON path”)
p_hash.add_argument(“–csv”, default=””, help=”Optional CSV output path”)
p_hash.add_argument(“–download”, action=”store_true”, help=”(LAB ONLY) Download sample zip (OFF by default)”)
p_hash.add_argument(“–download-dir”, default=”out/downloads”, help=”Download directory (when –download)”)
# recent
p_recent = sub.add_parser(“recent”, help=”Get recent samples”)
p_recent.add_argument(“–limit”, type=int, default=50, help=”Number of recent items (practical limit applies)”)
p_recent.add_argument(“–out”, default=”out/mb_recent.json”, help=”Output JSON path”)
p_recent.add_argument(“–csv”, default=””, help=”Optional CSV output path”)
# tag
p_tag = sub.add_parser(“tag”, help=”Search by tag (e.g. ‘stealer’, ‘ransomware‘)”)
p_tag.add_argument(“–value”, required=True, help=”Tag to search”)
p_tag.add_argument(“–limit”, type=int, default=50, help=”Limit (best-effort)”)
p_tag.add_argument(“–out”, default=”out/mb_tag.json”, help=”Output JSON path”)
p_tag.add_argument(“–csv”, default=””, help=”Optional CSV output path”)
# signature
p_sig = sub.add_parser(“signature”, help=”Search by signature (family)”)
p_sig.add_argument(“–value”, required=True, help=”Signature/family to search”)
p_sig.add_argument(“–limit”, type=int, default=50, help=”Limit (best-effort)”)
p_sig.add_argument(“–out”, default=”out/mb_signature.json”, help=”Output JSON path”)
p_sig.add_argument(“–csv”, default=””, help=”Optional CSV output path”)
args = ap.parse_args()
# Run
if args.cmd == “hash”:
resp = post_api({“query”: “get_info”, “hash”: args.value})
rows = normalize_rows(resp)
write_json(args.out, resp)
if args.csv:
write_csv(args.csv, rows)
# Optional lab-only download: choose sha256 from response if available
if args.download:
if not rows:
raise SystemExit(“No data returned; cannot download.”)
sha256 = rows[0].get(“sha256_hash”)
if not sha256:
raise SystemExit(“No sha256_hash in response; cannot download.”)
path = download_sample(sha256, args.download_dir)
print(f”[CYBERDUDEBIVASH] Downloaded sample ZIP to: {path}”)
print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)
if args.csv:
print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)
return 0
if args.cmd == “recent”:
resp = post_api({“query”: “get_recent”, “selector”: str(args.limit)})
rows = normalize_rows(resp)
write_json(args.out, resp)
if args.csv:
write_csv(args.csv, rows)
print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)
if args.csv:
print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)
return 0
if args.cmd == “tag”:
# MalwareBazaar supports: query=get_taginfo, tag=…
resp = post_api({“query”: “get_taginfo”, “tag”: args.value})
rows = normalize_rows(resp)
# best-effort limit client-side
if rows and args.limit:
rows = rows[: args.limit]
resp = {“query_status”: “ok”, “data”: rows, “note”: “client_side_limit_applied”}
write_json(args.out, resp)
if args.csv:
write_csv(args.csv, rows)
print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)
if args.csv:
print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)
return 0
if args.cmd == “signature”:
# MalwareBazaar supports: query=get_siginfo, signature=…
resp = post_api({“query”: “get_siginfo”, “signature”: args.value})
rows = normalize_rows(resp)
if rows and args.limit:
rows = rows[: args.limit]
resp = {“query_status”: “ok”, “data”: rows, “note”: “client_side_limit_applied”}
write_json(args.out, resp)
if args.csv:
write_csv(args.csv, rows)
print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)
if args.csv:
print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)
return 0
return 2
if __name__ == “__main__”:
raise SystemExit(main())
Real-time usage examples
1) Hash enrichment (SOC IOC triage)
python malbaz_quickstart.py hash --value <SHA256_OR_MD5_OR_SHA1> --out out/hash.json --csv out/hash.csv
2) Pull latest samples for daily hunting
python malbaz_quickstart.py recent --limit 100 --out out/recent.json --csv out/recent.csv
3) Hunt by tag (e.g., “stealer”, “ransomware”)
python malbaz_quickstart.py tag --value stealer --limit 50 --out out/tag_stealer.json
4) Search by malware family/signature
python malbaz_quickstart.py signature --value "AgentTesla" --limit 50 --out out/agenttesla.json
5) (LAB ONLY) Download sample zip for isolated sandbox
python malbaz_quickstart.py hash --value <SHA256> --download --download-dir out/downloads
SOC best-practice notes (CYBERDUDEBIVASH authority)
- Use this script for metadata enrichment + IOC pipeline, not “auto-detonation.”
- Keep downloads OFF by default and only enable in a sandbox environment.
- Ship JSON/CSV to SIEM, then correlate with:
- endpoint process telemetry
- DNS / proxy logs
- authentication anomalies
- email/phishing events
#cyberdudebivash #CyberDudeBivash #MalwareBazaar #ThreatIntel #MalwareAnalysis
#SOC #ThreatHunting #DFIR #IncidentResponse #DetectionEngineering
#IOC #YARA #ReverseEngineering #SecurityAutomation #PythonSecurity
#SIEM #Splunk #Elastic #CyberDefense #CyberSecurity
Leave a comment