/*-
* See the file LICENSE for redistribution information.
*
* Copyright (c) 2001-2004
* Sleepycat Software. All rights reserved.
*
* $Id: rep_record.c,v 1.1.1.1 2005/06/24 22:42:42 ca Exp $
*/
#include "db_config.h"
#ifndef NO_SYSTEM_INCLUDES
#if TIME_WITH_SYS_TIME
#include <sys/time.h>
#include <time.h>
#else
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#else
#include <time.h>
#endif
#endif
#include <stdlib.h>
#include <string.h>
#endif
#include "db_int.h"
#include "dbinc/db_page.h"
#include "dbinc/db_shash.h"
#include "dbinc/db_am.h"
#include "dbinc/lock.h"
#include "dbinc/log.h"
#include "dbinc/mp.h"
#include "dbinc/txn.h"
static int __rep_apply __P((DB_ENV *, REP_CONTROL *, DBT *, DB_LSN *, int *));
static int __rep_collect_txn __P((DB_ENV *, DB_LSN *, LSN_COLLECTION *));
static int __rep_do_ckp __P((DB_ENV *, DBT *, REP_CONTROL *));
static int __rep_dorecovery __P((DB_ENV *, DB_LSN *, DB_LSN *));
static int __rep_getnext __P((DB_ENV *));
static int __rep_lsn_cmp __P((const void *, const void *));
static int __rep_newfile __P((DB_ENV *, REP_CONTROL *, DB_LSN *));
static int __rep_process_rec __P((DB_ENV *,
REP_CONTROL *, DBT *, u_int32_t *, DB_LSN *));
static int __rep_remfirst __P((DB_ENV *, DBT *, DBT *));
static int __rep_resend_req __P((DB_ENV *, int));
static int __rep_verify_match __P((DB_ENV *, DB_LSN *, time_t));
/* Used to consistently designate which messages ought to be received where. */
#define MASTER_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_F_MASTER)) { \
RPRINT(dbenv, rep, \
(dbenv, &mb, "Master record received on client")); \
REP_PRINT_MESSAGE(dbenv, \
*eidp, rp, "rep_process_message"); \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#define CLIENT_ONLY(rep, rp) do { \
if (!F_ISSET(rep, REP_F_CLIENT)) { \
RPRINT(dbenv, rep, \
(dbenv, &mb, "Client record received on master")); \
REP_PRINT_MESSAGE(dbenv, \
*eidp, rp, "rep_process_message"); \
(void)__rep_send_message(dbenv, \
DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0); \
ret = DB_REP_DUPMASTER; \
goto errlock; \
} \
} while (0)
#define MASTER_CHECK(dbenv, eid, rep) do { \
if (rep->master_id == DB_EID_INVALID) { \
RPRINT(dbenv, rep, (dbenv, &mb, \
"Received record from %d, master is INVALID", eid));\
ret = 0; \
(void)__rep_send_message(dbenv, \
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0); \
goto errlock; \
} \
if (eid != rep->master_id) { \
__db_err(dbenv, \
"Received master record from %d, master is %d", \
eid, rep->master_id); \
ret = EINVAL; \
goto errlock; \
} \
} while (0)
#define MASTER_UPDATE(dbenv, renv) do { \
MUTEX_LOCK((dbenv), &(renv)->mutex); \
F_SET((renv), DB_REGENV_REPLOCKED); \
(void)time(&(renv)->op_timestamp); \
MUTEX_UNLOCK((dbenv), &(renv)->mutex); \
} while (0)
#define ANYSITE(rep)
/*
* __rep_process_message --
*
* This routine takes an incoming message and processes it.
*
* control: contains the control fields from the record
* rec: contains the actual record
* eidp: contains the machine id of the sender of the message;
* in the case of a DB_NEWMASTER message, returns the eid
* of the new master.
* ret_lsnp: On DB_REP_ISPERM and DB_REP_NOTPERM returns, contains the
* lsn of the maximum permanent or current not permanent log record
* (respectively).
*
* PUBLIC: int __rep_process_message __P((DB_ENV *, DBT *, DBT *, int *,
* PUBLIC: DB_LSN *));
*/
int
__rep_process_message(dbenv, control, rec, eidp, ret_lsnp)
DB_ENV *dbenv;
DBT *control, *rec;
int *eidp;
DB_LSN *ret_lsnp;
{
DB_LOG *dblp;
DB_LOGC *logc;
DB_LSN endlsn, lsn, oldfilelsn;
DB_REP *db_rep;
DBT *d, data_dbt, mylog;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
REP_CONTROL *rp;
REP_VOTE_INFO *vi;
u_int32_t bytes, egen, flags, gen, gbytes, rectype, type;
int check_limit, cmp, done, do_req, is_dup;
int master, match, old, recovering, ret, t_ret;
time_t savetime;
#ifdef DIAGNOSTIC
DB_MSGBUF mb;
#endif
PANIC_CHECK(dbenv);
ENV_REQUIRES_CONFIG(dbenv, dbenv->rep_handle, "rep_process_message",
DB_INIT_REP);
/* Control argument must be non-Null. */
if (control == NULL || control->size == 0) {
__db_err(dbenv,
"DB_ENV->rep_process_message: control argument must be specified");
return (EINVAL);
}
if (!IS_REP_MASTER(dbenv) && !IS_REP_CLIENT(dbenv)) {
__db_err(dbenv,
"Environment not configured as replication master or client");
return (EINVAL);
}
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
infop = dbenv->reginfo;
renv = infop->primary;
rp = (REP_CONTROL *)control->data;
if (ret_lsnp != NULL)
ZERO_LSN(*ret_lsnp);
/*
* Acquire the replication lock.
*/
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (rep->start_th != 0) {
/*
* If we're racing with a thread in rep_start, then
* just ignore the message and return.
*/
RPRINT(dbenv, rep, (dbenv, &mb,
"Racing rep_start, ignore message."));
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
goto out;
}
rep->msg_th++;
gen = rep->gen;
recovering = rep->in_recovery || F_ISSET(rep, REP_F_RECOVER_MASK);
savetime = renv->rep_timestamp;
rep->stat.st_msgs_processed++;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
REP_PRINT_MESSAGE(dbenv, *eidp, rp, "rep_process_message");
/* Complain if we see an improper version number. */
if (rp->rep_version != DB_REPVERSION) {
__db_err(dbenv,
"unexpected replication message version %lu, expected %d",
(u_long)rp->rep_version, DB_REPVERSION);
ret = EINVAL;
goto errlock;
}
if (rp->log_version != DB_LOGVERSION) {
__db_err(dbenv,
"unexpected log record version %lu, expected %d",
(u_long)rp->log_version, DB_LOGVERSION);
ret = EINVAL;
goto errlock;
}
/*
* Check for generation number matching. Ignore any old messages
* except requests that are indicative of a new client that needs
* to get in sync.
*/
if (rp->gen < gen && rp->rectype != REP_ALIVE_REQ &&
rp->rectype != REP_NEWCLIENT && rp->rectype != REP_MASTER_REQ &&
rp->rectype != REP_DUPMASTER) {
/*
* We don't hold the rep mutex, and could miscount if we race.
*/
rep->stat.st_msgs_badgen++;
goto errlock;
}
if (rp->gen > gen) {
/*
* If I am a master and am out of date with a lower generation
* number, I am in bad shape and should downgrade.
*/
if (F_ISSET(rep, REP_F_MASTER)) {
rep->stat.st_dupmasters++;
ret = DB_REP_DUPMASTER;
if (rp->rectype != REP_DUPMASTER)
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_DUPMASTER,
NULL, NULL, 0);
goto errlock;
}
/*
* I am a client and am out of date. If this is an election,
* or a response from the first site I contacted, then I can
* accept the generation number and participate in future
* elections and communication. Otherwise, I need to hear about
* a new master and sync up.
*/
if (rp->rectype == REP_ALIVE ||
rp->rectype == REP_VOTE1 || rp->rectype == REP_VOTE2) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
RPRINT(dbenv, rep, (dbenv, &mb,
"Updating gen from %lu to %lu",
(u_long)gen, (u_long)rp->gen));
rep->master_id = DB_EID_INVALID;
gen = rep->gen = rp->gen;
/*
* Updating of egen will happen when we process the
* message below for each message type.
*/
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (rp->rectype == REP_ALIVE)
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL,
NULL, 0);
} else if (rp->rectype != REP_NEWMASTER) {
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_MASTER_REQ, NULL, NULL, 0);
goto errlock;
}
/*
* If you get here, then you're a client and either you're
* in an election or you have a NEWMASTER or an ALIVE message
* whose processing will do the right thing below.
*/
}
/*
* We need to check if we're in recovery and if we are
* then we need to ignore any messages except VERIFY*, VOTE*,
* NEW* and ALIVE_REQ, or backup related messages: UPDATE*,
* PAGE* and FILE*. We need to also accept LOG messages
* if we're copying the log for recovery/backup.
*/
if (recovering) {
switch (rp->rectype) {
case REP_VERIFY:
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
cmp = log_compare(&lp->verify_lsn, &rp->lsn);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (cmp != 0)
goto skip;
break;
case REP_NEWFILE:
case REP_LOG:
case REP_LOG_MORE:
if (!F_ISSET(rep, REP_F_RECOVER_LOG))
goto skip;
/*
* If we're recovering the log we only want
* log records that are in the range we need
* to recover. Otherwise we can end up storing
* a huge number of "new" records, only to
* truncate the temp database later after we
* run recovery.
*/
if (log_compare(&rp->lsn, &rep->last_lsn) > 0)
goto skip;
break;
case REP_ALIVE:
case REP_ALIVE_REQ:
case REP_DUPMASTER:
case REP_FILE_FAIL:
case REP_NEWCLIENT:
case REP_NEWMASTER:
case REP_NEWSITE:
case REP_PAGE:
case REP_PAGE_FAIL:
case REP_PAGE_MORE:
case REP_PAGE_REQ:
case REP_UPDATE:
case REP_UPDATE_REQ:
case REP_VERIFY_FAIL:
case REP_VOTE1:
case REP_VOTE2:
break;
default:
skip:
/* Check for need to retransmit. */
/* Not holding rep_mutex, may miscount */
rep->stat.st_msgs_recover++;
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
do_req = __rep_check_doreq(dbenv, rep);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (do_req) {
/*
* Don't respond to a MASTER_REQ with
* a MASTER_REQ.
*/
if (rep->master_id == DB_EID_INVALID &&
rp->rectype != REP_MASTER_REQ)
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST,
REP_MASTER_REQ,
NULL, NULL, 0);
else if (*eidp == rep->master_id)
ret = __rep_resend_req(dbenv, *eidp);
}
goto errlock;
}
}
switch (rp->rectype) {
case REP_ALIVE:
ANYSITE(rep);
egen = *(u_int32_t *)rec->data;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
RPRINT(dbenv, rep, (dbenv, &mb,
"Received ALIVE egen of %lu, mine %lu",
(u_long)egen, (u_long)rep->egen));
if (egen > rep->egen) {
/*
* We're changing egen, need to clear out any old
* election information.
*/
__rep_elect_done(dbenv, rep);
rep->egen = egen;
}
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
break;
case REP_ALIVE_REQ:
ANYSITE(rep);
dblp = dbenv->lg_handle;
R_LOCK(dbenv, &dblp->reginfo);
lsn = ((LOG *)dblp->reginfo.primary)->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
egen = rep->egen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
data_dbt.data = &egen;
data_dbt.size = sizeof(egen);
(void)__rep_send_message(dbenv,
*eidp, REP_ALIVE, &lsn, &data_dbt, 0);
goto errlock;
case REP_DUPMASTER:
if (F_ISSET(rep, REP_F_MASTER))
ret = DB_REP_DUPMASTER;
goto errlock;
case REP_ALL_REQ:
MASTER_ONLY(rep, rp);
gbytes = bytes = 0;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
gbytes = rep->gbytes;
bytes = rep->bytes;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
check_limit = gbytes != 0 || bytes != 0;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
memset(&data_dbt, 0, sizeof(data_dbt));
oldfilelsn = lsn = rp->lsn;
type = REP_LOG;
flags = IS_ZERO_LSN(rp->lsn) ||
IS_INIT_LSN(rp->lsn) ? DB_FIRST : DB_SET;
for (ret = __log_c_get(logc, &lsn, &data_dbt, flags);
ret == 0 && type == REP_LOG;
ret = __log_c_get(logc, &lsn, &data_dbt, DB_NEXT)) {
/*
* When a log file changes, we'll have a real log
* record with some lsn [n][m], and we'll also want
* to send a NEWFILE message with lsn [n-1][MAX].
*/
if (lsn.file != oldfilelsn.file)
(void)__rep_send_message(dbenv,
*eidp, REP_NEWFILE, &oldfilelsn, NULL, 0);
if (check_limit) {
/*
* data_dbt.size is only the size of the log
* record; it doesn't count the size of the
* control structure. Factor that in as well
* so we're not off by a lot if our log records
* are small.
*/
while (bytes <
data_dbt.size + sizeof(REP_CONTROL)) {
if (gbytes > 0) {
bytes += GIGABYTE;
--gbytes;
continue;
}
/*
* We don't hold the rep mutex,
* and may miscount.
*/
rep->stat.st_nthrottles++;
type = REP_LOG_MORE;
goto send;
}
bytes -= (data_dbt.size + sizeof(REP_CONTROL));
}
send: if (__rep_send_message(dbenv, *eidp, type,
&lsn, &data_dbt, DB_LOG_RESEND) != 0)
break;
/*
* If we are about to change files, then we'll need the
* last LSN in the previous file. Save it here.
*/
oldfilelsn = lsn;
oldfilelsn.offset += logc->c_len;
}
if (ret == DB_NOTFOUND)
ret = 0;
if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
goto errlock;
#ifdef NOTYET
case REP_FILE: /* TODO */
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
break;
case REP_FILE_REQ:
MASTER_ONLY(rep, rp);
ret = __rep_send_file(dbenv, rec, *eidp);
goto errlock;
#endif
case REP_FILE_FAIL:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
/*
* XXX
*/
break;
case REP_LOG:
case REP_LOG_MORE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
is_dup = 0;
ret = __rep_apply(dbenv, rp, rec, ret_lsnp, &is_dup);
switch (ret) {
/*
* We're in an internal backup and we've gotten
* all the log we need to run recovery. Do so now.
*/
case DB_REP_LOGREADY:
if ((ret = __log_flush(dbenv, NULL)) != 0)
goto errlock;
if ((ret = __rep_verify_match(dbenv, &rep->last_lsn,
savetime)) == 0) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
ZERO_LSN(rep->first_lsn);
ZERO_LSN(rep->last_lsn);
F_CLR(rep, REP_F_RECOVER_LOG);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
break;
/*
* If we get any of the "normal" returns, we only process
* LOG_MORE if this is not a duplicate record. If the
* record is a duplicate we don't want to handle LOG_MORE
* and request a multiple data stream (or trigger internal
* initialization) since this could be a very old record
* that no longer exists on the master.
*/
case DB_REP_ISPERM:
case DB_REP_NOTPERM:
case 0:
if (is_dup)
goto errlock;
else
break;
/*
* Any other return (errors), we're done.
*/
default:
goto errlock;
}
if (rp->rectype == REP_LOG_MORE) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
master = rep->master_id;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
/*
* If the master_id is invalid, this means that since
* the last record was sent, somebody declared an
* election and we may not have a master to request
* things of.
*
* This is not an error; when we find a new master,
* we'll re-negotiate where the end of the log is and
* try to bring ourselves up to date again anyway.
*/
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
if (master == DB_EID_INVALID)
ret = 0;
/*
* If we've asked for a bunch of records, it could
* either be from a LOG_REQ or ALL_REQ. If we're
* waiting for a gap to be filled, call loggap_req,
* otherwise use ALL_REQ again.
*/
else if (IS_ZERO_LSN(lp->waiting_lsn)) {
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (__rep_send_message(dbenv,
master, REP_ALL_REQ, &lsn, NULL, 0) != 0)
break;
} else {
__rep_loggap_req(dbenv, rep, &lsn, 1);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
}
}
goto errlock;
case REP_LOG_REQ:
MASTER_ONLY(rep, rp);
if (rec != NULL && rec->size != 0) {
RPRINT(dbenv, rep, (dbenv, &mb,
"[%lu][%lu]: LOG_REQ max lsn: [%lu][%lu]",
(u_long) rp->lsn.file, (u_long)rp->lsn.offset,
(u_long)((DB_LSN *)rec->data)->file,
(u_long)((DB_LSN *)rec->data)->offset));
}
/*
* There are three different cases here.
* 1. We asked for a particular LSN and got it.
* 2. We asked for an LSN and it's not found because it is
* beyond the end of a log file and we need a NEWFILE msg.
* and then the record that was requested.
* 3. We asked for an LSN and it simply doesn't exist, but
* doesn't meet any of those other criteria, in which case
* it's an error (that should never happen).
* If we have a valid LSN and the request has a data_dbt with
* it, then we need to send all records up to the LSN in the
* data dbt.
*/
oldfilelsn = lsn = rp->lsn;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
memset(&data_dbt, 0, sizeof(data_dbt));
ret = __log_c_get(logc, &lsn, &data_dbt, DB_SET);
if (ret == 0) /* Case 1 */
(void)__rep_send_message(dbenv,
*eidp, REP_LOG, &lsn, &data_dbt, DB_LOG_RESEND);
else if (ret == DB_NOTFOUND) {
R_LOCK(dbenv, &dblp->reginfo);
endlsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
if (endlsn.file > lsn.file) {
/*
* Case 2:
* Need to find the LSN of the last record in
* file lsn.file so that we can send it with
* the NEWFILE call. In order to do that, we
* need to try to get {lsn.file + 1, 0} and
* then backup.
*/
endlsn.file = lsn.file + 1;
endlsn.offset = 0;
if ((ret = __log_c_get(logc,
&endlsn, &data_dbt, DB_SET)) != 0 ||
(ret = __log_c_get(logc,
&endlsn, &data_dbt, DB_PREV)) != 0) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Unable to get prev of [%lu][%lu]",
(u_long)lsn.file,
(u_long)lsn.offset));
/*
* We want to push the error back
* to the client so that the client
* does an internal backup. The
* client asked for a log record
* we no longer have and it is
* outdated.
* XXX - This could be optimized by
* having the master perform and
* send a REP_UPDATE message. We
* currently want the client to set
* up its 'update' state prior to
* requesting REP_UPDATE_REQ.
*/
ret = 0;
(void)__rep_send_message(dbenv, *eidp,
REP_VERIFY_FAIL, &rp->lsn, NULL, 0);
} else {
endlsn.offset += logc->c_len;
(void)__rep_send_message(dbenv, *eidp,
REP_NEWFILE, &endlsn, NULL, 0);
}
} else {
/* Case 3 */
__db_err(dbenv,
"Request for LSN [%lu][%lu] fails",
(u_long)lsn.file, (u_long)lsn.offset);
DB_ASSERT(0);
ret = EINVAL;
}
}
/*
* If the user requested a gap, send the whole thing,
* while observing the limits from set_rep_limit.
*/
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
gbytes = rep->gbytes;
bytes = rep->bytes;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
check_limit = gbytes != 0 || bytes != 0;
type = REP_LOG;
while (ret == 0 && rec != NULL && rec->size != 0 &&
type == REP_LOG) {
if ((ret =
__log_c_get(logc, &lsn, &data_dbt, DB_NEXT)) != 0) {
if (ret == DB_NOTFOUND)
ret = 0;
break;
}
if (log_compare(&lsn, (DB_LSN *)rec->data) >= 0)
break;
/*
* When a log file changes, we'll have a real log
* record with some lsn [n][m], and we'll also want
* to send a NEWFILE message with lsn [n-1][MAX].
*/
if (lsn.file != oldfilelsn.file)
(void)__rep_send_message(dbenv,
*eidp, REP_NEWFILE, &oldfilelsn, NULL, 0);
if (check_limit) {
/*
* data_dbt.size is only the size of the log
* record; it doesn't count the size of the
* control structure. Factor that in as well
* so we're not off by a lot if our log records
* are small.
*/
while (bytes <
data_dbt.size + sizeof(REP_CONTROL)) {
if (gbytes > 0) {
bytes += GIGABYTE;
--gbytes;
continue;
}
/*
* We don't hold the rep mutex,
* and may miscount.
*/
rep->stat.st_nthrottles++;
type = REP_LOG_MORE;
goto send1;
}
bytes -= (data_dbt.size + sizeof(REP_CONTROL));
}
send1: if (__rep_send_message(dbenv, *eidp, type,
&lsn, &data_dbt, DB_LOG_RESEND) != 0)
break;
/*
* If we are about to change files, then we'll need the
* last LSN in the previous file. Save it here.
*/
oldfilelsn = lsn;
oldfilelsn.offset += logc->c_len;
}
if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
goto errlock;
case REP_NEWSITE:
/* We don't hold the rep mutex, and may miscount. */
rep->stat.st_newsites++;
/* This is a rebroadcast; simply tell the application. */
if (F_ISSET(rep, REP_F_MASTER)) {
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
}
ret = DB_REP_NEWSITE;
goto errlock;
case REP_NEWCLIENT:
/*
* This message was received and should have resulted in the
* application entering the machine ID in its machine table.
* We respond to this with an ALIVE to send relevant information
* to the new client (if we are a master, we'll send a
* NEWMASTER, so we only need to send the ALIVE if we're a
* client). But first, broadcast the new client's record to
* all the clients.
*/
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_NEWSITE, &rp->lsn, rec, 0);
ret = DB_REP_NEWSITE;
if (F_ISSET(rep, REP_F_CLIENT)) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
egen = rep->egen;
if (*eidp == rep->master_id)
rep->master_id = DB_EID_INVALID;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
data_dbt.data = &egen;
data_dbt.size = sizeof(egen);
(void)__rep_send_message(dbenv, DB_EID_BROADCAST,
REP_ALIVE, &rp->lsn, &data_dbt, 0);
goto errlock;
}
/* FALLTHROUGH */
case REP_MASTER_REQ:
if (F_ISSET(rep, REP_F_MASTER)) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_NEWMASTER, &lsn, NULL, 0);
}
/*
* If there is no master, then we could get into a state
* where an old client lost the initial ALIVE message and
* is calling an election under an old gen and can
* never get to the current gen.
*/
if (F_ISSET(rep, REP_F_CLIENT) && rp->gen < gen) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
egen = rep->egen;
if (*eidp == rep->master_id)
rep->master_id = DB_EID_INVALID;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
data_dbt.data = &egen;
data_dbt.size = sizeof(egen);
(void)__rep_send_message(dbenv, *eidp,
REP_ALIVE, &rp->lsn, &data_dbt, 0);
goto errlock;
}
goto errlock;
case REP_NEWFILE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
ret = __rep_apply(dbenv, rp, rec, ret_lsnp, NULL);
goto errlock;
case REP_NEWMASTER:
ANYSITE(rep);
if (F_ISSET(rep, REP_F_MASTER) &&
*eidp != dbenv->rep_eid) {
/* We don't hold the rep mutex, and may miscount. */
rep->stat.st_dupmasters++;
ret = DB_REP_DUPMASTER;
(void)__rep_send_message(dbenv,
DB_EID_BROADCAST, REP_DUPMASTER, NULL, NULL, 0);
goto errlock;
}
ret = __rep_new_master(dbenv, rp, *eidp);
goto errlock;
case REP_PAGE:
case REP_PAGE_MORE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
ret = __rep_page(dbenv, *eidp, rp, rec);
break;
case REP_PAGE_FAIL:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
ret = __rep_page_fail(dbenv, *eidp, rec);
break;
case REP_PAGE_REQ:
MASTER_ONLY(rep, rp);
MASTER_UPDATE(dbenv, renv);
ret = __rep_page_req(dbenv, *eidp, rec);
break;
case REP_UPDATE:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
ret = __rep_update_setup(dbenv, *eidp, rp, rec);
break;
case REP_UPDATE_REQ:
MASTER_ONLY(rep, rp);
infop = dbenv->reginfo;
renv = infop->primary;
MASTER_UPDATE(dbenv, renv);
ret = __rep_update_req(dbenv, *eidp);
break;
case REP_VERIFY:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
if (IS_ZERO_LSN(lp->verify_lsn))
goto errlock;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
memset(&mylog, 0, sizeof(mylog));
if ((ret = __log_c_get(logc, &rp->lsn, &mylog, DB_SET)) != 0)
goto rep_verify_err;
match = 0;
memcpy(&rectype, mylog.data, sizeof(rectype));
if (mylog.size == rec->size &&
memcmp(mylog.data, rec->data, rec->size) == 0)
match = 1;
DB_ASSERT(rectype == DB___txn_ckp);
/*
* If we don't have a match, backup to the previous
* checkpoint and try again.
*/
if (match == 0) {
ZERO_LSN(lsn);
if ((ret = __log_backup(dbenv, logc, &rp->lsn, &lsn,
LASTCKP_CMP)) == 0) {
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp->verify_lsn = lsn;
lp->rcvd_recs = 0;
lp->wait_recs = rep->request_gap;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
(void)__rep_send_message(dbenv,
*eidp, REP_VERIFY_REQ, &lsn, NULL, 0);
} else if (ret == DB_NOTFOUND) {
/*
* We've either run out of records because
* logs have been removed or we've rolled back
* all the way to the beginning. In the latter
* we don't think these sites were ever part of
* the same environment and we'll say so.
* In the former, request internal backup.
*/
if (rp->lsn.file == 1) {
__db_err(dbenv,
"Client was never part of master's environment");
ret = EINVAL;
} else {
rep->stat.st_outdated++;
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
F_CLR(rep, REP_F_RECOVER_VERIFY);
F_SET(rep, REP_F_RECOVER_UPDATE);
ZERO_LSN(rep->first_lsn);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
(void)__rep_send_message(dbenv,
*eidp, REP_UPDATE_REQ, NULL,
NULL, 0);
}
}
} else
ret = __rep_verify_match(dbenv, &rp->lsn, savetime);
rep_verify_err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
goto errlock;
case REP_VERIFY_FAIL:
CLIENT_ONLY(rep, rp);
MASTER_CHECK(dbenv, *eidp, rep);
/*
* If any recovery flags are set, but not VERIFY,
* then we ignore this message. We are already
* in the middle of updating.
*/
if (F_ISSET(rep, REP_F_RECOVER_MASK) &&
!F_ISSET(rep, REP_F_RECOVER_VERIFY))
goto errlock;
rep->stat.st_outdated++;
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
/*
* We don't want an old or delayed VERIFY_FAIL
* message to throw us into internal initialization
* when we shouldn't be.
*
* Only go into internal initialization if:
* We are in RECOVER_VERIFY and this LSN == verify_lsn.
* We are not in any RECOVERY and we are expecting
* an LSN that no longer exists on the master.
* Otherwise, ignore this message.
*/
if (((F_ISSET(rep, REP_F_RECOVER_VERIFY)) &&
log_compare(&rp->lsn, &lp->verify_lsn) == 0) ||
(F_ISSET(rep, REP_F_RECOVER_MASK) == 0 &&
log_compare(&rp->lsn, &lp->ready_lsn) >= 0)) {
F_CLR(rep, REP_F_RECOVER_VERIFY);
F_SET(rep, REP_F_RECOVER_UPDATE);
ZERO_LSN(rep->first_lsn);
lp->wait_recs = rep->request_gap;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
(void)__rep_send_message(dbenv,
*eidp, REP_UPDATE_REQ, NULL, NULL, 0);
} else {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
}
goto errlock;
case REP_VERIFY_REQ:
MASTER_ONLY(rep, rp);
type = REP_VERIFY;
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto errlock;
d = &data_dbt;
memset(d, 0, sizeof(data_dbt));
F_SET(logc, DB_LOG_SILENT_ERR);
ret = __log_c_get(logc, &rp->lsn, d, DB_SET);
/*
* If the LSN was invalid, then we might get a not
* found, we might get an EIO, we could get anything.
* If we get a DB_NOTFOUND, then there is a chance that
* the LSN comes before the first file present in which
* case we need to return a fail so that the client can return
* a DB_OUTDATED.
*/
if (ret == DB_NOTFOUND &&
__log_is_outdated(dbenv, rp->lsn.file, &old) == 0 &&
old != 0)
type = REP_VERIFY_FAIL;
if (ret != 0)
d = NULL;
(void)__rep_send_message(dbenv, *eidp, type, &rp->lsn, d, 0);
ret = __log_c_close(logc);
goto errlock;
case REP_VOTE1:
if (F_ISSET(rep, REP_F_MASTER)) {
RPRINT(dbenv, rep,
(dbenv, &mb, "Master received vote"));
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
goto errlock;
}
vi = (REP_VOTE_INFO *)rec->data;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
/*
* If we get a vote from a later election gen, we
* clear everything from the current one, and we'll
* start over by tallying it. If we get an old vote,
* send an ALIVE to the old participant.
*/
RPRINT(dbenv, rep, (dbenv, &mb,
"Received vote1 egen %lu, egen %lu",
(u_long)vi->egen, (u_long)rep->egen));
if (vi->egen < rep->egen) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Received old vote %lu, egen %lu, ignoring vote1",
(u_long)vi->egen, (u_long)rep->egen));
egen = rep->egen;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
data_dbt.data = &egen;
data_dbt.size = sizeof(egen);
(void)__rep_send_message(dbenv,
*eidp, REP_ALIVE, &rp->lsn, &data_dbt, 0);
goto errlock;
}
if (vi->egen > rep->egen) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Received VOTE1 from egen %lu, my egen %lu; reset",
(u_long)vi->egen, (u_long)rep->egen));
__rep_elect_done(dbenv, rep);
rep->egen = vi->egen;
}
if (!IN_ELECTION(rep))
F_SET(rep, REP_F_TALLY);
/* Check if this site knows about more sites than we do. */
if (vi->nsites > rep->nsites)
rep->nsites = vi->nsites;
/* Check if this site requires more votes than we do. */
if (vi->nvotes > rep->nvotes)
rep->nvotes = vi->nvotes;
/*
* We are keeping the vote, let's see if that changes our
* count of the number of sites.
*/
if (rep->sites + 1 > rep->nsites)
rep->nsites = rep->sites + 1;
if (rep->nsites > rep->asites &&
(ret = __rep_grow_sites(dbenv, rep->nsites)) != 0) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Grow sites returned error %d", ret));
goto errunlock;
}
/*
* Ignore vote1's if we're in phase 2.
*/
if (F_ISSET(rep, REP_F_EPHASE2)) {
RPRINT(dbenv, rep, (dbenv, &mb,
"In phase 2, ignoring vote1"));
goto errunlock;
}
/*
* Record this vote. If we get back non-zero, we
* ignore the vote.
*/
if ((ret = __rep_tally(dbenv, rep, *eidp, &rep->sites,
vi->egen, rep->tally_off)) != 0) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Tally returned %d, sites %d",
ret, rep->sites));
ret = 0;
goto errunlock;
}
RPRINT(dbenv, rep, (dbenv, &mb,
"Incoming vote: (eid)%d (pri)%d (gen)%lu (egen)%lu [%lu,%lu]",
*eidp, vi->priority,
(u_long)rp->gen, (u_long)vi->egen,
(u_long)rp->lsn.file, (u_long)rp->lsn.offset));
#ifdef DIAGNOSTIC
if (rep->sites > 1)
RPRINT(dbenv, rep, (dbenv, &mb,
"Existing vote: (eid)%d (pri)%d (gen)%lu (sites)%d [%lu,%lu]",
rep->winner, rep->w_priority,
(u_long)rep->w_gen, rep->sites,
(u_long)rep->w_lsn.file,
(u_long)rep->w_lsn.offset));
#endif
__rep_cmp_vote(dbenv, rep, eidp, &rp->lsn, vi->priority,
rp->gen, vi->tiebreaker);
/*
* If you get a vote and you're not in an election, we've
* already recorded this vote. But that is all we need
* to do.
*/
if (!IN_ELECTION(rep)) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Not in election, but received vote1 0x%x",
rep->flags));
ret = DB_REP_HOLDELECTION;
goto errunlock;
}
master = rep->winner;
lsn = rep->w_lsn;
/*
* We need to check sites == nsites, not more than half
* like we do in __rep_elect and the VOTE2 code below. The
* reason is that we want to process all the incoming votes
* and not short-circuit once we reach more than half. The
* real winner's vote may be in the last half.
*/
done = rep->sites >= rep->nsites && rep->w_priority != 0;
if (done) {
RPRINT(dbenv, rep,
(dbenv, &mb, "Phase1 election done"));
RPRINT(dbenv, rep, (dbenv, &mb, "Voting for %d%s",
master, master == rep->eid ? "(self)" : ""));
egen = rep->egen;
F_SET(rep, REP_F_EPHASE2);
F_CLR(rep, REP_F_EPHASE1);
if (master == rep->eid) {
(void)__rep_tally(dbenv, rep, rep->eid,
&rep->votes, egen, rep->v2tally_off);
goto errunlock;
}
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
/* Vote for someone else. */
__rep_send_vote(dbenv, NULL, 0, 0, 0, 0, egen,
master, REP_VOTE2);
} else
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
/* Election is still going on. */
break;
case REP_VOTE2:
RPRINT(dbenv, rep, (dbenv, &mb, "We received a vote%s",
F_ISSET(rep, REP_F_MASTER) ? " (master)" : ""));
if (F_ISSET(rep, REP_F_MASTER)) {
R_LOCK(dbenv, &dblp->reginfo);
lsn = lp->lsn;
R_UNLOCK(dbenv, &dblp->reginfo);
rep->stat.st_elections_won++;
(void)__rep_send_message(dbenv,
*eidp, REP_NEWMASTER, &lsn, NULL, 0);
goto errlock;
}
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
/* If we have priority 0, we should never get a vote. */
DB_ASSERT(rep->priority != 0);
/*
* We might be the last to the party and we haven't had
* time to tally all the vote1's, but others have and
* decided we're the winner. So, if we're in the process
* of tallying sites, keep the vote so that when our
* election thread catches up we'll have the votes we
* already received.
*/
vi = (REP_VOTE_INFO *)rec->data;
if (!IN_ELECTION_TALLY(rep) && vi->egen >= rep->egen) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Not in election gen %lu, at %lu, got vote",
(u_long)vi->egen, (u_long)rep->egen));
ret = DB_REP_HOLDELECTION;
goto errunlock;
}
/*
* Record this vote. In a VOTE2, the only valid entry
* in the REP_VOTE_INFO is the election generation.
*
* There are several things which can go wrong that we
* need to account for:
* 1. If we receive a latent VOTE2 from an earlier election,
* we want to ignore it.
* 2. If we receive a VOTE2 from a site from which we never
* received a VOTE1, we want to ignore it.
* 3. If we have received a duplicate VOTE2 from this election
* from the same site we want to ignore it.
* 4. If this is from the current election and someone is
* really voting for us, then we finally get to record it.
*/
/*
* __rep_cmp_vote2 checks for cases 1 and 2.
*/
if ((ret = __rep_cmp_vote2(dbenv, rep, *eidp, vi->egen)) != 0) {
ret = 0;
goto errunlock;
}
/*
* __rep_tally takes care of cases 3 and 4.
*/
if ((ret = __rep_tally(dbenv, rep, *eidp, &rep->votes,
vi->egen, rep->v2tally_off)) != 0) {
ret = 0;
goto errunlock;
}
done = rep->votes >= rep->nvotes;
RPRINT(dbenv, rep, (dbenv, &mb, "Counted vote %d of %d",
rep->votes, rep->nvotes));
if (done) {
__rep_elect_master(dbenv, rep, eidp);
ret = DB_REP_NEWMASTER;
goto errunlock;
} else
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
break;
default:
__db_err(dbenv,
"DB_ENV->rep_process_message: unknown replication message: type %lu",
(u_long)rp->rectype);
ret = EINVAL;
goto errlock;
}
/*
* If we already hold rep_mutexp then we goto 'errunlock'
* Otherwise we goto 'errlock' to acquire it before we
* decrement our message thread count.
*/
errlock:
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
errunlock:
rep->msg_th--;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
out:
if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
if (ret_lsnp != NULL)
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
return (ret);
}
/*
* __rep_apply --
*
* Handle incoming log records on a client, applying when possible and
* entering into the bookkeeping table otherwise. This routine manages
* the state of the incoming message stream -- processing records, via
* __rep_process_rec, when possible and enqueuing in the __db.rep.db
* when necessary. As gaps in the stream are filled in, this is where
* we try to process as much as possible from __db.rep.db to catch up.
*/
static int
__rep_apply(dbenv, rp, rec, ret_lsnp, is_dupp)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
DB_LSN *ret_lsnp;
int *is_dupp;
{
DB_REP *db_rep;
DBT control_dbt, key_dbt;
DBT rec_dbt;
DB *dbp;
DB_LOG *dblp;
DB_LSN max_lsn;
LOG *lp;
REP *rep;
u_int32_t rectype;
int cmp, ret;
#ifdef DIAGNOSTIC
DB_MSGBUF mb;
#endif
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dbp = db_rep->rep_db;
rectype = 0;
ret = 0;
memset(&control_dbt, 0, sizeof(control_dbt));
memset(&rec_dbt, 0, sizeof(rec_dbt));
ZERO_LSN(max_lsn);
dblp = dbenv->lg_handle;
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp = dblp->reginfo.primary;
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (F_ISSET(rep, REP_F_RECOVER_LOG) &&
log_compare(&lp->ready_lsn, &rep->first_lsn) < 0)
lp->ready_lsn = rep->first_lsn;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
cmp = log_compare(&rp->lsn, &lp->ready_lsn);
if (cmp == 0) {
if ((ret =
__rep_process_rec(dbenv, rp, rec, &rectype, &max_lsn)) != 0)
goto err;
/*
* If we get the record we are expecting, reset
* the count of records we've received and are applying
* towards the request interval.
*/
lp->rcvd_recs = 0;
while (ret == 0 &&
log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0) {
/*
* We just filled in a gap in the log record stream.
* Write subsequent records to the log.
*/
gap_check:
lp->rcvd_recs = 0;
ZERO_LSN(lp->max_wait_lsn);
if ((ret =
__rep_remfirst(dbenv, &control_dbt, &rec_dbt)) != 0)
goto err;
rp = (REP_CONTROL *)control_dbt.data;
rec = &rec_dbt;
if ((ret = __rep_process_rec(dbenv,
rp, rec, &rectype, &max_lsn)) != 0)
goto err;
/*
* We may miscount, as we don't hold the rep mutex.
*/
--rep->stat.st_log_queued;
if ((ret = __rep_getnext(dbenv)) == DB_NOTFOUND) {
ret = 0;
break;
} else if (ret != 0)
goto err;
}
/*
* Check if we're at a gap in the table and if so, whether we
* need to ask for any records.
*/
if (!IS_ZERO_LSN(lp->waiting_lsn) &&
log_compare(&lp->ready_lsn, &lp->waiting_lsn) != 0) {
/*
* We got a record and processed it, but we may
* still be waiting for more records.
*/
if (__rep_check_doreq(dbenv, rep))
__rep_loggap_req(dbenv, rep, &rp->lsn, 0);
} else {
lp->wait_recs = 0;
ZERO_LSN(lp->max_wait_lsn);
}
} else if (cmp > 0) {
/*
* The LSN is higher than the one we were waiting for.
* This record isn't in sequence; add it to the temporary
* database, update waiting_lsn if necessary, and perform
* calculations to determine if we should issue requests
* for new records.
*/
memset(&key_dbt, 0, sizeof(key_dbt));
key_dbt.data = rp;
key_dbt.size = sizeof(*rp);
if (lp->wait_recs == 0) {
/*
* This is a new gap. Initialize the number of
* records that we should wait before requesting
* that it be resent. We grab the limits out of
* the rep without the mutex.
*/
lp->wait_recs = rep->request_gap;
lp->rcvd_recs = 0;
ZERO_LSN(lp->max_wait_lsn);
}
if (__rep_check_doreq(dbenv, rep))
__rep_loggap_req(dbenv, rep, &rp->lsn, 0);
ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE);
rep->stat.st_log_queued++;
rep->stat.st_log_queued_total++;
if (rep->stat.st_log_queued_max < rep->stat.st_log_queued)
rep->stat.st_log_queued_max = rep->stat.st_log_queued;
if (ret == DB_KEYEXIST)
ret = 0;
if (ret != 0)
goto done;
if (IS_ZERO_LSN(lp->waiting_lsn) ||
log_compare(&rp->lsn, &lp->waiting_lsn) < 0)
lp->waiting_lsn = rp->lsn;
/*
* If this is permanent; let the caller know that we have
* not yet written it to disk, but we've accepted it.
*/
if (ret == 0 && F_ISSET(rp, DB_LOG_PERM)) {
max_lsn = rp->lsn;
ret = DB_REP_NOTPERM;
}
goto done;
} else {
/*
* We may miscount if we race, since we
* don't currently hold the rep mutex.
*/
rep->stat.st_log_duplicated++;
if (is_dupp != NULL)
*is_dupp = 1;
if (F_ISSET(rp, DB_LOG_PERM))
max_lsn = lp->max_perm_lsn;
goto done;
}
/* Check if we need to go back into the table. */
if (ret == 0 && log_compare(&lp->ready_lsn, &lp->waiting_lsn) == 0)
goto gap_check;
done:
err: /* Check if we need to go back into the table. */
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (ret == 0 &&
F_ISSET(rep, REP_F_RECOVER_LOG) &&
log_compare(&lp->ready_lsn, &rep->last_lsn) >= 0) {
rep->last_lsn = max_lsn;
ZERO_LSN(max_lsn);
ret = DB_REP_LOGREADY;
}
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (ret == 0 && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
!IS_ZERO_LSN(max_lsn)) {
if (ret_lsnp != NULL)
*ret_lsnp = max_lsn;
ret = DB_REP_ISPERM;
DB_ASSERT(log_compare(&max_lsn, &lp->max_perm_lsn) >= 0);
lp->max_perm_lsn = max_lsn;
}
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
/*
* Startup is complete when we process our first live record. However,
* we want to return DB_REP_STARTUPDONE on the first record we can --
* but other return values trump this one. We know we've processed at
* least one record when rectype is non-zero.
*/
if (ret == 0 && !F_ISSET(rp, DB_LOG_RESEND) &&
rectype != 0 && rep->stat.st_startup_complete == 0) {
rep->stat.st_startup_complete = 1;
ret = DB_REP_STARTUPDONE;
}
if (ret == 0 && rp->rectype == REP_NEWFILE && lp->db_log_autoremove)
__log_autoremove(dbenv);
if (control_dbt.data != NULL)
__os_ufree(dbenv, control_dbt.data);
if (rec_dbt.data != NULL)
__os_ufree(dbenv, rec_dbt.data);
if (ret == DB_REP_NOTPERM && !F_ISSET(rep, REP_F_RECOVER_LOG) &&
!IS_ZERO_LSN(max_lsn) && ret_lsnp != NULL)
*ret_lsnp = max_lsn;
#ifdef DIAGNOSTIC
if (ret == DB_REP_ISPERM)
RPRINT(dbenv, rep, (dbenv, &mb, "Returning ISPERM [%lu][%lu]",
(u_long)max_lsn.file, (u_long)max_lsn.offset));
else if (ret == DB_REP_LOGREADY)
RPRINT(dbenv, rep, (dbenv, &mb,
"Returning LOGREADY up to [%lu][%lu]",
(u_long)rep->last_lsn.file,
(u_long)rep->last_lsn.offset));
else if (ret == DB_REP_NOTPERM)
RPRINT(dbenv, rep, (dbenv, &mb, "Returning NOTPERM [%lu][%lu]",
(u_long)max_lsn.file, (u_long)max_lsn.offset));
else if (ret == DB_REP_STARTUPDONE)
RPRINT(dbenv, rep, (dbenv, &mb,
"Returning STARTUPDONE [%lu][%lu]",
(u_long)rp->lsn.file, (u_long)rp->lsn.offset));
else if (ret != 0)
RPRINT(dbenv, rep, (dbenv, &mb, "Returning %d [%lu][%lu]", ret,
(u_long)max_lsn.file, (u_long)max_lsn.offset));
#endif
return (ret);
}
/*
* __rep_process_txn --
*
* This is the routine that actually gets a transaction ready for
* processing.
*
* PUBLIC: int __rep_process_txn __P((DB_ENV *, DBT *));
*/
int
__rep_process_txn(dbenv, rec)
DB_ENV *dbenv;
DBT *rec;
{
DBT data_dbt, *lock_dbt;
DB_LOCKREQ req, *lvp;
DB_LOGC *logc;
DB_LSN prev_lsn, *lsnp;
DB_REP *db_rep;
LSN_COLLECTION lc;
REP *rep;
__txn_regop_args *txn_args;
__txn_xa_regop_args *prep_args;
u_int32_t lockid, rectype;
u_int i;
int ret, t_ret;
void *txninfo;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
logc = NULL;
txn_args = NULL;
prep_args = NULL;
txninfo = NULL;
memset(&data_dbt, 0, sizeof(data_dbt));
if (F_ISSET(dbenv, DB_ENV_THREAD))
F_SET(&data_dbt, DB_DBT_REALLOC);
/*
* There are two phases: First, we have to traverse backwards through
* the log records gathering the list of all LSNs in the transaction.
* Once we have this information, we can loop through and then apply it.
*
* We may be passed a prepare (if we're restoring a prepare on upgrade)
* instead of a commit (the common case). Check which it is and behave
* appropriately.
*/
memcpy(&rectype, rec->data, sizeof(rectype));
memset(&lc, 0, sizeof(lc));
if (rectype == DB___txn_regop) {
/*
* We're the end of a transaction. Make sure this is
* really a commit and not an abort!
*/
if ((ret = __txn_regop_read(dbenv, rec->data, &txn_args)) != 0)
return (ret);
if (txn_args->opcode != TXN_COMMIT) {
__os_free(dbenv, txn_args);
return (0);
}
prev_lsn = txn_args->prev_lsn;
lock_dbt = &txn_args->locks;
} else {
/* We're a prepare. */
DB_ASSERT(rectype == DB___txn_xa_regop);
if ((ret =
__txn_xa_regop_read(dbenv, rec->data, &prep_args)) != 0)
return (ret);
prev_lsn = prep_args->prev_lsn;
lock_dbt = &prep_args->locks;
}
/* Get locks. */
if ((ret = __lock_id(dbenv, &lockid)) != 0)
goto err1;
if ((ret =
__lock_get_list(dbenv, lockid, 0, DB_LOCK_WRITE, lock_dbt)) != 0)
goto err;
/* Phase 1. Get a list of the LSNs in this transaction, and sort it. */
if ((ret = __rep_collect_txn(dbenv, &prev_lsn, &lc)) != 0)
goto err;
qsort(lc.array, lc.nlsns, sizeof(DB_LSN), __rep_lsn_cmp);
/*
* The set of records for a transaction may include dbreg_register
* records. Create a txnlist so that they can keep track of file
* state between records.
*/
if ((ret = __db_txnlist_init(dbenv, 0, 0, NULL, &txninfo)) != 0)
goto err;
/* Phase 2: Apply updates. */
if ((ret = __log_cursor(dbenv, &logc)) != 0)
goto err;
for (lsnp = &lc.array[0], i = 0; i < lc.nlsns; i++, lsnp++) {
if ((ret = __log_c_get(logc, lsnp, &data_dbt, DB_SET)) != 0) {
__db_err(dbenv, "failed to read the log at [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
if ((ret = __db_dispatch(dbenv, dbenv->recover_dtab,
dbenv->recover_dtab_size, &data_dbt, lsnp,
DB_TXN_APPLY, txninfo)) != 0) {
__db_err(dbenv, "transaction failed at [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
goto err;
}
}
err: memset(&req, 0, sizeof(req));
req.op = DB_LOCK_PUT_ALL;
if ((t_ret =
__lock_vec(dbenv, lockid, 0, &req, 1, &lvp)) != 0 && ret == 0)
ret = t_ret;
if ((t_ret = __lock_id_free(dbenv, lockid)) != 0 && ret == 0)
ret = t_ret;
err1: if (txn_args != NULL)
__os_free(dbenv, txn_args);
if (prep_args != NULL)
__os_free(dbenv, prep_args);
if (lc.array != NULL)
__os_free(dbenv, lc.array);
if (logc != NULL && (t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (txninfo != NULL)
__db_txnlist_end(dbenv, txninfo);
if (F_ISSET(&data_dbt, DB_DBT_REALLOC) && data_dbt.data != NULL)
__os_ufree(dbenv, data_dbt.data);
if (ret == 0)
/*
* We don't hold the rep mutex, and could miscount if we race.
*/
rep->stat.st_txns_applied++;
return (ret);
}
/*
* __rep_collect_txn
* Recursive function that will let us visit every entry in a transaction
* chain including all child transactions so that we can then apply
* the entire transaction family at once.
*/
static int
__rep_collect_txn(dbenv, lsnp, lc)
DB_ENV *dbenv;
DB_LSN *lsnp;
LSN_COLLECTION *lc;
{
__txn_child_args *argp;
DB_LOGC *logc;
DB_LSN c_lsn;
DBT data;
u_int32_t rectype;
u_int nalloc;
int ret, t_ret;
memset(&data, 0, sizeof(data));
F_SET(&data, DB_DBT_REALLOC);
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
while (!IS_ZERO_LSN(*lsnp) &&
(ret = __log_c_get(logc, lsnp, &data, DB_SET)) == 0) {
memcpy(&rectype, data.data, sizeof(rectype));
if (rectype == DB___txn_child) {
if ((ret = __txn_child_read(dbenv,
data.data, &argp)) != 0)
goto err;
c_lsn = argp->c_lsn;
*lsnp = argp->prev_lsn;
__os_free(dbenv, argp);
ret = __rep_collect_txn(dbenv, &c_lsn, lc);
} else {
if (lc->nalloc < lc->nlsns + 1) {
nalloc = lc->nalloc == 0 ? 20 : lc->nalloc * 2;
if ((ret = __os_realloc(dbenv,
nalloc * sizeof(DB_LSN), &lc->array)) != 0)
goto err;
lc->nalloc = nalloc;
}
lc->array[lc->nlsns++] = *lsnp;
/*
* Explicitly copy the previous lsn. The record
* starts with a u_int32_t record type, a u_int32_t
* txn id, and then the DB_LSN (prev_lsn) that we
* want. We copy explicitly because we have no idea
* what kind of record this is.
*/
memcpy(lsnp, (u_int8_t *)data.data +
sizeof(u_int32_t) + sizeof(u_int32_t),
sizeof(DB_LSN));
}
if (ret != 0)
goto err;
}
if (ret != 0)
__db_err(dbenv, "collect failed at: [%lu][%lu]",
(u_long)lsnp->file, (u_long)lsnp->offset);
err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
if (data.data != NULL)
__os_ufree(dbenv, data.data);
return (ret);
}
/*
* __rep_lsn_cmp --
* qsort-type-compatible wrapper for log_compare.
*/
static int
__rep_lsn_cmp(lsn1, lsn2)
const void *lsn1, *lsn2;
{
return (log_compare((DB_LSN *)lsn1, (DB_LSN *)lsn2));
}
/*
* __rep_newfile --
* NEWFILE messages have the LSN of the last record in the previous
* log file. When applying a NEWFILE message, make sure we haven't already
* swapped files.
*/
static int
__rep_newfile(dbenv, rc, lsnp)
DB_ENV *dbenv;
REP_CONTROL *rc;
DB_LSN *lsnp;
{
DB_LOG *dblp;
LOG *lp;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
if (rc->lsn.file + 1 > lp->lsn.file)
return (__log_newfile(dblp, lsnp, 0));
else {
/* We've already applied this NEWFILE. Just ignore it. */
*lsnp = lp->lsn;
return (0);
}
}
/*
* __rep_tally --
* PUBLIC: int __rep_tally __P((DB_ENV *, REP *, int, int *,
* PUBLIC: u_int32_t, roff_t));
*
* Handle incoming vote1 message on a client. Called with the db_rep
* mutex held. This function will return 0 if we successfully tally
* the vote and non-zero if the vote is ignored. This will record
* both VOTE1 and VOTE2 records, depending on which region offset the
* caller passed in.
*/
int
__rep_tally(dbenv, rep, eid, countp, egen, vtoff)
DB_ENV *dbenv;
REP *rep;
int eid, *countp;
u_int32_t egen;
roff_t vtoff;
{
REP_VTALLY *tally, *vtp;
int i;
#ifdef DIAGNOSTIC
DB_MSGBUF mb;
#else
COMPQUIET(rep, NULL);
#endif
tally = R_ADDR((REGINFO *)dbenv->reginfo, vtoff);
i = 0;
vtp = &tally[i];
while (i < *countp) {
/*
* Ignore votes from earlier elections (i.e. we've heard
* from this site in this election, but its vote from an
* earlier election got delayed and we received it now).
* However, if we happened to hear from an earlier vote
* and we recorded it and we're now hearing from a later
* election we want to keep the updated one. Note that
* updating the entry will not increase the count.
* Also ignore votes that are duplicates.
*/
if (vtp->eid == eid) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Tally found[%d] (%d, %lu), this vote (%d, %lu)",
i, vtp->eid, (u_long)vtp->egen,
eid, (u_long)egen));
if (vtp->egen >= egen)
return (1);
else {
vtp->egen = egen;
return (0);
}
}
i++;
vtp = &tally[i];
}
/*
* If we get here, we have a new voter we haven't
* seen before. Tally this vote.
*/
#ifdef DIAGNOSTIC
if (vtoff == rep->tally_off)
RPRINT(dbenv, rep, (dbenv, &mb, "Tallying VOTE1[%d] (%d, %lu)",
i, eid, (u_long)egen));
else
RPRINT(dbenv, rep, (dbenv, &mb, "Tallying VOTE2[%d] (%d, %lu)",
i, eid, (u_long)egen));
#endif
vtp->eid = eid;
vtp->egen = egen;
(*countp)++;
return (0);
}
/*
* __rep_cmp_vote --
* PUBLIC: void __rep_cmp_vote __P((DB_ENV *, REP *, int *, DB_LSN *,
* PUBLIC: int, u_int32_t, u_int32_t));
*
* Compare incoming vote1 message on a client. Called with the db_rep
* mutex held.
*/
void
__rep_cmp_vote(dbenv, rep, eidp, lsnp, priority, gen, tiebreaker)
DB_ENV *dbenv;
REP *rep;
int *eidp;
DB_LSN *lsnp;
int priority;
u_int32_t gen, tiebreaker;
{
int cmp;
#ifdef DIAGNOSTIC
DB_MSGBUF mb;
#else
COMPQUIET(dbenv, NULL);
#endif
cmp = log_compare(lsnp, &rep->w_lsn);
/*
* If we've seen more than one, compare us to the best so far.
* If we're the first, make ourselves the winner to start.
*/
if (rep->sites > 1 && priority != 0) {
/*
* LSN is primary determinant. Then priority if LSNs
* are equal, then tiebreaker if both are equal.
*/
if (cmp > 0 ||
(cmp == 0 && (priority > rep->w_priority ||
(priority == rep->w_priority &&
(tiebreaker > rep->w_tiebreaker))))) {
RPRINT(dbenv, rep, (dbenv, &mb, "Accepting new vote"));
rep->winner = *eidp;
rep->w_priority = priority;
rep->w_lsn = *lsnp;
rep->w_gen = gen;
rep->w_tiebreaker = tiebreaker;
}
} else if (rep->sites == 1) {
if (priority != 0) {
/* Make ourselves the winner to start. */
rep->winner = *eidp;
rep->w_priority = priority;
rep->w_gen = gen;
rep->w_lsn = *lsnp;
rep->w_tiebreaker = tiebreaker;
} else {
rep->winner = DB_EID_INVALID;
rep->w_priority = 0;
rep->w_gen = 0;
ZERO_LSN(rep->w_lsn);
rep->w_tiebreaker = 0;
}
}
return;
}
/*
* __rep_cmp_vote2 --
* PUBLIC: int __rep_cmp_vote2 __P((DB_ENV *, REP *, int, u_int32_t));
*
* Compare incoming vote2 message with vote1's we've recorded. Called
* with the db_rep mutex held. We return 0 if the VOTE2 is from a
* site we've heard from and it is from this election. Otherwise we return 1.
*/
int
__rep_cmp_vote2(dbenv, rep, eid, egen)
DB_ENV *dbenv;
REP *rep;
int eid;
u_int32_t egen;
{
int i;
REP_VTALLY *tally, *vtp;
#ifdef DIAGNOSTIC
DB_MSGBUF mb;
#endif
tally = R_ADDR((REGINFO *)dbenv->reginfo, rep->tally_off);
i = 0;
vtp = &tally[i];
for (i = 0; i < rep->sites; i++) {
vtp = &tally[i];
if (vtp->eid == eid && vtp->egen == egen) {
RPRINT(dbenv, rep, (dbenv, &mb,
"Found matching vote1 (%d, %lu), at %d of %d",
eid, (u_long)egen, i, rep->sites));
return (0);
}
}
RPRINT(dbenv, rep,
(dbenv, &mb, "Didn't find vote1 for eid %d, egen %lu",
eid, (u_long)egen));
return (1);
}
static int
__rep_dorecovery(dbenv, lsnp, trunclsnp)
DB_ENV *dbenv;
DB_LSN *lsnp, *trunclsnp;
{
DB_LSN lsn;
DB_REP *db_rep;
DBT mylog;
DB_LOGC *logc;
int ret, t_ret, update;
u_int32_t rectype;
__txn_regop_args *txnrec;
db_rep = dbenv->rep_handle;
/* Figure out if we are backing out any committed transactions. */
if ((ret = __log_cursor(dbenv, &logc)) != 0)
return (ret);
memset(&mylog, 0, sizeof(mylog));
update = 0;
while (update == 0 &&
(ret = __log_c_get(logc, &lsn, &mylog, DB_PREV)) == 0 &&
log_compare(&lsn, lsnp) > 0) {
memcpy(&rectype, mylog.data, sizeof(rectype));
if (rectype == DB___txn_regop) {
if ((ret =
__txn_regop_read(dbenv, mylog.data, &txnrec)) != 0)
goto err;
if (txnrec->opcode != TXN_ABORT)
update = 1;
__os_free(dbenv, txnrec);
}
}
/*
* If we successfully run recovery, we've opened all the necessary
* files. We are guaranteed to be single-threaded here, so no mutex
* is necessary.
*/
if ((ret = __db_apprec(dbenv, lsnp, trunclsnp, update, 0)) == 0)
F_SET(db_rep, DBREP_OPENFILES);
err: if ((t_ret = __log_c_close(logc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
/*
* __rep_verify_match --
* We have just received a matching log record during verification.
* Figure out if we're going to need to run recovery. If so, wait until
* everything else has exited the library. If not, set up the world
* correctly and move forward.
*/
static int
__rep_verify_match(dbenv, reclsnp, savetime)
DB_ENV *dbenv;
DB_LSN *reclsnp;
time_t savetime;
{
DB_LOG *dblp;
DB_LSN trunclsn;
DB_REP *db_rep;
LOG *lp;
REGENV *renv;
REGINFO *infop;
REP *rep;
int done, master, ret;
u_int32_t unused;
dblp = dbenv->lg_handle;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
lp = dblp->reginfo.primary;
ret = 0;
infop = dbenv->reginfo;
renv = infop->primary;
/*
* Check if the savetime is different than our current time stamp.
* If it is, then we're racing with another thread trying to recover
* and we lost. We must give up.
*/
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
done = savetime != renv->rep_timestamp;
if (done) {
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
return (0);
}
ZERO_LSN(lp->verify_lsn);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
/*
* Make sure the world hasn't changed while we tried to get
* the lock. If it hasn't then it's time for us to kick all
* operations out of DB and run recovery.
*/
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
if (!F_ISSET(rep, REP_F_RECOVER_LOG) &&
(F_ISSET(rep, REP_F_READY) || rep->in_recovery != 0)) {
rep->stat.st_msgs_recover++;
goto errunlock;
}
__rep_lockout(dbenv, db_rep, rep, 1);
/* OK, everyone is out, we can now run recovery. */
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if ((ret = __rep_dorecovery(dbenv, reclsnp, &trunclsn)) != 0) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
rep->in_recovery = 0;
F_CLR(rep, REP_F_READY);
goto errunlock;
}
/*
* The log has been truncated (either directly by us or by __db_apprec)
* We want to make sure we're waiting for the LSN at the new end-of-log,
* not some later point.
*/
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lp->ready_lsn = trunclsn;
ZERO_LSN(lp->waiting_lsn);
ZERO_LSN(lp->max_wait_lsn);
lp->max_perm_lsn = *reclsnp;
lp->wait_recs = 0;
lp->rcvd_recs = 0;
ZERO_LSN(lp->verify_lsn);
/*
* Discard any log records we have queued; we're about to re-request
* them, and can't trust the ones in the queue. We need to set the
* DB_AM_RECOVER bit in this handle, so that the operation doesn't
* deadlock.
*/
F_SET(db_rep->rep_db, DB_AM_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
ret = __db_truncate(db_rep->rep_db, NULL, &unused);
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
F_CLR(db_rep->rep_db, DB_AM_RECOVER);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
rep->stat.st_log_queued = 0;
rep->in_recovery = 0;
F_CLR(rep, REP_F_NOARCHIVE | REP_F_RECOVER_MASK);
if (ret != 0)
goto errunlock;
/*
* If the master_id is invalid, this means that since
* the last record was sent, somebody declared an
* election and we may not have a master to request
* things of.
*
* This is not an error; when we find a new master,
* we'll re-negotiate where the end of the log is and
* try to bring ourselves up to date again anyway.
*
* !!!
* We cannot assert the election flags though because
* somebody may have declared an election and then
* got an error, thus clearing the election flags
* but we still have an invalid master_id.
*/
master = rep->master_id;
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
if (master == DB_EID_INVALID)
ret = 0;
else
(void)__rep_send_message(dbenv,
master, REP_ALL_REQ, reclsnp, NULL, 0);
if (0) {
errunlock:
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
}
return (ret);
}
/*
* __rep_do_ckp --
* Perform the memp_sync necessary for this checkpoint without holding
* the db_rep->db_mutexp. All callers of this function must hold the
* db_rep->db_mutexp and must not be holding the db_rep->rep_mutexp.
*/
static int
__rep_do_ckp(dbenv, rec, rp)
DB_ENV *dbenv;
DBT *rec;
REP_CONTROL *rp;
{
DB_LSN ckp_lsn;
DB_REP *db_rep;
int ret;
db_rep = dbenv->rep_handle;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
DB_TEST_CHECKPOINT(dbenv, dbenv->test_check);
/* Sync the memory pool. */
memcpy(&ckp_lsn, (u_int8_t *)rec->data +
SSZ(__txn_ckp_args, ckp_lsn), sizeof(DB_LSN));
ret = __memp_sync(dbenv, &ckp_lsn);
/* Update the last_ckp in the txn region. */
if (ret == 0)
__txn_updateckp(dbenv, &rp->lsn);
else {
__db_err(dbenv, "Error syncing ckp [%lu][%lu]",
(u_long)ckp_lsn.file, (u_long)ckp_lsn.offset);
ret = __db_panic(dbenv, ret);
}
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
return (ret);
}
/*
* __rep_remfirst --
* Remove the first entry from the __db.rep.db
*/
static int
__rep_remfirst(dbenv, cntrl, rec)
DB_ENV *dbenv;
DBT *cntrl;
DBT *rec;
{
DB *dbp;
DBC *dbc;
DB_REP *db_rep;
int ret, t_ret;
u_int32_t rectype;
db_rep = dbenv->rep_handle;
dbp = db_rep->rep_db;
if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
return (ret);
/* The DBTs need to persist through another call. */
F_SET(cntrl, DB_DBT_REALLOC);
F_SET(rec, DB_DBT_REALLOC);
if ((ret = __db_c_get(dbc, cntrl, rec, DB_RMW | DB_FIRST)) == 0) {
memcpy(&rectype, rec->data, sizeof(rectype));
ret = __db_c_del(dbc, 0);
}
if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
/*
* __rep_getnext --
* Get the next record out of the __db.rep.db table.
*/
static int
__rep_getnext(dbenv)
DB_ENV *dbenv;
{
DB *dbp;
DB_REP *db_rep;
DB_LOG *dblp;
DBC *dbc;
DBT lsn_dbt, nextrec_dbt;
LOG *lp;
REP_CONTROL *rp;
int ret, t_ret;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
db_rep = dbenv->rep_handle;
dbp = db_rep->rep_db;
if ((ret = __db_cursor(dbp, NULL, &dbc, 0)) != 0)
return (ret);
/*
* Update waiting_lsn. We need to move it
* forward to the LSN of the next record
* in the queue.
*
* If the next item in the database is a log
* record--the common case--we're not
* interested in its contents, just in its LSN.
* Optimize by doing a partial get of the data item.
*/
memset(&nextrec_dbt, 0, sizeof(nextrec_dbt));
F_SET(&nextrec_dbt, DB_DBT_PARTIAL);
nextrec_dbt.ulen = nextrec_dbt.dlen = 0;
memset(&lsn_dbt, 0, sizeof(lsn_dbt));
ret = __db_c_get(dbc, &lsn_dbt, &nextrec_dbt, DB_FIRST);
if (ret != DB_NOTFOUND && ret != 0)
goto err;
if (ret == DB_NOTFOUND) {
ZERO_LSN(lp->waiting_lsn);
/*
* Whether or not the current record is
* simple, there's no next one, and
* therefore we haven't got anything
* else to do right now. Break out.
*/
goto err;
}
rp = (REP_CONTROL *)lsn_dbt.data;
lp->waiting_lsn = rp->lsn;
err: if ((t_ret = __db_c_close(dbc)) != 0 && ret == 0)
ret = t_ret;
return (ret);
}
/*
* __rep_process_rec --
*
* Given a record in 'rp', process it. In the case of a NEWFILE, that means
* potentially switching files. In the case of a checkpoint, it means doing
* the checkpoint, and in other cases, it means simply writing the record into
* the log.
*/
static int
__rep_process_rec(dbenv, rp, rec, typep, ret_lsnp)
DB_ENV *dbenv;
REP_CONTROL *rp;
DBT *rec;
u_int32_t *typep;
DB_LSN *ret_lsnp;
{
DB *dbp;
DB_LOG *dblp;
DB_REP *db_rep;
DBT control_dbt, key_dbt, rec_dbt;
LOG *lp;
REP *rep;
u_int32_t txnid;
int ret, t_ret;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dbp = db_rep->rep_db;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
ret = 0;
if (rp->rectype == REP_NEWFILE) {
ret = __rep_newfile(dbenv, rp, &lp->ready_lsn);
/* Make this evaluate to a simple rectype. */
*typep = 0;
return (0);
}
memcpy(typep, rec->data, sizeof(*typep));
memset(&control_dbt, 0, sizeof(control_dbt));
memset(&rec_dbt, 0, sizeof(rec_dbt));
/*
* We write all records except for checkpoint records here.
* All non-checkpoint records need to appear in the log before
* we take action upon them (i.e., we enforce write-ahead logging).
* However, we can't write the checkpoint record here until the
* data buffers are actually written to disk, else we are creating
* an invalid log -- one that says all data before a certain point
* has been written to disk.
*
* If two threads are both processing the same checkpoint record
* (because, for example, it was resent and the original finally
* arrived), we handle that below by checking for the existence of
* the log record when we add it to the replication database.
*
* Any log records that arrive while we are processing the checkpoint
* are added to the bookkeeping database because ready_lsn is not yet
* updated to point after the checkpoint record.
*/
if (*typep != DB___txn_ckp || F_ISSET(rep, REP_F_RECOVER_LOG)) {
if ((ret = __log_rep_put(dbenv, &rp->lsn, rec)) != 0)
return (ret);
rep->stat.st_log_records++;
if (F_ISSET(rep, REP_F_RECOVER_LOG)) {
*ret_lsnp = rp->lsn;
goto out;
}
}
switch (*typep) {
case DB___dbreg_register:
/*
* DB opens occur in the context of a transaction, so we can
* simply handle them when we process the transaction. Closes,
* however, are not transaction-protected, so we have to
* handle them here.
*
* Note that it should be unsafe for the master to do a close
* of a file that was opened in an active transaction, so we
* should be guaranteed to get the ordering right.
*/
memcpy(&txnid, (u_int8_t *)rec->data +
SSZ(__dbreg_register_args, txnid), sizeof(u_int32_t));
if (txnid == TXN_INVALID)
ret = __db_dispatch(dbenv, dbenv->recover_dtab,
dbenv->recover_dtab_size, rec, &rp->lsn,
DB_TXN_APPLY, NULL);
break;
case DB___txn_regop:
/*
* If an application is doing app-specific recovery
* and acquires locks while applying a transaction,
* it can deadlock. Any other locks held by this
* thread should have been discarded in the
* __rep_process_txn error path, so if we simply
* retry, we should eventually succeed.
*/
do {
ret = 0;
if (!F_ISSET(db_rep, DBREP_OPENFILES)) {
ret = __txn_openfiles(dbenv, NULL, 1);
F_SET(db_rep, DBREP_OPENFILES);
}
if (ret == 0)
ret = __rep_process_txn(dbenv, rec);
} while (ret == DB_LOCK_DEADLOCK);
/* Now flush the log unless we're running TXN_NOSYNC. */
if (ret == 0 && !F_ISSET(dbenv, DB_ENV_TXN_NOSYNC))
ret = __log_flush(dbenv, NULL);
if (ret != 0) {
__db_err(dbenv, "Error processing txn [%lu][%lu]",
(u_long)rp->lsn.file, (u_long)rp->lsn.offset);
ret = __db_panic(dbenv, ret);
}
break;
case DB___txn_xa_regop:
ret = __log_flush(dbenv, NULL);
break;
case DB___txn_ckp:
/*
* We do not want to hold the db_rep->db_mutexp
* mutex while syncing the mpool, so if we get
* a checkpoint record that we are supposed to
* process, we add it to the __db.rep.db, do
* the memp_sync and then go back and process
* it later, when the sync has finished. If
* this record is already in the table, then
* some other thread will process it, so simply
* return REP_NOTPERM;
*/
memset(&key_dbt, 0, sizeof(key_dbt));
key_dbt.data = rp;
key_dbt.size = sizeof(*rp);
/*
* We want to put this record into the tmp DB only if
* it doesn't exist, so use DB_NOOVERWRITE.
*/
ret = __db_put(dbp, NULL, &key_dbt, rec, DB_NOOVERWRITE);
if (ret == DB_KEYEXIST) {
if (ret_lsnp != NULL)
*ret_lsnp = rp->lsn;
ret = DB_REP_NOTPERM;
}
if (ret != 0)
break;
/*
* Now, do the checkpoint. Regardless of
* whether the checkpoint succeeds or not,
* we need to remove the record we just put
* in the temporary database. If the
* checkpoint failed, return an error. We
* will act like we never received the
* checkpoint.
*/
if ((ret = __rep_do_ckp(dbenv, rec, rp)) == 0)
ret = __log_rep_put(dbenv, &rp->lsn, rec);
if ((t_ret = __rep_remfirst(dbenv,
&control_dbt, &rec_dbt)) != 0 && ret == 0)
ret = t_ret;
break;
default:
break;
}
out:
if (ret == 0 && F_ISSET(rp, DB_LOG_PERM))
*ret_lsnp = rp->lsn;
if (control_dbt.data != NULL)
__os_ufree(dbenv, control_dbt.data);
if (rec_dbt.data != NULL)
__os_ufree(dbenv, rec_dbt.data);
return (ret);
}
/*
* __rep_resend_req --
* We might have dropped a message, we need to resend our request.
* The request we send is dependent on what recovery state we're in.
* The caller holds no locks.
*/
static int
__rep_resend_req(dbenv, eid)
DB_ENV *dbenv;
int eid;
{
DB_LOG *dblp;
DB_LSN lsn;
DB_REP *db_rep;
LOG *lp;
REP *rep;
int ret;
u_int32_t repflags;
ret = 0;
db_rep = dbenv->rep_handle;
rep = db_rep->region;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
repflags = rep->flags;
if (FLD_ISSET(repflags, REP_F_RECOVER_VERIFY)) {
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
lsn = lp->verify_lsn;
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
if (!IS_ZERO_LSN(lsn))
(void)__rep_send_message(dbenv, eid,
REP_VERIFY_REQ, &lsn, NULL, 0);
goto out;
} else if (FLD_ISSET(repflags, REP_F_RECOVER_UPDATE)) {
(void)__rep_send_message(dbenv, eid,
REP_UPDATE_REQ, NULL, NULL, 0);
} else if (FLD_ISSET(repflags, REP_F_RECOVER_PAGE)) {
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
ret = __rep_pggap_req(dbenv, rep, NULL, 0);
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
} else if (FLD_ISSET(repflags, REP_F_RECOVER_LOG)) {
MUTEX_LOCK(dbenv, db_rep->db_mutexp);
__rep_loggap_req(dbenv, rep, NULL, 0);
MUTEX_UNLOCK(dbenv, db_rep->db_mutexp);
}
out:
return (ret);
}
/*
* __rep_check_doreq --
* PUBLIC: int __rep_check_doreq __P((DB_ENV *, REP *));
*
* Check if we need to send another request. If so, compare with
* the request limits the user might have set. This assumes the
* caller holds the db_rep->db_mutexp mutex. Returns 1 if a request
* needs to be made, and 0 if it does not.
*/
int
__rep_check_doreq(dbenv, rep)
DB_ENV *dbenv;
REP *rep;
{
DB_LOG *dblp;
LOG *lp;
int req;
dblp = dbenv->lg_handle;
lp = dblp->reginfo.primary;
req = ++lp->rcvd_recs >= lp->wait_recs;
if (req) {
lp->wait_recs *= 2;
if (lp->wait_recs > rep->max_gap)
lp->wait_recs = rep->max_gap;
lp->rcvd_recs = 0;
}
return (req);
}
/*
* __rep_lockout --
* PUBLIC: void __rep_lockout __P((DB_ENV *, DB_REP *, REP *, u_int32_t));
*
* Coordinate with other threads in the library and active txns so
* that we can run single-threaded, for recovery or internal backup.
* Assumes the caller holds rep_mutexp.
*/
void
__rep_lockout(dbenv, db_rep, rep, msg_th)
DB_ENV *dbenv;
DB_REP *db_rep;
REP *rep;
u_int32_t msg_th;
{
int wait_cnt;
/* Phase 1: set REP_F_READY and wait for op_cnt to go to 0. */
F_SET(rep, REP_F_READY);
for (wait_cnt = 0; rep->op_cnt != 0;) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
__os_sleep(dbenv, 1, 0);
#ifdef DIAGNOSTIC
if (++wait_cnt % 60 == 0)
__db_err(dbenv,
"Waiting for txn_cnt to run replication recovery/backup for %d minutes",
wait_cnt / 60);
#endif
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
}
/*
* Phase 2: set in_recovery and wait for handle count to go
* to 0 and for the number of threads in __rep_process_message
* to go to 1 (us).
*/
rep->in_recovery = 1;
for (wait_cnt = 0; rep->handle_cnt != 0 || rep->msg_th > msg_th;) {
MUTEX_UNLOCK(dbenv, db_rep->rep_mutexp);
__os_sleep(dbenv, 1, 0);
#ifdef DIAGNOSTIC
if (++wait_cnt % 60 == 0)
__db_err(dbenv,
"Waiting for handle count to run replication recovery/backup for %d minutes",
wait_cnt / 60);
#endif
MUTEX_LOCK(dbenv, db_rep->rep_mutexp);
}
}
syntax highlighted by Code2HTML, v. 0.9.1