libzypp 17.35.12
networkrequestdispatcher.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9#include <zypp-core/Globals.h>
14#include <zypp-core/zyppng/base/Timer>
15#include <zypp-core/zyppng/base/SocketNotifier>
16#include <zypp-core/zyppng/base/EventDispatcher>
18#include <assert.h>
19
20#include <zypp/base/Logger.h>
21#include <zypp/base/String.h>
22#include <zypp-core/base/DtorReset>
23
24using namespace boost;
25
26L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_CURL_DEBUG)
27
28
29namespace zyppng {
30
31static const std::string & defaultAgentString()
32{
33 // we need to add the release and identifier to the
34 // agent string.
35 // The target could be not initialized, and then this information
36 // is guessed.
37 static const std::string _value(
39 "ZYpp " LIBZYPP_VERSION_STRING " (curl %s)"
40 , curl_version_info(CURLVERSION_NOW)->version
41 )
42 );
43 return _value;
44}
45
46
48 : BasePrivate( p )
49 , _timer( Timer::create() )
50 , _multi ( curl_multi_init() )
51 , _userAgent( defaultAgentString() )
52{
54
55 curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
56 curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
57 curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
58 curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
59
60 // disabled explicit pipelining since it breaks our tests on releases < 15.2
61 // we could consider enabling it starting with a specific CURL version
62 // curl_multi_setopt( _multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX|CURLPIPE_HTTP1 );
63
64 _timer->setSingleShot( true );
66}
67
73
74//called by curl to setup a timer
75int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
76{
77 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
78 assert( that != nullptr );
79
80 if ( timeout_ms >= 0 ) {
81 that->_timer->start( static_cast<uint64_t>(timeout_ms) );
82 } else {
83 //cancel the timer
84 that->_timer->stop();
85 }
86 return 0;
87}
88
90{
91 handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
92}
93
94int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
95{
96 NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
97 assert( that != nullptr );
98 return that->socketCallback( easy, s, what, socketp );
99}
100
101int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
102{
103 std::shared_ptr<SocketNotifier> socketp;
104
105 if ( _socketHandler.count( s ) == 0 ) {
106 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
107 return 0;
108
109 socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
110 _socketHandler.insert( std::make_pair( s, socketp ) );
111
113 } else {
114 socketp = _socketHandler[s];
115 }
116
117 //should never happen
118 if ( !socketp ) {
119 if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
120 return 0;
121
122 if ( _socketHandler.count( s ) > 0 )
123 _socketHandler.erase( s );
124
125 void *privatePtr = nullptr;
126 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
127 privatePtr = nullptr; //make sure this was not filled with bad info
128 }
129
130 if ( privatePtr ) {
131 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
132 //we stop the download, if we can not listen for socket changes we can not correctly do anything
133 setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
134 return 0;
135 } else {
136 //a broken handle without anything assigned, also should never happen but make sure and clean it up
137 WAR << "Cleaning up unassigned easy handle" << std::endl;
138 curl_multi_remove_handle( _multi, easy );
139 curl_easy_cleanup( easy );
140 return 0;
141 }
142 }
143
144 //remove the socket
145 if ( what == CURL_POLL_REMOVE ) {
146 socketp->setEnabled( false );
147 _socketHandler.erase( s );
148 return 0;
149 }
150
151 if ( what == CURL_POLL_IN ) {
152 socketp->setMode( SocketNotifier::Read );
153 } else if ( what == CURL_POLL_OUT ) {
154 socketp->setMode( SocketNotifier::Write );
155 } else if ( what == CURL_POLL_INOUT ) {
156 socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
157 }
158
159 socketp->setEnabled();
160 return 0;
161}
162
164{
165 int evBitmask = 0;
166 if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
167 evBitmask |= CURL_CSELECT_IN;
169 evBitmask |= CURL_CSELECT_OUT;
171 evBitmask |= CURL_CSELECT_ERR;
172
173 handleMultiSocketAction( listener.socket(), evBitmask );
174}
175
176void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
177{
178 int running = 0;
179
180 // when inside a curl callback we can not call another multi curl API,
181 // for now just lock the thing, but we should consider rewriting this
182 // to post events instead of doing direct calls simply to decouple from
183 // that limitation
184 CURLMcode rc = CURLM_OK;
185 {
186 zypp::DtorReset lockSet( _locked );
187 _locked = true;
188 rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
189 }
190 if (rc != 0) {
191 //we can not recover from a error like that, cancel all and stop
193 cancelAll( err );
194 //emit error
195 _lastError = err;
196 _sigError.emit( *z_func() );
197 return;
198 }
199
200 // make sure we dequeue pending requests ( in case a call to dequeue was blocked during the API call )
201 zypp::OnScopeExit scopeFinally([this](){
202 this->dequeuePending();
203 });
204
205 int msgs_left = 0;
206 CURLMsg *msg = nullptr;
207 while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
208 if(msg->msg == CURLMSG_DONE) {
209 CURL *easy = msg->easy_handle;
210 CURLcode res = msg->data.result;
211
212 void *privatePtr = nullptr;
213 if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
214 WAR << "Unable to get CURLINFO_PRIVATE" << std::endl;
215 continue;
216 }
217
218 if ( !privatePtr ) {
219 //broken easy handle not associated, should never happen but clean it up
220 WAR << "Cleaning up unassigned easy handle" << std::endl;
221 curl_multi_remove_handle( _multi, easy );
222 curl_easy_cleanup( easy );
223 continue;
224 }
225
226 NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
227 request->dequeueNotify();
228
229 if ( request->hasMoreWork() && ( res == CURLE_OK || request->canRecover() ) ) {
230 std::string errBuf = "Broken easy handle in request";
231 if ( !request->_easyHandle ) {
233 setFinished( *request->z_func(), e );
234 continue;
235 }
236
237 // remove the handle from multi to change options
238 curl_multi_remove_handle( _multi, request->_easyHandle );
239
240 errBuf = "Failed to reinitialize the request";
241 if ( !request->prepareToContinue ( errBuf ) ) {
243 setFinished( *request->z_func(), e );
244 } else {
245 // add the request back to the multi handle, it is not done
246 if ( !addRequestToMultiHandle( *request->z_func() ) )
247 continue;
248
249 request->aboutToStart( );
250 }
251 } else {
252 //trigger notification about file downloaded
253 NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->errorMessage() );
254 setFinished( *request->z_func(), e );
255 }
256 //attention request could be deleted from here on
257 }
258 }
259}
260
262{
263 //prevent dequeuePending from filling up the runningDownloads again
264 zypp::DtorReset lockReset( _locked );
265 _locked = true;
266
267 while ( _runningDownloads.size() ) {
268 std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
269 setFinished(*req, result );
270 }
271 while ( _pendingDownloads.size() ) {
272 std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
273 setFinished(*req, result );
274 }
275}
276
278{
279 auto delReq = []( auto &list, NetworkRequest &req ) -> std::shared_ptr<NetworkRequest> {
280 auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
281 return req.d_func() == r->d_func();
282 } );
283 if ( it != list.end() ) {
284 auto ptr = *it;
285 list.erase( it );
286 return ptr;
287 }
288 return nullptr;
289 };
290
291 // We have a tricky situation if a network request is called when inside a callback. In those cases, it is
292 // not allowed to call curl_multi_remove_handle. We need to tell the callback to fail, so the download
293 // is cancelled by curl itself. We also need to store the current result for later
294 auto rmode = std::get_if<NetworkRequestPrivate::running_t>( &req.d_func()->_runningMode );
295 if ( rmode ) {
296 if ( rmode->_isInCallback ) {
297 // the first cached result wins)
298 if ( !rmode->_cachedResult )
299 rmode->_cachedResult = result;
300 return;
301 } else if ( rmode->_cachedResult ) {
302 result = rmode->_cachedResult.value();
303 }
304 }
305
306 auto rLocked = delReq( _runningDownloads, req );
307 if ( !rLocked )
308 rLocked = delReq( _pendingDownloads, req );
309
310 void *easyHandle = req.d_func()->_easyHandle;
311 if ( easyHandle )
312 curl_multi_remove_handle( _multi, easyHandle );
313
314 req.d_func()->_dispatcher = nullptr;
315
316 //first set the result, the Request might have a checksum to check as well so a currently
317 //successful request could fail later on
318 req.d_func()->setResult( std::move(result) );
319 _sigDownloadFinished.emit( *z_func(), req );
320
321 //we got a open slot, try to dequeue or send the finished signals if all queues are empty
323}
324
326{
327 CURLMcode rc = curl_multi_add_handle( _multi, req.d_func()->_easyHandle );
328 if ( rc != 0 ) {
330 return false;
331 }
332
333 // make sure to wake up once to register what we have now
334 _timer->start(0);
335 return true;
336}
337
339{
340 if ( !_isRunning || _locked )
341 return;
342
343 while ( _maxConnections == -1 || ( (std::size_t)_maxConnections > _runningDownloads.size() ) ) {
344 if ( !_pendingDownloads.size() )
345 break;
346
347 std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
348 _pendingDownloads.pop_front();
349
350 std::string errBuf = "Failed to initialize easy handle";
351 if ( !req->d_func()->initialize( errBuf ) ) {
352 //@TODO store the CURL error in the errors extra info
354 continue;
355 }
356
357 if ( !addRequestToMultiHandle( *req ) )
358 continue;
359
360 req->d_func()->aboutToStart();
361 _sigDownloadStarted.emit( *z_func(), *req );
362
363 _runningDownloads.push_back( std::move(req) );
364 }
365
366 //check for empty queues
367 if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
368 //once we finished all requests, cancel the timer too, so curl is not called without requests
369 _timer->stop();
370 _sigQueueFinished.emit( *z_func() );
371 }
372}
373
374ZYPP_IMPL_PRIVATE(NetworkRequestDispatcher)
375
376NetworkRequestDispatcher::NetworkRequestDispatcher( )
377 : Base( * new NetworkRequestDispatcherPrivate ( *this ) )
378{
379
380}
381
382bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
383{
384 curl_version_info_data *curl_info = nullptr;
385 curl_info = curl_version_info(CURLVERSION_NOW);
386 // curl_info does not need any free (is static)
387 if (curl_info->protocols)
388 {
389 const char * const *proto = nullptr;
390 std::string scheme( url.getScheme() );
391 bool found = false;
392 for(proto=curl_info->protocols; !found && *proto; ++proto) {
393 if( scheme == std::string((const char *)*proto))
394 found = true;
395 }
396 return found;
397 }
398 return true;
399}
400
401void NetworkRequestDispatcher::setMaximumConcurrentConnections( const int maxConn )
402{
403 d_func()->_maxConnections = maxConn;
404}
405
406int NetworkRequestDispatcher::maximumConcurrentConnections () const
407{
408 return d_func()->_maxConnections;
409}
410
411void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
412{
413 if ( !req )
414 return;
415 Z_D();
416
417 if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
418 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
419 return;
420 }
421
422 if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
423 WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
424 return;
425 }
426
427 req->d_func()->_dispatcher = this;
428 if ( req->priority() == NetworkRequest::Normal )
429 d->_pendingDownloads.push_back( req );
430 else {
431 auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [ prio = req->priority() ]( const auto &pendingReq ){
432 return pendingReq->priority() < prio;
433 });
434
435 //if we have a valid iterator, decrement we found a pending download request with lower prio, insert before that
436 if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
437 it--;
438 d->_pendingDownloads.insert( it, req );
439 }
440
441 //dequeue if running and we have capacity
442 d->dequeuePending();
443}
444
445void NetworkRequestDispatcher::setAgentString( const std::string &agent )
446{
447 Z_D();
448 if ( agent.empty() )
449 d->_userAgent = defaultAgentString();
450 else
451 d->_userAgent = agent;
452}
453
454const std::string &NetworkRequestDispatcher::agentString() const
455{
456 return d_func()->_userAgent;
457}
458
459void NetworkRequestDispatcher::setHostSpecificHeader( const std::string &host, const std::string &headerName, const std::string &value )
460{
461 Z_D();
462 if ( value.empty() ) {
463 if ( auto i = d->_customHeaders.find( host ); i != d->_customHeaders.end() ) {
464 if ( auto v = i->second.find( headerName ); v != i->second.end() ) {
465 i->second.erase (v);
466 }
467 if ( i->second.empty() )
468 d->_customHeaders.erase(i);
469 }
470 return;
471 }
472 d->_customHeaders[host][headerName] = value;
473}
474
475const NetworkRequestDispatcher::SpecificHeaderMap &NetworkRequestDispatcher::hostSpecificHeaders() const
476{
477 return d_func()->_customHeaders;
478}
479
480void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
481{
482 cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
483}
484
485void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
486{
487 Z_D();
488
489 if ( req.d_func()->_dispatcher != this ) {
490 //TODO throw exception
491 return;
492 }
493
494 d->setFinished( req, err );
495}
496
497void NetworkRequestDispatcher::run()
498{
499 Z_D();
500 d->_isRunning = true;
501
502 if ( d->_pendingDownloads.size() )
503 d->dequeuePending();
504}
505
506void NetworkRequestDispatcher::reschedule()
507{
508 Z_D();
509 if ( !d->_pendingDownloads.size() )
510 return;
511
512 std::stable_sort( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &a, const auto &b ){
513 return a->priority() < b->priority();
514 });
515
516 d->dequeuePending();
517}
518
519size_t NetworkRequestDispatcher::count()
520{
521 Z_D();
522 return d->_pendingDownloads.size() + d->_runningDownloads.size();
523}
524
525const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
526{
527 return d_func()->_lastError;
528}
529
530SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
531{
532 return d_func()->_sigDownloadStarted;
533}
534
535SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
536{
537 return d_func()->_sigDownloadFinished;
538}
539
540SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
541{
542 return d_func()->_sigQueueFinished;
543}
544
545SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
546{
547 return d_func()->_sigError;
548}
549
550}
Edition * _value
Assign a vaiable a certain value when going out of scope.
Definition dtorreset.h:50
NetworkRequestDispatcherPrivate(NetworkRequestDispatcher &p)
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
Signal< void(NetworkRequestDispatcher &)> _sigError
Signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
void setFinished(NetworkRequest &req, NetworkRequestError result)
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
void onSocketActivated(const SocketNotifier &listener, int events)
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
void cancelAll(const NetworkRequestError &result)
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const std::string &nativeError)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
The NetworkRequestError class Represents a error that occured in.
std::string errorMessage() const
Definition request.cc:565
bool prepareToContinue(std::string &errBuf)
Definition request.cc:411
static Ptr create(int socket, int evTypes, bool enable=true)
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition timer.cc:120
unsigned short a
unsigned short b
Definition String.h:29
void globalInitCurlOnce()
Definition curlhelper.cc:64
std::string form(const char *format,...) __attribute__((format(printf
Printf style construction of std::string.
Definition String.cc:37
zypp::Url Url
Definition url.h:15
static const std::string & defaultAgentString()
Provides API related macros.
#define WAR
Definition Logger.h:99
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition Logger.h:115
#define ZYPP_IMPL_PRIVATE(Class)
Definition zyppglobal.h:92
#define Z_D()
Definition zyppglobal.h:105