MalwareBazaar API Quick-Start Script (CYBERDUDEBIVASH EDITION)

CYBERDUDEBIVASH

 Daily Threat Intel by CyberDudeBivash
Zero-days, exploit breakdowns, IOCs, detection rules & mitigation playbooks.

Follow on LinkedInApps & Security ToolsWWW.CYBERDUDEBIVASH.COM CYBERDUDEBIVASH PVT LTD

#!/usr/bin/env python3

“””

MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)

Defensive usage: query metadata for triage, threat intel, IOC enrichment.

Features:

– Query by hash (sha256/md5/sha1)

– Get recent samples

– Search by tag or signature

– Save results to JSON and optional CSV

– Optional download (OFF by default) for controlled lab-only use

Docs: https://bazaar.abuse.ch/api/

“””

from __future__ import annotations

import argparse

import csv

import json

import os

import time

from typing import Any, Dict, List, Optional

import requests

API_URL = “https://mb-api.abuse.ch/api/v1/”

def post_api(payload: Dict[str, Any], timeout: int = 20) -> Dict[str, Any]:

    r = requests.post(API_URL, data=payload, timeout=timeout)

    r.raise_for_status()

    return r.json()

def write_json(path: str, obj: Any) -> None:

    os.makedirs(os.path.dirname(path) or “.”, exist_ok=True)

    with open(path, “w”, encoding=”utf-8″) as f:

        json.dump(obj, f, indent=2, ensure_ascii=False)

def write_csv(path: str, rows: List[Dict[str, Any]], field_order: Optional[List[str]] = None) -> None:

    if not rows:

        return

    os.makedirs(os.path.dirname(path) or “.”, exist_ok=True)

    # pick stable fields

    if field_order is None:

        # common MalwareBazaar keys

        field_order = [

            “sha256_hash“, “md5_hash”, “sha1_hash”, “file_name”, “file_type”,

            “file_type_mime”, “file_size”, “first_seen”, “last_seen”, “reporter”,

            “signature”, “tags”, “intelligence”

        ]

        # add any unknown fields

        for k in rows[0].keys():

            if k not in field_order:

                field_order.append(k)

    with open(path, “w”, newline=””, encoding=”utf-8″) as f:

        w = csv.DictWriter(f, fieldnames=field_order)

        w.writeheader()

        for row in rows:

            clean = dict(row)

            # flatten lists/dicts for csv

            for k, v in list(clean.items()):

                if isinstance(v, (list, dict)):

                    clean[k] = json.dumps(v, ensure_ascii=False)

            w.writerow({k: clean.get(k, “”) for k in field_order})

def normalize_rows(resp: Dict[str, Any]) -> List[Dict[str, Any]]:

    “””

    MalwareBazaar usually returns:

      {“query_status”:”ok”,”data”:[{…},{…}]}

    or query_status not ok.

    “””

    if resp.get(“query_status”) != “ok”:

        return []

    data = resp.get(“data”)

    if isinstance(data, list):

        return data

    if isinstance(data, dict):

        return [data]

    return []

def download_sample(sha256: str, out_dir: str, timeout: int = 60) -> str:

    “””

    Lab-only: downloads the sample zip from MalwareBazaar.

    Requires: query=get_file, sha256_hash=…

    “””

    os.makedirs(out_dir, exist_ok=True)

    payload = {“query”: “get_file”, “sha256_hash”: sha256}

    r = requests.post(API_URL, data=payload, timeout=timeout)

    r.raise_for_status()

    # API returns raw file content for get_file

    out_path = os.path.join(out_dir, f”{sha256}.zip”)

    with open(out_path, “wb”) as f:

        f.write(r.content)

    return out_path

def main() -> int:

    ap = argparse.ArgumentParser(description=”MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)”)

    sub = ap.add_subparsers(dest=”cmd”, required=True)

    # hash

    p_hash = sub.add_parser(“hash”, help=”Query by hash (sha256/md5/sha1)”)

    p_hash.add_argument(“–value”, required=True, help=”Hash value to search”)

    p_hash.add_argument(“–out”, default=”out/mb_hash.json”, help=”Output JSON path”)

    p_hash.add_argument(“–csv”, default=””, help=”Optional CSV output path”)

    p_hash.add_argument(“–download”, action=”store_true”, help=”(LAB ONLY) Download sample zip (OFF by default)”)

    p_hash.add_argument(“–download-dir”, default=”out/downloads”, help=”Download directory (when –download)”)

    # recent

    p_recent = sub.add_parser(“recent”, help=”Get recent samples”)

    p_recent.add_argument(“–limit”, type=int, default=50, help=”Number of recent items (practical limit applies)”)

    p_recent.add_argument(“–out”, default=”out/mb_recent.json”, help=”Output JSON path”)

    p_recent.add_argument(“–csv”, default=””, help=”Optional CSV output path”)

    # tag

    p_tag = sub.add_parser(“tag”, help=”Search by tag (e.g. ‘stealer’, ‘ransomware‘)”)

    p_tag.add_argument(“–value”, required=True, help=”Tag to search”)

    p_tag.add_argument(“–limit”, type=int, default=50, help=”Limit (best-effort)”)

    p_tag.add_argument(“–out”, default=”out/mb_tag.json”, help=”Output JSON path”)

    p_tag.add_argument(“–csv”, default=””, help=”Optional CSV output path”)

    # signature

    p_sig = sub.add_parser(“signature”, help=”Search by signature (family)”)

    p_sig.add_argument(“–value”, required=True, help=”Signature/family to search”)

    p_sig.add_argument(“–limit”, type=int, default=50, help=”Limit (best-effort)”)

    p_sig.add_argument(“–out”, default=”out/mb_signature.json”, help=”Output JSON path”)

    p_sig.add_argument(“–csv”, default=””, help=”Optional CSV output path”)

    args = ap.parse_args()

    # Run

    if args.cmd == “hash”:

        resp = post_api({“query”: “get_info”, “hash”: args.value})

        rows = normalize_rows(resp)

        write_json(args.out, resp)

        if args.csv:

            write_csv(args.csv, rows)

        # Optional lab-only download: choose sha256 from response if available

        if args.download:

            if not rows:

                raise SystemExit(“No data returned; cannot download.”)

            sha256 = rows[0].get(“sha256_hash”)

            if not sha256:

                raise SystemExit(“No sha256_hash in response; cannot download.”)

            path = download_sample(sha256, args.download_dir)

            print(f”[CYBERDUDEBIVASH] Downloaded sample ZIP to: {path}”)

        print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)

        if args.csv:

            print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)

        return 0

    if args.cmd == “recent”:

        resp = post_api({“query”: “get_recent”, “selector”: str(args.limit)})

        rows = normalize_rows(resp)

        write_json(args.out, resp)

        if args.csv:

            write_csv(args.csv, rows)

        print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)

        if args.csv:

            print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)

        return 0

    if args.cmd == “tag”:

        # MalwareBazaar supports: query=get_taginfo, tag=…

        resp = post_api({“query”: “get_taginfo”, “tag”: args.value})

        rows = normalize_rows(resp)

        # best-effort limit client-side

        if rows and args.limit:

            rows = rows[: args.limit]

            resp = {“query_status”: “ok”, “data”: rows, “note”: “client_side_limit_applied”}

        write_json(args.out, resp)

        if args.csv:

            write_csv(args.csv, rows)

        print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)

        if args.csv:

            print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)

        return 0

    if args.cmd == “signature”:

        # MalwareBazaar supports: query=get_siginfo, signature=…

        resp = post_api({“query”: “get_siginfo”, “signature”: args.value})

        rows = normalize_rows(resp)

        if rows and args.limit:

            rows = rows[: args.limit]

            resp = {“query_status”: “ok”, “data”: rows, “note”: “client_side_limit_applied”}

        write_json(args.out, resp)

        if args.csv:

            write_csv(args.csv, rows)

        print(f”[CYBERDUDEBIVASH] Saved JSON: {args.out}”)

        if args.csv:

            print(f”[CYBERDUDEBIVASH] Saved CSV: {args.csv}”)

        return 0

    return 2

if __name__ == “__main__”:

    raise SystemExit(main())

Real-time usage examples 

1) Hash enrichment (SOC IOC triage)

python malbaz_quickstart.py hash --value <SHA256_OR_MD5_OR_SHA1> --out out/hash.json --csv out/hash.csv

2) Pull latest samples for daily hunting

python malbaz_quickstart.py recent --limit 100 --out out/recent.json --csv out/recent.csv

3) Hunt by tag (e.g., “stealer”, “ransomware”)

python malbaz_quickstart.py tag --value stealer --limit 50 --out out/tag_stealer.json

4) Search by malware family/signature

python malbaz_quickstart.py signature --value "AgentTesla" --limit 50 --out out/agenttesla.json

5) (LAB ONLY) Download sample zip for isolated sandbox

python malbaz_quickstart.py hash --value <SHA256> --download --download-dir out/downloads


SOC best-practice notes (CYBERDUDEBIVASH authority)

  • Use this script for metadata enrichment + IOC pipeline, not “auto-detonation.”
  • Keep downloads OFF by default and only enable in a sandbox environment.
  • Ship JSON/CSV to SIEM, then correlate with:
    • endpoint process telemetry
    • DNS / proxy logs
    • authentication anomalies
    • email/phishing events

#cyberdudebivash #CyberDudeBivash #MalwareBazaar #ThreatIntel #MalwareAnalysis
#SOC #ThreatHunting #DFIR #IncidentResponse #DetectionEngineering
#IOC #YARA #ReverseEngineering #SecurityAutomation #PythonSecurity
#SIEM #Splunk #Elastic #CyberDefense #CyberSecurity

Leave a comment

Design a site like this with WordPress.com
Get started