-
Notifications
You must be signed in to change notification settings - Fork 245
/
Copy pathecho_state_machine.hxx
153 lines (124 loc) · 4.58 KB
/
echo_state_machine.hxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/************************************************************************
Copyright 2017-2019 eBay Inc.
Author/Developer(s): Jung-Sang Ahn
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/
#pragma once
#include "nuraft.hxx"
#include <atomic>
#include <cassert>
#include <iostream>
#include <mutex>
using namespace nuraft;
class echo_state_machine : public state_machine {
public:
echo_state_machine()
: last_committed_idx_(0)
{}
~echo_state_machine() {}
ptr<buffer> pre_commit(const ulong log_idx, buffer& data) {
// Extract string from `data.
buffer_serializer bs(data);
std::string str = bs.get_str();
// Just print.
std::cout << "pre_commit " << log_idx << ": "
<< str << std::endl;
return nullptr;
}
ptr<buffer> commit(const ulong log_idx, buffer& data) {
// Extract string from `data.
buffer_serializer bs(data);
std::string str = bs.get_str();
// Just print.
std::cout << "commit " << log_idx << ": "
<< str << std::endl;
// Update last committed index number.
last_committed_idx_ = log_idx;
return nullptr;
}
void commit_config(const ulong log_idx, ptr<cluster_config>& new_conf) {
// Nothing to do with configuration change. Just update committed index.
last_committed_idx_ = log_idx;
}
void rollback(const ulong log_idx, buffer& data) {
// Extract string from `data.
buffer_serializer bs(data);
std::string str = bs.get_str();
// Just print.
std::cout << "rollback " << log_idx << ": "
<< str << std::endl;
}
int read_logical_snp_obj(snapshot& s,
void*& user_snp_ctx,
ulong obj_id,
ptr<buffer>& data_out,
bool& is_last_obj)
{
// Put dummy data.
data_out = buffer::alloc( sizeof(int32) );
buffer_serializer bs(data_out);
bs.put_i32(0);
is_last_obj = true;
return 0;
}
void save_logical_snp_obj(snapshot& s,
ulong& obj_id,
buffer& data,
bool is_first_obj,
bool is_last_obj)
{
std::cout << "save snapshot " << s.get_last_log_idx()
<< " term " << s.get_last_log_term()
<< " object ID " << obj_id << std::endl;
// Request next object.
obj_id++;
}
bool apply_snapshot(snapshot& s) {
std::cout << "apply snapshot " << s.get_last_log_idx()
<< " term " << s.get_last_log_term() << std::endl;
// Clone snapshot from `s`.
{ std::lock_guard<std::mutex> l(last_snapshot_lock_);
ptr<buffer> snp_buf = s.serialize();
last_snapshot_ = snapshot::deserialize(*snp_buf);
}
return true;
}
void free_user_snp_ctx(void*& user_snp_ctx) { }
ptr<snapshot> last_snapshot() {
// Just return the latest snapshot.
std::lock_guard<std::mutex> l(last_snapshot_lock_);
return last_snapshot_;
}
ulong last_commit_index() {
return last_committed_idx_;
}
void create_snapshot(snapshot& s,
async_result<bool>::handler_type& when_done)
{
std::cout << "create snapshot " << s.get_last_log_idx()
<< " term " << s.get_last_log_term() << std::endl;
// Clone snapshot from `s`.
{ std::lock_guard<std::mutex> l(last_snapshot_lock_);
ptr<buffer> snp_buf = s.serialize();
last_snapshot_ = snapshot::deserialize(*snp_buf);
}
ptr<std::exception> except(nullptr);
bool ret = true;
when_done(ret, except);
}
private:
// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx_;
// Last snapshot.
ptr<snapshot> last_snapshot_;
// Mutex for last snapshot.
std::mutex last_snapshot_lock_;
};