Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1)

Unified Diff: src/CoordinatorServerList.h

Issue 311001: Precommit (Closed)
Patch Set: Created 11 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « src/CoordinatorMain.cc ('k') | src/CoordinatorServerList.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: src/CoordinatorServerList.h
diff --git a/src/CoordinatorServerList.h b/src/CoordinatorServerList.h
index 14643c28b5f114d88e4503dd8a37a7774463ae02..441b4c46af743d33dd06a41c310fbfe5ac9339fb 100644
--- a/src/CoordinatorServerList.h
+++ b/src/CoordinatorServerList.h
@@ -18,20 +18,16 @@
#include <condition_variable>
#include <deque>
+#include <list>
#include "MasterRecoveryInfo.pb.h"
#include "ServerList.pb.h"
-#include "EntryType.pb.h"
-#include "ServerCrashInfo.pb.h"
-#include "ServerInformation.pb.h"
-#include "ServerListVersion.pb.h"
-#include "ServerUpdate.pb.h"
-#include "ServerReplicationUpdate.pb.h"
-
#include "AbstractServerList.h"
+#include "CoordinatorUpdateManager.h"
#include "ServiceMask.h"
#include "ServerId.h"
+#include "ServerTracker.h"
#include "Tub.h"
#include "ServerIdRpcWrapper.h"
@@ -87,6 +83,7 @@ class CoordinatorServerList : public AbstractServerList{
Entry(const Entry& other) = default;
Entry& operator=(const Entry& other) = default;
void serialize(ProtoBuf::ServerList_Entry* dest) const;
+ void sync(ExternalStorage* externalStorage);
bool isMaster() const {
return (status == ServerStatus::UP) &&
@@ -101,6 +98,15 @@ class CoordinatorServerList : public AbstractServerList{
// and are not transmitted to members' ServerLists.
/**
+ * A sequence number assigned by CoordinatorUpdateManager whenever
+ * this entry changes in a way that requires notifying the rest of
+ * the cluster (e.g., server crashing, recovery completed, etc.).
+ * It's used to ensure that cluster updates are finished even if
+ * the coordinator crashes in the middle.
+ */
+ uint64_t updateSequenceNumber;
+
+ /**
* Stores information about the server for use during recovery.
* This information is completely opaque to the coordinator during
* normal operation and is only used in master recovery. Basically,
@@ -111,12 +117,11 @@ class CoordinatorServerList : public AbstractServerList{
ProtoBuf::MasterRecoveryInfo masterRecoveryInfo;
/**
- * Indicates whether this server needs to be recovered.
- * Its value is false by default (when the server it UP). It is set
- * to true at the beginning of crash recovery for the server, and
- * reset to false when that recovery completes.
+ * The CoordinatorServerList version number corresponding to the
+ * most recent change to this entry that must be propagated to the
+ * cluster. Set by persistAndPropagate.
*/
- bool needsRecovery;
+ uint64_t version;
/**
* The two fields below, verifiedVersion and updateVersion,
@@ -130,9 +135,7 @@ class CoordinatorServerList : public AbstractServerList{
* list.
*
* \b verifiedVersion stores the latest version of the server
- * list that the server received, applied, and responded to.
- * In a sense, this stores the version of the server list that
- * has been "committed" on the server.
+ * list that the server has acknowledged receiving.
*
* == Semantic meaning ==
* Together, these variables help determine the update state
@@ -146,21 +149,11 @@ class CoordinatorServerList : public AbstractServerList{
* are equal to UNINITIALIZED_VERSION. This means that the
* server has just been added to the server list and has not
* yet have any updates sent to it yet.
- *
- * == Legal modifications ==
- * With these definitions in place, the mapping of a 2-phase
- * commit to these variables is quite natural:
- *
- * Semantic Action -> Literal Action
- * Start a new update RPC -> Set updateVersion = version of update RPC
- * RPC failed (rollback) -> Set updateVersion = verifiedVersion
- * RPC Success (commit) -> Set verfiedVersion = updateVersion
- *
*/
/**
- * The latest version of the ServerList that server received,
- * applied, and ACKed to. See comment block above for more info.
+ * The latest version of the ServerList that server has acknowledged
+ * receiving.
*/
uint64_t verifiedVersion;
@@ -176,16 +169,19 @@ class CoordinatorServerList : public AbstractServerList{
~CoordinatorServerList();
uint32_t backupCount() const;
- ServerId enlistServer(ServerId replacesId, ServiceMask serviceMask,
- const uint32_t readSpeed, const char* serviceLocator);
+ ServerId enlistServer(ServiceMask serviceMask, const uint32_t readSpeed,
+ const char* serviceLocator);
+ void haltUpdater();
uint32_t masterCount() const;
Entry operator[](ServerId serverId) const;
Entry operator[](size_t index) const;
+ void recover(uint64_t lastCompletedUpdate);
void recoveryCompleted(ServerId serverId);
void serialize(ProtoBuf::ServerList* protobuf, ServiceMask services) const;
void serverCrashed(ServerId serverId);
bool setMasterRecoveryInfo(ServerId serverId,
const ProtoBuf::MasterRecoveryInfo* recoveryInfo);
+ void startUpdater();
PRIVATE:
/**
@@ -234,8 +230,8 @@ class CoordinatorServerList : public AbstractServerList{
};
/**
- * State of partial scans through the server list to find servers
- * that require updates.
+ * This class is used by getWork to record information while scanning all
+ * of the entries in the server list.
*/
struct ScanMetadata {
/**
@@ -255,13 +251,13 @@ class CoordinatorServerList : public AbstractServerList{
/**
* Marks where a scan through the server list to find updates
* would restart. This is set when the search loop exits and
- * during the scan, it serves as both a start and stop
+ * during the scan, it serves as both a start and stop.
*/
size_t searchIndex;
/**
* Minimum version of all the entry server list versions that have
- * been encountered thus far in the scan.
+ * been encountered since searchIndex was last reset to 0.
*/
uint64_t minVersion;
@@ -269,112 +265,51 @@ class CoordinatorServerList : public AbstractServerList{
* The number of complete scans through the server list by getWork().
* Since the server list expands with time, each scan through the
* server list may represent different amounts of work and thus it's
- * not necessarily an interesting performance metric. Instead, it's
- * used for detecting the first iteration through the loop and for
- * debugging.
+ * not necessarily an interesting performance metric. It's used
+ * primarily for debugging.
*/
uint64_t completeScansSinceStart;
ScanMetadata() : noWorkFoundForEpoch(0), searchIndex(0),
- minVersion(MAX64), completeScansSinceStart(0) {}
-
- /**
- * Resets the values within to be the same as a newly
- * constructed ScanMetadata. This is usually invoked
- * when the updater is started/restarted.
- */
- void reset() {
- noWorkFoundForEpoch = 0;
- searchIndex = 0;
- minVersion = MAX64;
- completeScansSinceStart = 0;
- }
+ minVersion(0), completeScansSinceStart(0) {}
};
/**
- * Stores the incremental and full Server List protobufs for a
+ * Stores a ServerList protobuf for the changes that resulted in a
* particular version of the server list. This is used by the
* server list to keep track of past server list updates.
*/
- struct ServerListUpdatePair {
- /// Version of the ServerLists contained
+ struct ServerListUpdate {
+ /// Server list version # that corresponds with this update
uint64_t version;
- /// Incremental ServerList for this version
- ProtoBuf::ServerList incremental;
+ /// Sequence number of the external storage update that corresponds
+ /// to this update; used to mark the external storage update as
+ /// "complete" once this update has been fully propagated.
+ uint64_t sequenceNumber;
- /// Full ServerList for this version
- ProtoBuf::ServerList full;
+ /// Describes changes in the server list from version-1 to version.
+ ProtoBuf::ServerList incremental;
- /**
- * Helps form a singly-linked list through the update deque,
- * ordered both by version # and deque index. This allows for
- * lockless iteration/access concurrent with locked writes.
- * See docs above updates deque for more info.
- */
- ServerListUpdatePair* next;
-
- ServerListUpdatePair(ProtoBuf::ServerList* incremental,
- ProtoBuf::ServerList* full)
- : version(incremental->version_number())
- , incremental(*incremental)
- , full(*full)
- , next(NULL)
+ ServerListUpdate(uint64_t version, uint64_t sequenceNumber)
+ : version(version)
+ , sequenceNumber(sequenceNumber)
+ , incremental()
{}
- DISALLOW_COPY_AND_ASSIGN(ServerListUpdatePair);
- };
-
- /**
- * Describes the basic work unit that can be assigned to the
- * updater thread. It provides the serverId and the RANGE of
- * updates that should be batched up and sent to the server
- * in one shot.
- *
- * There is an implicit contract that comes with every work unit
- * handed out by the coordinator. Once a work unit is handed out,
- * it is expected that a call back to workSuccess or workFailed
- * with the target serverId will eventually occur and until it
- * does, these conditions hold:
- * a) The ServerList will not hand out more UpdateUnit's for
- * the server addressed by targerServer.
- * b) The range of updates described by firstUpdate to
- * updateVersionTail are GUARANTEED to remain valid
- * until a call back to workSuccess/Failed occurs.
- * c) There are no guarantees about the integrity of updates
- * outside this range so don't decrement the iterator and
- * don't iterate past the updateVersionTail.
- *
- * The implications of a dropped WorkUnit would mean that part of
- * the cluster will indefinitely remain out of date and the false
- * report of a workSuccess would result in server/backup suicide.
- * The latter case happens because if a server/backup misses an
- * update, there is no guarantee that the required update protobuf
- * version would still be around on the coordinator when the server
- * realizes that it had missed an update.
- *
- * A false report of a workFailed however, would result in a transient
- * bug whereby duplicate updates are sent to the server. This will not
- * result in suicide so it is safe to invoke workFailed in error cases.
- */
- struct UpdaterWorkUnit {
- /// To whom to send the update
- ServerId targetServer;
-
- /// Whether to send full or partial update
- bool sendFullList;
-
- /// A pointer to the update deque starting at the first
- /// update that should be sent to the server.
- const ServerListUpdatePair* firstUpdate;
-
- /// Signifies the end range to be sent to the server.
- /// Practically, it is used to stop iterating.
- uint64_t updateVersionTail;
-
- UpdaterWorkUnit()
- : targetServer(), sendFullList(), firstUpdate(), updateVersionTail()
+ ServerListUpdate(const ServerListUpdate& source)
+ : version(source.version)
+ , sequenceNumber(source.sequenceNumber)
+ , incremental(source.incremental)
{}
+
+ ServerListUpdate& operator=(const ServerListUpdate& source)
+ {
+ version = source.version;
+ sequenceNumber = source.sequenceNumber;
+ incremental = source.incremental;
+ return *this;
+ }
};
/// Internal Use Only - Does not grab locks
@@ -383,14 +318,10 @@ class CoordinatorServerList : public AbstractServerList{
size_t isize() const;
/// Functions related to modifying the server list
- void add(Lock& lock, ServerId serverId, string serviceLocator,
- ServiceMask serviceMask, uint32_t readSpeed,
- bool enqueueUpdate = true);
- void crashed(const Lock& lock, ServerId serverId);
- uint32_t firstFreeIndex();
- ServerId generateUniqueId(Lock& lock);
+ uint32_t firstFreeIndex(const Lock& lock);
CoordinatorServerList::Entry* getEntry(ServerId id) const;
CoordinatorServerList::Entry* getEntry(size_t index) const;
+ void persistAndPropagate(Lock& lock, Entry* entry, ServerChangeEvent event);
void recoveryCompleted(Lock& lock, ServerId serverId);
void serialize(const Lock& lock, ProtoBuf::ServerList* protoBuf) const;
void serialize(const Lock& lock, ProtoBuf::ServerList* protoBuf,
@@ -399,24 +330,26 @@ class CoordinatorServerList : public AbstractServerList{
/// Functions related to replication groups.
bool assignReplicationGroup(Lock& lock, uint64_t replicationId,
const vector<ServerId>* replicationGroupIds);
- void createReplicationGroup(Lock& lock);
+ void createReplicationGroups(Lock& lock);
void removeReplicationGroup(Lock& lock, uint64_t groupId);
/// Functions related to keeping the cluster up-to-date
- void pushUpdate(const Lock& lock, uint64_t updateVersion);
- void haltUpdater();
- void startUpdater();
+ void pushUpdate(const Lock& lock, Entry* entry);
void updateLoop();
+ void checkUpdates();
void sync();
bool isClusterUpToDate(const Lock& lock);
void pruneUpdates(const Lock& lock);
- bool getWork(UpdaterWorkUnit* wu);
- void workSuccess(ServerId id) ;
+ bool getWork(Tub<UpdateServerListRpc>* rpc);
+ void workSuccess(ServerId id, uint64_t currentVersion);
void workFailed(ServerId id);
void waitForWork();
+ /// Shared information about the server.
+ Context* context;
+
/// Slots in the server list.
std::vector<GenerationNumberEntryPair> serverList;
@@ -433,37 +366,22 @@ class CoordinatorServerList : public AbstractServerList{
*/
bool stopUpdater;
- /// Metadata from previous partial scan through server list to find updates
- ScanMetadata lastScan;
-
/**
- * Stores add/remove/crashed updates to server list until a
- * pushUpdate call which will update the version number, enqueue
- * a copy to the updates list and clear() this entry.
- *
- * \a update can contain remove, crash, and add notifications,
- * but removals/crashes must precede additions in the update to ensure
- * ordering guarantees about notifications related to servers which
- * re-enlist. For now, this means calls to remove() and crashed() must
- * proceed call to add() if they have a common \a update.
+ * True means the updater is sleeping because there is no work for
+ * it to do; false means the updater is actively sending RPCs and
+ * polling for completion. This variable is intended only for testing.
*/
- ProtoBuf::ServerList update;
+ bool updaterSleeping;
+
+ /// Metadata from previous partial scan through server list to find updates
+ ScanMetadata lastScan;
/**
* Past updates that lead up to the \a version. This does not contain
- * all the updates created, only the ones needed by the servers
- * currently in the server list. Older updates are pruned.
- *
- * WARNING: Modifications MUST ONLY occur at the ends, but random access
- * is otherwise allowed. Reason being that ServerListUpdatePairs form an
- * in order singly-linked list within the deque and according to the C++0x
- * specs, references are preserved ONLY IF modifications occur at the ends
- * of the deque. Following this convention allows for lockless traversal
- * in the middle of the deque concurrent with locked writes at the ends.
- * Regular iterators are not used because they are invalidated on
- * every deque modification.
+ * all the updates created, only the ones that haven't yet been propagated
+ * to the rest of the cluster. Older updates are pruned.
*/
- std::deque<ServerListUpdatePair> updates;
+ std::deque<ServerListUpdate> updates;
/**
* Triggered when the server list is detected to be out of date or
@@ -483,13 +401,29 @@ class CoordinatorServerList : public AbstractServerList{
Tub<std::thread> updaterThread;
/**
+ * RPC objects for updates that are currently outstanding. These are
+ * dynamically allocated and must be freed manually.
+ */
+ std::list<Tub<UpdateServerListRpc>*> activeRpcs;
+
+ /**
+ * RPC objects not currently in use (and not constructed); we save them
+ * here to avoid having to reallocate them later. These are
+ * dynamically allocated and must be freed manually.
+ */
+ std::list<Tub<UpdateServerListRpc>*> spareRpcs;
+
+ /**
+ * The highest ServerLister version that is known to have been
+ * received by all servers in the cluster (i.e. updates for this
+ * and earlier versions can now be discarded).
* Indicates the the oldest ServerList version amongst servers
* that have received updates from us. This value is updated
* lazily but does guarantee that whatever the value is, no
* server has a version younger than that. Hence, it is safe
* to use for pruning updates.
*/
- uint64_t minConfirmedVersion;
+ uint64_t maxConfirmedVersion;
/**
* Number of servers currently being sent updates. This is used
@@ -498,12 +432,17 @@ class CoordinatorServerList : public AbstractServerList{
uint32_t numUpdatingServers;
/**
- * The id of the next replication group to be created. The replication
- * group is a set of backups that store all of the replicas of a segment.
- * NextReplicationId starts at 1 and is never reused.
- * Id 0 is reserved for nodes that do not belong to a replication group.
+ * The number of backups in a replication group. Currently there is
+ * no way to set this value based on cluster configuration information
+ * (RAM-555).
+ */
+ uint32_t replicationGroupSize;
+
+ /**
+ * The highest id of a replication group that has been assigned so far.
+ * Note: id 0 is never used.
*/
- uint64_t nextReplicationId;
+ uint64_t maxReplicationId;
DISALLOW_COPY_AND_ASSIGN(CoordinatorServerList);
};
} // namespace RAMCloud
« no previous file with comments | « src/CoordinatorMain.cc ('k') | src/CoordinatorServerList.cc » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld aab5469