Some checks failed
Build Simulation and Test / Run All Tests (push) Failing after 8m17s
Major rewrite.
83 lines
2.8 KiB
Python
83 lines
2.8 KiB
Python
"""CSV formatter for tabular output data."""
|
|
|
|
import csv
|
|
import io
|
|
from typing import Any, List, Dict
|
|
from .base_formatter import BaseFormatter
|
|
|
|
|
|
class CSVFormatter(BaseFormatter):
|
|
"""Formats data as CSV."""
|
|
|
|
def __init__(self, flatten_nested: bool = True):
|
|
self.flatten_nested = flatten_nested
|
|
|
|
def format(self, data: Any) -> str:
|
|
"""Format data as CSV string."""
|
|
if isinstance(data, list):
|
|
return self._format_list(data)
|
|
elif isinstance(data, dict):
|
|
return self._format_dict(data)
|
|
else:
|
|
# Single value
|
|
return str(data)
|
|
|
|
def _format_list(self, data: List[Dict[str, Any]]) -> str:
|
|
"""Format list of dictionaries as CSV."""
|
|
if not data:
|
|
return ""
|
|
|
|
# Flatten nested dictionaries if requested
|
|
processed_data = []
|
|
for item in data:
|
|
if self.flatten_nested:
|
|
processed_data.append(self._flatten_dict(item))
|
|
else:
|
|
processed_data.append(item)
|
|
|
|
# Get all field names from all items
|
|
fieldnames = set()
|
|
for item in processed_data:
|
|
fieldnames.update(item.keys())
|
|
fieldnames = sorted(fieldnames)
|
|
|
|
# Create CSV
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(processed_data)
|
|
|
|
return output.getvalue()
|
|
|
|
def _format_dict(self, data: Dict[str, Any]) -> str:
|
|
"""Format single dictionary as CSV."""
|
|
processed_data = self._flatten_dict(data) if self.flatten_nested else data
|
|
fieldnames = sorted(processed_data.keys())
|
|
|
|
output = io.StringIO()
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerow(processed_data)
|
|
|
|
return output.getvalue()
|
|
|
|
def _flatten_dict(self, data: Dict[str, Any], parent_key: str = '', sep: str = '_') -> Dict[str, Any]:
|
|
"""Flatten nested dictionaries."""
|
|
items = []
|
|
for key, value in data.items():
|
|
new_key = f"{parent_key}{sep}{key}" if parent_key else key
|
|
if isinstance(value, dict):
|
|
items.extend(self._flatten_dict(value, new_key, sep).items())
|
|
elif isinstance(value, list):
|
|
# Convert lists to strings or handle each element
|
|
if value and isinstance(value[0], (int, float, str)):
|
|
items.append((new_key, ','.join(map(str, value))))
|
|
else:
|
|
items.append((new_key, str(value)))
|
|
else:
|
|
items.append((new_key, value))
|
|
return dict(items)
|
|
|
|
def get_file_extension(self) -> str:
|
|
"""Get file extension for CSV format."""
|
|
return "csv" |