Skip to content

Experiment rules

experiment.rules.base

Base

Bases: object

Base class for other rules classes

Source code in experiment/rules/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class Base(object):
    """Base class for other rules classes"""

    contact_email = settings.CONTACT_MAIL
    counted_result_keys = []

    def __init__(self):
        self.question_series = []

    def feedback_info(self):
        feedback_body = render_to_string("feedback/user_feedback.html", {"email": self.contact_email})
        return {
            # Header above the feedback form
            "header": _("Do you have any remarks or questions?"),
            # Button text
            "button": _("Submit"),
            # Body of the feedback form, can be HTML. Shown under the button
            "contact_body": feedback_body,
            # Thank you message after submitting feedback
            "thank_you": _("We appreciate your feedback!"),
            # Show a floating button on the right side of the screen to open the feedback form
            "show_float_button": False,
        }

    def calculate_score(self, result, data):
        """use scoring rule to calculate score
        If not scoring rule is defined, return None
        Override in rules file for other scoring schemes"""
        scoring_rule = SCORING_RULES.get(result.scoring_rule)
        if scoring_rule:
            return scoring_rule(result, data)
        return None

    def get_play_again_url(self, session: Session):
        participant_id_url_param = (
            f"?participant_id={session.participant.participant_id_url}"
            if session.participant.participant_id_url
            else ""
        )
        return f"/{session.block.slug}{participant_id_url_param}"

    def calculate_intermediate_score(self, session, result):
        """process result data during a trial (i.e., between next_round calls)
        return score
        """
        return 0

    def final_score_message(self, session):
        """Create final score message for given session, base on score per result"""

        correct = 0
        total = 0

        for result in session.result_set.all():
            # if a result has score != 0, it was recognized
            if result.score:
                total += 1

                if result.score > 0:
                    # if a result has score > 0, it was identified correctly
                    correct += 1

        score_message = "Well done!" if session.final_score > 0 else "Too bad!"
        message = "You correctly identified {} out of {} recognized songs!".format(correct, total)
        return score_message + " " + message

    def rank(self, session, exclude_unfinished=True):
        """Get rank based on session score"""
        score = session.final_score
        ranks = Final.RANKS

        # Few or negative points or no score, always return lowest plastic score
        if score <= 0 or not score:
            return ranks["PLASTIC"]

        # Buckets for positive scores:
        # rank: starts percentage
        buckets = [
            # ~ stanines 1-3
            {"rank": ranks["BRONZE"], "min_percentile": 0.0},
            # ~ stanines 4-6
            {"rank": ranks["SILVER"], "min_percentile": 25.0},
            # ~ stanine 7
            {"rank": ranks["GOLD"], "min_percentile": 75.0},
            {"rank": ranks["PLATINUM"], "min_percentile": 90.0},  # ~ stanine 8
            {"rank": ranks["DIAMOND"], "min_percentile": 95.0},  # ~ stanine 9
        ]

        percentile = session.percentile_rank(exclude_unfinished)

        # Check the buckets in reverse order
        # If the percentile rank is higher than the min_percentile
        # return the rank
        for bucket in reversed(buckets):
            if percentile >= bucket["min_percentile"]:
                return bucket["rank"]

        # Default return, in case score isn't in the buckets
        return ranks["PLASTIC"]

    def get_single_question(self, session, randomize=False):
        """Get a random question from each question list, in priority completion order.

        Participants will not continue to the next question set until they
        have completed their current one.
        """
        questionnaire = unanswered_questions(
            session.participant, get_questions_from_series(session.block.questionseries_set.all()), randomize
        )
        try:
            question = next(questionnaire)
            return Trial(title=_("Questionnaire"), feedback_form=Form([question], is_skippable=question.is_skippable))
        except StopIteration:
            return None

    def get_open_questions(self, session, randomize=False, cutoff_index=None) -> Union[list, None]:
        """Get a list of trials for questions not yet answered by the user"""

        trials = []
        questions = list(
            unanswered_questions(
                session.participant,
                get_questions_from_series(session.block.questionseries_set.all()),
                randomize,
                cutoff_index,
            )
        )
        open_questions = len(questions)
        if not open_questions:
            return None
        for index, question in enumerate(questions):
            trials.append(
                Trial(
                    title=_("Questionnaire %(index)i / %(total)i") % {"index": index + 1, "total": open_questions},
                    feedback_form=Form([question], is_skippable=question.is_skippable),
                )
            )
        return trials

    def validate_playlist(self, playlist: None):
        errors = []
        # Common validations across blocks
        if not playlist:
            errors.append("The block must have a playlist.")
            return errors

        sections = playlist.section_set.all()

        if not sections:
            errors.append("The block must have at least one section.")

        try:
            playlist.clean_csv()
        except ValidationError as e:
            errors += e.error_list

        return errors

calculate_intermediate_score(session, result)

process result data during a trial (i.e., between next_round calls) return score

Source code in experiment/rules/base.py
59
60
61
62
63
def calculate_intermediate_score(self, session, result):
    """process result data during a trial (i.e., between next_round calls)
    return score
    """
    return 0

calculate_score(result, data)

use scoring rule to calculate score If not scoring rule is defined, return None Override in rules file for other scoring schemes

Source code in experiment/rules/base.py
42
43
44
45
46
47
48
49
def calculate_score(self, result, data):
    """use scoring rule to calculate score
    If not scoring rule is defined, return None
    Override in rules file for other scoring schemes"""
    scoring_rule = SCORING_RULES.get(result.scoring_rule)
    if scoring_rule:
        return scoring_rule(result, data)
    return None

final_score_message(session)

Create final score message for given session, base on score per result

Source code in experiment/rules/base.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def final_score_message(self, session):
    """Create final score message for given session, base on score per result"""

    correct = 0
    total = 0

    for result in session.result_set.all():
        # if a result has score != 0, it was recognized
        if result.score:
            total += 1

            if result.score > 0:
                # if a result has score > 0, it was identified correctly
                correct += 1

    score_message = "Well done!" if session.final_score > 0 else "Too bad!"
    message = "You correctly identified {} out of {} recognized songs!".format(correct, total)
    return score_message + " " + message

get_open_questions(session, randomize=False, cutoff_index=None)

Get a list of trials for questions not yet answered by the user

Source code in experiment/rules/base.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def get_open_questions(self, session, randomize=False, cutoff_index=None) -> Union[list, None]:
    """Get a list of trials for questions not yet answered by the user"""

    trials = []
    questions = list(
        unanswered_questions(
            session.participant,
            get_questions_from_series(session.block.questionseries_set.all()),
            randomize,
            cutoff_index,
        )
    )
    open_questions = len(questions)
    if not open_questions:
        return None
    for index, question in enumerate(questions):
        trials.append(
            Trial(
                title=_("Questionnaire %(index)i / %(total)i") % {"index": index + 1, "total": open_questions},
                feedback_form=Form([question], is_skippable=question.is_skippable),
            )
        )
    return trials

get_single_question(session, randomize=False)

Get a random question from each question list, in priority completion order.

Participants will not continue to the next question set until they have completed their current one.

Source code in experiment/rules/base.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_single_question(self, session, randomize=False):
    """Get a random question from each question list, in priority completion order.

    Participants will not continue to the next question set until they
    have completed their current one.
    """
    questionnaire = unanswered_questions(
        session.participant, get_questions_from_series(session.block.questionseries_set.all()), randomize
    )
    try:
        question = next(questionnaire)
        return Trial(title=_("Questionnaire"), feedback_form=Form([question], is_skippable=question.is_skippable))
    except StopIteration:
        return None

rank(session, exclude_unfinished=True)

Get rank based on session score

Source code in experiment/rules/base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def rank(self, session, exclude_unfinished=True):
    """Get rank based on session score"""
    score = session.final_score
    ranks = Final.RANKS

    # Few or negative points or no score, always return lowest plastic score
    if score <= 0 or not score:
        return ranks["PLASTIC"]

    # Buckets for positive scores:
    # rank: starts percentage
    buckets = [
        # ~ stanines 1-3
        {"rank": ranks["BRONZE"], "min_percentile": 0.0},
        # ~ stanines 4-6
        {"rank": ranks["SILVER"], "min_percentile": 25.0},
        # ~ stanine 7
        {"rank": ranks["GOLD"], "min_percentile": 75.0},
        {"rank": ranks["PLATINUM"], "min_percentile": 90.0},  # ~ stanine 8
        {"rank": ranks["DIAMOND"], "min_percentile": 95.0},  # ~ stanine 9
    ]

    percentile = session.percentile_rank(exclude_unfinished)

    # Check the buckets in reverse order
    # If the percentile rank is higher than the min_percentile
    # return the rank
    for bucket in reversed(buckets):
        if percentile >= bucket["min_percentile"]:
            return bucket["rank"]

    # Default return, in case score isn't in the buckets
    return ranks["PLASTIC"]

experiment.rules.staircasing

register_turnpoint(session, last_result)

register turnpoint: - set comment on previous result to indicate turnpoint - increase final_score (used as counter for turnpoints)

Source code in experiment/rules/util/staircasing.py
1
2
3
4
5
6
7
8
def register_turnpoint(session, last_result):
    """ register turnpoint:
        - set comment on previous result to indicate turnpoint
        - increase final_score (used as counter for turnpoints) """
    last_result.comment += ': turnpoint'
    last_result.save()
    session.final_score += 1
    session.save()

experiment.rules.practice

get_practice_views(session, intro_explainer, first_trial_callback, trial_callback, response_callback, check_previous_condition, difficulty)

Present practice views, in blocks of 2 Give feedback on the correctness of the response, and repeat practice if necessary. - session: session - intro_explainer: explainer object to introduce the experiment - first_trial_callback: function to generate the first trial after practice - trial_callback: function to return the data for a trial - response_callback: function to generate explainer object about correctness of response - check_previous_condition: function to determine the condition of previous practice trial (returns Boolean) - difficulty: difficulty of the current practice trial

Source code in experiment/rules/util/practice.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_practice_views(
    session,
    intro_explainer,
    first_trial_callback,
    trial_callback,
    response_callback,
    check_previous_condition,
    difficulty
):
    ''' Present practice views, in blocks of 2
    Give feedback on the correctness of the response,
    and repeat practice if necessary.
    - session: session
    - intro_explainer: explainer object to introduce the experiment
    - first_trial_callback: function to generate the first trial after practice
    - trial_callback: function to return the data for a trial
    - response_callback: function to generate explainer object about correctness of response
    - check_previous_condition: function to determine the condition of previous practice trial (returns Boolean)
    - difficulty: difficulty of the current practice trial
    '''
    results_count = session.result_set.count()
    trial_condition = get_trial_condition_block(session, 2)
    previous_results = session.result_set.order_by('-created_at')
    if not results_count:
        # first practice trial
        return [intro_explainer, get_practice_explainer(), trial_callback(session, trial_condition, difficulty)]
    last_result = previous_results.first()
    if results_count < 4:
        # practice trial
        correct = last_result.score > 0
        previous_condition = check_previous_condition(last_result)
        response_explainer = response_callback(correct, previous_condition)
        trial = trial_callback(
            session, trial_condition, difficulty)
        return [response_explainer, trial]
    else:
        # after last practice trial
        penultimate_score = previous_results.all()[1].score
        # delete previous practice sessions
        session.result_set.all().delete()
        session.save()
        if last_result.score > 0 and penultimate_score > 0:
            # Practice went successfully, start experiment
            previous_condition = check_previous_condition(last_result)
            response_explainer = response_callback(
                True, previous_condition)
            session.final_score = 1
            # remove any data saved for practice purposes
            session.save_json_data({'block': []})
            session.save()
            trial = first_trial_callback(session, trial_callback)
            return [
                response_explainer,
                start_experiment_explainer(),
                trial
            ]
        else:
            # need more practice, start over
            response_explainer = response_callback(False, check_previous_condition(last_result))
            next_trial = trial_callback(
                session, trial_condition, difficulty)
            return [
                response_explainer,
                practice_again_explainer(),
                intro_explainer,
                get_practice_explainer(),
                next_trial
            ]

get_trial_condition(n_choices)

get randomized trial condition return an integer between 0 and n_choices-2

Source code in experiment/rules/util/practice.py
133
134
135
136
137
138
def get_trial_condition(n_choices):
    """ get randomized trial condition
    return an integer between 0 and n_choices-2
    """
    options = list(range(n_choices))
    return random.choice(options)

get_trial_condition_block(session, n_trials_per_block)

make a list of n_trials_per_blocks conditions, of which one is catch (=1) store updates in the session.json_data field

Source code in experiment/rules/util/practice.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_trial_condition_block(session, n_trials_per_block):
    """ make a list of n_trials_per_blocks conditions, of which one is catch (=1)
    store updates in the session.json_data field
    """
    json_data = session.json_data
    block = json_data.get('block')
    if not block:
        block = [0] * n_trials_per_block
        catch_index = random.randrange(0, n_trials_per_block)
        block[catch_index] = 1
    condition = block.pop()
    session.save_json_data({'block': block})
    session.save()
    return condition