-
Notifications
You must be signed in to change notification settings - Fork 8.6k
Expand file tree
/
Copy pathIBridgeHelper.cpp
More file actions
135 lines (110 loc) · 4.5 KB
/
Copy pathIBridgeHelper.cpp
File metadata and controls
135 lines (110 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <BridgeHelper/IBridgeHelper.h>
#include <csignal>
#include <filesystem>
#include <thread>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Common/VectorWithMemoryTracking.h>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
extern const int BAD_ARGUMENTS;
}
void IBridgeHelper::startBridgeSync()
{
if (!bridgeHandShake())
{
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
startBridge(startBridgeCommand());
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
if (bridgeHandShake())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception(ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING, "BridgeHelper: {} is not responding", serviceAlias());
}
}
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand()
{
if (startBridgeManually())
throw Exception(ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING, "{} is not running. Please, start it manually", serviceAlias());
const auto & config = getConfig();
/// Path to executable folder
fs::path path(config.getString("application.dir", "/usr/bin"));
VectorWithMemoryTracking<std::string> cmd_args;
path /= serviceFileName();
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt(configPrefix() + ".port", getDefaultPort())));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString(configPrefix() + ".listen_host", DEFAULT_HOST));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(getHTTPTimeout().totalMicroseconds()));
cmd_args.push_back("--http-max-field-value-size");
cmd_args.push_back("99999999999999999"); // something "big" to accept large datasets (issue 47616)
if (config.has("logger." + configPrefix() + "_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_log"));
}
if (config.has("logger." + configPrefix() + "_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_level"));
}
std::string allowed_paths;
for (const auto * allowed_path_config : {"dictionaries_lib_path", "catboost_lib_path"})
{
if (config.has(allowed_path_config))
{
std::string allowed_path = config.getString(allowed_path_config);
if (allowed_path.contains(':'))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "`{}` cannot contain the colon (:) symbol: {}", allowed_path_config, allowed_path);
if (!allowed_paths.empty())
allowed_paths += ":";
allowed_paths += allowed_path;
}
}
if (!allowed_paths.empty())
{
cmd_args.push_back("--libraries-path");
cmd_args.push_back(allowed_paths);
}
LOG_TRACE(getLog(), "Starting {}", serviceAlias());
/// We will terminate it with the KILL signal instead of the TERM signal,
/// because it's more reliable for arbitrary third-party ODBC drivers.
/// The drivers can spawn threads, install their own signal handlers... we don't care.
ShellCommand::Config command_config(path.string());
command_config.arguments = cmd_args;
command_config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy(true, SIGKILL);
return ShellCommand::executeDirect(command_config);
}
}