Skip to content

Converter Instruction

twinweaver.instruction.converter_instruction

Classes

ConverterInstruction

Bases: ConverterBase

Orchestrates the conversion of patient data into multi-task instruction-following prompts and target strings, suitable for training language models.

This class acts as a high-level controller, utilizing specialized sub-converters (ConverterForecasting, ConverterEvents, ConverterForecastingQA) to generate prompts and targets for different task types (e.g., value forecasting, event prediction, question answering about forecasting). It then combines these individual task components into a single, structured multi-task instruction prompt and a corresponding multi-task target answer string, using templates defined in the provided Config object. It also handles the reverse conversion of model outputs back into structured data and provides methods for aggregation and comparison.

Customization

Summarized row generation (the compact “latest values” section inserted before the task prompts) can be overridden: * Call set_summarized_row_fn(fn) after instantiation. * fn may have signature: - (self, events_df, combined_meta) OR - (events_df, combined_meta) combined_meta contains (possibly empty) keys: - "dates_per_variable": dict[var_name] -> list/sequence of future (or target) dates - "variable_name_mapping": optional mapping var_name -> descriptive name * If the custom function raises an exception, the converter logs a warning and falls back to the built-in _generate_summarized_row_string.

Source code in twinweaver/instruction/converter_instruction.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
class ConverterInstruction(ConverterBase):
    """
    Orchestrates the conversion of patient data into multi-task instruction-following
    prompts and target strings, suitable for training language models.

    This class acts as a high-level controller, utilizing specialized sub-converters
    (`ConverterForecasting`, `ConverterEvents`, `ConverterForecastingQA`)
    to generate prompts and targets for different task types (e.g., value forecasting,
    event prediction, question answering about forecasting). It then combines these
    individual task components into a single, structured multi-task instruction prompt
    and a corresponding multi-task target answer string, using templates defined in
    the provided `Config` object. It also handles the reverse conversion of model
    outputs back into structured data and provides methods for aggregation and comparison.

    Customization
    -------------
    Summarized row generation (the compact “latest values” section inserted before
    the task prompts) can be overridden:
      * Call set_summarized_row_fn(fn) after instantiation.
      * fn may have signature:
          - (self, events_df, combined_meta)   OR
          - (events_df, combined_meta)
        combined_meta contains (possibly empty) keys:
          - "dates_per_variable": dict[var_name] -> list/sequence of future (or target) dates
          - "variable_name_mapping": optional mapping var_name -> descriptive name
      * If the custom function raises an exception, the converter logs a warning and
        falls back to the built-in _generate_summarized_row_string.

    """

    def __init__(
        self,
        nr_tokens_budget_total: int,
        config: Config,
        dm: DataManager,
        variable_stats: pd.DataFrame = None,
    ) -> None:
        """
        Initializes the ConverterInstruction class.

        Sets up the main converter by initializing base class settings, storing essential
        data descriptions and token budgets, configuring the tokenizer, loading multi-task
        prompt templates from the config, and initializing instances of the specialized
        sub-converters required for different task types.

        Parameters
        ----------
        nr_tokens_budget_total : int
            The total token budget available for the generated context + instruction prompt.
            Used for dynamically selecting history length.
        config : Config
            A configuration object supplying tokenizer details, prompt templates (for tasks
            and context generation), padding values, and other settings.
        dm : DataManager
            The data manager instance providing access to data frames and variable types.
        variable_stats : pd.DataFrame, optional
            DataFrame containing statistics about variables, used by the forecasting QA converter.
        """

        super().__init__(config)

        # Set vars
        self.nr_tokens_budget_total = nr_tokens_budget_total
        self.dm = dm
        self.constant_description = self.dm.data_frames["constant_description"]

        # Set forecasting prompt texts using overrides or config defaults
        self.forecasting_prompt_start = config.forecasting_fval_prompt_start
        self.forecasting_prompt_var_time = config.forecasting_prompt_var_time
        self.forecasting_prompt_summarized_start = config.forecasting_prompt_summarized_start
        self.forecasting_prompt_summarized_genetic = config.forecasting_prompt_summarized_genetic
        self.forecasting_prompt_summarized_lot = config.forecasting_prompt_summarized_lot

        # Initialize tokenizer and related settings
        self.tokenizer = AutoTokenizer.from_pretrained(self.config.tokenizer_to_use)
        self.nr_tokens_budget_padding = self.config.nr_tokens_budget_padding
        self.always_keep_first_visit = self.config.always_keep_first_visit

        # Set multi-task prompt texts using overrides or config defaults
        self.task_prompt_start = config.task_prompt_start
        self.task_prompt_each_task = config.task_prompt_each_task
        self.task_prompt_end = config.task_prompt_end
        self.task_target_start = config.task_target_start
        self.task_target_end = config.task_target_end

        # Initialize converters - can this be done more elegantly?
        self.converter_forecasting = ConverterForecasting(
            self.constant_description, nr_tokens_budget_total, config=config, dm=dm
        )

        self.converter_events = ConverterEvents(
            config,
            self.constant_description,
            nr_tokens_budget_total,
        )

        self.converter_forecasting_qa = ConverterForecastingQA(
            variable_stats=variable_stats,
            constant_description=self.constant_description,
            nr_tokens_budget_total=nr_tokens_budget_total,
            config=config,
            dm=dm,
        )

        # Default summarized row function
        self._generate_summarized_row_str_fn = self._generate_summarized_row_string

    def set_custom_summarized_row_fn(self, fn: Callable[[pd.DataFrame, dict], str]):
        """
        Sets a custom function for generating the summarized row string.
        If this function is not called, the default _generate_summarized_row_string is used.
        Requirements: signature must start with (self, events_df, meta, ...)
        If the function meets these criteria but it is still invalid, it will raise an error later on
        """
        import inspect

        params = list(inspect.signature(fn).parameters)
        if len(params) < 3 or params[0] != "self":  # assumes signature (self, events_df, meta, ...)
            raise TypeError(
                f"Custom summarized row function must accept at least (self, events_df, meta).Got parameters: {params}"
            )
        # Bind it as method
        self._generate_summarized_row_str_fn = fn.__get__(self, type(self))

    def _generate_prompt(self, all_tasks: list) -> tuple:
        """
        Constructs the multi-task instruction prompt string from individual task prompts.

        Takes a list of generated tasks (where each task is a tuple containing its specific
        prompt, target, metadata, and type identifier) and formats them into a single
        instruction prompt using wrapper templates defined in the config (e.g., numbering
        tasks, adding introductory/concluding text).

        Parameters
        ----------
        all_tasks : list[tuple[str, str, dict, str]]
            A list where each element represents a single task. The tuple structure is
            expected to be: (task_prompt_string, task_target_string, task_metadata_dict, task_type_identifier_string).

        Returns
        -------
        str
            The combined multi-task instruction prompt string ready to be prepended
            with context (patient history, etc.).
        """

        ret_str = self.task_prompt_start

        for idx in range(len(all_tasks)):
            task = all_tasks[idx]

            ret_str += self.task_prompt_each_task.format(task_nr=idx + 1)
            ret_str += task[3]
            ret_str += task[0]
            ret_str += "\n\n"

        ret_str += self.task_prompt_end

        return ret_str

    def _generate_target_string(self, all_tasks: list) -> tuple:
        """
        Constructs the multi-task target answer string and aggregates metadata.

        Takes a list of generated tasks and combines their individual target strings
        into a single, formatted multi-task answer string using wrapper templates
        (e.g., numbering answers corresponding to tasks). It also collects the
        metadata dictionaries from each individual task into a list.

        Parameters
        ----------
        all_tasks : list[tuple[str, str, dict, str]]
            A list where each element represents a single task. The tuple structure is
            expected to be: (task_prompt_string, task_target_string, task_metadata_dict, task_type_identifier_string).

        Returns
        -------
        target_str : str
            The combined multi-task target answer string.
        target_meta_list : list[dict]
            A list containing the metadata dictionaries from each
            individual task included in the target string.
        """

        ret_str = ""
        ret_meta = []

        for idx in range(len(all_tasks)):
            task = all_tasks[idx]
            task_type = task[3]
            task_text = task[1]

            ret_str += self.task_target_start.format(task_nr=idx + 1)
            ret_str += task_type
            ret_str += task_text
            ret_str += self.task_target_end
            ret_str += "\n"

            curr_meta = task[2]
            curr_meta["task_type"] = task_type
            ret_meta.append(task[2])

        return ret_str, ret_meta

    def get_nr_tokens(self, curr_string):
        """
        Calculates the number of tokens in a given string using the initialized tokenizer.

        Parameters
        ----------
        curr_string : str
            The string to tokenize.

        Returns
        -------
        int
            The number of tokens generated by the tokenizer for the input string.
        """
        return len(self.tokenizer(curr_string)["input_ids"])

    def forward_conversion(
        self,
        forecasting_splits: list[DataSplitterForecastingGroup],
        event_splits: list[DataSplitterEventsGroup],
        override_mode_to_select_forecasting: str = None,
    ) -> dict:
        """
        Generates a multi-task instruction prompt and target answer from patient data splits.

        This method orchestrates the creation of a training example. It randomly selects
        scenarios (splits) for event prediction and forecasting tasks from the provided lists.
        It then uses the specialized sub-converters to generate prompt/target pairs for these
        tasks (including potentially forecasting QA tasks). These individual tasks are shuffled
        and combined into a multi-task instruction prompt and a multi-task target answer using
        helper methods. Patient history (constant features, time-series events, summarized info)
        is formatted and prepended to the instruction prompt, respecting the total token budget.

        Parameters
        ----------
        forecasting_splits : list[DataSplitterForecastingGroup]
            List of DataSplitterForecastingGroup objects, each representing a potential forecasting task scenario
            (containing patient history up to a split date and future target values).
        event_splits : list[DataSplitterEventsGroup]
            List of DataSplitterEventsGroup objects, each representing a potential event prediction task scenario
            (containing patient history up to a split date and event outcome/censoring info).
        override_mode_to_select_forecasting : str, optional
            If provided, forces the selection mode for forecasting tasks ('forecasting',
            'forecasting_qa', or 'both'). If None, the mode is chosen randomly. Defaults to None.

        Returns
        -------
        dict
            A dictionary containing the final formatted data:
            - 'instruction': The complete input string for the model (context + multi-task prompt).
            - 'answer': The complete target string for the model (multi-task answer).
            - 'meta': A dictionary holding metadata including patient ID, structured constant and
                      history data used, split date, combined metadata from sub-converters, and
                      a list of detailed metadata for each individual task generated ('target_meta_detailed').

        Raises
        ------
        AssertionError
            If input splits have inconsistent Line of Therapy (LOT) dates or split dates,
            or if no tasks are generated.
        """

        #: make assertions that data has same split and lot date
        all_lot_dates_events = [x.lot_date for x in event_splits]
        all_lot_dates_forecasting = [x.lot_date for x in forecasting_splits]
        assert len(set(all_lot_dates_events) | set(all_lot_dates_forecasting)) == 1
        all_split_dates_events = [x.split_date_included_in_input for x in event_splits]
        all_split_dates_forecasting = [x.split_date_included_in_input for x in forecasting_splits]
        assert len(set(all_split_dates_events) | set(all_split_dates_forecasting)) == 1
        # assert that the split date is after or equal to lot date (checking 0, since all same)
        assert len(event_splits) == 0 or all_split_dates_events[0] >= all_lot_dates_events[0]

        #: randomly select the number of events and which splits to use and generate events
        min_nr_events = 1 if len(forecasting_splits) == 0 else 0
        nr_events_to_use = np.random.randint(min_nr_events, len(event_splits) + 1)
        event_splits_to_use = np.random.choice(event_splits, nr_events_to_use, replace=False)

        event_converted = []
        for event_split in event_splits_to_use:
            prompt, target, target_meta = self.converter_events.forward_conversion(event_split)
            event_converted.append((prompt, target, target_meta, self.config.task_prompt_events))

        #: then randomly select the forecasting splits and generate forecasting
        forecasting_tasks = []
        forecasting_qa_tasks = []

        if len(forecasting_splits) > 0:
            if override_mode_to_select_forecasting is None:
                #: randomly select whether to do forecasting, forecasting QA or both
                mode_to_select = np.random.choice(["forecasting", "forecasting_qa", "both"], 1, replace=False)[0]
            else:
                mode_to_select = override_mode_to_select_forecasting

            # Randomly select from the forecasting split
            if mode_to_select == "forecasting" or mode_to_select == "both":
                forecasting_split_to_use = np.random.choice(forecasting_splits, 1, replace=False)[0]
                ret_forecasting = self.converter_forecasting.forward_conversion(forecasting_split_to_use)
                prompt_forecasting, target_forecasting, target_meta_forecasting = ret_forecasting
                forecasting_tasks.append(
                    (
                        prompt_forecasting,
                        target_forecasting,
                        target_meta_forecasting,
                        self.config.task_prompt_forecasting,
                    )
                )

            #: then randomly select the QA splits and generate QA
            if mode_to_select == "forecasting_qa" or mode_to_select == "both":
                forecasting_qa_split_to_use = np.random.choice(forecasting_splits, 1, replace=False)[0]
                ret_forecasting_qa = self.converter_forecasting_qa.forward_conversion(forecasting_qa_split_to_use)
                (
                    prompt_forecasting_qa,
                    target_forecasting_qa,
                    target_meta_forecasting_qa,
                ) = ret_forecasting_qa
                forecasting_qa_tasks.append(
                    (
                        prompt_forecasting_qa,
                        target_forecasting_qa,
                        target_meta_forecasting_qa,
                        self.config.task_prompt_forecasting_qa,
                    )
                )

        #: determine random order of all tasks
        all_tasks = forecasting_tasks + forecasting_qa_tasks + event_converted
        all_tasks = np.random.permutation(all_tasks)
        assert len(all_tasks) > 0, "No tasks generated!"

        #: generate overall prompt
        prompt = self._generate_prompt(all_tasks)

        #: generate overall target
        target_str, target_meta = self._generate_target_string(all_tasks)

        #: aggregate all meta needed and add into target_meta for the summarized row
        combined_meta = {
            "dates_per_variable": {},
            "variable_name_mapping": {},
        }
        for curr_meta in target_meta:
            if "dates_per_variable" in curr_meta:
                combined_meta["dates_per_variable"].update(curr_meta["dates_per_variable"])
            if "variable_name_mapping" in curr_meta:
                combined_meta["variable_name_mapping"].update(curr_meta["variable_name_mapping"])

        # Since the history and constant are the same, we can just use the first
        data_to_use = forecasting_splits[0] if len(forecasting_splits) > 0 else event_splits[0]
        events_until_split = data_to_use.events_until_split
        constant = data_to_use.constant_data
        patientid = data_to_use.constant_data[self.config.patient_id_col]
        split_date_included_in_input = data_to_use.split_date_included_in_input

        #: generate constant string
        constant, constant_description = self._preprocess_constant_date(
            events_until_split, constant, self.constant_description
        )
        constant_string = self._get_constant_string(constant, constant_description)

        #: generate summarized row string
        try:
            summarized_row_string = self._generate_summarized_row_str_fn(events_until_split, combined_meta)
        except Exception as e:
            raise TypeError(f"Custom summarized_row_fn failed: {e}.")

        #: get current budget
        budget_total = self.nr_tokens_budget_total
        budget_total -= self.get_nr_tokens(prompt)
        budget_total -= self.get_nr_tokens(constant_string)
        budget_total -= self.get_nr_tokens(summarized_row_string)
        budget_total -= self.get_nr_tokens(target_str)
        budget_total -= self.nr_tokens_budget_padding

        #: select events within token budget
        patient_history = self._get_all_most_recent_events_within_budget(events_until_split, budget_total)

        #: generate history string
        patient_history_processed = patient_history.copy()
        patient_history_processed = self._preprocess_events(patient_history_processed)
        history_str = self._get_event_string(patient_history_processed, use_accumulative_dates=False)

        #: generate final string
        input_string = self.config.preamble_text + constant_string + history_str + summarized_row_string + prompt

        #: generate return & meta (incl. structured data)
        ret = {
            "instruction": input_string,
            "answer": target_str,
            "meta": {
                self.config.patient_id_col: patientid,
                "constant_data": constant,
                "history_data": patient_history,
                "split_date_included_in_input": split_date_included_in_input,
                "combined_meta": combined_meta,
                "target_meta_detailed": target_meta,
            },
        }
        return ret

    def forward_conversion_inference(
        self,
        forecasting_split: DataSplitterForecastingOption = None,
        forecasting_future_weeks_per_variable: dict = None,
        event_split: DataSplitterEventsOption = None,
        custom_tasks: list = None,
    ) -> dict:
        """
        Generates a multi-task instruction prompt suitable for inference.

        Constructs the input prompt for the model based on provided task scenarios
        (forecasting, event prediction, and/or custom text tasks). It does not generate
        a target 'answer' string. Patient history context (constant, time-series, summarized)
        is generated and prepended, respecting token limits.

        Parameters
        ----------
        forecasting_split : DataSplitterForecastingOption, optional
            Data for a forecasting task scenario. Defaults to None.
        forecasting_future_weeks_per_variable : dict, optional
            Specifies future time points (in weeks) for forecasting specific variables.
            Used by the forecasting sub-converter during inference. Defaults to None.
        event_split : DataSplitterEventsOption, optional
            Data for an event prediction task scenario. Defaults to None.
        custom_tasks : list[str], optional
            A list of strings, each representing a custom task/question to include
            in the prompt. Defaults to None.

        Returns
        -------
        dict
            A dictionary containing:
            - 'instruction': The complete input string for the model (context + multi-task prompt).
            - 'answer': None (as this is for inference).
            - 'meta': Metadata including patient ID, structured data used for context,
                      split date, and metadata associated with the generated tasks.

        Raises
        ------
        AssertionError
            If both `forecasting_split` and `event_split` are None, or if their
            split dates do not match when both are provided.
        """

        #: make assertions that data has same split date
        if forecasting_split is not None and event_split is not None:
            assert forecasting_split.split_date_included_in_input == event_split.split_date_included_in_input, (
                "Split dates do not match!"
            )
        assert forecasting_split is not None or event_split is not None, "No data provided!"

        #: convert events, if needed
        event_prompt_str = event_meta = None
        if event_split is not None:
            event_prompt_str, event_meta = self.converter_events.forward_conversion_inference(event_split)

        #: convert forecasting, if needed
        forecasting_prompt_str = forecasting_meta = None
        if forecasting_split is not None:
            f_ret = self.converter_forecasting.forward_conversion_inference(
                forecasting_split,
                future_weeks_per_variable=forecasting_future_weeks_per_variable,
            )
            forecasting_prompt_str, forecasting_meta = f_ret

        #: generate meta and aggregate
        all_tasks = []
        target_meta = []
        if event_prompt_str is not None:
            all_tasks.append((event_prompt_str, None, event_meta, self.config.task_prompt_events))
            target_meta.append(event_meta)
        if forecasting_prompt_str is not None:
            all_tasks.append(
                (
                    forecasting_prompt_str,
                    None,
                    forecasting_meta,
                    self.config.task_prompt_forecasting,
                )
            )
            target_meta.append(forecasting_meta)

        # Add in custom tasks
        if custom_tasks is not None:
            for custom_task in custom_tasks:
                all_tasks.append((custom_task, None, None, self.config.task_prompt_custom + "\n"))
                target_meta.append({})

        # Generate prompt
        prompt = self._generate_prompt(all_tasks)

        #: generate constant string
        data_to_use = forecasting_split if forecasting_split is not None else event_split
        events_until_split = data_to_use.events_until_split
        constant = data_to_use.constant_data
        patientid = data_to_use.constant_data[self.config.patient_id_col]
        split_date_included_in_input = data_to_use.split_date_included_in_input
        constant, constant_description = self._preprocess_constant_date(
            events_until_split, constant, self.constant_description
        )
        constant_string = self._get_constant_string(constant, constant_description)

        #: generate summarized row string
        combined_meta = {
            "dates_per_variable": {},
            "variable_name_mapping": {},
        }
        for curr_meta in target_meta:
            if "dates_per_variable" in curr_meta:
                combined_meta["dates_per_variable"].update(curr_meta["dates_per_variable"])
            if "variable_name_mapping" in curr_meta:
                combined_meta["variable_name_mapping"].update(curr_meta["variable_name_mapping"])
        try:
            summarized_row_string = self._generate_summarized_row_str_fn(events_until_split, combined_meta)
        except Exception as e:
            raise TypeError(f"Custom summarized_row_fn failed: {e}.")

        #: get current budget
        budget_total = self.nr_tokens_budget_total
        budget_total -= self.get_nr_tokens(prompt)
        budget_total -= self.get_nr_tokens(constant_string)
        budget_total -= self.get_nr_tokens(summarized_row_string)
        budget_total -= self.nr_tokens_budget_padding

        #: select events within token budget
        patient_history = self._get_all_most_recent_events_within_budget(events_until_split, budget_total)

        #: generate history string
        patient_history_processed = patient_history.copy()
        patient_history_processed = self._preprocess_events(patient_history_processed)
        history_str = self._get_event_string(patient_history, use_accumulative_dates=False)

        #: generate final string
        input_string = self.config.preamble_text + constant_string + history_str + summarized_row_string + prompt

        #: generate return & meta (incl. structured data)
        ret = {
            "instruction": input_string,
            "answer": None,
            "meta": {
                self.config.patient_id_col: patientid,
                "constant_data": constant,
                "history_data": patient_history,
                "split_date_included_in_input": split_date_included_in_input,
                "combined_meta": combined_meta,
                "target_meta_detailed": target_meta,
            },
        }
        return ret

    def generate_target_manual(self, reverse_converted, split_date, events_until_split):
        """
        Reconstructs a multi-task target string from previously reverse-converted structured data.

        This method takes a list of dictionaries (where each dictionary represents the
        parsed result of a single task from a model's output) and generates the corresponding
        multi-task target string. It calls the appropriate sub-converter's manual target
        generation method for each task type.

        Parameters
        ----------
        reverse_converted : list[dict]
            A list of dictionaries, each resulting from the `reverse_conversion` method,
            containing keys 'task_type' and 'result' (the structured data).
        split_date : datetime
            The reference split date, needed for reconstructing forecasting targets.
        events_until_split : pd.DataFrame
            Patient's historical event data up to the split date, needed for context
            by some sub-converters' target generation (e.g., forecasting).

        Returns
        -------
        tuple
            - str: The reconstructed multi-task target string.
            - list[dict]: A list containing the metadata generated during the reconstruction
                          of each individual task's target string.
            - list[dict]: The input `reverse_converted` list, potentially updated with
                          'original_text' (the reconstructed target snippet for each task)
                          and 'target_meta'.
        """

        #: go through all reverse converted and generate only the target string
        individual_targets = []

        for idx, reverse_converted_dic in enumerate(reverse_converted):
            if reverse_converted_dic["task_type"] == self.config.task_prompt_events:
                occurred = reverse_converted_dic["result"]["occurred"].iloc[0]
                censored = reverse_converted_dic["result"]["censoring"].iloc[0]
                target_name = reverse_converted_dic["result"]["target_name"].iloc[0]

                ret_prompt, ret_meta = self.converter_events.generate_target_manual(
                    target_name=target_name,
                    event_occurred=occurred,
                    event_censored=censored,
                )
                individual_targets.append((None, ret_prompt, {}, self.config.task_prompt_events))
                reverse_converted[idx]["original_text"] = ret_prompt

            if reverse_converted_dic["task_type"] == self.config.task_prompt_forecasting:
                ret_prompt, ret_meta = self.converter_forecasting.generate_target_manual(
                    split_date_included_in_input=split_date,
                    events_until_split=events_until_split,
                    target_events_after_split=reverse_converted_dic["result"],
                )
                individual_targets.append((None, ret_prompt, {}, self.config.task_prompt_forecasting))
                reverse_converted[idx]["original_text"] = ret_prompt
                reverse_converted[idx]["target_meta"] = ret_meta

            if reverse_converted_dic["task_type"] == self.config.task_prompt_custom:
                individual_targets.append(
                    (
                        None,
                        reverse_converted_dic["result"],
                        {},
                        self.config.task_prompt_custom,
                    )
                )
                reverse_converted[idx]["original_text"] = reverse_converted_dic["result"]

        #: then aggegrate all target strings with own
        target_str, target_meta = self._generate_target_string(individual_targets)

        # Return
        return target_str, target_meta, reverse_converted

    def aggregate_multiple_responses(self, all_responses: list[list[dict]]) -> dict:
        """
        Aggregates multiple model responses for the same multi-task prompt.

        Takes a list of responses, where each response is itself a list of parsed results
        (one dictionary per task from the `reverse_conversion` method). It groups the results
        for each task position across all responses and then calls the appropriate
        sub-converter's aggregation method to determine the most likely result for that task.

        Parameters
        ----------
        all_responses : list[list[dict]]
            A list where each inner list corresponds to one full model response (parsed by
            `reverse_conversion`) to the multi-task prompt. Each dictionary in the inner list
            should contain 'task_type' and 'result'.

        Returns
        -------
        list[dict]
            A list of dictionaries, one for each task position in the original prompt. Each
            dictionary contains:
            - 'task_type': The type of the task at this position.
            - 'result': The aggregated result (e.g., the most common structured data).
            - 'original_text': None (as this aggregates structured results, not text).
            - 'aggregated_meta': Metadata about the aggregation process (e.g., distribution
                               of responses for this task), provided by the sub-converter.

        Raises
        ------
        ValueError
            If `all_responses` is empty or if the inner lists have inconsistent lengths.
        """

        #: first restructure the format to get appropriate responses together
        ret_order = {idx: [] for idx in range(len(all_responses[0]))}

        for response_list in all_responses:
            # This is one response, which is a list since we may have multiple tasks per response
            for idx, response in enumerate(response_list):
                ret_order[idx].append(response)

        #: for each, aggregate by correctly calling the correct converter
        ret_aggregated = []

        for idx in range(max(ret_order.keys()) + 1):
            task_type = ret_order[idx][0]["task_type"]
            results = [x["result"] for x in ret_order[idx]]

            if task_type == self.config.task_prompt_events:
                ret, meta = self.converter_events.aggregate_multiple_responses(results)
            if task_type == self.config.task_prompt_forecasting:
                ret, meta = self.converter_forecasting.aggregate_multiple_responses(results)
            if task_type == self.config.task_prompt_forecasting_qa:
                ret, meta = self.converter_forecasting_qa.aggregate_multiple_responses(results)
            if task_type == self.config.task_prompt_custom:
                # Aggregate by taking the most common answer
                all_responses_count = Counter([x["original_text"] for x in results])
                most_common_response = all_responses_count.most_common(1)[0][0]
                ret = most_common_response
                # Get all responses and their counts as DF
                all_responses_count = pd.DataFrame(all_responses_count.items(), columns=["response", "count"])
                all_responses_count = all_responses_count.sort_values("count", ascending=False).reset_index(drop=True)
                meta = {
                    "all_custom_responses": all_responses_count,
                }

            # Set as appropriate dic
            ret_dic = {
                "task_type": task_type,
                "result": ret,
                "original_text": None,
                "aggregated_meta": meta,
            }
            ret_aggregated.append(ret_dic)

        #: return
        return ret_aggregated

    def reverse_conversion(
        self,
        target_string: str,
        data_manager: DataManager,
        split_date: datetime,
        patientid: str = None,
        inference_override: bool = False,
    ) -> list[dict]:
        """
        Parses a multi-task model output string back into a list of structured results.

        Splits the input string based on task markers (e.g., "Task 1:", "Task 2:").
        For each segment, it identifies the task type using configured prompt identifiers
        and calls the corresponding sub-converter's `reverse_conversion` method to parse
        the text into structured data (e.g., a DataFrame).

        Parameters
        ----------
        target_string : str
            The complete multi-task output string generated by the language model.
        data_manager : DataManager
            The data manager instance, needed by some sub-converters for context
            (e.g., unique event mappings).
        split_date : datetime
            The reference split date, required by some sub-converters (e.g., forecasting)
            to interpret time-related outputs correctly.
        patientid : str, optional
            If provided, this patient ID is added to the resulting structured data
            for each task. Defaults to None.
        inference_override : bool, optional
            If True, allows processing even if a task type cannot be strictly identified
            (e.g., by falling back to heuristics). Primarily for inference robustness
            where model output might be slightly malformed. Defaults to False.

        Returns
        -------
        list[dict]
            A list of dictionaries, one for each task identified and parsed in the
            `target_string`. Each dictionary contains:
            - 'task_type': The identified type of the task.
            - 'original_text': The raw text segment corresponding to this task's answer.
            - 'result': The structured data parsed from 'original_text' by the appropriate
                        sub-converter (e.g., a pandas DataFrame).

        Raises
        ------
        ValueError
            If `inference_override` is False and the type of a task segment cannot be
            determined using the configured prompt identifiers.
        """

        #: split up by task
        all_tasks = target_string.split("Task")
        ret_list = []

        for curr_task in all_tasks:
            # Basic preprocessing
            curr_task = curr_task.strip()
            if len(curr_task) == 0:
                continue

            #: determine which task it is
            task_type = None
            standard_extraction = True

            if self.config.task_prompt_forecasting in curr_task:
                task_type = self.config.task_prompt_forecasting
            elif self.config.task_prompt_forecasting_qa in curr_task:
                task_type = self.config.task_prompt_forecasting_qa
            elif self.config.task_prompt_events in curr_task:
                task_type = self.config.task_prompt_events
            elif "custom" in curr_task:
                task_type = self.config.task_prompt_custom
            else:
                #: Try determining by whether the task contains a "censored"
                if "censored" in curr_task:
                    task_type = self.config.task_prompt_events
                    standard_extraction = False
                elif inference_override is False:
                    raise ValueError("Could not determine task type")

            #: extract the relevant parts
            if standard_extraction:
                curr_task_text = curr_task.split(task_type)[1]
            else:
                curr_task_text = ":".join(curr_task.split(":")[1:])

                # Often we have "[""]" which need to be removed
                curr_task_text = curr_task_text.replace("[", "")
                curr_task_text = curr_task_text.replace("]", "")

            #: pass to correct reverse converter
            if task_type == self.config.task_prompt_forecasting:
                ret = self.converter_forecasting.reverse_conversion(
                    curr_task_text, data_manager.unique_events, split_date
                )
            elif task_type == self.config.task_prompt_forecasting_qa:
                ret = self.converter_forecasting_qa.reverse_conversion(
                    curr_task_text, data_manager.unique_events, split_date
                )
            elif task_type == self.config.task_prompt_events:
                ret = self.converter_events.reverse_conversion(curr_task_text)

            elif task_type == self.config.task_prompt_custom:
                ret = {"original_text": curr_task_text}

            if patientid is not None:
                ret[self.config.patient_id_col] = patientid

            ret_dic = {
                "task_type": task_type,
                "original_text": curr_task_text,
                "result": ret,
            }

            #: make list of results
            ret_list.append(ret_dic)

        #: return list of results
        return ret_list

    def get_difference_in_event_dataframes(
        self,
        list_of_original_conversions_meta: list[dict],
        list_of_reversed_conversions: list[dict],
    ) -> list[pd.DataFrame]:
        """
        Compares original structured data with reverse-converted data for each task in a multi-task scenario.

        Iterates through the tasks, matching the original metadata (which contains the
        ground truth structured data, e.g., in 'target_data_processed') with the result
        of the reverse conversion for that task. It then calls the appropriate sub-converter's
        difference calculation method.

        Parameters
        ----------
        list_of_original_conversions_meta : list[dict]
            The list of metadata dictionaries generated during the *forward* conversion,
            where each dictionary corresponds to a task and should contain the original
            structured target data (e.g., under 'target_data_processed'). Usually obtained
            from the 'target_meta_detailed' key in the forward conversion output meta.
        list_of_reversed_conversions : list[dict]
            The list of dictionaries resulting from the `reverse_conversion` method, where
            each dictionary contains the parsed structured data ('result') for a task.

        Returns
        -------
        list[pd.DataFrame]
            A list of pandas DataFrames. Each DataFrame highlights the differences found
            for the corresponding task between the original data and the reverse-converted data.
            An empty DataFrame indicates no differences were found for that task.

        Raises
        ------
        AssertionError
            If the number of original metadata items does not match the number of reversed
            conversion results.
        ValueError
             If a task type mismatch is detected or if expected data keys are missing.
        """

        # Check and setup data
        # list_of_original_conversions_meta = original_conversion["meta"]["target_meta_detailed"]

        assert len(list_of_original_conversions_meta) == len(list_of_reversed_conversions)
        ret_diffs = []

        #: go through each task and compare using the correct converter
        for idx in range(len(list_of_original_conversions_meta)):
            original = list_of_original_conversions_meta[idx]
            reversed = list_of_reversed_conversions[idx]

            if reversed["task_type"] == self.config.task_prompt_forecasting:
                ret_diff = self.converter_forecasting.get_difference_in_event_dataframes(
                    reversed["result"], original["target_data_processed"]
                )
            elif original["task_type"] == self.config.task_prompt_forecasting_qa:
                ret_diff = self.converter_forecasting_qa.get_difference_in_event_dataframes(
                    reversed["result"], original["target_data_processed"]
                )
            elif original["task_type"] == self.config.task_prompt_events:
                ret_diff = self.converter_events.get_difference_in_event_dataframes(
                    reversed["result"], original["target_data_processed"]
                )

            ret_diffs.append(ret_diff)

        #: return list of differences
        return ret_diffs
Functions
__init__
__init__(
    nr_tokens_budget_total, config, dm, variable_stats=None
)

Initializes the ConverterInstruction class.

Sets up the main converter by initializing base class settings, storing essential data descriptions and token budgets, configuring the tokenizer, loading multi-task prompt templates from the config, and initializing instances of the specialized sub-converters required for different task types.

Parameters:

Name Type Description Default
nr_tokens_budget_total int

The total token budget available for the generated context + instruction prompt. Used for dynamically selecting history length.

required
config Config

A configuration object supplying tokenizer details, prompt templates (for tasks and context generation), padding values, and other settings.

required
dm DataManager

The data manager instance providing access to data frames and variable types.

required
variable_stats DataFrame

DataFrame containing statistics about variables, used by the forecasting QA converter.

None
Source code in twinweaver/instruction/converter_instruction.py
def __init__(
    self,
    nr_tokens_budget_total: int,
    config: Config,
    dm: DataManager,
    variable_stats: pd.DataFrame = None,
) -> None:
    """
    Initializes the ConverterInstruction class.

    Sets up the main converter by initializing base class settings, storing essential
    data descriptions and token budgets, configuring the tokenizer, loading multi-task
    prompt templates from the config, and initializing instances of the specialized
    sub-converters required for different task types.

    Parameters
    ----------
    nr_tokens_budget_total : int
        The total token budget available for the generated context + instruction prompt.
        Used for dynamically selecting history length.
    config : Config
        A configuration object supplying tokenizer details, prompt templates (for tasks
        and context generation), padding values, and other settings.
    dm : DataManager
        The data manager instance providing access to data frames and variable types.
    variable_stats : pd.DataFrame, optional
        DataFrame containing statistics about variables, used by the forecasting QA converter.
    """

    super().__init__(config)

    # Set vars
    self.nr_tokens_budget_total = nr_tokens_budget_total
    self.dm = dm
    self.constant_description = self.dm.data_frames["constant_description"]

    # Set forecasting prompt texts using overrides or config defaults
    self.forecasting_prompt_start = config.forecasting_fval_prompt_start
    self.forecasting_prompt_var_time = config.forecasting_prompt_var_time
    self.forecasting_prompt_summarized_start = config.forecasting_prompt_summarized_start
    self.forecasting_prompt_summarized_genetic = config.forecasting_prompt_summarized_genetic
    self.forecasting_prompt_summarized_lot = config.forecasting_prompt_summarized_lot

    # Initialize tokenizer and related settings
    self.tokenizer = AutoTokenizer.from_pretrained(self.config.tokenizer_to_use)
    self.nr_tokens_budget_padding = self.config.nr_tokens_budget_padding
    self.always_keep_first_visit = self.config.always_keep_first_visit

    # Set multi-task prompt texts using overrides or config defaults
    self.task_prompt_start = config.task_prompt_start
    self.task_prompt_each_task = config.task_prompt_each_task
    self.task_prompt_end = config.task_prompt_end
    self.task_target_start = config.task_target_start
    self.task_target_end = config.task_target_end

    # Initialize converters - can this be done more elegantly?
    self.converter_forecasting = ConverterForecasting(
        self.constant_description, nr_tokens_budget_total, config=config, dm=dm
    )

    self.converter_events = ConverterEvents(
        config,
        self.constant_description,
        nr_tokens_budget_total,
    )

    self.converter_forecasting_qa = ConverterForecastingQA(
        variable_stats=variable_stats,
        constant_description=self.constant_description,
        nr_tokens_budget_total=nr_tokens_budget_total,
        config=config,
        dm=dm,
    )

    # Default summarized row function
    self._generate_summarized_row_str_fn = self._generate_summarized_row_string
aggregate_multiple_responses
aggregate_multiple_responses(all_responses)

Aggregates multiple model responses for the same multi-task prompt.

Takes a list of responses, where each response is itself a list of parsed results (one dictionary per task from the reverse_conversion method). It groups the results for each task position across all responses and then calls the appropriate sub-converter's aggregation method to determine the most likely result for that task.

Parameters:

Name Type Description Default
all_responses list[list[dict]]

A list where each inner list corresponds to one full model response (parsed by reverse_conversion) to the multi-task prompt. Each dictionary in the inner list should contain 'task_type' and 'result'.

required

Returns:

Type Description
list[dict]

A list of dictionaries, one for each task position in the original prompt. Each dictionary contains: - 'task_type': The type of the task at this position. - 'result': The aggregated result (e.g., the most common structured data). - 'original_text': None (as this aggregates structured results, not text). - 'aggregated_meta': Metadata about the aggregation process (e.g., distribution of responses for this task), provided by the sub-converter.

Raises:

Type Description
ValueError

If all_responses is empty or if the inner lists have inconsistent lengths.

Source code in twinweaver/instruction/converter_instruction.py
def aggregate_multiple_responses(self, all_responses: list[list[dict]]) -> dict:
    """
    Aggregates multiple model responses for the same multi-task prompt.

    Takes a list of responses, where each response is itself a list of parsed results
    (one dictionary per task from the `reverse_conversion` method). It groups the results
    for each task position across all responses and then calls the appropriate
    sub-converter's aggregation method to determine the most likely result for that task.

    Parameters
    ----------
    all_responses : list[list[dict]]
        A list where each inner list corresponds to one full model response (parsed by
        `reverse_conversion`) to the multi-task prompt. Each dictionary in the inner list
        should contain 'task_type' and 'result'.

    Returns
    -------
    list[dict]
        A list of dictionaries, one for each task position in the original prompt. Each
        dictionary contains:
        - 'task_type': The type of the task at this position.
        - 'result': The aggregated result (e.g., the most common structured data).
        - 'original_text': None (as this aggregates structured results, not text).
        - 'aggregated_meta': Metadata about the aggregation process (e.g., distribution
                           of responses for this task), provided by the sub-converter.

    Raises
    ------
    ValueError
        If `all_responses` is empty or if the inner lists have inconsistent lengths.
    """

    #: first restructure the format to get appropriate responses together
    ret_order = {idx: [] for idx in range(len(all_responses[0]))}

    for response_list in all_responses:
        # This is one response, which is a list since we may have multiple tasks per response
        for idx, response in enumerate(response_list):
            ret_order[idx].append(response)

    #: for each, aggregate by correctly calling the correct converter
    ret_aggregated = []

    for idx in range(max(ret_order.keys()) + 1):
        task_type = ret_order[idx][0]["task_type"]
        results = [x["result"] for x in ret_order[idx]]

        if task_type == self.config.task_prompt_events:
            ret, meta = self.converter_events.aggregate_multiple_responses(results)
        if task_type == self.config.task_prompt_forecasting:
            ret, meta = self.converter_forecasting.aggregate_multiple_responses(results)
        if task_type == self.config.task_prompt_forecasting_qa:
            ret, meta = self.converter_forecasting_qa.aggregate_multiple_responses(results)
        if task_type == self.config.task_prompt_custom:
            # Aggregate by taking the most common answer
            all_responses_count = Counter([x["original_text"] for x in results])
            most_common_response = all_responses_count.most_common(1)[0][0]
            ret = most_common_response
            # Get all responses and their counts as DF
            all_responses_count = pd.DataFrame(all_responses_count.items(), columns=["response", "count"])
            all_responses_count = all_responses_count.sort_values("count", ascending=False).reset_index(drop=True)
            meta = {
                "all_custom_responses": all_responses_count,
            }

        # Set as appropriate dic
        ret_dic = {
            "task_type": task_type,
            "result": ret,
            "original_text": None,
            "aggregated_meta": meta,
        }
        ret_aggregated.append(ret_dic)

    #: return
    return ret_aggregated
forward_conversion
forward_conversion(
    forecasting_splits,
    event_splits,
    override_mode_to_select_forecasting=None,
)

Generates a multi-task instruction prompt and target answer from patient data splits.

This method orchestrates the creation of a training example. It randomly selects scenarios (splits) for event prediction and forecasting tasks from the provided lists. It then uses the specialized sub-converters to generate prompt/target pairs for these tasks (including potentially forecasting QA tasks). These individual tasks are shuffled and combined into a multi-task instruction prompt and a multi-task target answer using helper methods. Patient history (constant features, time-series events, summarized info) is formatted and prepended to the instruction prompt, respecting the total token budget.

Parameters:

Name Type Description Default
forecasting_splits list[DataSplitterForecastingGroup]

List of DataSplitterForecastingGroup objects, each representing a potential forecasting task scenario (containing patient history up to a split date and future target values).

required
event_splits list[DataSplitterEventsGroup]

List of DataSplitterEventsGroup objects, each representing a potential event prediction task scenario (containing patient history up to a split date and event outcome/censoring info).

required
override_mode_to_select_forecasting str

If provided, forces the selection mode for forecasting tasks ('forecasting', 'forecasting_qa', or 'both'). If None, the mode is chosen randomly. Defaults to None.

None

Returns:

Type Description
dict

A dictionary containing the final formatted data: - 'instruction': The complete input string for the model (context + multi-task prompt). - 'answer': The complete target string for the model (multi-task answer). - 'meta': A dictionary holding metadata including patient ID, structured constant and history data used, split date, combined metadata from sub-converters, and a list of detailed metadata for each individual task generated ('target_meta_detailed').

Raises:

Type Description
AssertionError

If input splits have inconsistent Line of Therapy (LOT) dates or split dates, or if no tasks are generated.

Source code in twinweaver/instruction/converter_instruction.py
def forward_conversion(
    self,
    forecasting_splits: list[DataSplitterForecastingGroup],
    event_splits: list[DataSplitterEventsGroup],
    override_mode_to_select_forecasting: str = None,
) -> dict:
    """
    Generates a multi-task instruction prompt and target answer from patient data splits.

    This method orchestrates the creation of a training example. It randomly selects
    scenarios (splits) for event prediction and forecasting tasks from the provided lists.
    It then uses the specialized sub-converters to generate prompt/target pairs for these
    tasks (including potentially forecasting QA tasks). These individual tasks are shuffled
    and combined into a multi-task instruction prompt and a multi-task target answer using
    helper methods. Patient history (constant features, time-series events, summarized info)
    is formatted and prepended to the instruction prompt, respecting the total token budget.

    Parameters
    ----------
    forecasting_splits : list[DataSplitterForecastingGroup]
        List of DataSplitterForecastingGroup objects, each representing a potential forecasting task scenario
        (containing patient history up to a split date and future target values).
    event_splits : list[DataSplitterEventsGroup]
        List of DataSplitterEventsGroup objects, each representing a potential event prediction task scenario
        (containing patient history up to a split date and event outcome/censoring info).
    override_mode_to_select_forecasting : str, optional
        If provided, forces the selection mode for forecasting tasks ('forecasting',
        'forecasting_qa', or 'both'). If None, the mode is chosen randomly. Defaults to None.

    Returns
    -------
    dict
        A dictionary containing the final formatted data:
        - 'instruction': The complete input string for the model (context + multi-task prompt).
        - 'answer': The complete target string for the model (multi-task answer).
        - 'meta': A dictionary holding metadata including patient ID, structured constant and
                  history data used, split date, combined metadata from sub-converters, and
                  a list of detailed metadata for each individual task generated ('target_meta_detailed').

    Raises
    ------
    AssertionError
        If input splits have inconsistent Line of Therapy (LOT) dates or split dates,
        or if no tasks are generated.
    """

    #: make assertions that data has same split and lot date
    all_lot_dates_events = [x.lot_date for x in event_splits]
    all_lot_dates_forecasting = [x.lot_date for x in forecasting_splits]
    assert len(set(all_lot_dates_events) | set(all_lot_dates_forecasting)) == 1
    all_split_dates_events = [x.split_date_included_in_input for x in event_splits]
    all_split_dates_forecasting = [x.split_date_included_in_input for x in forecasting_splits]
    assert len(set(all_split_dates_events) | set(all_split_dates_forecasting)) == 1
    # assert that the split date is after or equal to lot date (checking 0, since all same)
    assert len(event_splits) == 0 or all_split_dates_events[0] >= all_lot_dates_events[0]

    #: randomly select the number of events and which splits to use and generate events
    min_nr_events = 1 if len(forecasting_splits) == 0 else 0
    nr_events_to_use = np.random.randint(min_nr_events, len(event_splits) + 1)
    event_splits_to_use = np.random.choice(event_splits, nr_events_to_use, replace=False)

    event_converted = []
    for event_split in event_splits_to_use:
        prompt, target, target_meta = self.converter_events.forward_conversion(event_split)
        event_converted.append((prompt, target, target_meta, self.config.task_prompt_events))

    #: then randomly select the forecasting splits and generate forecasting
    forecasting_tasks = []
    forecasting_qa_tasks = []

    if len(forecasting_splits) > 0:
        if override_mode_to_select_forecasting is None:
            #: randomly select whether to do forecasting, forecasting QA or both
            mode_to_select = np.random.choice(["forecasting", "forecasting_qa", "both"], 1, replace=False)[0]
        else:
            mode_to_select = override_mode_to_select_forecasting

        # Randomly select from the forecasting split
        if mode_to_select == "forecasting" or mode_to_select == "both":
            forecasting_split_to_use = np.random.choice(forecasting_splits, 1, replace=False)[0]
            ret_forecasting = self.converter_forecasting.forward_conversion(forecasting_split_to_use)
            prompt_forecasting, target_forecasting, target_meta_forecasting = ret_forecasting
            forecasting_tasks.append(
                (
                    prompt_forecasting,
                    target_forecasting,
                    target_meta_forecasting,
                    self.config.task_prompt_forecasting,
                )
            )

        #: then randomly select the QA splits and generate QA
        if mode_to_select == "forecasting_qa" or mode_to_select == "both":
            forecasting_qa_split_to_use = np.random.choice(forecasting_splits, 1, replace=False)[0]
            ret_forecasting_qa = self.converter_forecasting_qa.forward_conversion(forecasting_qa_split_to_use)
            (
                prompt_forecasting_qa,
                target_forecasting_qa,
                target_meta_forecasting_qa,
            ) = ret_forecasting_qa
            forecasting_qa_tasks.append(
                (
                    prompt_forecasting_qa,
                    target_forecasting_qa,
                    target_meta_forecasting_qa,
                    self.config.task_prompt_forecasting_qa,
                )
            )

    #: determine random order of all tasks
    all_tasks = forecasting_tasks + forecasting_qa_tasks + event_converted
    all_tasks = np.random.permutation(all_tasks)
    assert len(all_tasks) > 0, "No tasks generated!"

    #: generate overall prompt
    prompt = self._generate_prompt(all_tasks)

    #: generate overall target
    target_str, target_meta = self._generate_target_string(all_tasks)

    #: aggregate all meta needed and add into target_meta for the summarized row
    combined_meta = {
        "dates_per_variable": {},
        "variable_name_mapping": {},
    }
    for curr_meta in target_meta:
        if "dates_per_variable" in curr_meta:
            combined_meta["dates_per_variable"].update(curr_meta["dates_per_variable"])
        if "variable_name_mapping" in curr_meta:
            combined_meta["variable_name_mapping"].update(curr_meta["variable_name_mapping"])

    # Since the history and constant are the same, we can just use the first
    data_to_use = forecasting_splits[0] if len(forecasting_splits) > 0 else event_splits[0]
    events_until_split = data_to_use.events_until_split
    constant = data_to_use.constant_data
    patientid = data_to_use.constant_data[self.config.patient_id_col]
    split_date_included_in_input = data_to_use.split_date_included_in_input

    #: generate constant string
    constant, constant_description = self._preprocess_constant_date(
        events_until_split, constant, self.constant_description
    )
    constant_string = self._get_constant_string(constant, constant_description)

    #: generate summarized row string
    try:
        summarized_row_string = self._generate_summarized_row_str_fn(events_until_split, combined_meta)
    except Exception as e:
        raise TypeError(f"Custom summarized_row_fn failed: {e}.")

    #: get current budget
    budget_total = self.nr_tokens_budget_total
    budget_total -= self.get_nr_tokens(prompt)
    budget_total -= self.get_nr_tokens(constant_string)
    budget_total -= self.get_nr_tokens(summarized_row_string)
    budget_total -= self.get_nr_tokens(target_str)
    budget_total -= self.nr_tokens_budget_padding

    #: select events within token budget
    patient_history = self._get_all_most_recent_events_within_budget(events_until_split, budget_total)

    #: generate history string
    patient_history_processed = patient_history.copy()
    patient_history_processed = self._preprocess_events(patient_history_processed)
    history_str = self._get_event_string(patient_history_processed, use_accumulative_dates=False)

    #: generate final string
    input_string = self.config.preamble_text + constant_string + history_str + summarized_row_string + prompt

    #: generate return & meta (incl. structured data)
    ret = {
        "instruction": input_string,
        "answer": target_str,
        "meta": {
            self.config.patient_id_col: patientid,
            "constant_data": constant,
            "history_data": patient_history,
            "split_date_included_in_input": split_date_included_in_input,
            "combined_meta": combined_meta,
            "target_meta_detailed": target_meta,
        },
    }
    return ret
forward_conversion_inference
forward_conversion_inference(
    forecasting_split=None,
    forecasting_future_weeks_per_variable=None,
    event_split=None,
    custom_tasks=None,
)

Generates a multi-task instruction prompt suitable for inference.

Constructs the input prompt for the model based on provided task scenarios (forecasting, event prediction, and/or custom text tasks). It does not generate a target 'answer' string. Patient history context (constant, time-series, summarized) is generated and prepended, respecting token limits.

Parameters:

Name Type Description Default
forecasting_split DataSplitterForecastingOption

Data for a forecasting task scenario. Defaults to None.

None
forecasting_future_weeks_per_variable dict

Specifies future time points (in weeks) for forecasting specific variables. Used by the forecasting sub-converter during inference. Defaults to None.

None
event_split DataSplitterEventsOption

Data for an event prediction task scenario. Defaults to None.

None
custom_tasks list[str]

A list of strings, each representing a custom task/question to include in the prompt. Defaults to None.

None

Returns:

Type Description
dict

A dictionary containing: - 'instruction': The complete input string for the model (context + multi-task prompt). - 'answer': None (as this is for inference). - 'meta': Metadata including patient ID, structured data used for context, split date, and metadata associated with the generated tasks.

Raises:

Type Description
AssertionError

If both forecasting_split and event_split are None, or if their split dates do not match when both are provided.

Source code in twinweaver/instruction/converter_instruction.py
def forward_conversion_inference(
    self,
    forecasting_split: DataSplitterForecastingOption = None,
    forecasting_future_weeks_per_variable: dict = None,
    event_split: DataSplitterEventsOption = None,
    custom_tasks: list = None,
) -> dict:
    """
    Generates a multi-task instruction prompt suitable for inference.

    Constructs the input prompt for the model based on provided task scenarios
    (forecasting, event prediction, and/or custom text tasks). It does not generate
    a target 'answer' string. Patient history context (constant, time-series, summarized)
    is generated and prepended, respecting token limits.

    Parameters
    ----------
    forecasting_split : DataSplitterForecastingOption, optional
        Data for a forecasting task scenario. Defaults to None.
    forecasting_future_weeks_per_variable : dict, optional
        Specifies future time points (in weeks) for forecasting specific variables.
        Used by the forecasting sub-converter during inference. Defaults to None.
    event_split : DataSplitterEventsOption, optional
        Data for an event prediction task scenario. Defaults to None.
    custom_tasks : list[str], optional
        A list of strings, each representing a custom task/question to include
        in the prompt. Defaults to None.

    Returns
    -------
    dict
        A dictionary containing:
        - 'instruction': The complete input string for the model (context + multi-task prompt).
        - 'answer': None (as this is for inference).
        - 'meta': Metadata including patient ID, structured data used for context,
                  split date, and metadata associated with the generated tasks.

    Raises
    ------
    AssertionError
        If both `forecasting_split` and `event_split` are None, or if their
        split dates do not match when both are provided.
    """

    #: make assertions that data has same split date
    if forecasting_split is not None and event_split is not None:
        assert forecasting_split.split_date_included_in_input == event_split.split_date_included_in_input, (
            "Split dates do not match!"
        )
    assert forecasting_split is not None or event_split is not None, "No data provided!"

    #: convert events, if needed
    event_prompt_str = event_meta = None
    if event_split is not None:
        event_prompt_str, event_meta = self.converter_events.forward_conversion_inference(event_split)

    #: convert forecasting, if needed
    forecasting_prompt_str = forecasting_meta = None
    if forecasting_split is not None:
        f_ret = self.converter_forecasting.forward_conversion_inference(
            forecasting_split,
            future_weeks_per_variable=forecasting_future_weeks_per_variable,
        )
        forecasting_prompt_str, forecasting_meta = f_ret

    #: generate meta and aggregate
    all_tasks = []
    target_meta = []
    if event_prompt_str is not None:
        all_tasks.append((event_prompt_str, None, event_meta, self.config.task_prompt_events))
        target_meta.append(event_meta)
    if forecasting_prompt_str is not None:
        all_tasks.append(
            (
                forecasting_prompt_str,
                None,
                forecasting_meta,
                self.config.task_prompt_forecasting,
            )
        )
        target_meta.append(forecasting_meta)

    # Add in custom tasks
    if custom_tasks is not None:
        for custom_task in custom_tasks:
            all_tasks.append((custom_task, None, None, self.config.task_prompt_custom + "\n"))
            target_meta.append({})

    # Generate prompt
    prompt = self._generate_prompt(all_tasks)

    #: generate constant string
    data_to_use = forecasting_split if forecasting_split is not None else event_split
    events_until_split = data_to_use.events_until_split
    constant = data_to_use.constant_data
    patientid = data_to_use.constant_data[self.config.patient_id_col]
    split_date_included_in_input = data_to_use.split_date_included_in_input
    constant, constant_description = self._preprocess_constant_date(
        events_until_split, constant, self.constant_description
    )
    constant_string = self._get_constant_string(constant, constant_description)

    #: generate summarized row string
    combined_meta = {
        "dates_per_variable": {},
        "variable_name_mapping": {},
    }
    for curr_meta in target_meta:
        if "dates_per_variable" in curr_meta:
            combined_meta["dates_per_variable"].update(curr_meta["dates_per_variable"])
        if "variable_name_mapping" in curr_meta:
            combined_meta["variable_name_mapping"].update(curr_meta["variable_name_mapping"])
    try:
        summarized_row_string = self._generate_summarized_row_str_fn(events_until_split, combined_meta)
    except Exception as e:
        raise TypeError(f"Custom summarized_row_fn failed: {e}.")

    #: get current budget
    budget_total = self.nr_tokens_budget_total
    budget_total -= self.get_nr_tokens(prompt)
    budget_total -= self.get_nr_tokens(constant_string)
    budget_total -= self.get_nr_tokens(summarized_row_string)
    budget_total -= self.nr_tokens_budget_padding

    #: select events within token budget
    patient_history = self._get_all_most_recent_events_within_budget(events_until_split, budget_total)

    #: generate history string
    patient_history_processed = patient_history.copy()
    patient_history_processed = self._preprocess_events(patient_history_processed)
    history_str = self._get_event_string(patient_history, use_accumulative_dates=False)

    #: generate final string
    input_string = self.config.preamble_text + constant_string + history_str + summarized_row_string + prompt

    #: generate return & meta (incl. structured data)
    ret = {
        "instruction": input_string,
        "answer": None,
        "meta": {
            self.config.patient_id_col: patientid,
            "constant_data": constant,
            "history_data": patient_history,
            "split_date_included_in_input": split_date_included_in_input,
            "combined_meta": combined_meta,
            "target_meta_detailed": target_meta,
        },
    }
    return ret
generate_target_manual
generate_target_manual(
    reverse_converted, split_date, events_until_split
)

Reconstructs a multi-task target string from previously reverse-converted structured data.

This method takes a list of dictionaries (where each dictionary represents the parsed result of a single task from a model's output) and generates the corresponding multi-task target string. It calls the appropriate sub-converter's manual target generation method for each task type.

Parameters:

Name Type Description Default
reverse_converted list[dict]

A list of dictionaries, each resulting from the reverse_conversion method, containing keys 'task_type' and 'result' (the structured data).

required
split_date datetime

The reference split date, needed for reconstructing forecasting targets.

required
events_until_split DataFrame

Patient's historical event data up to the split date, needed for context by some sub-converters' target generation (e.g., forecasting).

required

Returns:

Type Description
tuple
  • str: The reconstructed multi-task target string.
  • list[dict]: A list containing the metadata generated during the reconstruction of each individual task's target string.
  • list[dict]: The input reverse_converted list, potentially updated with 'original_text' (the reconstructed target snippet for each task) and 'target_meta'.
Source code in twinweaver/instruction/converter_instruction.py
def generate_target_manual(self, reverse_converted, split_date, events_until_split):
    """
    Reconstructs a multi-task target string from previously reverse-converted structured data.

    This method takes a list of dictionaries (where each dictionary represents the
    parsed result of a single task from a model's output) and generates the corresponding
    multi-task target string. It calls the appropriate sub-converter's manual target
    generation method for each task type.

    Parameters
    ----------
    reverse_converted : list[dict]
        A list of dictionaries, each resulting from the `reverse_conversion` method,
        containing keys 'task_type' and 'result' (the structured data).
    split_date : datetime
        The reference split date, needed for reconstructing forecasting targets.
    events_until_split : pd.DataFrame
        Patient's historical event data up to the split date, needed for context
        by some sub-converters' target generation (e.g., forecasting).

    Returns
    -------
    tuple
        - str: The reconstructed multi-task target string.
        - list[dict]: A list containing the metadata generated during the reconstruction
                      of each individual task's target string.
        - list[dict]: The input `reverse_converted` list, potentially updated with
                      'original_text' (the reconstructed target snippet for each task)
                      and 'target_meta'.
    """

    #: go through all reverse converted and generate only the target string
    individual_targets = []

    for idx, reverse_converted_dic in enumerate(reverse_converted):
        if reverse_converted_dic["task_type"] == self.config.task_prompt_events:
            occurred = reverse_converted_dic["result"]["occurred"].iloc[0]
            censored = reverse_converted_dic["result"]["censoring"].iloc[0]
            target_name = reverse_converted_dic["result"]["target_name"].iloc[0]

            ret_prompt, ret_meta = self.converter_events.generate_target_manual(
                target_name=target_name,
                event_occurred=occurred,
                event_censored=censored,
            )
            individual_targets.append((None, ret_prompt, {}, self.config.task_prompt_events))
            reverse_converted[idx]["original_text"] = ret_prompt

        if reverse_converted_dic["task_type"] == self.config.task_prompt_forecasting:
            ret_prompt, ret_meta = self.converter_forecasting.generate_target_manual(
                split_date_included_in_input=split_date,
                events_until_split=events_until_split,
                target_events_after_split=reverse_converted_dic["result"],
            )
            individual_targets.append((None, ret_prompt, {}, self.config.task_prompt_forecasting))
            reverse_converted[idx]["original_text"] = ret_prompt
            reverse_converted[idx]["target_meta"] = ret_meta

        if reverse_converted_dic["task_type"] == self.config.task_prompt_custom:
            individual_targets.append(
                (
                    None,
                    reverse_converted_dic["result"],
                    {},
                    self.config.task_prompt_custom,
                )
            )
            reverse_converted[idx]["original_text"] = reverse_converted_dic["result"]

    #: then aggegrate all target strings with own
    target_str, target_meta = self._generate_target_string(individual_targets)

    # Return
    return target_str, target_meta, reverse_converted
get_difference_in_event_dataframes
get_difference_in_event_dataframes(
    list_of_original_conversions_meta,
    list_of_reversed_conversions,
)

Compares original structured data with reverse-converted data for each task in a multi-task scenario.

Iterates through the tasks, matching the original metadata (which contains the ground truth structured data, e.g., in 'target_data_processed') with the result of the reverse conversion for that task. It then calls the appropriate sub-converter's difference calculation method.

Parameters:

Name Type Description Default
list_of_original_conversions_meta list[dict]

The list of metadata dictionaries generated during the forward conversion, where each dictionary corresponds to a task and should contain the original structured target data (e.g., under 'target_data_processed'). Usually obtained from the 'target_meta_detailed' key in the forward conversion output meta.

required
list_of_reversed_conversions list[dict]

The list of dictionaries resulting from the reverse_conversion method, where each dictionary contains the parsed structured data ('result') for a task.

required

Returns:

Type Description
list[DataFrame]

A list of pandas DataFrames. Each DataFrame highlights the differences found for the corresponding task between the original data and the reverse-converted data. An empty DataFrame indicates no differences were found for that task.

Raises:

Type Description
AssertionError

If the number of original metadata items does not match the number of reversed conversion results.

ValueError

If a task type mismatch is detected or if expected data keys are missing.

Source code in twinweaver/instruction/converter_instruction.py
def get_difference_in_event_dataframes(
    self,
    list_of_original_conversions_meta: list[dict],
    list_of_reversed_conversions: list[dict],
) -> list[pd.DataFrame]:
    """
    Compares original structured data with reverse-converted data for each task in a multi-task scenario.

    Iterates through the tasks, matching the original metadata (which contains the
    ground truth structured data, e.g., in 'target_data_processed') with the result
    of the reverse conversion for that task. It then calls the appropriate sub-converter's
    difference calculation method.

    Parameters
    ----------
    list_of_original_conversions_meta : list[dict]
        The list of metadata dictionaries generated during the *forward* conversion,
        where each dictionary corresponds to a task and should contain the original
        structured target data (e.g., under 'target_data_processed'). Usually obtained
        from the 'target_meta_detailed' key in the forward conversion output meta.
    list_of_reversed_conversions : list[dict]
        The list of dictionaries resulting from the `reverse_conversion` method, where
        each dictionary contains the parsed structured data ('result') for a task.

    Returns
    -------
    list[pd.DataFrame]
        A list of pandas DataFrames. Each DataFrame highlights the differences found
        for the corresponding task between the original data and the reverse-converted data.
        An empty DataFrame indicates no differences were found for that task.

    Raises
    ------
    AssertionError
        If the number of original metadata items does not match the number of reversed
        conversion results.
    ValueError
         If a task type mismatch is detected or if expected data keys are missing.
    """

    # Check and setup data
    # list_of_original_conversions_meta = original_conversion["meta"]["target_meta_detailed"]

    assert len(list_of_original_conversions_meta) == len(list_of_reversed_conversions)
    ret_diffs = []

    #: go through each task and compare using the correct converter
    for idx in range(len(list_of_original_conversions_meta)):
        original = list_of_original_conversions_meta[idx]
        reversed = list_of_reversed_conversions[idx]

        if reversed["task_type"] == self.config.task_prompt_forecasting:
            ret_diff = self.converter_forecasting.get_difference_in_event_dataframes(
                reversed["result"], original["target_data_processed"]
            )
        elif original["task_type"] == self.config.task_prompt_forecasting_qa:
            ret_diff = self.converter_forecasting_qa.get_difference_in_event_dataframes(
                reversed["result"], original["target_data_processed"]
            )
        elif original["task_type"] == self.config.task_prompt_events:
            ret_diff = self.converter_events.get_difference_in_event_dataframes(
                reversed["result"], original["target_data_processed"]
            )

        ret_diffs.append(ret_diff)

    #: return list of differences
    return ret_diffs
get_nr_tokens
get_nr_tokens(curr_string)

Calculates the number of tokens in a given string using the initialized tokenizer.

Parameters:

Name Type Description Default
curr_string str

The string to tokenize.

required

Returns:

Type Description
int

The number of tokens generated by the tokenizer for the input string.

Source code in twinweaver/instruction/converter_instruction.py
def get_nr_tokens(self, curr_string):
    """
    Calculates the number of tokens in a given string using the initialized tokenizer.

    Parameters
    ----------
    curr_string : str
        The string to tokenize.

    Returns
    -------
    int
        The number of tokens generated by the tokenizer for the input string.
    """
    return len(self.tokenizer(curr_string)["input_ids"])
reverse_conversion
reverse_conversion(
    target_string,
    data_manager,
    split_date,
    patientid=None,
    inference_override=False,
)

Parses a multi-task model output string back into a list of structured results.

Splits the input string based on task markers (e.g., "Task 1:", "Task 2:"). For each segment, it identifies the task type using configured prompt identifiers and calls the corresponding sub-converter's reverse_conversion method to parse the text into structured data (e.g., a DataFrame).

Parameters:

Name Type Description Default
target_string str

The complete multi-task output string generated by the language model.

required
data_manager DataManager

The data manager instance, needed by some sub-converters for context (e.g., unique event mappings).

required
split_date datetime

The reference split date, required by some sub-converters (e.g., forecasting) to interpret time-related outputs correctly.

required
patientid str

If provided, this patient ID is added to the resulting structured data for each task. Defaults to None.

None
inference_override bool

If True, allows processing even if a task type cannot be strictly identified (e.g., by falling back to heuristics). Primarily for inference robustness where model output might be slightly malformed. Defaults to False.

False

Returns:

Type Description
list[dict]

A list of dictionaries, one for each task identified and parsed in the target_string. Each dictionary contains: - 'task_type': The identified type of the task. - 'original_text': The raw text segment corresponding to this task's answer. - 'result': The structured data parsed from 'original_text' by the appropriate sub-converter (e.g., a pandas DataFrame).

Raises:

Type Description
ValueError

If inference_override is False and the type of a task segment cannot be determined using the configured prompt identifiers.

Source code in twinweaver/instruction/converter_instruction.py
def reverse_conversion(
    self,
    target_string: str,
    data_manager: DataManager,
    split_date: datetime,
    patientid: str = None,
    inference_override: bool = False,
) -> list[dict]:
    """
    Parses a multi-task model output string back into a list of structured results.

    Splits the input string based on task markers (e.g., "Task 1:", "Task 2:").
    For each segment, it identifies the task type using configured prompt identifiers
    and calls the corresponding sub-converter's `reverse_conversion` method to parse
    the text into structured data (e.g., a DataFrame).

    Parameters
    ----------
    target_string : str
        The complete multi-task output string generated by the language model.
    data_manager : DataManager
        The data manager instance, needed by some sub-converters for context
        (e.g., unique event mappings).
    split_date : datetime
        The reference split date, required by some sub-converters (e.g., forecasting)
        to interpret time-related outputs correctly.
    patientid : str, optional
        If provided, this patient ID is added to the resulting structured data
        for each task. Defaults to None.
    inference_override : bool, optional
        If True, allows processing even if a task type cannot be strictly identified
        (e.g., by falling back to heuristics). Primarily for inference robustness
        where model output might be slightly malformed. Defaults to False.

    Returns
    -------
    list[dict]
        A list of dictionaries, one for each task identified and parsed in the
        `target_string`. Each dictionary contains:
        - 'task_type': The identified type of the task.
        - 'original_text': The raw text segment corresponding to this task's answer.
        - 'result': The structured data parsed from 'original_text' by the appropriate
                    sub-converter (e.g., a pandas DataFrame).

    Raises
    ------
    ValueError
        If `inference_override` is False and the type of a task segment cannot be
        determined using the configured prompt identifiers.
    """

    #: split up by task
    all_tasks = target_string.split("Task")
    ret_list = []

    for curr_task in all_tasks:
        # Basic preprocessing
        curr_task = curr_task.strip()
        if len(curr_task) == 0:
            continue

        #: determine which task it is
        task_type = None
        standard_extraction = True

        if self.config.task_prompt_forecasting in curr_task:
            task_type = self.config.task_prompt_forecasting
        elif self.config.task_prompt_forecasting_qa in curr_task:
            task_type = self.config.task_prompt_forecasting_qa
        elif self.config.task_prompt_events in curr_task:
            task_type = self.config.task_prompt_events
        elif "custom" in curr_task:
            task_type = self.config.task_prompt_custom
        else:
            #: Try determining by whether the task contains a "censored"
            if "censored" in curr_task:
                task_type = self.config.task_prompt_events
                standard_extraction = False
            elif inference_override is False:
                raise ValueError("Could not determine task type")

        #: extract the relevant parts
        if standard_extraction:
            curr_task_text = curr_task.split(task_type)[1]
        else:
            curr_task_text = ":".join(curr_task.split(":")[1:])

            # Often we have "[""]" which need to be removed
            curr_task_text = curr_task_text.replace("[", "")
            curr_task_text = curr_task_text.replace("]", "")

        #: pass to correct reverse converter
        if task_type == self.config.task_prompt_forecasting:
            ret = self.converter_forecasting.reverse_conversion(
                curr_task_text, data_manager.unique_events, split_date
            )
        elif task_type == self.config.task_prompt_forecasting_qa:
            ret = self.converter_forecasting_qa.reverse_conversion(
                curr_task_text, data_manager.unique_events, split_date
            )
        elif task_type == self.config.task_prompt_events:
            ret = self.converter_events.reverse_conversion(curr_task_text)

        elif task_type == self.config.task_prompt_custom:
            ret = {"original_text": curr_task_text}

        if patientid is not None:
            ret[self.config.patient_id_col] = patientid

        ret_dic = {
            "task_type": task_type,
            "original_text": curr_task_text,
            "result": ret,
        }

        #: make list of results
        ret_list.append(ret_dic)

    #: return list of results
    return ret_list
set_custom_summarized_row_fn
set_custom_summarized_row_fn(fn)

Sets a custom function for generating the summarized row string. If this function is not called, the default _generate_summarized_row_string is used. Requirements: signature must start with (self, events_df, meta, ...) If the function meets these criteria but it is still invalid, it will raise an error later on

Source code in twinweaver/instruction/converter_instruction.py
def set_custom_summarized_row_fn(self, fn: Callable[[pd.DataFrame, dict], str]):
    """
    Sets a custom function for generating the summarized row string.
    If this function is not called, the default _generate_summarized_row_string is used.
    Requirements: signature must start with (self, events_df, meta, ...)
    If the function meets these criteria but it is still invalid, it will raise an error later on
    """
    import inspect

    params = list(inspect.signature(fn).parameters)
    if len(params) < 3 or params[0] != "self":  # assumes signature (self, events_df, meta, ...)
        raise TypeError(
            f"Custom summarized row function must accept at least (self, events_df, meta).Got parameters: {params}"
        )
    # Bind it as method
    self._generate_summarized_row_str_fn = fn.__get__(self, type(self))