private_event_sourcing/
awaiting_dependencies.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use std::collections::BTreeMap;

use hdk::prelude::*;
use private_event_sourcing_integrity::*;

use crate::{
    acknowledgements::query_acknowledgement_entries,
    events_sent_to_recipients::query_events_sent_to_recipients_entries, query_event_histories,
    query_private_event_entries, utils::create_relaxed, validate_private_event_entry, PrivateEvent,
};

pub fn attempt_commit_awaiting_deps_entries<T: PrivateEvent>(
    private_event_entries: &BTreeMap<EntryHashB64, PrivateEventEntry>,
) -> ExternResult<()> {
    let mut entries: Vec<PrivateEventEntry> = query_awaiting_deps_private_event_entries()?;

    entries.sort_by_key(|e1| e1.0.payload.timestamp);

    for private_event_entry in entries {
        let entry_hash = hash_entry(&private_event_entry)?;

        if !private_event_entries.contains_key(&entry_hash.clone().into()) {
            let valid = validate_private_event_entry::<T>(&private_event_entry)?;

            match valid {
                ValidateCallbackResult::Valid => {
                    create_relaxed(EntryTypes::PrivateEvent(private_event_entry))?;
                }
                ValidateCallbackResult::Invalid(reason) => {
                    error!("Invalid awaiting dependencies entry: {reason}");
                }
                ValidateCallbackResult::UnresolvedDependencies(_) => {}
            }
        }
    }

    let private_event_entries = query_private_event_entries(())?;

    let events_sent_to_recipients = query_awaiting_deps_events_sent_to_recipients()?;
    for events_sent_to_recipients in events_sent_to_recipients {
        if private_event_entries.contains_key(&EntryHashB64::from(
            events_sent_to_recipients
                .0
                .payload
                .content
                .event_hash
                .clone(),
        )) {
            create_relaxed(EntryTypes::EventSentToRecipients(events_sent_to_recipients))?;
        }
    }

    let acknowledgements = query_awaiting_deps_acknowledgements()?;

    for acknowledgement in acknowledgements {
        if private_event_entries.contains_key(&EntryHashB64::from(
            acknowledgement.0.payload.content.private_event_hash.clone(),
        )) {
            create_relaxed(EntryTypes::Acknowledgement(acknowledgement))?;
        }
    }

    Ok(())
}

pub fn query_awaiting_deps_private_event_entries() -> ExternResult<Vec<PrivateEventEntry>> {
    let existing_private_event_entries = query_private_event_entries(())?;

    let awaiting_deps = query_awaiting_deps()?;

    let entries: Vec<PrivateEventEntry> = awaiting_deps
        .into_iter()
        .filter_map(|awaiting_deps| match awaiting_deps {
            AwaitingDependencies::Event { event, .. } => Some(event),
            _ => None,
        })
        .filter(|event| {
            let Ok(hash) = hash_entry(event) else {
                return false;
            };
            !existing_private_event_entries.contains_key(&EntryHashB64::from(hash.clone()))
        })
        .collect();

    Ok(entries)
}

pub fn query_awaiting_deps_events_sent_to_recipients() -> ExternResult<Vec<EventSentToRecipients>> {
    let existing_events_sent_to_recipients = query_events_sent_to_recipients_entries(())?;

    let awaiting_deps = query_awaiting_deps()?;

    let events_sent_to_recipients: Vec<EventSentToRecipients> = awaiting_deps
        .into_iter()
        .filter_map(|awaiting_deps| match awaiting_deps {
            AwaitingDependencies::EventsSentToRecipients {
                event_sent_to_recipients,
            } => Some(event_sent_to_recipients),
            _ => None,
        })
        .filter(|events_sent_to_recipients| {
            !existing_events_sent_to_recipients
                .iter()
                .any(|e| e.eq(events_sent_to_recipients))
        })
        .collect();

    Ok(events_sent_to_recipients)
}

pub fn query_awaiting_deps_acknowledgements() -> ExternResult<Vec<Acknowledgement>> {
    let existing_acknowledgements = query_acknowledgement_entries(())?;

    let awaiting_deps = query_awaiting_deps()?;

    let acknowledgements: Vec<Acknowledgement> = awaiting_deps
        .into_iter()
        .filter_map(|awaiting_deps| match awaiting_deps {
            AwaitingDependencies::Acknowledgement { acknowledgement } => Some(acknowledgement),
            _ => None,
        })
        .filter(|acknowledgement| {
            !existing_acknowledgements
                .iter()
                .any(|a| a.eq(acknowledgement))
        })
        .collect();

    Ok(acknowledgements)
}

pub fn query_awaiting_deps() -> ExternResult<Vec<AwaitingDependencies>> {
    let filter = ChainQueryFilter::new()
        .entry_type(UnitEntryTypes::AwaitingDependencies.try_into()?)
        .include_entries(true)
        .action_type(ActionType::Create);
    let create_records = query(filter)?;

    let mut awaiting_dependencies: Vec<AwaitingDependencies> = create_records
        .into_iter()
        .filter_map(|record| {
            let Some(entry) = record.entry.as_option() else {
                return None;
            };
            let Ok(awaiting_deps) = AwaitingDependencies::try_from(entry) else {
                return None;
            };
            Some(awaiting_deps)
        })
        .collect();

    let mut histories = query_event_histories()?;

    for history in &mut histories {
        awaiting_dependencies.append(&mut history.awaiting_deps);
    }

    Ok(awaiting_dependencies)
}