Skip to content

views.export_view.export_logic

Export logic module.

This module contains the core logic for data export operations, separated from the UI components in export_window.py.

ExportLogic

Core logic for data export operations.

Responsibilities
  1. Identify ID and value columns for export.
  2. Process column names to remove importer-specific suffixes.
  3. Collect metadata for the dataset, including geo metadata if available.
  4. Export the selected columns and metadata as CSV and YAML inside a ZIP archive.
Source code in src/views/export_view/export_logic.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class ExportLogic:
    """
    Core logic for data export operations.

    Responsibilities:
      1. Identify ID and value columns for export.
      2. Process column names to remove importer-specific suffixes.
      3. Collect metadata for the dataset, including geo metadata if available.
      4. Export the selected columns and metadata as CSV and YAML inside a ZIP archive.
    """

    def __init__(self):
        """
        Initializes the export logic.

        - Sets up an internal placeholder for the DataFrame to export.
        """
        self._current_df: Optional[pd.DataFrame] = None

    # ==============================================================================
    #  Data Preparation: Load DataFrame and identify columns
    # ==============================================================================
    def load_data(self, df_stats: pd.DataFrame) -> Tuple[List[str], List[str], List[str], List[str]]:
        """
        Processes the DataFrame to prepare column lists for export.

        Steps:
          1. Store the provided DataFrame as _current_df.
          2. If the DataFrame is None or empty, return empty lists.
          3. Identify ID columns by filtering names ending with "_geodata".
          4. Identify value columns by filtering names ending with "_stats".
          5. Build display names by removing the suffixes from the column names.
          6. Return tuples of display and original names for both ID and value columns.
        """
        self._current_df = df_stats
        if df_stats is None or df_stats.empty:
            return [], [], [], []

        # Identify ID columns by suffix "_geodata"
        id_cols = [c for c in df_stats.columns if c.endswith("_geodata")]
        # Identify value columns by suffix "_stats"
        val_cols = [c for c in df_stats.columns if c.endswith("_stats")]

        # Create display names by stripping the suffixes
        id_display = [c[:-8] for c in id_cols]  # Remove "_geodata"
        val_display = [c[:-6] for c in val_cols]  # Remove "_stats"

        return id_display, id_cols, val_display, val_cols

    # ==============================================================================
    #  Helper: Process column names for final CSV
    # ==============================================================================
    @staticmethod
    def process_column_name(col: str) -> str:
        """
        Removes importer-specific suffixes from a column name.

        - If name ends with "_geodata", strip that suffix.
        - If name ends with "_stats", strip that suffix.
        - Otherwise, return the name unchanged.
        """
        if col.endswith("_geodata"):
            return col[:-8]
        if col.endswith("_stats"):
            return col[:-6]
        return col

    # ==============================================================================
    #  Metadata Collection
    # ==============================================================================
    def get_metadata(self, id_col: str, val_col: str, name: str, description: str, source: str, year: int, data_type: str) -> Dict[str, Any]:
        """
        Collects all metadata required for export.

        Steps:
          1. Build a dictionary with trimmed values for name, description, source, year, and type.
          2. Add processed ID and value column names (without suffixes).
          3. If geo metadata is available in DataStore, include geo data details.
          4. Return the complete metadata dictionary.
        """
        metadata: Dict[str, Any] = {
            "name": name.strip(),
            "description": description.strip(),
            "source": source.strip(),
            "year": year,
            "type": data_type,
            "id_column": self.process_column_name(id_col),
            "value_column": self.process_column_name(val_col),
        }

        # If geographical metadata is present in the DataStore, add it
        if DataStore.geo_meta is not None:
            metadata["geo_data"] = {"type": DataStore.geo_meta.type, "version": DataStore.geo_meta.version, "level": DataStore.geo_meta.level}

        return metadata

    # ==============================================================================
    #  Export Data: Write CSV, YAML, and package into ZIP
    # ==============================================================================
    def export_data(self, id_col: str, val_col: str, metadata: Dict[str, Any], target_path: str) -> None:
        """
        Exports the data to a ZIP file containing a CSV and a YAML file.

        Steps:
          1. Validate that _current_df is set and non-empty; otherwise raise an error.
          2. Create a temporary directory to hold CSV and YAML files.
          3. Prepare the export DataFrame with only the two selected columns.
             - Rename them to "GEO_ID" and "VALUE" for consistency.
          4. Write the DataFrame to a CSV file named data.csv in the temp directory.
          5. Dump the metadata dictionary to a YAML file named metadata.yaml in the temp directory.
          6. Create a ZIP archive at target_path and add both data.csv and metadata.yaml.
        """
        if self._current_df is None or self._current_df.empty:
            raise ValueError("No data available for export")

        # Use a temporary directory to stage CSV and YAML before zipping
        with tempfile.TemporaryDirectory() as tmp:
            tmp_path = Path(tmp)

            # ---------------------------
            #  CSV Export
            # ---------------------------
            # Select only ID and value columns from the DataFrame
            df_export = self._current_df[[id_col, val_col]].copy()
            # Rename columns for final output
            df_export.columns = ["GEO_ID", "VALUE"]
            csv_path = tmp_path / "data.csv"
            # Write CSV without row indices
            df_export.to_csv(csv_path, index=False)

            # ---------------------------
            #  YAML Export
            # ---------------------------
            yaml_path = tmp_path / "metadata.yaml"
            # Dump metadata to YAML using PyYAML
            with open(yaml_path, "w", encoding="utf-8") as fh:
                yaml.dump(metadata, fh, allow_unicode=True, sort_keys=False)

            # ---------------------------
            #  ZIP Packaging
            # ---------------------------
            # Create a ZIP archive and include both files
            with zipfile.ZipFile(target_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
                # Add data.csv under the name "data.csv"
                zf.write(csv_path, arcname="data.csv")
                # Add metadata.yaml under the name "metadata.yaml"
                zf.write(yaml_path, arcname="metadata.yaml")

__init__()

Initializes the export logic.

  • Sets up an internal placeholder for the DataFrame to export.
Source code in src/views/export_view/export_logic.py
40
41
42
43
44
45
46
def __init__(self):
    """
    Initializes the export logic.

    - Sets up an internal placeholder for the DataFrame to export.
    """
    self._current_df: Optional[pd.DataFrame] = None

export_data(id_col, val_col, metadata, target_path)

Exports the data to a ZIP file containing a CSV and a YAML file.

Steps
  1. Validate that _current_df is set and non-empty; otherwise raise an error.
  2. Create a temporary directory to hold CSV and YAML files.
  3. Prepare the export DataFrame with only the two selected columns.
  4. Rename them to "GEO_ID" and "VALUE" for consistency.
  5. Write the DataFrame to a CSV file named data.csv in the temp directory.
  6. Dump the metadata dictionary to a YAML file named metadata.yaml in the temp directory.
  7. Create a ZIP archive at target_path and add both data.csv and metadata.yaml.
Source code in src/views/export_view/export_logic.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def export_data(self, id_col: str, val_col: str, metadata: Dict[str, Any], target_path: str) -> None:
    """
    Exports the data to a ZIP file containing a CSV and a YAML file.

    Steps:
      1. Validate that _current_df is set and non-empty; otherwise raise an error.
      2. Create a temporary directory to hold CSV and YAML files.
      3. Prepare the export DataFrame with only the two selected columns.
         - Rename them to "GEO_ID" and "VALUE" for consistency.
      4. Write the DataFrame to a CSV file named data.csv in the temp directory.
      5. Dump the metadata dictionary to a YAML file named metadata.yaml in the temp directory.
      6. Create a ZIP archive at target_path and add both data.csv and metadata.yaml.
    """
    if self._current_df is None or self._current_df.empty:
        raise ValueError("No data available for export")

    # Use a temporary directory to stage CSV and YAML before zipping
    with tempfile.TemporaryDirectory() as tmp:
        tmp_path = Path(tmp)

        # ---------------------------
        #  CSV Export
        # ---------------------------
        # Select only ID and value columns from the DataFrame
        df_export = self._current_df[[id_col, val_col]].copy()
        # Rename columns for final output
        df_export.columns = ["GEO_ID", "VALUE"]
        csv_path = tmp_path / "data.csv"
        # Write CSV without row indices
        df_export.to_csv(csv_path, index=False)

        # ---------------------------
        #  YAML Export
        # ---------------------------
        yaml_path = tmp_path / "metadata.yaml"
        # Dump metadata to YAML using PyYAML
        with open(yaml_path, "w", encoding="utf-8") as fh:
            yaml.dump(metadata, fh, allow_unicode=True, sort_keys=False)

        # ---------------------------
        #  ZIP Packaging
        # ---------------------------
        # Create a ZIP archive and include both files
        with zipfile.ZipFile(target_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            # Add data.csv under the name "data.csv"
            zf.write(csv_path, arcname="data.csv")
            # Add metadata.yaml under the name "metadata.yaml"
            zf.write(yaml_path, arcname="metadata.yaml")

get_metadata(id_col, val_col, name, description, source, year, data_type)

Collects all metadata required for export.

Steps
  1. Build a dictionary with trimmed values for name, description, source, year, and type.
  2. Add processed ID and value column names (without suffixes).
  3. If geo metadata is available in DataStore, include geo data details.
  4. Return the complete metadata dictionary.
Source code in src/views/export_view/export_logic.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def get_metadata(self, id_col: str, val_col: str, name: str, description: str, source: str, year: int, data_type: str) -> Dict[str, Any]:
    """
    Collects all metadata required for export.

    Steps:
      1. Build a dictionary with trimmed values for name, description, source, year, and type.
      2. Add processed ID and value column names (without suffixes).
      3. If geo metadata is available in DataStore, include geo data details.
      4. Return the complete metadata dictionary.
    """
    metadata: Dict[str, Any] = {
        "name": name.strip(),
        "description": description.strip(),
        "source": source.strip(),
        "year": year,
        "type": data_type,
        "id_column": self.process_column_name(id_col),
        "value_column": self.process_column_name(val_col),
    }

    # If geographical metadata is present in the DataStore, add it
    if DataStore.geo_meta is not None:
        metadata["geo_data"] = {"type": DataStore.geo_meta.type, "version": DataStore.geo_meta.version, "level": DataStore.geo_meta.level}

    return metadata

load_data(df_stats)

Processes the DataFrame to prepare column lists for export.

Steps
  1. Store the provided DataFrame as _current_df.
  2. If the DataFrame is None or empty, return empty lists.
  3. Identify ID columns by filtering names ending with "_geodata".
  4. Identify value columns by filtering names ending with "_stats".
  5. Build display names by removing the suffixes from the column names.
  6. Return tuples of display and original names for both ID and value columns.
Source code in src/views/export_view/export_logic.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load_data(self, df_stats: pd.DataFrame) -> Tuple[List[str], List[str], List[str], List[str]]:
    """
    Processes the DataFrame to prepare column lists for export.

    Steps:
      1. Store the provided DataFrame as _current_df.
      2. If the DataFrame is None or empty, return empty lists.
      3. Identify ID columns by filtering names ending with "_geodata".
      4. Identify value columns by filtering names ending with "_stats".
      5. Build display names by removing the suffixes from the column names.
      6. Return tuples of display and original names for both ID and value columns.
    """
    self._current_df = df_stats
    if df_stats is None or df_stats.empty:
        return [], [], [], []

    # Identify ID columns by suffix "_geodata"
    id_cols = [c for c in df_stats.columns if c.endswith("_geodata")]
    # Identify value columns by suffix "_stats"
    val_cols = [c for c in df_stats.columns if c.endswith("_stats")]

    # Create display names by stripping the suffixes
    id_display = [c[:-8] for c in id_cols]  # Remove "_geodata"
    val_display = [c[:-6] for c in val_cols]  # Remove "_stats"

    return id_display, id_cols, val_display, val_cols

process_column_name(col) staticmethod

Removes importer-specific suffixes from a column name.

  • If name ends with "_geodata", strip that suffix.
  • If name ends with "_stats", strip that suffix.
  • Otherwise, return the name unchanged.
Source code in src/views/export_view/export_logic.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@staticmethod
def process_column_name(col: str) -> str:
    """
    Removes importer-specific suffixes from a column name.

    - If name ends with "_geodata", strip that suffix.
    - If name ends with "_stats", strip that suffix.
    - Otherwise, return the name unchanged.
    """
    if col.endswith("_geodata"):
        return col[:-8]
    if col.endswith("_stats"):
        return col[:-6]
    return col