Is there a sample script for API-based audit log export?
The following sample script demonstrates how to authenticate, retrieve logs using a "last seen" timestamp to avoid duplicates, and save the results to a JSON file.
Python Sample Script
import requests
import json
from datetime import datetime, timedelta
# --- CONFIGURATION ---
TENANT_URL = "https://your-tenant.spirion.com" # Replace with your tenant URL
API_TOKEN = "YOUR_BEARER_TOKEN" # Replace with your API Key/Token
LOG_FILE = "spirion_audit_export.json"
STATE_FILE = "last_timestamp.txt" # Stores the last exported timestamp
def get_last_timestamp():
"""Reads the last exported timestamp from a local file."""
try:
with open(STATE_FILE, 'r') as f:
return f.read().strip()
except FileNotFoundError:
# Default to the last 24 hours if no state file exists
return (datetime.utcnow() - timedelta(days=1)).isoformat() + "Z"
def save_last_timestamp(timestamp):
"""Saves the latest timestamp to a local file."""
with open(STATE_FILE, 'w') as f:
f.write(timestamp)
def export_audit_logs():
last_seen = get_last_timestamp()
endpoint = f"{TENANT_URL}/api/v1/auditlogs"
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
# Query parameters for filtering and pagination
params = {
"startDate": last_seen,
"limit": 500,
"sort": "timestamp_asc" # Ensure we process in chronological order
}
print(f"Fetching logs since: {last_seen}...")
try:
response = requests.get(endpoint, headers=headers, params=params)
response.raise_for_status()
logs = response.json()
if not logs:
print("No new audit events found.")
return
# Append logs to your local file or send to SIEM
with open(LOG_FILE, 'a') as f:
for entry in logs:
f.write(json.dumps(entry) + "\n")
# Update the state file with the timestamp of the last record
new_last_timestamp = logs[-1]['timestamp']
save_last_timestamp(new_last_timestamp)
print(f"Successfully exported {len(logs)} events. Newest timestamp: {new_last_timestamp}")
except requests.exceptions.RequestException as e:
print(f"Error connecting to Spirion API: {e}")
if __name__ == "__main__":
export_audit_logs()
How to Use This Script
- Install Requirements: Ensure you have the
requestslibrary installed (pip install requests). - Set Credentials: Replace
TENANT_URLandAPI_TOKENwith your actual Spirion environment details. - Run Manually: Execute the script to verify it pulls data and creates the
last_timestamp.txtandspirion_audit_export.jsonfiles. - Automate:
- Windows: Use Task Scheduler to run
python export_script.pyevery hour. - Linux: Add a Cron job (e.g.,
0 * * * * /usr/bin/python3 /path/to/script.py).
- Windows: Use Task Scheduler to run
Key Features of This Script
- Incremental Fetching: By using
last_timestamp.txt, the script only pulls events that have occurred since the last successful run. This prevents data duplication in your SIEM. - JSONL Format: It saves logs in "JSON Lines" format, which is the standard for ingestion into tools like Splunk, Logstash, or Azure Monitor.
- Error Handling: It includes basic exception handling to alert you if the API is unreachable or the token is invalid.
Recommendations
- Security: In a production environment, do not hardcode the
API_TOKEN. Use an environment variable or a secret manager (like AWS Secrets Manager or Azure Key Vault). - SIEM Integration: Instead of writing to a local file (
LOG_FILE), you can modify the script toPOSTthe data directly to your SIEM's HTTP Event Collector (HEC) endpoint. - Timezone Awareness: Spirion API timestamps are in UTC. Ensure your script and your SIEM are aligned on the UTC timezone to avoid "time-shifting" gaps in your audit trail.
Is there a script with pagination for large audit logs?
Yes, the script can be modified to handle pagination. This is essential for large environments where a single time window might contain thousands of events, exceeding the API's maximum limit per request (typically 500 or 1,000 records).
To handle pagination, the script needs to use a while loop that continues to request data as long as the API indicates there are more records available.
Python Sample Script with Pagination Logic
import requests
import json
import time
from datetime import datetime, timedelta
# --- CONFIGURATION ---
TENANT_URL = "https://your-tenant.spirion.com"
API_TOKEN = "YOUR_BEARER_TOKEN"
LOG_FILE = "spirion_audit_export.json"
STATE_FILE = "last_timestamp.txt"
LIMIT_PER_PAGE = 500 # Standard API limit
def get_last_timestamp():
try:
with open(STATE_FILE, 'r') as f:
return f.read().strip()
except FileNotFoundError:
return (datetime.utcnow() - timedelta(days=1)).isoformat() + "Z"
def save_last_timestamp(timestamp):
with open(STATE_FILE, 'w') as f:
f.write(timestamp)
def export_audit_logs():
last_seen = get_last_timestamp()
endpoint = f"{TENANT_URL}/api/v1/auditlogs"
headers = {"Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json"}
all_events_processed = 0
has_more_data = True
current_start_date = last_seen
print(f"Starting export from: {current_start_date}")
while has_more_data:
params = {
"startDate": current_start_date,
"limit": LIMIT_PER_PAGE,
"sort": "timestamp_asc"
}
try:
response = requests.get(endpoint, headers=headers, params=params)
response.raise_for_status()
logs = response.json()
if not logs or len(logs) == 0:
has_more_data = False
break
# Process the batch
with open(LOG_FILE, 'a') as f:
for entry in logs:
f.write(json.dumps(entry) + "\n")
all_events_processed += len(logs)
# Update the pointer for the NEXT page
# We use the timestamp of the last record in this batch
last_batch_timestamp = logs[-1]['timestamp']
# If we received a full page, there might be more
if len(logs) == LIMIT_PER_PAGE:
# To avoid re-processing the exact same last record,
# some APIs require adding a millisecond or using an offset.
# For Spirion, we update the current_start_date to the last timestamp.
current_start_date = last_batch_timestamp
print(f"Batch complete. Processed {all_events_processed} so far. Moving to next page...")
time.sleep(1) # Polite rate-limiting
else:
# We received less than a full page, so we are caught up
save_last_timestamp(last_batch_timestamp)
has_more_data = False
except requests.exceptions.RequestException as e:
print(f"Error during pagination: {e}")
has_more_data = False
print(f"Export finished. Total events exported: {all_events_processed}")
if __name__ == "__main__":
export_audit_logs()
How the Pagination Works
- The
whileLoop: The script keeps running as long ashas_more_dataisTrue. - The
limitCheck: If the API returns exactly 500 records (theLIMIT_PER_PAGE), the script assumes there is likely another page of data. It updates thecurrent_start_dateto the timestamp of the 500th record and loops again. - The Exit Condition: If the API returns fewer than 500 records (e.g., 120), the script knows it has reached the end of the available logs, saves the final timestamp, and exits the loop.
- Rate Limiting: I added a
time.sleep(1)between requests. This is a "best practice" to ensure your script doesn't trigger a DDoS protection or rate-limiting rule on the Spirion SaaS WAF.
Recommendations for Large Exports
- Memory Management: Notice that the script writes to the file inside the loop batch-by-batch. This is critical. Do not try to store all events in a single Python list before writing, as an export of 100,000 audit events could crash your script's memory.
- Overlap Handling: Some API versions might return the "last record" of Page 1 as the "first record" of Page 2 if you use the exact same timestamp. If you see duplicates in your JSON file, you may need to add a tiny increment (e.g.,
.001s) to thecurrent_start_datebefore the next loop. - Logging: For production automation, add a
try/exceptblock around the file writing process to ensure that if the disk is full, the script stops and alerts you rather than losing data.
Summary
By using a timestamp-based pagination loop, this script can safely handle Audit Logs of any size, from a few dozen events to millions, while maintaining a low memory footprint and respecting API limits.