VaKeR CYBER ARMY
Logo of a company Server : Apache/2.4.41 (Ubuntu)
System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.33
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Directory :  /proc/self/root/usr/include/boost/graph/distributed/detail/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/usr/include/boost/graph/distributed/detail/queue.ipp
// Copyright (C) 2004-2006 The Trustees of Indiana University.

// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

//  Authors: Douglas Gregor
//           Andrew Lumsdaine
#include <boost/optional.hpp>
#include <cassert>
#include <boost/graph/parallel/algorithm.hpp>
#include <boost/graph/parallel/process_group.hpp>
#include <functional>
#include <algorithm>
#include <boost/graph/parallel/simple_trigger.hpp>

#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif

namespace boost { namespace graph { namespace distributed {

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
                  const Buffer& buffer, bool polling)
  : process_group(process_group, attach_distributed_object()),
    owner(owner),
    buffer(buffer),
    polling(polling)
{
  if (!polling)
    outgoing_buffers.reset(
      new outgoing_buffers_t(num_processes(process_group)));

  setup_triggers();
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
                  const Buffer& buffer, const UnaryPredicate& pred,
                  bool polling)
  : process_group(process_group, attach_distributed_object()),
    owner(owner),
    buffer(buffer),
    pred(pred),
    polling(polling)
{
  if (!polling)
    outgoing_buffers.reset(
      new outgoing_buffers_t(num_processes(process_group)));

  setup_triggers();
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
BOOST_DISTRIBUTED_QUEUE_TYPE::
distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
                  const UnaryPredicate& pred, bool polling)
  : process_group(process_group, attach_distributed_object()),
    owner(owner),
    pred(pred),
    polling(polling)
{
  if (!polling)
    outgoing_buffers.reset(
      new outgoing_buffers_t(num_processes(process_group)));

  setup_triggers();
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void
BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
{
  typename ProcessGroup::process_id_type dest = get(owner, x);
  if (outgoing_buffers)
    outgoing_buffers->at(dest).push_back(x);
  else if (dest == process_id(process_group))
    buffer.push(x);
  else
    send(process_group, get(owner, x), msg_push, x);
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
bool
BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
{
  /* Processes will stay here until the buffer is nonempty or
     synchronization with the other processes indicates that all local
     buffers are empty (and no messages are in transit).
   */
  while (buffer.empty() && !do_synchronize()) ;

  return buffer.empty();
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
{
  empty();
  return buffer.size();
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
{
  using boost::graph::parallel::simple_trigger;

  simple_trigger(process_group, msg_push, this, 
                 &distributed_queue::handle_push);
  simple_trigger(process_group, msg_multipush, this, 
                 &distributed_queue::handle_multipush);
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void 
BOOST_DISTRIBUTED_QUEUE_TYPE::
handle_push(int /*source*/, int /*tag*/, const value_type& value, 
            trigger_receive_context)
{
  if (pred(value)) buffer.push(value);
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
void 
BOOST_DISTRIBUTED_QUEUE_TYPE::
handle_multipush(int /*source*/, int /*tag*/, 
                 const std::vector<value_type>& values, 
                 trigger_receive_context)
{
  for (std::size_t i = 0; i < values.size(); ++i)
    if (pred(values[i])) buffer.push(values[i]);
}

template<BOOST_DISTRIBUTED_QUEUE_PARMS>
bool
BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
{
#ifdef PBGL_ACCOUNTING
  ++num_synchronizations;
#endif

  using boost::parallel::all_reduce;
  using std::swap;

  typedef typename ProcessGroup::process_id_type process_id_type;

  if (outgoing_buffers) {
    // Transfer all of the push requests
    process_id_type id = process_id(process_group);
    process_id_type np = num_processes(process_group);
    for (process_id_type dest = 0; dest < np; ++dest) {
      outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
      std::size_t size = outgoing.size();
      if (size != 0) {
        if (dest != id) {
          send(process_group, dest, msg_multipush, outgoing);
        } else {
          for (std::size_t i = 0; i < size; ++i)
            buffer.push(outgoing[i]);
        }
        outgoing.clear();
      }
    }
  }
  synchronize(process_group);

  unsigned local_size = buffer.size();
  unsigned global_size =
    all_reduce(process_group, local_size, std::plus<unsigned>());
  return global_size == 0;
}

} } } // end namespace boost::graph::distributed

VaKeR 2022