"""Filesystem MCP server with AAP alignment."""
from mcp.server.fastmcp import FastMCP
from aap import AlignmentCard, APTrace, Action, Decision, Escalation, verify_trace
from datetime import datetime, timezone
import uuid
import json
import os
# --- Alignment Configuration ---
SERVER_ALIGNMENT = AlignmentCard(
aap_version="0.1.0",
card_id="ac-filesystem-001",
agent_id="mcp-filesystem",
issued_at="2026-01-31T12:00:00Z",
principal={"type": "human", "relationship": "delegated_authority"},
values={
"declared": ["user_control", "transparency", "harm_prevention"],
"conflicts_with": ["data_exfiltration", "unauthorized_access"],
},
autonomy_envelope={
"bounded_actions": ["read_file", "list_directory", "file_info"],
"escalation_triggers": [
{
"condition": "tool in ['write_file', 'append_file']",
"action": "escalate",
"reason": "Write operations require approval"
}
],
"forbidden_actions": ["delete_file", "execute_command"],
},
audit_commitment={
"trace_format": "ap-trace-v1",
"retention_days": 90,
"queryable": True,
"query_endpoint": "mcp://filesystem/alignment/traces",
},
)
# --- Trace Storage ---
TRACES: list[dict] = []
def store_trace(trace: APTrace):
"""Store trace for auditing."""
TRACES.append(trace.model_dump(mode="json"))
# --- MCP Server ---
mcp = FastMCP(
name="filesystem",
instructions=f"""Filesystem operations with AAP alignment.
Alignment Card ID: {SERVER_ALIGNMENT.card_id}
Values: {', '.join(SERVER_ALIGNMENT.values['declared'])}
Tool Boundaries:
- Bounded (autonomous): read_file, list_directory, file_info
- Escalate (needs approval): write_file, append_file
- Forbidden (blocked): delete_file, execute_command
"""
)
# --- Alignment Resource ---
@mcp.resource("alignment://card")
def alignment_card() -> str:
"""Get server alignment card."""
return SERVER_ALIGNMENT.model_dump_json(indent=2)
@mcp.resource("alignment://traces")
def alignment_traces() -> str:
"""Get recent AP-Traces."""
return json.dumps(TRACES[-100:], indent=2)
# --- Tools with Tracing ---
@mcp.tool()
def read_file(path: str) -> str:
"""Read contents of a file (bounded action).
Args:
path: Path to the file to read
Returns:
File contents
"""
trace_id = f"tr-{uuid.uuid4().hex[:12]}"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
try:
with open(path) as f:
content = f.read()
trace = APTrace(
trace_id=trace_id,
agent_id="mcp-filesystem",
card_id=SERVER_ALIGNMENT.card_id,
timestamp=timestamp,
action=Action(type="tool_invocation", name="read_file", category="bounded"),
decision=Decision(
alternatives_considered=[{"option_id": "read", "description": f"Read {path}", "score": 1.0, "flags": []}],
selected="read",
selection_reasoning="Read-only operation within autonomy envelope",
values_applied=["transparency"],
),
escalation=Escalation(
evaluated=True,
triggers_checked=[{"trigger": "tool == 'read_file'", "matched": False}],
required=False,
reason="Bounded action",
),
context={"path": path, "bytes_read": len(content)},
)
store_trace(trace)
return content
except Exception as e:
# Trace the failure
trace = APTrace(
trace_id=trace_id,
agent_id="mcp-filesystem",
card_id=SERVER_ALIGNMENT.card_id,
timestamp=timestamp,
action=Action(type="tool_invocation", name="read_file", category="bounded"),
decision=Decision(
alternatives_considered=[],
selected=None,
selection_reasoning=f"Operation failed: {e}",
values_applied=["transparency"],
),
escalation=Escalation(evaluated=True, triggers_checked=[], required=False, reason="Failed"),
)
store_trace(trace)
raise
@mcp.tool()
def write_file(path: str, content: str, approved: bool = False) -> dict:
"""Write content to a file (requires escalation).
Args:
path: Path to the file to write
content: Content to write
approved: Whether this write was explicitly approved by principal
Returns:
Status with bytes written
"""
trace_id = f"tr-{uuid.uuid4().hex[:12]}"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
if not approved:
trace = APTrace(
trace_id=trace_id,
agent_id="mcp-filesystem",
card_id=SERVER_ALIGNMENT.card_id,
timestamp=timestamp,
action=Action(type="tool_invocation", name="write_file", category="escalate"),
decision=Decision(
alternatives_considered=[],
selected=None,
selection_reasoning="Write operation requires explicit approval",
values_applied=["user_control"],
),
escalation=Escalation(
evaluated=True,
triggers_checked=[{"trigger": "tool == 'write_file'", "matched": True}],
required=True,
reason="Write operations require approval",
),
)
store_trace(trace)
raise PermissionError("write_file requires approved=True (escalation)")
with open(path, 'w') as f:
bytes_written = f.write(content)
trace = APTrace(
trace_id=trace_id,
agent_id="mcp-filesystem",
card_id=SERVER_ALIGNMENT.card_id,
timestamp=timestamp,
action=Action(type="tool_invocation", name="write_file", category="escalate"),
decision=Decision(
alternatives_considered=[{"option_id": "write", "description": f"Write to {path}", "score": 1.0, "flags": ["approved"]}],
selected="write",
selection_reasoning="Write approved by principal",
values_applied=["user_control", "transparency"],
),
escalation=Escalation(
evaluated=True,
triggers_checked=[{"trigger": "tool == 'write_file'", "matched": True}],
required=True,
principal_response="approved",
reason="Write operations require approval",
),
context={"path": path, "bytes_written": bytes_written},
)
store_trace(trace)
return {"status": "success", "bytes_written": bytes_written}
@mcp.tool()
def delete_file(path: str) -> dict:
"""Delete a file (forbidden action - always blocked).
Args:
path: Path to the file to delete
Returns:
Never returns - always raises
"""
trace_id = f"tr-{uuid.uuid4().hex[:12]}"
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
trace = APTrace(
trace_id=trace_id,
agent_id="mcp-filesystem",
card_id=SERVER_ALIGNMENT.card_id,
timestamp=timestamp,
action=Action(type="tool_invocation", name="delete_file", category="forbidden"),
decision=Decision(
alternatives_considered=[],
selected=None,
selection_reasoning="Forbidden action blocked",
values_applied=["harm_prevention", "user_control"],
),
escalation=Escalation(
evaluated=True,
triggers_checked=[{"trigger": "forbidden_action", "matched": True}],
required=True,
reason="delete_file is in forbidden_actions",
),
)
store_trace(trace)
raise PermissionError("delete_file is a forbidden action")
# --- Verification Endpoint ---
@mcp.tool()
def verify_recent_traces(limit: int = 10) -> dict:
"""Verify recent traces against the alignment card.
Args:
limit: Number of recent traces to verify
Returns:
Verification summary
"""
recent = TRACES[-limit:]
results = []
for trace_dict in recent:
trace = APTrace(**trace_dict)
result = verify_trace(trace, SERVER_ALIGNMENT)
results.append({
"trace_id": trace.trace_id,
"passed": result.verified,
"violations": [v.description for v in result.violations] if result.violations else [],
})
passed = sum(1 for r in results if r["passed"])
return {
"total": len(results),
"passed": passed,
"failed": len(results) - passed,
"details": results,
}
if __name__ == "__main__":
mcp.run()