Files
2026-05-19 14:53:39 +02:00

459 lines
15 KiB
Python

import csv
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple
FOLDER_MIME = "application/vnd.google-apps.folder"
@dataclass
class Item:
item_id: str
name: str
depth: str
mime_type: str
modified_time: str
size_bytes: str
owner_email: str
owner_name: str
path: str
sharing_user_email: str
last_modifying_user_email: str
def _norm(value: str) -> str:
return (value or "").strip()
def _to_bool(value: str) -> bool:
return _norm(value).lower() in {"true", "1", "yes"}
def file_category(mime_type: str, name: str) -> str:
mime = _norm(mime_type)
fname = _norm(name)
if mime == FOLDER_MIME:
return "FOLDER"
if mime.startswith("application/vnd.google-apps."):
return {
"application/vnd.google-apps.spreadsheet": "G_SHEET",
"application/vnd.google-apps.document": "G_DOC",
"application/vnd.google-apps.presentation": "G_SLIDES",
}.get(mime, "G_APP")
if mime.startswith("image/"):
return "IMAGE"
if mime.startswith("video/"):
return "VIDEO"
if mime.startswith("audio/"):
return "AUDIO"
if mime in {"application/x-subrip", "text/vtt"}:
return "SUBTITLE"
if mime in {"application/pdf"}:
return "PDF"
if mime in {"application/zip", "application/x-7z-compressed"}:
return "ARCHIVE"
if mime.startswith("text/"):
return "TEXT"
if "." in fname:
ext = fname.rsplit(".", 1)[-1].lower()
return ext.upper()[:12] if ext else "OTHER"
return "OTHER"
def _permission_principal(perm: dict) -> str:
ptype = perm.get("type", "")
email = perm.get("email", "")
domain = perm.get("domain", "")
if ptype == "user":
return email or "unknown-user"
if ptype == "group":
return f"group:{email}" if email else "group:unknown"
if ptype == "domain":
return f"domain:{domain}" if domain else "domain:unknown"
if ptype == "anyone":
return "anyone(public)"
return email or domain or ptype or "UNKNOWN"
def classify_access(permissions: List[dict]) -> str:
active = [p for p in permissions if not p.get("deleted", False)]
ptypes = {p["type"] for p in active if p["type"]}
owner_only = len(active) == 1 and any(p["role"] == "owner" for p in active)
if "anyone" in ptypes:
return "PUBLIC"
if "domain" in ptypes:
return "DOMAIN"
if owner_only:
return "PRIVATE"
if "user" in ptypes or "group" in ptypes:
return "SHARED_USERS"
return "PRIVATE"
def _is_external_email(email: str) -> bool:
e = _norm(email).lower()
return bool(e and "@" in e and not e.endswith("@wolkabout.com"))
def classify_risk(
*,
access_scope: str,
shared_with_count: int,
external_target_count: int,
writer_target_count: int,
writer_without_expiry_count: int,
owner_differs_from_shared_by: bool,
) -> tuple[str, str, int, str]:
score = 0
flags: List[str] = []
reasons: List[str] = []
if access_scope == "PUBLIC":
score += 75
flags.append("PUBLIC_LINK")
reasons.append("Javno dostupno (anyone link)")
elif access_scope == "DOMAIN":
score += 60
flags.append("DOMAIN_WIDE")
reasons.append("Dostupno celom domenu")
if shared_with_count >= 20:
score += 45
flags.append("MASS_SHARING_20")
reasons.append(f"Masovno deljenje ({shared_with_count} primalaca)")
elif shared_with_count >= 10:
score += 30
flags.append("MASS_SHARING_10")
reasons.append(f"Siroko deljenje ({shared_with_count} primalaca)")
elif shared_with_count >= 5:
score += 18
flags.append("MULTI_SHARING_5")
reasons.append(f"Vise primalaca ({shared_with_count})")
if external_target_count >= 5:
score += 35
flags.append("EXTERNAL_5")
reasons.append(f"Deljeno van firme ({external_target_count} eksternih)")
elif external_target_count >= 1:
score += 20
flags.append("EXTERNAL_1")
reasons.append("Deljeno van firme")
if writer_target_count >= 3:
score += 22
flags.append("WRITER_3")
reasons.append(f"Vise edit prava ({writer_target_count})")
elif writer_target_count >= 1:
score += 12
flags.append("WRITER_1")
reasons.append("Postoje edit prava")
if writer_without_expiry_count >= 1:
score += 10
flags.append("WRITER_NO_EXPIRY")
reasons.append("Edit prava bez isteka")
if owner_differs_from_shared_by:
score += 8
flags.append("RESHARED")
reasons.append("Deljenje nije pokrenuo originalni owner")
if access_scope == "PRIVATE" and shared_with_count == 0:
score = max(0, score - 8)
score = max(0, min(score, 100))
if score >= 70:
level = "HIGH"
elif score >= 35:
level = "MEDIUM"
else:
level = "LOW"
reason = "; ".join(reasons[:3]) if reasons else "Nema sirokog deljenja"
return level, reason, score, "|".join(flags)
def parse_gam_raw_csv(raw_csv_path: Path) -> List[dict]:
by_id: Dict[str, Item] = {}
permissions_by_id: Dict[str, List[dict]] = defaultdict(list)
with raw_csv_path.open(newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
item_id = _norm(row.get("id"))
if not item_id:
continue
if item_id not in by_id:
sharing_user = _norm(row.get("sharingUser.emailAddress"))
if not sharing_user:
sharing_user = _norm(row.get("sharingUser.0.emailAddress"))
last_mod = _norm(row.get("lastModifyingUser.emailAddress"))
if not last_mod:
last_mod = _norm(row.get("lastModifyingUser.0.emailAddress"))
by_id[item_id] = Item(
item_id=item_id,
name=_norm(row.get("name")),
depth=_norm(row.get("depth")),
mime_type=_norm(row.get("mimeType")),
modified_time=_norm(row.get("modifiedTime")),
size_bytes=_norm(row.get("size")),
owner_email=_norm(row.get("owners.0.emailAddress")),
owner_name=_norm(row.get("owners.0.displayName")),
path=_norm(row.get("path.0")),
sharing_user_email=sharing_user,
last_modifying_user_email=last_mod,
)
permissions_by_id[item_id].append(
{
"type": _norm(row.get("permission.type")),
"role": _norm(row.get("permission.role")),
"email": _norm(row.get("permission.emailAddress")),
"domain": _norm(row.get("permission.domain")),
"allow_discovery": _to_bool(row.get("permission.allowFileDiscovery", "")),
"id": _norm(row.get("permission.id")),
"deleted": _to_bool(row.get("permission.deleted", "")),
"expiration_time": _norm(row.get("permission.expirationTime")),
}
)
curated_rows: List[dict] = []
for item_id, base in by_id.items():
raw_permissions = permissions_by_id[item_id]
unique = {}
for p in raw_permissions:
key = (
p.get("type", ""),
p.get("role", ""),
p.get("email", ""),
p.get("domain", ""),
p.get("allow_discovery", False),
p.get("expiration_time", ""),
p.get("deleted", False),
)
unique[key] = p
permissions = list(unique.values())
access_scope = classify_access(permissions)
path = base.path
root_path = path.split("/", 1)[0] if path else "UNKNOWN"
parent_path = path.rsplit("/", 1)[0] if "/" in path else root_path
active_non_owner = [
p
for p in permissions
if not p["deleted"] and p["role"] != "owner" and (p["type"] or p["email"] or p["domain"])
]
shared_targets = sorted({_permission_principal(p) for p in active_non_owner})
external_target_count = len(
[
p
for p in active_non_owner
if _is_external_email(p.get("email", ""))
or (
p.get("type") == "domain"
and p.get("domain")
and _norm(p.get("domain", "")).lower() != "wolkabout.com"
)
]
)
writer_target_count = len(
[
p
for p in active_non_owner
if p.get("role", "").lower() in {"writer", "organizer", "fileorganizer"}
]
)
writer_without_expiry_count = len(
[
p
for p in active_non_owner
if p.get("role", "").lower() in {"writer", "organizer", "fileorganizer"}
and not p.get("expiration_time")
]
)
permission_entries = sorted(
{
f"{p['type']}:{_permission_principal(p)}:{p['role']}"
for p in permissions
if not p["deleted"] and p["type"]
}
)
direct_people = sorted(
{
p["email"]
for p in active_non_owner
if p["type"] in {"user", "group"} and p["email"]
}
)
direct_domains = sorted(
{
p["domain"]
for p in active_non_owner
if p["type"] == "domain" and p["domain"]
}
)
# Best effort for "who shared":
# 1) sharingUser (if GAM provides it), 2) last modifying user, 3) owner.
shared_by = (
base.sharing_user_email
or base.last_modifying_user_email
or base.owner_email
or "UNKNOWN"
)
shared_with_count = len(shared_targets)
risk_level, risk_reason, risk_score, risk_flags = classify_risk(
access_scope=access_scope,
shared_with_count=shared_with_count,
external_target_count=external_target_count,
writer_target_count=writer_target_count,
writer_without_expiry_count=writer_without_expiry_count,
owner_differs_from_shared_by=bool(
base.owner_email
and shared_by
and base.owner_email.lower() != shared_by.lower()
),
)
curated_rows.append(
{
"item_id": base.item_id,
"name": base.name,
"item_kind": "FOLDER" if base.mime_type == FOLDER_MIME else "FILE",
"path": path,
"parent_path": parent_path,
"root_path": root_path,
"depth": base.depth,
"owner_email": base.owner_email,
"owner_name": base.owner_name,
"original_owner_email": base.owner_email,
"shared_by_email": shared_by,
"last_modifying_user_email": base.last_modifying_user_email,
"access_scope": access_scope,
"access_targets": "|".join(shared_targets),
"direct_user_group_targets": "|".join(direct_people),
"direct_domain_targets": "|".join(direct_domains),
"permission_entries": "|".join(permission_entries),
"shared_with_count": shared_with_count,
"risk_level": risk_level,
"risk_reason": risk_reason,
"risk_score": risk_score,
"risk_flags": risk_flags,
"is_shared": str(access_scope != "PRIVATE"),
"mime_type": base.mime_type,
"file_category": file_category(base.mime_type, base.name),
"size_bytes": base.size_bytes,
"modified_time": base.modified_time,
"external_target_count": external_target_count,
"writer_target_count": writer_target_count,
"permission_count": str(len([p for p in permissions if not p['deleted']])),
}
)
curated_rows.sort(key=lambda x: (x["root_path"], x["path"], x["name"]))
return curated_rows
def summarize(curated_rows: List[dict]) -> List[dict]:
agg: Dict[Tuple[str, str, str, str], int] = defaultdict(int)
for row in curated_rows:
key = (
row["root_path"],
row["file_category"],
row["access_scope"],
row["owner_email"],
)
agg[key] += 1
output = []
for (root_path, file_category_, access_scope, owner_email), item_count in agg.items():
output.append(
{
"root_path": root_path,
"file_category": file_category_,
"access_scope": access_scope,
"owner_email": owner_email,
"item_count": item_count,
}
)
output.sort(key=lambda x: (-x["item_count"], x["root_path"], x["file_category"]))
return output
def sharing_matrix(curated_rows: List[dict]) -> List[dict]:
out: List[dict] = []
for row in curated_rows:
entries = [e for e in row.get("permission_entries", "").split("|") if e]
if not entries:
out.append(
{
"item_id": row["item_id"],
"name": row["name"],
"path": row["path"],
"original_owner_email": row["original_owner_email"],
"shared_by_email": row["shared_by_email"],
"shared_to": "owner-only",
"permission_type": "user",
"permission_role": "owner",
"access_scope": row["access_scope"],
"file_category": row["file_category"],
}
)
continue
for ent in entries:
# format: type:principal:role
parts = ent.split(":", 2)
if len(parts) != 3:
continue
ptype, principal, role = parts
out.append(
{
"item_id": row["item_id"],
"name": row["name"],
"path": row["path"],
"original_owner_email": row["original_owner_email"],
"shared_by_email": row["shared_by_email"],
"shared_to": principal,
"permission_type": ptype,
"permission_role": role,
"access_scope": row["access_scope"],
"file_category": row["file_category"],
}
)
out.sort(key=lambda x: (x["path"], x["shared_to"], x["permission_role"]))
return out
def top_stats(curated_rows: List[dict]) -> dict:
return {
"total_items": len(curated_rows),
"total_folders": sum(1 for x in curated_rows if x["item_kind"] == "FOLDER"),
"public_items": sum(1 for x in curated_rows if x["access_scope"] == "PUBLIC"),
"domain_items": sum(1 for x in curated_rows if x["access_scope"] == "DOMAIN"),
"shared_user_items": sum(
1 for x in curated_rows if x["access_scope"] == "SHARED_USERS"
),
"roots": Counter(x["root_path"] for x in curated_rows).most_common(8),
}