import csv from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Tuple FOLDER_MIME = "application/vnd.google-apps.folder" @dataclass class Item: item_id: str name: str depth: str mime_type: str modified_time: str size_bytes: str owner_email: str owner_name: str path: str sharing_user_email: str last_modifying_user_email: str def _norm(value: str) -> str: return (value or "").strip() def _to_bool(value: str) -> bool: return _norm(value).lower() in {"true", "1", "yes"} def file_category(mime_type: str, name: str) -> str: mime = _norm(mime_type) fname = _norm(name) if mime == FOLDER_MIME: return "FOLDER" if mime.startswith("application/vnd.google-apps."): return { "application/vnd.google-apps.spreadsheet": "G_SHEET", "application/vnd.google-apps.document": "G_DOC", "application/vnd.google-apps.presentation": "G_SLIDES", }.get(mime, "G_APP") if mime.startswith("image/"): return "IMAGE" if mime.startswith("video/"): return "VIDEO" if mime.startswith("audio/"): return "AUDIO" if mime in {"application/x-subrip", "text/vtt"}: return "SUBTITLE" if mime in {"application/pdf"}: return "PDF" if mime in {"application/zip", "application/x-7z-compressed"}: return "ARCHIVE" if mime.startswith("text/"): return "TEXT" if "." in fname: ext = fname.rsplit(".", 1)[-1].lower() return ext.upper()[:12] if ext else "OTHER" return "OTHER" def _permission_principal(perm: dict) -> str: ptype = perm.get("type", "") email = perm.get("email", "") domain = perm.get("domain", "") if ptype == "user": return email or "unknown-user" if ptype == "group": return f"group:{email}" if email else "group:unknown" if ptype == "domain": return f"domain:{domain}" if domain else "domain:unknown" if ptype == "anyone": return "anyone(public)" return email or domain or ptype or "UNKNOWN" def classify_access(permissions: List[dict]) -> str: active = [p for p in permissions if not p.get("deleted", False)] ptypes = {p["type"] for p in active if p["type"]} owner_only = len(active) == 1 and any(p["role"] == "owner" for p in active) if "anyone" in ptypes: return "PUBLIC" if "domain" in ptypes: return "DOMAIN" if owner_only: return "PRIVATE" if "user" in ptypes or "group" in ptypes: return "SHARED_USERS" return "PRIVATE" def _is_external_email(email: str) -> bool: e = _norm(email).lower() return bool(e and "@" in e and not e.endswith("@wolkabout.com")) def classify_risk( *, access_scope: str, shared_with_count: int, external_target_count: int, writer_target_count: int, writer_without_expiry_count: int, owner_differs_from_shared_by: bool, ) -> tuple[str, str, int, str]: score = 0 flags: List[str] = [] reasons: List[str] = [] if access_scope == "PUBLIC": score += 75 flags.append("PUBLIC_LINK") reasons.append("Javno dostupno (anyone link)") elif access_scope == "DOMAIN": score += 60 flags.append("DOMAIN_WIDE") reasons.append("Dostupno celom domenu") if shared_with_count >= 20: score += 45 flags.append("MASS_SHARING_20") reasons.append(f"Masovno deljenje ({shared_with_count} primalaca)") elif shared_with_count >= 10: score += 30 flags.append("MASS_SHARING_10") reasons.append(f"Siroko deljenje ({shared_with_count} primalaca)") elif shared_with_count >= 5: score += 18 flags.append("MULTI_SHARING_5") reasons.append(f"Vise primalaca ({shared_with_count})") if external_target_count >= 5: score += 35 flags.append("EXTERNAL_5") reasons.append(f"Deljeno van firme ({external_target_count} eksternih)") elif external_target_count >= 1: score += 20 flags.append("EXTERNAL_1") reasons.append("Deljeno van firme") if writer_target_count >= 3: score += 22 flags.append("WRITER_3") reasons.append(f"Vise edit prava ({writer_target_count})") elif writer_target_count >= 1: score += 12 flags.append("WRITER_1") reasons.append("Postoje edit prava") if writer_without_expiry_count >= 1: score += 10 flags.append("WRITER_NO_EXPIRY") reasons.append("Edit prava bez isteka") if owner_differs_from_shared_by: score += 8 flags.append("RESHARED") reasons.append("Deljenje nije pokrenuo originalni owner") if access_scope == "PRIVATE" and shared_with_count == 0: score = max(0, score - 8) score = max(0, min(score, 100)) if score >= 70: level = "HIGH" elif score >= 35: level = "MEDIUM" else: level = "LOW" reason = "; ".join(reasons[:3]) if reasons else "Nema sirokog deljenja" return level, reason, score, "|".join(flags) def parse_gam_raw_csv(raw_csv_path: Path) -> List[dict]: by_id: Dict[str, Item] = {} permissions_by_id: Dict[str, List[dict]] = defaultdict(list) with raw_csv_path.open(newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: item_id = _norm(row.get("id")) if not item_id: continue if item_id not in by_id: sharing_user = _norm(row.get("sharingUser.emailAddress")) if not sharing_user: sharing_user = _norm(row.get("sharingUser.0.emailAddress")) last_mod = _norm(row.get("lastModifyingUser.emailAddress")) if not last_mod: last_mod = _norm(row.get("lastModifyingUser.0.emailAddress")) by_id[item_id] = Item( item_id=item_id, name=_norm(row.get("name")), depth=_norm(row.get("depth")), mime_type=_norm(row.get("mimeType")), modified_time=_norm(row.get("modifiedTime")), size_bytes=_norm(row.get("size")), owner_email=_norm(row.get("owners.0.emailAddress")), owner_name=_norm(row.get("owners.0.displayName")), path=_norm(row.get("path.0")), sharing_user_email=sharing_user, last_modifying_user_email=last_mod, ) permissions_by_id[item_id].append( { "type": _norm(row.get("permission.type")), "role": _norm(row.get("permission.role")), "email": _norm(row.get("permission.emailAddress")), "domain": _norm(row.get("permission.domain")), "allow_discovery": _to_bool(row.get("permission.allowFileDiscovery", "")), "id": _norm(row.get("permission.id")), "deleted": _to_bool(row.get("permission.deleted", "")), "expiration_time": _norm(row.get("permission.expirationTime")), } ) curated_rows: List[dict] = [] for item_id, base in by_id.items(): raw_permissions = permissions_by_id[item_id] unique = {} for p in raw_permissions: key = ( p.get("type", ""), p.get("role", ""), p.get("email", ""), p.get("domain", ""), p.get("allow_discovery", False), p.get("expiration_time", ""), p.get("deleted", False), ) unique[key] = p permissions = list(unique.values()) access_scope = classify_access(permissions) path = base.path root_path = path.split("/", 1)[0] if path else "UNKNOWN" parent_path = path.rsplit("/", 1)[0] if "/" in path else root_path active_non_owner = [ p for p in permissions if not p["deleted"] and p["role"] != "owner" and (p["type"] or p["email"] or p["domain"]) ] shared_targets = sorted({_permission_principal(p) for p in active_non_owner}) external_target_count = len( [ p for p in active_non_owner if _is_external_email(p.get("email", "")) or ( p.get("type") == "domain" and p.get("domain") and _norm(p.get("domain", "")).lower() != "wolkabout.com" ) ] ) writer_target_count = len( [ p for p in active_non_owner if p.get("role", "").lower() in {"writer", "organizer", "fileorganizer"} ] ) writer_without_expiry_count = len( [ p for p in active_non_owner if p.get("role", "").lower() in {"writer", "organizer", "fileorganizer"} and not p.get("expiration_time") ] ) permission_entries = sorted( { f"{p['type']}:{_permission_principal(p)}:{p['role']}" for p in permissions if not p["deleted"] and p["type"] } ) direct_people = sorted( { p["email"] for p in active_non_owner if p["type"] in {"user", "group"} and p["email"] } ) direct_domains = sorted( { p["domain"] for p in active_non_owner if p["type"] == "domain" and p["domain"] } ) # Best effort for "who shared": # 1) sharingUser (if GAM provides it), 2) last modifying user, 3) owner. shared_by = ( base.sharing_user_email or base.last_modifying_user_email or base.owner_email or "UNKNOWN" ) shared_with_count = len(shared_targets) risk_level, risk_reason, risk_score, risk_flags = classify_risk( access_scope=access_scope, shared_with_count=shared_with_count, external_target_count=external_target_count, writer_target_count=writer_target_count, writer_without_expiry_count=writer_without_expiry_count, owner_differs_from_shared_by=bool( base.owner_email and shared_by and base.owner_email.lower() != shared_by.lower() ), ) curated_rows.append( { "item_id": base.item_id, "name": base.name, "item_kind": "FOLDER" if base.mime_type == FOLDER_MIME else "FILE", "path": path, "parent_path": parent_path, "root_path": root_path, "depth": base.depth, "owner_email": base.owner_email, "owner_name": base.owner_name, "original_owner_email": base.owner_email, "shared_by_email": shared_by, "last_modifying_user_email": base.last_modifying_user_email, "access_scope": access_scope, "access_targets": "|".join(shared_targets), "direct_user_group_targets": "|".join(direct_people), "direct_domain_targets": "|".join(direct_domains), "permission_entries": "|".join(permission_entries), "shared_with_count": shared_with_count, "risk_level": risk_level, "risk_reason": risk_reason, "risk_score": risk_score, "risk_flags": risk_flags, "is_shared": str(access_scope != "PRIVATE"), "mime_type": base.mime_type, "file_category": file_category(base.mime_type, base.name), "size_bytes": base.size_bytes, "modified_time": base.modified_time, "external_target_count": external_target_count, "writer_target_count": writer_target_count, "permission_count": str(len([p for p in permissions if not p['deleted']])), } ) curated_rows.sort(key=lambda x: (x["root_path"], x["path"], x["name"])) return curated_rows def summarize(curated_rows: List[dict]) -> List[dict]: agg: Dict[Tuple[str, str, str, str], int] = defaultdict(int) for row in curated_rows: key = ( row["root_path"], row["file_category"], row["access_scope"], row["owner_email"], ) agg[key] += 1 output = [] for (root_path, file_category_, access_scope, owner_email), item_count in agg.items(): output.append( { "root_path": root_path, "file_category": file_category_, "access_scope": access_scope, "owner_email": owner_email, "item_count": item_count, } ) output.sort(key=lambda x: (-x["item_count"], x["root_path"], x["file_category"])) return output def sharing_matrix(curated_rows: List[dict]) -> List[dict]: out: List[dict] = [] for row in curated_rows: entries = [e for e in row.get("permission_entries", "").split("|") if e] if not entries: out.append( { "item_id": row["item_id"], "name": row["name"], "path": row["path"], "original_owner_email": row["original_owner_email"], "shared_by_email": row["shared_by_email"], "shared_to": "owner-only", "permission_type": "user", "permission_role": "owner", "access_scope": row["access_scope"], "file_category": row["file_category"], } ) continue for ent in entries: # format: type:principal:role parts = ent.split(":", 2) if len(parts) != 3: continue ptype, principal, role = parts out.append( { "item_id": row["item_id"], "name": row["name"], "path": row["path"], "original_owner_email": row["original_owner_email"], "shared_by_email": row["shared_by_email"], "shared_to": principal, "permission_type": ptype, "permission_role": role, "access_scope": row["access_scope"], "file_category": row["file_category"], } ) out.sort(key=lambda x: (x["path"], x["shared_to"], x["permission_role"])) return out def top_stats(curated_rows: List[dict]) -> dict: return { "total_items": len(curated_rows), "total_folders": sum(1 for x in curated_rows if x["item_kind"] == "FOLDER"), "public_items": sum(1 for x in curated_rows if x["access_scope"] == "PUBLIC"), "domain_items": sum(1 for x in curated_rows if x["access_scope"] == "DOMAIN"), "shared_user_items": sum( 1 for x in curated_rows if x["access_scope"] == "SHARED_USERS" ), "roots": Counter(x["root_path"] for x in curated_rows).most_common(8), }