summaryrefslogtreecommitdiff
path: root/maximus/system
diff options
context:
space:
mode:
authorlefranc2008-01-04 09:55:19 +0000
committerlefranc2008-01-04 09:55:19 +0000
commita42c5d2e7f1fda6539122928ab42ba4690af86df (patch)
tree492f4577e083a57de9de87d16cf6eced1255c467 /maximus/system
parent20c8ffeb7a0607c6edd9305b67367f1665673898 (diff)
- added socket implementation for maximus to station communication
- you need to define "STATION_SOCK" into host/config.h to enable socket communication git-svn-id: svn+ssh://pessac/svn/cesar/trunk@1211 017c9cb6-072f-447c-8318-d5b54f68fe89
Diffstat (limited to 'maximus/system')
-rw-r--r--maximus/system/inc/Station.h13
-rw-r--r--maximus/system/src/Station.cpp114
2 files changed, 109 insertions, 18 deletions
diff --git a/maximus/system/inc/Station.h b/maximus/system/inc/Station.h
index 0a328dd0d1..154f7b4b8f 100644
--- a/maximus/system/inc/Station.h
+++ b/maximus/system/inc/Station.h
@@ -31,6 +31,7 @@ The original location of this file is /home/buret/eclipse/maximus/system/inc/Sta
#ifndef STATION_H
#define STATION_H
+#include "host/config.h"
#include "system_types.h"
#include "sci_types.h"
@@ -40,8 +41,11 @@ The original location of this file is /home/buret/eclipse/maximus/system/inc/Sta
#define STATION_PIPE_PATH "/tmp"
#define STATION_PIPE_PREFIX "station"
+#define STATION_SOCK_PATH STATION_PIPE_PATH
+#define STATION_SOCK_PREFIX STATION_PIPE_PREFIX
#define STATION_WAIT_LOOP_NB 10
#define STATION_WAIT_TIMEOUT_MS 100
+#define STATION_MAX_SOCK_BUFFER_SIZE (256*1024)
class StationConfiguration;
@@ -63,8 +67,13 @@ private:
// private attributes
//
- File_Descriptor mInputPipe, mOutputPipe, mLogPipe;
- pid_t mPid;
+#ifdef STATION_SOCK
+ File_Descriptor mSocket;
+#else /* STATION_SOCK */
+ File_Descriptor mInputPipe, mOutputPipe;
+#endif /* STATION_SOCK */
+ File_Descriptor mLogPipe;
+ pid_t mPid;
Station_Status mStationStatus;
unsigned short int mStationIdleCounter;
StationConfiguration * mpStationConfiguration;
diff --git a/maximus/system/src/Station.cpp b/maximus/system/src/Station.cpp
index 3d6c9d9e4e..d035f6bc05 100644
--- a/maximus/system/src/Station.cpp
+++ b/maximus/system/src/Station.cpp
@@ -40,6 +40,8 @@ The original location of this file is /home/buret/eclipse/maximus/system/src/Sta
#include <fcntl.h>
#include <unistd.h> // for 'exec()' and 'system()'
#include <sys/signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
#include <errno.h>
#include <iomanip> // for 'setfill()' and 'setw()'
using namespace std;
@@ -53,8 +55,12 @@ extern string stationTest;
Station::Station ( string station_executable ):
+#ifdef STATION_SOCK
+mSocket(-1),
+#else /* STATION_SOCK */
mInputPipe(-1),
mOutputPipe(-1),
+#endif /* STATION_SOCK */
mLogPipe(-1),
mPid(0),
mStationStatus(MAXIMUS_STATION_STATUS_NONE),
@@ -153,7 +159,7 @@ void Station::displayStation ( ) const
{
logFunction();
- clog << logger(LOG_INFO) << "\t[input file descriptor = " << dec << getInputFileDescriptor() << ", output file descriptor = " << getOutputFileDescriptor() << ", log file descriptor = " << getLogFileDescriptor() \
+ clog << logger(LOG_INFO) << "\t[socket descriptor = " << dec << getInputFileDescriptor() << ", log file descriptor = " << getLogFileDescriptor() \
<< ", process id = " << getStationId() << " (0x" << setfill('0') << setw(4) << uppercase << hex << getStationId() << ")]" << dec << endl;
}
@@ -223,16 +229,22 @@ const pid_t Station::getPid ( ) const
bool Station::setInputFileDescriptor ( const File_Descriptor input_file_descriptor )
{
+#ifdef STATION_SOCK
+ mSocket = input_file_descriptor;
+#else /* STATION_SOCK */
mInputPipe = input_file_descriptor;
-
+#endif /* STATION_SOCK */
return true;
}
bool Station::setOutputFileDescriptor ( const File_Descriptor output_file_descriptor )
{
+#ifdef STATION_SOCK
+ mSocket = output_file_descriptor;
+#else /* STATION_SOCK */
mOutputPipe = output_file_descriptor;
-
+#endif /* STATION_SOCK */
return true;
}
@@ -247,13 +259,21 @@ bool Station::setLogFileDescriptor ( const File_Descriptor log_file_descriptor )
const File_Descriptor Station::getInputFileDescriptor ( ) const
{
+#ifdef STATION_SOCK
+ return mSocket;
+#else /* STATION_SOCK */
return mInputPipe;
+#endif /* STATION_SOCK */
}
const File_Descriptor Station::getOutputFileDescriptor ( ) const
{
+#ifdef STATION_SOCK
+ return mSocket;
+#else /* STATION_SOCK */
return mOutputPipe;
+#endif /* STATION_SOCK */
}
@@ -356,6 +376,7 @@ void Station::startProcess ( const string station_executable )
char stationExecutable[station_executable.size()+1];
size_t stringLength = station_executable.copy(stationExecutable, station_executable.size());
stationExecutable[stringLength] = '\0';
+
if (-1 == execlp(stationExecutable, stationExecutable, NULL))
{
throw Error(__PRETTY_FUNCTION__, "Cannot launch station executable", errno);
@@ -366,33 +387,83 @@ void Station::startProcess ( const string station_executable )
// Loop until pipe creation: max = STATION_WAIT_LOOP_NB * STATION_WAIT_TIMEOUT_MS ms
//
int loop;
+ int fd_status;
char nameBuffer[256];
- sprintf(nameBuffer, "%s/%s_in_%d", STATION_PIPE_PATH, STATION_PIPE_PREFIX, getPid());
+
+#ifdef STATION_SOCK
+ struct sockaddr_un sockaddr;
+ int bufsize = STATION_MAX_SOCK_BUFFER_SIZE;
+ sprintf(nameBuffer, "%s/%s_sock_%d", STATION_SOCK_PATH, STATION_SOCK_PREFIX, getPid());
+ // open the socket
+ if((mSocket = socket (PF_UNIX, SOCK_STREAM, 0)) < 0)
+ {
+ throw Error(__FUNCTION__ , "Cannot create socket", errno);
+ }
+
+ // avoid other stations to inherit the socket
+ fd_status = fcntl(mSocket, F_GETFD);
+ fcntl(mSocket, F_SETFD, fd_status | FD_CLOEXEC);
+
+ // extend send buffer size
+ if(setsockopt (mSocket, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)) < 0)
+ {
+ throw Error(__FUNCTION__ , "Cannot increase buffer size", errno);
+ }
+ sockaddr.sun_family = AF_UNIX;
+ strcpy (sockaddr.sun_path, nameBuffer);
+
for(loop = 0; loop < STATION_WAIT_LOOP_NB; loop++)
{
- // Open my output pipe
+ // Connect to server
//
- if((mOutputPipe = open(nameBuffer, O_WRONLY)) >= 0) // my output pipe is the station in pipe
- {
+ if((connect(mSocket, (struct sockaddr *)&sockaddr, sizeof(sockaddr))) >= 0)
break;
- }
usleep(STATION_WAIT_TIMEOUT_MS * 1000);
}
+
if(loop >= STATION_WAIT_LOOP_NB)
{
- stopProcess();
- throw Error(__FUNCTION__ , "Cannot open output pipe", errno);
+ Error e(__FUNCTION__ , "Cannot connect to station", errno);
+ e.display();
+ stopProcess();
+ throw e;
}
- // Open my input pipe
- //
+#else /* STATION_SOCK */
+ // Open input pipe
sprintf(nameBuffer, "%s/%s_out_%d", STATION_PIPE_PATH, STATION_PIPE_PREFIX, getPid());
- if((mInputPipe = open(nameBuffer, O_RDONLY)) < 0) // my input pipe is the station out pipe
+ for(loop = 0; loop < STATION_WAIT_LOOP_NB; loop++)
+ {
+ // Connect to server
+ //
+ if((mInputPipe = open(nameBuffer, O_RDONLY | O_NONBLOCK)) >= 0)
+ break;
+ usleep(STATION_WAIT_TIMEOUT_MS * 1000);
+ }
+ if(loop >= STATION_WAIT_LOOP_NB)
+ {
+ Error e(__FUNCTION__ , "Cannot connect to station", errno);
+ e.display();
+ stopProcess();
+ throw e;
+ }
+
+ // avoid other stations to inherit the pipe
+ fd_status = fcntl(mInputPipe, F_GETFD);
+ fcntl(mInputPipe, F_SETFD, fd_status | FD_CLOEXEC);
+
+ // Open output pipe
+ sprintf(nameBuffer, "%s/%s_in_%d", STATION_PIPE_PATH, STATION_PIPE_PREFIX, getPid());
+ if((mOutputPipe = open(nameBuffer, O_WRONLY | O_NONBLOCK)) < 0)
{
stopProcess();
- throw Error(__PRETTY_FUNCTION__, "Cannot open input pipe", errno);
+ throw Error(__PRETTY_FUNCTION__, "Cannot open output pipe", errno);
}
-
+ // avoid other stations to inherit the pipe
+ fd_status = fcntl(mOutputPipe, F_GETFD);
+ fcntl(mOutputPipe, F_SETFD, fd_status | FD_CLOEXEC);
+#endif /* STATION_SOCK */
+
// Open log pipe
//
sprintf(nameBuffer, "%s/%s_log_%d", STATION_PIPE_PATH, STATION_PIPE_PREFIX, getPid());
@@ -401,13 +472,23 @@ void Station::startProcess ( const string station_executable )
stopProcess();
throw Error(__PRETTY_FUNCTION__, "Cannot open log pipe", errno);
}
+
+ // avoid other stations to inherit the pipe
+ fd_status = fcntl(mLogPipe, F_GETFD);
+ fcntl(mLogPipe, F_SETFD, fd_status | FD_CLOEXEC);
+
}
void Station::stopProcess ( )
{
logFunction();
-
+#ifdef STATION_SOCK
+ if(mSocket >= 0)
+ {
+ close(mSocket);
+ }
+#else /* STATION_SOCK */
if(mInputPipe >= 0)
{
close(mInputPipe);
@@ -416,6 +497,7 @@ void Station::stopProcess ( )
{
close(mOutputPipe);
}
+#endif /* STATION_SOCK */
if(mLogPipe >= 0)
{
close(mLogPipe);