Index: src/CoordinatorServerList.h |
diff --git a/src/CoordinatorServerList.h b/src/CoordinatorServerList.h |
index 14643c28b5f114d88e4503dd8a37a7774463ae02..441b4c46af743d33dd06a41c310fbfe5ac9339fb 100644 |
--- a/src/CoordinatorServerList.h |
+++ b/src/CoordinatorServerList.h |
@@ -18,20 +18,16 @@ |
#include <condition_variable> |
#include <deque> |
+#include <list> |
#include "MasterRecoveryInfo.pb.h" |
#include "ServerList.pb.h" |
-#include "EntryType.pb.h" |
-#include "ServerCrashInfo.pb.h" |
-#include "ServerInformation.pb.h" |
-#include "ServerListVersion.pb.h" |
-#include "ServerUpdate.pb.h" |
-#include "ServerReplicationUpdate.pb.h" |
- |
#include "AbstractServerList.h" |
+#include "CoordinatorUpdateManager.h" |
#include "ServiceMask.h" |
#include "ServerId.h" |
+#include "ServerTracker.h" |
#include "Tub.h" |
#include "ServerIdRpcWrapper.h" |
@@ -87,6 +83,7 @@ class CoordinatorServerList : public AbstractServerList{ |
Entry(const Entry& other) = default; |
Entry& operator=(const Entry& other) = default; |
void serialize(ProtoBuf::ServerList_Entry* dest) const; |
+ void sync(ExternalStorage* externalStorage); |
bool isMaster() const { |
return (status == ServerStatus::UP) && |
@@ -101,6 +98,15 @@ class CoordinatorServerList : public AbstractServerList{ |
// and are not transmitted to members' ServerLists. |
/** |
+ * A sequence number assigned by CoordinatorUpdateManager whenever |
+ * this entry changes in a way that requires notifying the rest of |
+ * the cluster (e.g., server crashing, recovery completed, etc.). |
+ * It's used to ensure that cluster updates are finished even if |
+ * the coordinator crashes in the middle. |
+ */ |
+ uint64_t updateSequenceNumber; |
+ |
+ /** |
* Stores information about the server for use during recovery. |
* This information is completely opaque to the coordinator during |
* normal operation and is only used in master recovery. Basically, |
@@ -111,12 +117,11 @@ class CoordinatorServerList : public AbstractServerList{ |
ProtoBuf::MasterRecoveryInfo masterRecoveryInfo; |
/** |
- * Indicates whether this server needs to be recovered. |
- * Its value is false by default (when the server it UP). It is set |
- * to true at the beginning of crash recovery for the server, and |
- * reset to false when that recovery completes. |
+ * The CoordinatorServerList version number corresponding to the |
+ * most recent change to this entry that must be propagated to the |
+ * cluster. Set by persistAndPropagate. |
*/ |
- bool needsRecovery; |
+ uint64_t version; |
/** |
* The two fields below, verifiedVersion and updateVersion, |
@@ -130,9 +135,7 @@ class CoordinatorServerList : public AbstractServerList{ |
* list. |
* |
* \b verifiedVersion stores the latest version of the server |
- * list that the server received, applied, and responded to. |
- * In a sense, this stores the version of the server list that |
- * has been "committed" on the server. |
+ * list that the server has acknowledged receiving. |
* |
* == Semantic meaning == |
* Together, these variables help determine the update state |
@@ -146,21 +149,11 @@ class CoordinatorServerList : public AbstractServerList{ |
* are equal to UNINITIALIZED_VERSION. This means that the |
* server has just been added to the server list and has not |
* yet have any updates sent to it yet. |
- * |
- * == Legal modifications == |
- * With these definitions in place, the mapping of a 2-phase |
- * commit to these variables is quite natural: |
- * |
- * Semantic Action -> Literal Action |
- * Start a new update RPC -> Set updateVersion = version of update RPC |
- * RPC failed (rollback) -> Set updateVersion = verifiedVersion |
- * RPC Success (commit) -> Set verfiedVersion = updateVersion |
- * |
*/ |
/** |
- * The latest version of the ServerList that server received, |
- * applied, and ACKed to. See comment block above for more info. |
+ * The latest version of the ServerList that server has acknowledged |
+ * receiving. |
*/ |
uint64_t verifiedVersion; |
@@ -176,16 +169,19 @@ class CoordinatorServerList : public AbstractServerList{ |
~CoordinatorServerList(); |
uint32_t backupCount() const; |
- ServerId enlistServer(ServerId replacesId, ServiceMask serviceMask, |
- const uint32_t readSpeed, const char* serviceLocator); |
+ ServerId enlistServer(ServiceMask serviceMask, const uint32_t readSpeed, |
+ const char* serviceLocator); |
+ void haltUpdater(); |
uint32_t masterCount() const; |
Entry operator[](ServerId serverId) const; |
Entry operator[](size_t index) const; |
+ void recover(uint64_t lastCompletedUpdate); |
void recoveryCompleted(ServerId serverId); |
void serialize(ProtoBuf::ServerList* protobuf, ServiceMask services) const; |
void serverCrashed(ServerId serverId); |
bool setMasterRecoveryInfo(ServerId serverId, |
const ProtoBuf::MasterRecoveryInfo* recoveryInfo); |
+ void startUpdater(); |
PRIVATE: |
/** |
@@ -234,8 +230,8 @@ class CoordinatorServerList : public AbstractServerList{ |
}; |
/** |
- * State of partial scans through the server list to find servers |
- * that require updates. |
+ * This class is used by getWork to record information while scanning all |
+ * of the entries in the server list. |
*/ |
struct ScanMetadata { |
/** |
@@ -255,13 +251,13 @@ class CoordinatorServerList : public AbstractServerList{ |
/** |
* Marks where a scan through the server list to find updates |
* would restart. This is set when the search loop exits and |
- * during the scan, it serves as both a start and stop |
+ * during the scan, it serves as both a start and stop. |
*/ |
size_t searchIndex; |
/** |
* Minimum version of all the entry server list versions that have |
- * been encountered thus far in the scan. |
+ * been encountered since searchIndex was last reset to 0. |
*/ |
uint64_t minVersion; |
@@ -269,112 +265,51 @@ class CoordinatorServerList : public AbstractServerList{ |
* The number of complete scans through the server list by getWork(). |
* Since the server list expands with time, each scan through the |
* server list may represent different amounts of work and thus it's |
- * not necessarily an interesting performance metric. Instead, it's |
- * used for detecting the first iteration through the loop and for |
- * debugging. |
+ * not necessarily an interesting performance metric. It's used |
+ * primarily for debugging. |
*/ |
uint64_t completeScansSinceStart; |
ScanMetadata() : noWorkFoundForEpoch(0), searchIndex(0), |
- minVersion(MAX64), completeScansSinceStart(0) {} |
- |
- /** |
- * Resets the values within to be the same as a newly |
- * constructed ScanMetadata. This is usually invoked |
- * when the updater is started/restarted. |
- */ |
- void reset() { |
- noWorkFoundForEpoch = 0; |
- searchIndex = 0; |
- minVersion = MAX64; |
- completeScansSinceStart = 0; |
- } |
+ minVersion(0), completeScansSinceStart(0) {} |
}; |
/** |
- * Stores the incremental and full Server List protobufs for a |
+ * Stores a ServerList protobuf for the changes that resulted in a |
* particular version of the server list. This is used by the |
* server list to keep track of past server list updates. |
*/ |
- struct ServerListUpdatePair { |
- /// Version of the ServerLists contained |
+ struct ServerListUpdate { |
+ /// Server list version # that corresponds with this update |
uint64_t version; |
- /// Incremental ServerList for this version |
- ProtoBuf::ServerList incremental; |
+ /// Sequence number of the external storage update that corresponds |
+ /// to this update; used to mark the external storage update as |
+ /// "complete" once this update has been fully propagated. |
+ uint64_t sequenceNumber; |
- /// Full ServerList for this version |
- ProtoBuf::ServerList full; |
+ /// Describes changes in the server list from version-1 to version. |
+ ProtoBuf::ServerList incremental; |
- /** |
- * Helps form a singly-linked list through the update deque, |
- * ordered both by version # and deque index. This allows for |
- * lockless iteration/access concurrent with locked writes. |
- * See docs above updates deque for more info. |
- */ |
- ServerListUpdatePair* next; |
- |
- ServerListUpdatePair(ProtoBuf::ServerList* incremental, |
- ProtoBuf::ServerList* full) |
- : version(incremental->version_number()) |
- , incremental(*incremental) |
- , full(*full) |
- , next(NULL) |
+ ServerListUpdate(uint64_t version, uint64_t sequenceNumber) |
+ : version(version) |
+ , sequenceNumber(sequenceNumber) |
+ , incremental() |
{} |
- DISALLOW_COPY_AND_ASSIGN(ServerListUpdatePair); |
- }; |
- |
- /** |
- * Describes the basic work unit that can be assigned to the |
- * updater thread. It provides the serverId and the RANGE of |
- * updates that should be batched up and sent to the server |
- * in one shot. |
- * |
- * There is an implicit contract that comes with every work unit |
- * handed out by the coordinator. Once a work unit is handed out, |
- * it is expected that a call back to workSuccess or workFailed |
- * with the target serverId will eventually occur and until it |
- * does, these conditions hold: |
- * a) The ServerList will not hand out more UpdateUnit's for |
- * the server addressed by targerServer. |
- * b) The range of updates described by firstUpdate to |
- * updateVersionTail are GUARANTEED to remain valid |
- * until a call back to workSuccess/Failed occurs. |
- * c) There are no guarantees about the integrity of updates |
- * outside this range so don't decrement the iterator and |
- * don't iterate past the updateVersionTail. |
- * |
- * The implications of a dropped WorkUnit would mean that part of |
- * the cluster will indefinitely remain out of date and the false |
- * report of a workSuccess would result in server/backup suicide. |
- * The latter case happens because if a server/backup misses an |
- * update, there is no guarantee that the required update protobuf |
- * version would still be around on the coordinator when the server |
- * realizes that it had missed an update. |
- * |
- * A false report of a workFailed however, would result in a transient |
- * bug whereby duplicate updates are sent to the server. This will not |
- * result in suicide so it is safe to invoke workFailed in error cases. |
- */ |
- struct UpdaterWorkUnit { |
- /// To whom to send the update |
- ServerId targetServer; |
- |
- /// Whether to send full or partial update |
- bool sendFullList; |
- |
- /// A pointer to the update deque starting at the first |
- /// update that should be sent to the server. |
- const ServerListUpdatePair* firstUpdate; |
- |
- /// Signifies the end range to be sent to the server. |
- /// Practically, it is used to stop iterating. |
- uint64_t updateVersionTail; |
- |
- UpdaterWorkUnit() |
- : targetServer(), sendFullList(), firstUpdate(), updateVersionTail() |
+ ServerListUpdate(const ServerListUpdate& source) |
+ : version(source.version) |
+ , sequenceNumber(source.sequenceNumber) |
+ , incremental(source.incremental) |
{} |
+ |
+ ServerListUpdate& operator=(const ServerListUpdate& source) |
+ { |
+ version = source.version; |
+ sequenceNumber = source.sequenceNumber; |
+ incremental = source.incremental; |
+ return *this; |
+ } |
}; |
/// Internal Use Only - Does not grab locks |
@@ -383,14 +318,10 @@ class CoordinatorServerList : public AbstractServerList{ |
size_t isize() const; |
/// Functions related to modifying the server list |
- void add(Lock& lock, ServerId serverId, string serviceLocator, |
- ServiceMask serviceMask, uint32_t readSpeed, |
- bool enqueueUpdate = true); |
- void crashed(const Lock& lock, ServerId serverId); |
- uint32_t firstFreeIndex(); |
- ServerId generateUniqueId(Lock& lock); |
+ uint32_t firstFreeIndex(const Lock& lock); |
CoordinatorServerList::Entry* getEntry(ServerId id) const; |
CoordinatorServerList::Entry* getEntry(size_t index) const; |
+ void persistAndPropagate(Lock& lock, Entry* entry, ServerChangeEvent event); |
void recoveryCompleted(Lock& lock, ServerId serverId); |
void serialize(const Lock& lock, ProtoBuf::ServerList* protoBuf) const; |
void serialize(const Lock& lock, ProtoBuf::ServerList* protoBuf, |
@@ -399,24 +330,26 @@ class CoordinatorServerList : public AbstractServerList{ |
/// Functions related to replication groups. |
bool assignReplicationGroup(Lock& lock, uint64_t replicationId, |
const vector<ServerId>* replicationGroupIds); |
- void createReplicationGroup(Lock& lock); |
+ void createReplicationGroups(Lock& lock); |
void removeReplicationGroup(Lock& lock, uint64_t groupId); |
/// Functions related to keeping the cluster up-to-date |
- void pushUpdate(const Lock& lock, uint64_t updateVersion); |
- void haltUpdater(); |
- void startUpdater(); |
+ void pushUpdate(const Lock& lock, Entry* entry); |
void updateLoop(); |
+ void checkUpdates(); |
void sync(); |
bool isClusterUpToDate(const Lock& lock); |
void pruneUpdates(const Lock& lock); |
- bool getWork(UpdaterWorkUnit* wu); |
- void workSuccess(ServerId id) ; |
+ bool getWork(Tub<UpdateServerListRpc>* rpc); |
+ void workSuccess(ServerId id, uint64_t currentVersion); |
void workFailed(ServerId id); |
void waitForWork(); |
+ /// Shared information about the server. |
+ Context* context; |
+ |
/// Slots in the server list. |
std::vector<GenerationNumberEntryPair> serverList; |
@@ -433,37 +366,22 @@ class CoordinatorServerList : public AbstractServerList{ |
*/ |
bool stopUpdater; |
- /// Metadata from previous partial scan through server list to find updates |
- ScanMetadata lastScan; |
- |
/** |
- * Stores add/remove/crashed updates to server list until a |
- * pushUpdate call which will update the version number, enqueue |
- * a copy to the updates list and clear() this entry. |
- * |
- * \a update can contain remove, crash, and add notifications, |
- * but removals/crashes must precede additions in the update to ensure |
- * ordering guarantees about notifications related to servers which |
- * re-enlist. For now, this means calls to remove() and crashed() must |
- * proceed call to add() if they have a common \a update. |
+ * True means the updater is sleeping because there is no work for |
+ * it to do; false means the updater is actively sending RPCs and |
+ * polling for completion. This variable is intended only for testing. |
*/ |
- ProtoBuf::ServerList update; |
+ bool updaterSleeping; |
+ |
+ /// Metadata from previous partial scan through server list to find updates |
+ ScanMetadata lastScan; |
/** |
* Past updates that lead up to the \a version. This does not contain |
- * all the updates created, only the ones needed by the servers |
- * currently in the server list. Older updates are pruned. |
- * |
- * WARNING: Modifications MUST ONLY occur at the ends, but random access |
- * is otherwise allowed. Reason being that ServerListUpdatePairs form an |
- * in order singly-linked list within the deque and according to the C++0x |
- * specs, references are preserved ONLY IF modifications occur at the ends |
- * of the deque. Following this convention allows for lockless traversal |
- * in the middle of the deque concurrent with locked writes at the ends. |
- * Regular iterators are not used because they are invalidated on |
- * every deque modification. |
+ * all the updates created, only the ones that haven't yet been propagated |
+ * to the rest of the cluster. Older updates are pruned. |
*/ |
- std::deque<ServerListUpdatePair> updates; |
+ std::deque<ServerListUpdate> updates; |
/** |
* Triggered when the server list is detected to be out of date or |
@@ -483,13 +401,29 @@ class CoordinatorServerList : public AbstractServerList{ |
Tub<std::thread> updaterThread; |
/** |
+ * RPC objects for updates that are currently outstanding. These are |
+ * dynamically allocated and must be freed manually. |
+ */ |
+ std::list<Tub<UpdateServerListRpc>*> activeRpcs; |
+ |
+ /** |
+ * RPC objects not currently in use (and not constructed); we save them |
+ * here to avoid having to reallocate them later. These are |
+ * dynamically allocated and must be freed manually. |
+ */ |
+ std::list<Tub<UpdateServerListRpc>*> spareRpcs; |
+ |
+ /** |
+ * The highest ServerLister version that is known to have been |
+ * received by all servers in the cluster (i.e. updates for this |
+ * and earlier versions can now be discarded). |
* Indicates the the oldest ServerList version amongst servers |
* that have received updates from us. This value is updated |
* lazily but does guarantee that whatever the value is, no |
* server has a version younger than that. Hence, it is safe |
* to use for pruning updates. |
*/ |
- uint64_t minConfirmedVersion; |
+ uint64_t maxConfirmedVersion; |
/** |
* Number of servers currently being sent updates. This is used |
@@ -498,12 +432,17 @@ class CoordinatorServerList : public AbstractServerList{ |
uint32_t numUpdatingServers; |
/** |
- * The id of the next replication group to be created. The replication |
- * group is a set of backups that store all of the replicas of a segment. |
- * NextReplicationId starts at 1 and is never reused. |
- * Id 0 is reserved for nodes that do not belong to a replication group. |
+ * The number of backups in a replication group. Currently there is |
+ * no way to set this value based on cluster configuration information |
+ * (RAM-555). |
+ */ |
+ uint32_t replicationGroupSize; |
+ |
+ /** |
+ * The highest id of a replication group that has been assigned so far. |
+ * Note: id 0 is never used. |
*/ |
- uint64_t nextReplicationId; |
+ uint64_t maxReplicationId; |
DISALLOW_COPY_AND_ASSIGN(CoordinatorServerList); |
}; |
} // namespace RAMCloud |