forked from StoneAtom/StoneDB
1062 lines
27 KiB
C++
1062 lines
27 KiB
C++
/* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software Foundation,
|
|
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
|
|
|
|
#include "my_global.h"
|
|
#include "log.h"
|
|
#include "rpl_channel_service_interface.h"
|
|
#include "rpl_slave.h"
|
|
#include "rpl_info_factory.h"
|
|
#include "rpl_mi.h"
|
|
#include "rpl_msr.h" /* Multisource replication */
|
|
#include "rpl_rli.h"
|
|
#include "rpl_rli_pdb.h"
|
|
#include "mysqld_thd_manager.h" // Global_THD_manager
|
|
#include "sql_parse.h" // Find_thd_with_id
|
|
|
|
/**
|
|
Auxiliary function to stop all the running channel threads according to the
|
|
given mask.
|
|
|
|
@note: The caller shall possess channel_map lock before calling this function,
|
|
and unlock after returning from this function.
|
|
|
|
@param mi The pointer to Master_info instance
|
|
@param threads_to_stop The types of threads to be stopped
|
|
@param timeout The expected time in which the thread should stop
|
|
|
|
@return the operation status
|
|
@retval 0 OK
|
|
@retval !=0 Error
|
|
*/
|
|
int channel_stop(Master_info *mi,
|
|
int threads_to_stop,
|
|
long timeout);
|
|
|
|
|
|
int initialize_channel_service_interface()
|
|
{
|
|
DBUG_ENTER("initialize_channel_service_interface");
|
|
|
|
//master info and relay log repositories must be TABLE
|
|
if (opt_mi_repository_id != INFO_REPOSITORY_TABLE ||
|
|
opt_rli_repository_id != INFO_REPOSITORY_TABLE)
|
|
{
|
|
sql_print_error("For the creation of replication channels the master info"
|
|
" and relay log info repositories must be set to TABLE");
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
//server id must be different from 0
|
|
if (server_id == 0)
|
|
{
|
|
sql_print_error("For the creation of replication channels the server id"
|
|
" must be different from 0");
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
#ifdef HAVE_REPLICATION
|
|
|
|
void set_mi_settings(Master_info *mi, Channel_creation_info* channel_info)
|
|
{
|
|
mysql_mutex_lock(&mi->data_lock);
|
|
|
|
mi->rli->set_thd_tx_priority(channel_info->thd_tx_priority);
|
|
|
|
mi->rli->set_ignore_write_set_memory_limit(
|
|
channel_info->m_ignore_write_set_memory_limit);
|
|
mi->rli->set_allow_drop_write_set(channel_info->m_allow_drop_write_set);
|
|
|
|
mi->rli->replicate_same_server_id=
|
|
(channel_info->replicate_same_server_id == RPL_SERVICE_SERVER_DEFAULT) ?
|
|
replicate_same_server_id : channel_info->replicate_same_server_id;
|
|
|
|
mi->rli->opt_slave_parallel_workers=
|
|
(channel_info->channel_mts_parallel_workers == RPL_SERVICE_SERVER_DEFAULT) ?
|
|
opt_mts_slave_parallel_workers : channel_info->channel_mts_parallel_workers;
|
|
|
|
if (channel_info->channel_mts_parallel_type == RPL_SERVICE_SERVER_DEFAULT)
|
|
{
|
|
if (mts_parallel_option == MTS_PARALLEL_TYPE_DB_NAME)
|
|
mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
|
|
else
|
|
mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
|
|
}
|
|
else
|
|
{
|
|
if (channel_info->channel_mts_parallel_type == CHANNEL_MTS_PARALLEL_TYPE_DB_NAME)
|
|
mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
|
|
else
|
|
mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
|
|
}
|
|
|
|
mi->rli->checkpoint_group=
|
|
(channel_info->channel_mts_checkpoint_group == RPL_SERVICE_SERVER_DEFAULT) ?
|
|
opt_mts_checkpoint_group : channel_info->channel_mts_checkpoint_group;
|
|
|
|
mi->set_mi_description_event(new Format_description_log_event(BINLOG_VERSION));
|
|
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
}
|
|
|
|
bool init_thread_context()
|
|
{
|
|
return my_thread_init();
|
|
}
|
|
|
|
void clean_thread_context()
|
|
{
|
|
my_thread_end();
|
|
}
|
|
|
|
THD *create_surrogate_thread()
|
|
{
|
|
THD *thd= NULL;
|
|
thd= new THD;
|
|
thd->thread_stack= (char*) &thd;
|
|
thd->store_globals();
|
|
thd->security_context()->skip_grants();
|
|
|
|
return(thd);
|
|
}
|
|
|
|
void delete_surrogate_thread(THD *thd)
|
|
{
|
|
thd->release_resources();
|
|
delete thd;
|
|
my_thread_set_THR_THD(NULL);
|
|
}
|
|
|
|
void
|
|
initialize_channel_creation_info(Channel_creation_info* channel_info)
|
|
{
|
|
channel_info->type= SLAVE_REPLICATION_CHANNEL;
|
|
channel_info->hostname= 0;
|
|
channel_info->port= 0;
|
|
channel_info->user= 0;
|
|
channel_info->password= 0;
|
|
channel_info->ssl_info= 0;
|
|
channel_info->auto_position= RPL_SERVICE_SERVER_DEFAULT;
|
|
channel_info->channel_mts_parallel_type= RPL_SERVICE_SERVER_DEFAULT;
|
|
channel_info->channel_mts_parallel_workers= RPL_SERVICE_SERVER_DEFAULT;
|
|
channel_info->channel_mts_checkpoint_group= RPL_SERVICE_SERVER_DEFAULT;
|
|
channel_info->replicate_same_server_id= RPL_SERVICE_SERVER_DEFAULT;
|
|
channel_info->thd_tx_priority= 0;
|
|
channel_info->sql_delay= RPL_SERVICE_SERVER_DEFAULT;
|
|
channel_info->preserve_relay_logs= false;
|
|
channel_info->retry_count= 0;
|
|
channel_info->connect_retry= 0;
|
|
channel_info->m_ignore_write_set_memory_limit = false;
|
|
channel_info->m_allow_drop_write_set = false;
|
|
}
|
|
|
|
void initialize_channel_ssl_info(Channel_ssl_info* channel_ssl_info)
|
|
{
|
|
channel_ssl_info->use_ssl= 0;
|
|
channel_ssl_info->ssl_ca_file_name= 0;
|
|
channel_ssl_info->ssl_ca_directory= 0;
|
|
channel_ssl_info->ssl_cert_file_name= 0;
|
|
channel_ssl_info->ssl_crl_file_name= 0;
|
|
channel_ssl_info->ssl_crl_directory= 0;
|
|
channel_ssl_info->ssl_key= 0;
|
|
channel_ssl_info->ssl_cipher= 0;
|
|
channel_ssl_info->tls_version= 0;
|
|
channel_ssl_info->ssl_verify_server_cert= 0;
|
|
}
|
|
|
|
void
|
|
initialize_channel_connection_info(Channel_connection_info* channel_info)
|
|
{
|
|
channel_info->until_condition= CHANNEL_NO_UNTIL_CONDITION;
|
|
channel_info->gtid= 0;
|
|
channel_info->view_id= 0;
|
|
}
|
|
|
|
void set_mi_ssl_options(LEX_MASTER_INFO* lex_mi, Channel_ssl_info* channel_ssl_info)
|
|
{
|
|
|
|
lex_mi->ssl= (channel_ssl_info->use_ssl) ?
|
|
LEX_MASTER_INFO::LEX_MI_ENABLE :
|
|
LEX_MASTER_INFO::LEX_MI_DISABLE;
|
|
|
|
if (channel_ssl_info->ssl_ca_file_name != NULL)
|
|
{
|
|
lex_mi->ssl_ca= channel_ssl_info->ssl_ca_file_name;
|
|
}
|
|
|
|
if (channel_ssl_info->ssl_ca_directory != NULL)
|
|
{
|
|
lex_mi->ssl_capath= channel_ssl_info->ssl_ca_directory;
|
|
}
|
|
|
|
if (channel_ssl_info->tls_version != NULL)
|
|
{
|
|
lex_mi->tls_version= channel_ssl_info->tls_version;
|
|
}
|
|
|
|
if (channel_ssl_info->ssl_cert_file_name != NULL)
|
|
{
|
|
lex_mi->ssl_cert= channel_ssl_info->ssl_cert_file_name;
|
|
}
|
|
|
|
if (channel_ssl_info->ssl_crl_file_name != NULL)
|
|
{
|
|
lex_mi->ssl_crl= channel_ssl_info->ssl_crl_file_name;
|
|
}
|
|
|
|
if (channel_ssl_info->ssl_crl_directory != NULL)
|
|
{
|
|
lex_mi->ssl_crlpath= channel_ssl_info->ssl_crl_directory;
|
|
}
|
|
|
|
if (channel_ssl_info->ssl_key != NULL)
|
|
{
|
|
lex_mi->ssl_key= channel_ssl_info->ssl_key;
|
|
}
|
|
|
|
if (channel_ssl_info->ssl_cipher != NULL)
|
|
{
|
|
lex_mi->ssl_cipher= channel_ssl_info->ssl_cipher;
|
|
}
|
|
|
|
lex_mi->ssl_verify_server_cert= (channel_ssl_info->ssl_verify_server_cert) ?
|
|
LEX_MASTER_INFO::LEX_MI_ENABLE :
|
|
LEX_MASTER_INFO::LEX_MI_DISABLE;
|
|
}
|
|
|
|
int channel_create(const char* channel,
|
|
Channel_creation_info* channel_info)
|
|
{
|
|
DBUG_ENTER("channel_create");
|
|
|
|
Master_info *mi= NULL;
|
|
int error= 0;
|
|
LEX_MASTER_INFO* lex_mi= NULL;
|
|
|
|
bool thd_created= false;
|
|
THD *thd= current_thd;
|
|
|
|
//Don't create default channels
|
|
if (!strcmp(channel_map.get_default_channel(), channel))
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_DEFAULT_CHANNEL_CREATION_ERROR);
|
|
|
|
/* Service channels are not supposed to use sql_slave_skip_counter */
|
|
mysql_mutex_lock(&LOCK_sql_slave_skip_counter);
|
|
if (sql_slave_skip_counter > 0)
|
|
error= RPL_CHANNEL_SERVICE_SLAVE_SKIP_COUNTER_ACTIVE;
|
|
mysql_mutex_unlock(&LOCK_sql_slave_skip_counter);
|
|
if (error)
|
|
DBUG_RETURN(error);
|
|
|
|
channel_map.wrlock();
|
|
|
|
/* Get the Master_info of the channel */
|
|
mi= channel_map.get_mi(channel);
|
|
|
|
/* create a new channel if doesn't exist */
|
|
if (!mi)
|
|
{
|
|
if ((error= add_new_channel(&mi, channel)))
|
|
goto err;
|
|
}
|
|
|
|
lex_mi= new st_lex_master_info();
|
|
lex_mi->channel= channel;
|
|
lex_mi->host= channel_info->hostname;
|
|
/*
|
|
'group_replication_recovery' channel (*after recovery is done*)
|
|
or 'group_replication_applier' channel wants to set the port number
|
|
to '0' as there is no actual network usage on these channels.
|
|
*/
|
|
lex_mi->port_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
|
|
lex_mi->port= channel_info->port;
|
|
lex_mi->user= channel_info->user;
|
|
lex_mi->password= channel_info->password;
|
|
lex_mi->sql_delay= channel_info->sql_delay;
|
|
lex_mi->connect_retry= channel_info->connect_retry;
|
|
if (channel_info->retry_count)
|
|
{
|
|
lex_mi->retry_count_opt= LEX_MASTER_INFO::LEX_MI_ENABLE;
|
|
lex_mi->retry_count= channel_info->retry_count;
|
|
}
|
|
|
|
if (channel_info->auto_position)
|
|
{
|
|
lex_mi->auto_position= LEX_MASTER_INFO::LEX_MI_ENABLE;
|
|
if ((mi && mi->is_auto_position()) ||
|
|
channel_info->auto_position == RPL_SERVICE_SERVER_DEFAULT)
|
|
{
|
|
//So change master allows new configurations with a running SQL thread
|
|
lex_mi->auto_position= LEX_MASTER_INFO::LEX_MI_UNCHANGED;
|
|
}
|
|
}
|
|
|
|
if (channel_info->ssl_info != NULL)
|
|
{
|
|
set_mi_ssl_options(lex_mi, channel_info->ssl_info);
|
|
}
|
|
|
|
if (mi)
|
|
{
|
|
if (!thd)
|
|
{
|
|
thd_created= true;
|
|
thd= create_surrogate_thread();
|
|
}
|
|
|
|
if ((error= change_master(thd, mi, lex_mi,
|
|
channel_info->preserve_relay_logs)))
|
|
{
|
|
goto err;
|
|
}
|
|
}
|
|
|
|
set_mi_settings(mi, channel_info);
|
|
|
|
if (channel_map.is_group_replication_channel_name(mi->get_channel()))
|
|
{
|
|
thd->variables.max_allowed_packet= slave_max_allowed_packet;
|
|
thd->get_protocol_classic()->set_max_packet_size(
|
|
slave_max_allowed_packet + MAX_LOG_EVENT_HEADER);
|
|
}
|
|
|
|
|
|
err:
|
|
channel_map.unlock();
|
|
|
|
if (thd_created)
|
|
{
|
|
delete_surrogate_thread(thd);
|
|
}
|
|
|
|
delete lex_mi;
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int channel_start(const char* channel,
|
|
Channel_connection_info* connection_info,
|
|
int threads_to_start,
|
|
int wait_for_connection)
|
|
{
|
|
DBUG_ENTER("channel_start(channel, threads_to_start, wait_for_connection");
|
|
int error= 0;
|
|
int thread_mask= 0;
|
|
LEX_MASTER_INFO lex_mi;
|
|
ulong thread_start_id= 0;
|
|
bool thd_created= false;
|
|
THD* thd= current_thd;
|
|
|
|
/* Service channels are not supposed to use sql_slave_skip_counter */
|
|
mysql_mutex_lock(&LOCK_sql_slave_skip_counter);
|
|
if (sql_slave_skip_counter > 0)
|
|
error= RPL_CHANNEL_SERVICE_SLAVE_SKIP_COUNTER_ACTIVE;
|
|
mysql_mutex_unlock(&LOCK_sql_slave_skip_counter);
|
|
if (error)
|
|
DBUG_RETURN(error);
|
|
|
|
channel_map.wrlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
error= RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
|
|
goto err;
|
|
}
|
|
|
|
if (threads_to_start & CHANNEL_APPLIER_THREAD)
|
|
{
|
|
thread_mask |= SLAVE_SQL;
|
|
}
|
|
if (threads_to_start & CHANNEL_RECEIVER_THREAD)
|
|
{
|
|
thread_mask |= SLAVE_IO;
|
|
}
|
|
|
|
//Nothing to be done here
|
|
if (!thread_mask)
|
|
goto err;
|
|
|
|
LEX_SLAVE_CONNECTION lex_connection;
|
|
lex_connection.reset();
|
|
|
|
if (connection_info->until_condition != CHANNEL_NO_UNTIL_CONDITION)
|
|
{
|
|
switch (connection_info->until_condition)
|
|
{
|
|
case CHANNEL_UNTIL_APPLIER_AFTER_GTIDS:
|
|
lex_mi.gtid_until_condition= LEX_MASTER_INFO::UNTIL_SQL_AFTER_GTIDS;
|
|
lex_mi.gtid= connection_info->gtid;
|
|
break;
|
|
case CHANNEL_UNTIL_APPLIER_BEFORE_GTIDS:
|
|
lex_mi.gtid_until_condition= LEX_MASTER_INFO::UNTIL_SQL_BEFORE_GTIDS;
|
|
lex_mi.gtid= connection_info->gtid;
|
|
break;
|
|
case CHANNEL_UNTIL_APPLIER_AFTER_GAPS:
|
|
lex_mi.until_after_gaps= true;
|
|
break;
|
|
case CHANNEL_UNTIL_VIEW_ID:
|
|
assert((thread_mask & SLAVE_SQL) && connection_info->view_id);
|
|
lex_mi.view_id= connection_info->view_id;
|
|
break;
|
|
default:
|
|
assert(0);
|
|
}
|
|
}
|
|
|
|
if (wait_for_connection && (thread_mask & SLAVE_IO))
|
|
thread_start_id= mi->slave_run_id;
|
|
|
|
if (!thd)
|
|
{
|
|
thd_created= true;
|
|
thd= create_surrogate_thread();
|
|
}
|
|
|
|
error= start_slave(thd, &lex_connection, &lex_mi,
|
|
thread_mask, mi, false);
|
|
|
|
if (wait_for_connection && (thread_mask & SLAVE_IO) && !error)
|
|
{
|
|
mysql_mutex_lock(&mi->run_lock);
|
|
/*
|
|
If the ids are still equal this means the start thread method did not
|
|
wait for the thread to start
|
|
*/
|
|
while (thread_start_id == mi->slave_run_id)
|
|
{
|
|
mysql_cond_wait(&mi->start_cond, &mi->run_lock);
|
|
}
|
|
mysql_mutex_unlock(&mi->run_lock);
|
|
|
|
while (mi->slave_running != MYSQL_SLAVE_RUN_CONNECT)
|
|
{
|
|
//If there is such a state change then there was an error on connection
|
|
if (mi->slave_running == MYSQL_SLAVE_NOT_RUN)
|
|
{
|
|
error= RPL_CHANNEL_SERVICE_RECEIVER_CONNECTION_ERROR;
|
|
break;
|
|
}
|
|
my_sleep(100);
|
|
}
|
|
}
|
|
|
|
err:
|
|
channel_map.unlock();
|
|
|
|
if (thd_created)
|
|
{
|
|
delete_surrogate_thread(thd);
|
|
}
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int channel_stop(Master_info *mi,
|
|
int threads_to_stop,
|
|
long timeout)
|
|
{
|
|
DBUG_ENTER("channel_stop(master_info, stop_receiver, stop_applier, timeout");
|
|
|
|
channel_map.assert_some_lock();
|
|
|
|
if (mi == NULL)
|
|
{
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
int thread_mask= 0;
|
|
int server_thd_mask= 0;
|
|
int error= 0;
|
|
bool thd_init= false;
|
|
|
|
mi->channel_wrlock();
|
|
lock_slave_threads(mi);
|
|
|
|
init_thread_mask(&server_thd_mask, mi, 0 /* not inverse*/);
|
|
|
|
if ((threads_to_stop & CHANNEL_APPLIER_THREAD)
|
|
&& (server_thd_mask & SLAVE_SQL))
|
|
{
|
|
thread_mask |= SLAVE_SQL;
|
|
}
|
|
if ((threads_to_stop & CHANNEL_RECEIVER_THREAD)
|
|
&& (server_thd_mask & SLAVE_IO))
|
|
{
|
|
thread_mask |= SLAVE_IO;
|
|
}
|
|
|
|
if (thread_mask == 0)
|
|
{
|
|
goto end;
|
|
}
|
|
|
|
thd_init= init_thread_context();
|
|
|
|
error= terminate_slave_threads(mi, thread_mask, timeout, false);
|
|
|
|
end:
|
|
unlock_slave_threads(mi);
|
|
mi->channel_unlock();
|
|
|
|
if (thd_init)
|
|
{
|
|
clean_thread_context();
|
|
}
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int channel_stop(const char* channel,
|
|
int threads_to_stop,
|
|
long timeout)
|
|
{
|
|
DBUG_ENTER("channel_stop(channel, stop_receiver, stop_applier, timeout");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
int error= channel_stop(mi, threads_to_stop, timeout);
|
|
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int channel_stop_all(int threads_to_stop,
|
|
long timeout)
|
|
{
|
|
DBUG_ENTER("channel_stop_all");
|
|
|
|
int error= 0;
|
|
Master_info *mi= 0;
|
|
|
|
channel_map.rdlock();
|
|
|
|
for (mi_map::iterator it= channel_map.begin(); it != channel_map.end(); it++)
|
|
{
|
|
mi= it->second;
|
|
|
|
if (mi)
|
|
{
|
|
DBUG_PRINT("info", ("stopping channel_name: %s",
|
|
mi->get_channel()));
|
|
|
|
int channel_error= channel_stop(mi, threads_to_stop, timeout);
|
|
|
|
DBUG_EXECUTE_IF("group_replication_stop_all_channels_failure",
|
|
{
|
|
channel_error=1;
|
|
});
|
|
|
|
if (channel_error &&
|
|
channel_error != RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR)
|
|
{
|
|
error= channel_error;
|
|
mi->report(ERROR_LEVEL, error,
|
|
"Error stopping channel: %s. Got error: %d",
|
|
mi->get_channel(), error);
|
|
}
|
|
}
|
|
}
|
|
|
|
channel_map.unlock();
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int channel_purge_queue(const char* channel, bool reset_all)
|
|
{
|
|
DBUG_ENTER("channel_purge_queue(channel, only_purge");
|
|
|
|
channel_map.wrlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
bool thd_init= init_thread_context();
|
|
|
|
int error= reset_slave(current_thd, mi, reset_all);
|
|
|
|
channel_map.unlock();
|
|
|
|
if (thd_init)
|
|
{
|
|
clean_thread_context();
|
|
}
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
bool channel_is_active(const char* channel, enum_channel_thread_types thd_type)
|
|
{
|
|
int thread_mask= 0;
|
|
DBUG_ENTER("channel_is_active(channel, thd_type");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(false);
|
|
}
|
|
|
|
init_thread_mask(&thread_mask, mi, 0 /* not inverse*/);
|
|
|
|
channel_map.unlock();
|
|
|
|
switch(thd_type)
|
|
{
|
|
case CHANNEL_NO_THD:
|
|
DBUG_RETURN(true); //return true as the channel exists
|
|
case CHANNEL_RECEIVER_THREAD:
|
|
DBUG_RETURN(thread_mask & SLAVE_IO);
|
|
case CHANNEL_APPLIER_THREAD:
|
|
DBUG_RETURN(thread_mask & SLAVE_SQL);
|
|
default:
|
|
assert(0);
|
|
}
|
|
DBUG_RETURN(false);
|
|
}
|
|
|
|
int channel_get_thread_id(const char* channel,
|
|
enum_channel_thread_types thd_type,
|
|
unsigned long** thread_id)
|
|
{
|
|
DBUG_ENTER("channel_get_thread_id(channel, thread_type ,*thread_id");
|
|
|
|
int number_threads= -1;
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
switch(thd_type)
|
|
{
|
|
case CHANNEL_RECEIVER_THREAD:
|
|
mysql_mutex_lock(&mi->info_thd_lock);
|
|
if (mi->info_thd != NULL)
|
|
{
|
|
*thread_id= (unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
|
|
sizeof(unsigned long),
|
|
MYF(MY_WME));
|
|
**thread_id= mi->info_thd->thread_id();
|
|
number_threads= 1;
|
|
}
|
|
mysql_mutex_unlock(&mi->info_thd_lock);
|
|
break;
|
|
case CHANNEL_APPLIER_THREAD:
|
|
if (mi->rli != NULL)
|
|
{
|
|
mysql_mutex_lock(&mi->rli->run_lock);
|
|
|
|
if (mi->rli->slave_parallel_workers > 0)
|
|
{
|
|
// Parallel applier.
|
|
size_t num_workers= mi->rli->get_worker_count();
|
|
number_threads= 1 + num_workers;
|
|
*thread_id=
|
|
(unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
|
|
number_threads * sizeof(unsigned long),
|
|
MYF(MY_WME));
|
|
unsigned long *thread_id_pointer= *thread_id;
|
|
|
|
// Set default values on thread_id array.
|
|
for (int i= 0; i < number_threads; i++, thread_id_pointer++)
|
|
*thread_id_pointer= -1;
|
|
thread_id_pointer= *thread_id;
|
|
|
|
// Coordinator thread id.
|
|
if (mi->rli->info_thd != NULL)
|
|
{
|
|
mysql_mutex_lock(&mi->rli->info_thd_lock);
|
|
*thread_id_pointer= mi->rli->info_thd->thread_id();
|
|
mysql_mutex_unlock(&mi->rli->info_thd_lock);
|
|
thread_id_pointer++;
|
|
}
|
|
|
|
// Workers thread id.
|
|
if (mi->rli->workers_array_initialized)
|
|
{
|
|
for (size_t i= 0; i < num_workers; i++, thread_id_pointer++)
|
|
{
|
|
Slave_worker* worker= mi->rli->get_worker(i);
|
|
if (worker != NULL)
|
|
{
|
|
mysql_mutex_lock(&worker->jobs_lock);
|
|
if (worker->info_thd != NULL &&
|
|
worker->running_status != Slave_worker::NOT_RUNNING)
|
|
{
|
|
mysql_mutex_lock(&worker->info_thd_lock);
|
|
*thread_id_pointer= worker->info_thd->thread_id();
|
|
mysql_mutex_unlock(&worker->info_thd_lock);
|
|
}
|
|
mysql_mutex_unlock(&worker->jobs_lock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Sequential applier.
|
|
if (mi->rli->info_thd != NULL)
|
|
{
|
|
*thread_id= (unsigned long*) my_malloc(PSI_NOT_INSTRUMENTED,
|
|
sizeof(unsigned long),
|
|
MYF(MY_WME));
|
|
mysql_mutex_lock(&mi->rli->info_thd_lock);
|
|
**thread_id= mi->rli->info_thd->thread_id();
|
|
mysql_mutex_unlock(&mi->rli->info_thd_lock);
|
|
number_threads= 1;
|
|
}
|
|
}
|
|
mysql_mutex_unlock(&mi->rli->run_lock);
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(number_threads);
|
|
}
|
|
|
|
long long channel_get_last_delivered_gno(const char* channel, int sidno)
|
|
{
|
|
DBUG_ENTER("channel_get_last_delivered_gno(channel, sidno)");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
rpl_gno last_gno= 0;
|
|
|
|
global_sid_lock->rdlock();
|
|
last_gno= mi->rli->get_gtid_set()->get_last_gno(sidno);
|
|
global_sid_lock->unlock();
|
|
|
|
#if !defined(NDEBUG)
|
|
const Gtid_set *retrieved_gtid_set= mi->rli->get_gtid_set();
|
|
char *retrieved_gtid_set_string= NULL;
|
|
global_sid_lock->wrlock();
|
|
retrieved_gtid_set->to_string(&retrieved_gtid_set_string);
|
|
global_sid_lock->unlock();
|
|
DBUG_PRINT("info", ("get_last_delivered_gno retrieved_set_string: %s",
|
|
retrieved_gtid_set_string));
|
|
my_free(retrieved_gtid_set_string);
|
|
#endif
|
|
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(last_gno);
|
|
}
|
|
|
|
int channel_add_executed_gtids_to_received_gtids(const char* channel)
|
|
{
|
|
DBUG_ENTER("channel_add_executed_gtids_to_received_gtids(channel)");
|
|
|
|
channel_map.rdlock();
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
global_sid_lock->wrlock();
|
|
|
|
enum_return_status return_status=
|
|
mi->rli->add_gtid_set(gtid_state->get_executed_gtids());
|
|
|
|
global_sid_lock->unlock();
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(return_status != RETURN_STATUS_OK);
|
|
}
|
|
|
|
int channel_queue_packet(const char* channel,
|
|
const char* buf,
|
|
unsigned long event_len)
|
|
{
|
|
int result;
|
|
DBUG_ENTER("channel_queue_packet(channel, event_buffer, event_len)");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
result= queue_event(mi, buf, event_len);
|
|
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(result);
|
|
}
|
|
|
|
int channel_wait_until_apply_queue_applied(const char* channel,
|
|
double timeout)
|
|
{
|
|
DBUG_ENTER("channel_wait_until_apply_queue_applied(channel, timeout)");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
mi->inc_reference();
|
|
channel_map.unlock();
|
|
|
|
int error = mi->rli->wait_for_gtid_set(current_thd, mi->rli->get_gtid_set(),
|
|
timeout);
|
|
mi->dec_reference();
|
|
|
|
if (error == -1)
|
|
DBUG_RETURN(REPLICATION_THREAD_WAIT_TIMEOUT_ERROR);
|
|
if (error == -2)
|
|
DBUG_RETURN(REPLICATION_THREAD_WAIT_NO_INFO_ERROR);
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int channel_is_applier_waiting(const char* channel)
|
|
{
|
|
DBUG_ENTER("channel_is_applier_waiting(channel)");
|
|
int result= RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR;
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(result);
|
|
}
|
|
|
|
unsigned long* thread_ids= NULL;
|
|
int number_appliers= channel_get_thread_id(channel,
|
|
CHANNEL_APPLIER_THREAD,
|
|
&thread_ids);
|
|
|
|
if (number_appliers <= 0)
|
|
{
|
|
goto end;
|
|
}
|
|
|
|
if (number_appliers == 1)
|
|
{
|
|
result= channel_is_applier_thread_waiting(*thread_ids);
|
|
}
|
|
else if (number_appliers > 1)
|
|
{
|
|
int waiting= 0;
|
|
|
|
// Check if coordinator is waiting.
|
|
waiting += channel_is_applier_thread_waiting(thread_ids[0]);
|
|
|
|
// Check if workers are waiting.
|
|
for (int i= 1; i < number_appliers; i++)
|
|
waiting += channel_is_applier_thread_waiting(thread_ids[i], true);
|
|
|
|
// Check if all are waiting.
|
|
if (waiting == number_appliers)
|
|
result= 1;
|
|
else
|
|
result= 0;
|
|
}
|
|
|
|
end:
|
|
channel_map.unlock();
|
|
my_free(thread_ids);
|
|
|
|
DBUG_RETURN(result);
|
|
}
|
|
|
|
int channel_is_applier_thread_waiting(unsigned long thread_id, bool worker)
|
|
{
|
|
DBUG_ENTER("channel_is_applier_thread_waiting(thread_id, worker)");
|
|
bool result= -1;
|
|
|
|
Find_thd_with_id find_thd_with_id(thread_id);
|
|
THD *thd= Global_THD_manager::get_instance()->find_thd(&find_thd_with_id);
|
|
if (thd)
|
|
{
|
|
result= 0;
|
|
|
|
const char *proc_info= thd->get_proc_info();
|
|
if (proc_info)
|
|
{
|
|
const char* stage_name= stage_slave_has_read_all_relay_log.m_name;
|
|
if (worker)
|
|
stage_name= stage_slave_waiting_event_from_coordinator.m_name;
|
|
|
|
if (!strcmp(proc_info, stage_name))
|
|
result= 1;
|
|
}
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
}
|
|
|
|
DBUG_RETURN(result);
|
|
}
|
|
|
|
int channel_flush(const char* channel)
|
|
{
|
|
DBUG_ENTER("channel_flush(channel)");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
bool error= flush_relay_logs(mi);
|
|
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(error ? 1 : 0);
|
|
}
|
|
|
|
int channel_get_retrieved_gtid_set(const char* channel,
|
|
char** retrieved_set)
|
|
{
|
|
DBUG_ENTER("channel_get_retrieved_gtid_set(channel,retrieved_set)");
|
|
|
|
channel_map.rdlock();
|
|
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(RPL_CHANNEL_SERVICE_CHANNEL_DOES_NOT_EXISTS_ERROR);
|
|
}
|
|
|
|
mi->inc_reference();
|
|
channel_map.unlock();
|
|
|
|
int error= 0;
|
|
const Gtid_set* receiver_gtid_set= mi->rli->get_gtid_set();
|
|
if (receiver_gtid_set->to_string(retrieved_set,
|
|
true /*need_lock*/) == -1)
|
|
error= ER_OUTOFMEMORY;
|
|
|
|
mi->dec_reference();
|
|
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
bool channel_is_stopping(const char* channel,
|
|
enum_channel_thread_types thd_type)
|
|
{
|
|
bool is_stopping= false;
|
|
DBUG_ENTER("channel_is_stopping(channel, thd_type");
|
|
|
|
channel_map.rdlock();
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(false);
|
|
}
|
|
|
|
switch(thd_type)
|
|
{
|
|
case CHANNEL_NO_THD:
|
|
break;
|
|
case CHANNEL_RECEIVER_THREAD:
|
|
is_stopping= likely(mi->is_stopping.atomic_get());
|
|
break;
|
|
case CHANNEL_APPLIER_THREAD:
|
|
is_stopping= likely(mi->rli->is_stopping.atomic_get());
|
|
break;
|
|
default:
|
|
assert(0);
|
|
}
|
|
|
|
channel_map.unlock();
|
|
|
|
DBUG_RETURN(is_stopping);
|
|
}
|
|
|
|
bool is_partial_transaction_on_channel_relay_log(const char *channel)
|
|
{
|
|
DBUG_ENTER("is_partial_transaction_on_channel_relay_log(channel)");
|
|
channel_map.rdlock();
|
|
Master_info *mi= channel_map.get_mi(channel);
|
|
if (mi == NULL)
|
|
{
|
|
channel_map.unlock();
|
|
DBUG_RETURN(false);
|
|
}
|
|
bool ret= mi->transaction_parser.is_inside_transaction();
|
|
channel_map.unlock();
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
bool is_any_slave_channel_running(int thread_mask)
|
|
{
|
|
bool status;
|
|
DBUG_ENTER("is_any_slave_channel_running");
|
|
channel_map.rdlock();
|
|
status= is_any_slave_channel_running(thread_mask, NULL);
|
|
channel_map.unlock();
|
|
DBUG_RETURN(status);
|
|
}
|
|
#endif /* HAVE_REPLICATION */
|