VaKeR CYBER ARMY
Logo of a company Server : Apache/2.4.41 (Ubuntu)
System : Linux absol.cf 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.33
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Directory :  /var/lib/gems/2.5.0/gems/eventmachine-1.2.7/ext/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : /var/lib/gems/2.5.0/gems/eventmachine-1.2.7/ext/pipe.cpp
/*****************************************************************************

$Id$

File:     pipe.cpp
Date:     30May07

Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
Gmail: blackhedd

This program is free software; you can redistribute it and/or modify
it under the terms of either: 1) the GNU General Public License
as published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version; or 2) Ruby's License.

See the file COPYING for complete licensing information.

*****************************************************************************/

#include "project.h"


#ifdef OS_UNIX
// THIS ENTIRE FILE IS ONLY COMPILED ON UNIX-LIKE SYSTEMS.

/******************************
PipeDescriptor::PipeDescriptor
******************************/

PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em):
	EventableDescriptor (fd, parent_em),
	bReadAttemptedAfterClose (false),
	OutboundDataSize (0),
	SubprocessPid (subpid)
{
	#ifdef HAVE_EPOLL
	EpollEvent.events = EPOLLIN;
	#endif
	#ifdef HAVE_KQUEUE
	MyEventMachine->ArmKqueueReader (this);
	#endif
}


/*******************************
PipeDescriptor::~PipeDescriptor
*******************************/

PipeDescriptor::~PipeDescriptor() NO_EXCEPT_FALSE
{
	// Run down any stranded outbound data.
	for (size_t i=0; i < OutboundPages.size(); i++)
		OutboundPages[i].Free();

	/* As a virtual destructor, we come here before the base-class
	 * destructor that closes our file-descriptor.
	 * We have to make sure the subprocess goes down (if it's not
	 * already down) and we have to reap the zombie.
	 *
	 * This implementation is PROVISIONAL and will surely be improved.
	 * The intention here is that we never block, hence the highly
	 * undesirable sleeps. But if we can't reap the subprocess even
	 * after sending it SIGKILL, then something is wrong and we
	 * throw a fatal exception, which is also not something we should
	 * be doing.
	 *
	 * Eventually the right thing to do will be to have the reactor
	 * core respond to SIGCHLD by chaining a handler on top of the
	 * one Ruby may have installed, and dealing with a list of dead
	 * children that are pending cleanup.
	 *
	 * Since we want to have a signal processor integrated into the
	 * client-visible API, let's wait until that is done before cleaning
	 * this up.
	 *
	 * Added a very ugly hack to support passing the subprocess's exit
	 * status to the user. It only makes logical sense for user code to access
	 * the subprocess exit status in the unbind callback. But unbind is called
	 * back during the EventableDescriptor destructor. So by that time there's
	 * no way to call back this object through an object binding, because it's
	 * already been cleaned up. We might have added a parameter to the unbind
	 * callback, but that would probably break a huge amount of existing code.
	 * So the hack-solution is to define an instance variable in the EventMachine
	 * object and stick the exit status in there, where it can easily be accessed
	 * with an accessor visible to user code.
	 * User code should ONLY access the exit status from within the unbind callback.
	 * Otherwise there's no guarantee it'll be valid.
	 * This hack won't make it impossible to run multiple EventMachines in a single
	 * process, but it will make it impossible to reliably nest unbind calls
	 * within other unbind calls. (Not sure if that's even possible.)
	 */

	assert (MyEventMachine);

	/* Another hack to make the SubprocessPid available to get_subprocess_status */
	MyEventMachine->SubprocessPid = SubprocessPid;

	/* 01Mar09: Updated to use a small nanosleep in a loop. When nanosleep is interrupted by SIGCHLD,
	 * it resumes the system call after processing the signal (resulting in unnecessary latency).
	 * Calling nanosleep in a loop avoids this problem.
	 */
	struct timespec req = {0, 50000000}; // 0.05s
	int n;

	// wait 0.5s for the process to die
	for (n=0; n<10; n++) {
		if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return;
		nanosleep (&req, NULL);
	}

	// send SIGTERM and wait another 1s
	kill (SubprocessPid, SIGTERM);
	for (n=0; n<20; n++) {
		nanosleep (&req, NULL);
		if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return;
	}

	// send SIGKILL and wait another 5s
	kill (SubprocessPid, SIGKILL);
	for (n=0; n<100; n++) {
		nanosleep (&req, NULL);
		if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return;
	}

	// still not dead, give up!
	throw std::runtime_error ("unable to reap subprocess");
}



/********************
PipeDescriptor::Read
********************/

void PipeDescriptor::Read()
{
	int sd = GetSocket();
	if (sd == INVALID_SOCKET) {
		assert (!bReadAttemptedAfterClose);
		bReadAttemptedAfterClose = true;
		return;
	}

	LastActivity = MyEventMachine->GetCurrentLoopTime();

	int total_bytes_read = 0;
	char readbuffer [16 * 1024];

	for (int i=0; i < 10; i++) {
		// Don't read just one buffer and then move on. This is faster
		// if there is a lot of incoming.
		// But don't read indefinitely. Give other sockets a chance to run.
		// NOTICE, we're reading one less than the buffer size.
		// That's so we can put a guard byte at the end of what we send
		// to user code.
		// Use read instead of recv, which on Linux gives a "socket operation
		// on nonsocket" error.
		

		int r = read (sd, readbuffer, sizeof(readbuffer) - 1);
		//cerr << "<R:" << r << ">";

		if (r > 0) {
			total_bytes_read += r;

			// Add a null-terminator at the the end of the buffer
			// that we will send to the callback.
			// DO NOT EVER CHANGE THIS. We want to explicitly allow users
			// to be able to depend on this behavior, so they will have
			// the option to do some things faster. Additionally it's
			// a security guard against buffer overflows.
			readbuffer [r] = 0;
			_GenericInboundDispatch(readbuffer, r);
			}
		else if (r == 0) {
			break;
		}
		else {
			// Basically a would-block, meaning we've read everything there is to read.
			break;
		}

	}


	if (total_bytes_read == 0) {
		// If we read no data on a socket that selected readable,
		// it generally means the other end closed the connection gracefully.
		ScheduleClose (false);
		//bCloseNow = true;
	}

}

/*********************
PipeDescriptor::Write
*********************/

void PipeDescriptor::Write()
{
	int sd = GetSocket();
	assert (sd != INVALID_SOCKET);

	LastActivity = MyEventMachine->GetCurrentLoopTime();
	char output_buffer [16 * 1024];
	size_t nbytes = 0;

	while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) {
		OutboundPage *op = &(OutboundPages[0]);
		if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) {
			memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset);
			nbytes += (op->Length - op->Offset);
			op->Free();
			OutboundPages.pop_front();
		}
		else {
			int len = sizeof(output_buffer) - nbytes;
			memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len);
			op->Offset += len;
			nbytes += len;
		}
	}

	// We should never have gotten here if there were no data to write,
	// so assert that as a sanity check.
	// Don't bother to make sure nbytes is less than output_buffer because
	// if it were we probably would have crashed already.
	assert (nbytes > 0);

	assert (GetSocket() != INVALID_SOCKET);
	int bytes_written = write (GetSocket(), output_buffer, nbytes);
#ifdef OS_WIN32
	int e = WSAGetLastError();
#else
	int e = errno;
#endif

	if (bytes_written > 0) {
		OutboundDataSize -= bytes_written;
		if ((size_t)bytes_written < nbytes) {
			int len = nbytes - bytes_written;
			char *buffer = (char*) malloc (len + 1);
			if (!buffer)
				throw std::runtime_error ("bad alloc throwing back data");
			memcpy (buffer, output_buffer + bytes_written, len);
			buffer [len] = 0;
			OutboundPages.push_front (OutboundPage (buffer, len));
		}
		#ifdef HAVE_EPOLL
		EpollEvent.events = EPOLLIN;
		if (SelectForWrite())
			EpollEvent.events |= EPOLLOUT;
		assert (MyEventMachine);
		MyEventMachine->Modify (this);
		#endif
	}
	else {
		#ifdef OS_UNIX
		if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR))
		#endif
		#ifdef OS_WIN32
		if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK))
		#endif
			Close();
	}
}


/*************************
PipeDescriptor::Heartbeat
*************************/

void PipeDescriptor::Heartbeat()
{
	// If an inactivity timeout is defined, then check for it.
	if (InactivityTimeout && ((MyEventMachine->GetCurrentLoopTime() - LastActivity) >= InactivityTimeout))
		ScheduleClose (false);
		//bCloseNow = true;
}


/*****************************
PipeDescriptor::SelectForRead
*****************************/

bool PipeDescriptor::SelectForRead()
{
	/* Pipe descriptors, being local by definition, don't have
	 * a pending state, so this is simpler than for the
	 * ConnectionDescriptor object.
	 */
	return bPaused ? false : true;
}

/******************************
PipeDescriptor::SelectForWrite
******************************/

bool PipeDescriptor::SelectForWrite()
{
	/* Pipe descriptors, being local by definition, don't have
	 * a pending state, so this is simpler than for the
	 * ConnectionDescriptor object.
	 */
	return (GetOutboundDataSize() > 0) && !bPaused ? true : false;
}




/********************************
PipeDescriptor::SendOutboundData
********************************/

int PipeDescriptor::SendOutboundData (const char *data, unsigned long length)
{
	//if (bCloseNow || bCloseAfterWriting)
	if (IsCloseScheduled())
		return 0;

	if (!data && (length > 0))
		throw std::runtime_error ("bad outbound data");
	char *buffer = (char *) malloc (length + 1);
	if (!buffer)
		throw std::runtime_error ("no allocation for outbound data");
	memcpy (buffer, data, length);
	buffer [length] = 0;
	OutboundPages.push_back (OutboundPage (buffer, length));
	OutboundDataSize += length;
	#ifdef HAVE_EPOLL
	EpollEvent.events = (EPOLLIN | EPOLLOUT);
	assert (MyEventMachine);
	MyEventMachine->Modify (this);
	#endif
	return length;
}

/********************************
PipeDescriptor::GetSubprocessPid
********************************/

bool PipeDescriptor::GetSubprocessPid (pid_t *pid)
{
	bool ok = false;
	if (pid && (SubprocessPid > 0)) {
		*pid = SubprocessPid;
		ok = true;
	}
	return ok;
}


#endif // OS_UNIX


VaKeR 2022