GNU/Linux >> Tutoriales Linux >  >> Linux

Fifo sin bloqueo de Linux (registro bajo demanda)

Este es un hilo (muy) antiguo, pero me he encontrado con un problema similar últimamente. De hecho, lo que necesitaba es una clonación de stdin a stdout con una copia a una tubería que no bloquee. el ftee propuesto en la primera respuesta realmente ayudó allí, pero era (para mi caso de uso) demasiado volátil. Lo que significa que perdí datos que podría haber procesado si hubiera llegado a tiempo.

El escenario al que me enfrenté es que tengo un proceso (some_process) que agrega algunos datos y escribe sus resultados cada tres segundos en la salida estándar. La configuración (simplificada) se veía así (en la configuración real estoy usando una canalización con nombre):

some_process | ftee >(onlineAnalysis.pl > results) | gzip > raw_data.gz

Ahora, raw_data.gz debe comprimirse y completarse. ftee hace este trabajo muy bien. Pero la tubería que estoy usando en el medio fue demasiado lenta para capturar los datos eliminados, pero fue lo suficientemente rápido como para procesar todo si podía llegar a él, lo cual se probó con una camiseta normal. Sin embargo, un tee normal se bloquea si algo le sucede a la tubería sin nombre, y como quiero poder conectarme a pedido, el tee no es una opción. Volviendo al tema:mejoró cuando puse un búfer en el medio, lo que resultó en:

some_process | ftee >(mbuffer -m 32M| onlineAnalysis.pl > results) | gzip > raw_data.gz

Pero eso seguía perdiendo datos que podría haber procesado. Así que seguí adelante y amplié el ftee propuesto anteriormente a una versión con búfer (bftee). Todavía tiene las mismas propiedades, pero usa un búfer interno (¿ineficiente?) en caso de que falle una escritura. Todavía pierde datos si el búfer se llena, pero funciona maravillosamente en mi caso. Como siempre, hay mucho espacio para mejorar, pero como copié el código de aquí, me gustaría compartirlo con las personas que podrían usarlo.

/* bftee - clone stdin to stdout and to a buffered, non-blocking pipe 
    (c) [email protected]
    (c) [email protected]
    WTFPL Licence */

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include <errno.h>
    #include <signal.h>
    #include <unistd.h>

    // the number of sBuffers that are being held at a maximum
    #define BUFFER_SIZE 4096
    #define BLOCK_SIZE 2048

    typedef struct {
      char data[BLOCK_SIZE];
      int bytes;
    } sBuffer;

    typedef struct {
      sBuffer *data;  //array of buffers
      int bufferSize; // number of buffer in data
      int start;      // index of the current start buffer
      int end;        // index of the current end buffer
      int active;     // number of active buffer (currently in use)
      int maxUse;     // maximum number of buffers ever used
      int drops;      // number of discarded buffer due to overflow
      int sWrites;    // number of buffer written to stdout
      int pWrites;    // number of buffers written to pipe
    } sQueue;

    void InitQueue(sQueue*, int);              // initialized the Queue
    void PushToQueue(sQueue*, sBuffer*, int);  // pushes a buffer into Queue at the end 
    sBuffer *RetrieveFromQueue(sQueue*);       // returns the first entry of the buffer and removes it or NULL is buffer is empty
    sBuffer *PeakAtQueue(sQueue*);             // returns the first entry of the buffer but does not remove it. Returns NULL on an empty buffer
    void ShrinkInQueue(sQueue *queue, int);    // shrinks the first entry of the buffer by n-bytes. Buffer is removed if it is empty
    void DelFromQueue(sQueue *queue);          // removes the first entry of the queue

    static void sigUSR1(int);                  // signal handled for SUGUSR1 - used for stats output to stderr
    static void sigINT(int);                   // signla handler for SIGKILL/SIGTERM - allows for a graceful stop ?

    sQueue queue;                              // Buffer storing the overflow
    volatile int quit;                         // for quiting the main loop

    int main(int argc, char *argv[])
    {   
        int readfd, writefd;
        struct stat status;
        char *fifonam;
        sBuffer buffer;
        ssize_t bytes;
        int bufferSize = BUFFER_SIZE;

        signal(SIGPIPE, SIG_IGN);
        signal(SIGUSR1, sigUSR1);
        signal(SIGTERM, sigINT);
        signal(SIGINT,  sigINT);

        /** Handle commandline args and open the pipe for non blocking writing **/

        if(argc < 2 || argc > 3)
        {   
            printf("Usage:\n someprog 2>&1 | %s FIFO [BufferSize]\n"
                   "FIFO - path to a named pipe, required argument\n"
                   "BufferSize - temporary Internal buffer size in case write to FIFO fails\n", argv[0]);
            exit(EXIT_FAILURE);
        }

        fifonam = argv[1];
        if (argc == 3) {
          bufferSize = atoi(argv[2]);
          if (bufferSize == 0) bufferSize = BUFFER_SIZE;
        }

        readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
        if(-1==readfd)
        {   
            perror("bftee: readfd: open()");
            exit(EXIT_FAILURE);
        }

        if(-1==fstat(readfd, &status))
        {
            perror("bftee: fstat");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        if(!S_ISFIFO(status.st_mode))
        {
            printf("bftee: %s in not a fifo!\n", fifonam);
            close(readfd);
            exit(EXIT_FAILURE);
        }

        writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
        if(-1==writefd)
        {
            perror("bftee: writefd: open()");
            close(readfd);
            exit(EXIT_FAILURE);
        }

        close(readfd);


        InitQueue(&queue, bufferSize);
        quit = 0;

        while(!quit)
        {
            // read from STDIN
            bytes = read(STDIN_FILENO, buffer.data, sizeof(buffer.data));

            // if read failed due to interrupt, then retry, otherwise STDIN has closed and we should stop reading
            if (bytes < 0 && errno == EINTR) continue;
            if (bytes <= 0) break;

            // save the number if read bytes in the current buffer to be processed
            buffer.bytes = bytes;

            // this is a blocking write. As long as buffer is smaller than 4096 Bytes, the write is atomic to a pipe in Linux
            // thus, this cannot be interrupted. however, to be save this should handle the error cases of partial or interrupted write none the less.
            bytes = write(STDOUT_FILENO, buffer.data, buffer.bytes);
            queue.sWrites++;

            if(-1==bytes) {
                perror("ftee: writing to stdout");
                break;
            }

            sBuffer *tmpBuffer = NULL;

            // if the queue is empty (tmpBuffer gets set to NULL) the this does nothing - otherwise it tries to write
            // the buffered data to the pipe. This continues until the Buffer is empty or the write fails.
            // NOTE: bytes cannot be -1  (that would have failed just before) when the loop is entered. 
            while ((bytes != -1) && (tmpBuffer = PeakAtQueue(&queue)) != NULL) {
               // write the oldest buffer to the pipe
               bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);

               // the  written bytes are equal to the buffer size, the write is successful - remove the buffer and continue
               if (bytes == tmpBuffer->bytes) {
                 DelFromQueue(&queue);
                 queue.pWrites++;
               } else if (bytes > 0) {
                 // on a positive bytes value there was a partial write. we shrink the current buffer
                 //  and handle this as a write failure
                 ShrinkInQueue(&queue, bytes);
                 bytes = -1;
               }
            }
            // There are several cases here:
            // 1.) The Queue is empty -> bytes is still set from the write to STDOUT. in this case, we try to write the read data directly to the pipe
            // 2.) The Queue was not empty but is now -> bytes is set from the last write (which was successful) and is bigger 0. also try to write the data
            // 3.) The Queue was not empty and still is not -> there was a write error before (even partial), and bytes is -1. Thus this line is skipped.
            if (bytes != -1) bytes = write(writefd, buffer.data, buffer.bytes);

            // again, there are several cases what can happen here
            // 1.) the write before was successful -> in this case bytes is equal to buffer.bytes and nothing happens
            // 2.) the write just before is partial or failed all together - bytes is either -1 or smaller than buffer.bytes -> add the remaining data to the queue
            // 3.) the write before did not happen as the buffer flush already had an error. In this case bytes is -1 -> add the remaining data to the queue
            if (bytes != buffer.bytes)
              PushToQueue(&queue, &buffer, bytes);
            else 
              queue.pWrites++;
        }

        // once we are done with STDIN, try to flush the buffer to the named pipe
        if (queue.active > 0) {
           //set output buffer to block - here we wait until we can write everything to the named pipe
           // --> this does not seem to work - just in case there is a busy loop that waits for buffer flush aswell. 
           int saved_flags = fcntl(writefd, F_GETFL);
           int new_flags = saved_flags & ~O_NONBLOCK;
           int res = fcntl(writefd, F_SETFL, new_flags);

           sBuffer *tmpBuffer = NULL;
           //TODO: this does not handle partial writes yet
           while ((tmpBuffer = PeakAtQueue(&queue)) != NULL) {
             int bytes = write(writefd, tmpBuffer->data, tmpBuffer->bytes);
             if (bytes != -1) DelFromQueue(&queue);
           }
        }

        close(writefd);

    }


    /** init a given Queue **/
    void InitQueue (sQueue *queue, int bufferSize) {
      queue->data = calloc(bufferSize, sizeof(sBuffer));
      queue->bufferSize = bufferSize;
      queue->start = 0;
      queue->end = 0;
      queue->active = 0;
      queue->maxUse = 0;
      queue->drops = 0;
      queue->sWrites = 0;
      queue->pWrites = 0;
    }

    /** push a buffer into the Queue**/
    void PushToQueue(sQueue *queue, sBuffer *p, int offset)
    {

        if (offset < 0) offset = 0;      // offset cannot be smaller than 0 - if that is the case, we were given an error code. Set it to 0 instead
        if (offset == p->bytes) return;  // in this case there are 0 bytes to add to the queue. Nothing to write

        // this should never happen - offset cannot be bigger than the buffer itself. Panic action
        if (offset > p->bytes) {perror("got more bytes to buffer than we read\n"); exit(EXIT_FAILURE);}

        // debug output on a partial write. TODO: remove this line
        // if (offset > 0 ) fprintf(stderr, "partial write to buffer\n");

        // copy the data from the buffer into the queue and remember its size
        memcpy(queue->data[queue->end].data, p->data + offset , p->bytes-offset);
        queue->data[queue->end].bytes = p->bytes - offset;

        // move the buffer forward
        queue->end = (queue->end + 1) % queue->bufferSize;

        // there is still space in the buffer
        if (queue->active < queue->bufferSize)
        {
            queue->active++;
            if (queue->active > queue->maxUse) queue->maxUse = queue->active;
        } else {
            // Overwriting the oldest. Move start to next-oldest
            queue->start = (queue->start + 1) % queue->bufferSize;
            queue->drops++;
        }
    }

    /** return the oldest entry in the Queue and remove it or return NULL in case the Queue is empty **/
    sBuffer *RetrieveFromQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }

        queue->start = (queue->start + 1) % queue->bufferSize;
        queue->active--;
        return &(queue->data[queue->start]);
    }

    /** return the oldest entry in the Queue or NULL if the Queue is empty. Does not remove the entry **/
    sBuffer *PeakAtQueue(sQueue *queue)
    {
        if (!queue->active) { return NULL; }
        return &(queue->data[queue->start]);
    }

    /*** Shrinks the oldest entry i the Queue by bytes. Removes the entry if buffer of the oldest entry runs empty*/
    void ShrinkInQueue(sQueue *queue, int bytes) {

      // cannot remove negative amount of bytes - this is an error case. Ignore it
      if (bytes <= 0) return;

      // remove the entry if the offset is equal to the buffer size
      if (queue->data[queue->start].bytes == bytes) {
        DelFromQueue(queue);
        return;
      };

      // this is a partial delete
      if (queue->data[queue->start].bytes > bytes) {
        //shift the memory by the offset
        memmove(queue->data[queue->start].data, queue->data[queue->start].data + bytes, queue->data[queue->start].bytes - bytes);
        queue->data[queue->start].bytes = queue->data[queue->start].bytes - bytes;
        return;
      }

      // panic is the are to remove more than we have the buffer
      if (queue->data[queue->start].bytes < bytes) {
        perror("we wrote more than we had - this should never happen\n");
        exit(EXIT_FAILURE);
        return;
      }
    }

    /** delete the oldest entry from the queue. Do nothing if the Queue is empty **/
    void DelFromQueue(sQueue *queue)
    {
        if (queue->active > 0) {
          queue->start = (queue->start + 1) % queue->bufferSize;
          queue->active--;
        }
    }

    /** Stats output on SIGUSR1 **/
    static void sigUSR1(int signo) {
      fprintf(stderr, "Buffer use: %i (%i/%i), STDOUT: %i PIPE: %i:%i\n", queue.active, queue.maxUse, queue.bufferSize, queue.sWrites, queue.pWrites, queue.drops);
    }

    /** handle signal for terminating **/
    static void sigINT(int signo) {
      quit++;
      if (quit > 1) exit(EXIT_FAILURE);
    }

Esta versión toma un argumento más (opcional) que especifica el número de bloques que se almacenarán en el búfer para la tubería. Mi llamada de muestra ahora se ve así:

some_process | bftee >(onlineAnalysis.pl > results) 16384 | gzip > raw_data.gz

lo que da como resultado 16384 bloques para almacenar en búfer antes de que ocurran los descartes. esto usa unos 32 Mbytes más de memoria, pero... ¿a quién le importa?

Por supuesto, en el entorno real estoy usando una canalización con nombre para poder adjuntar y desconectar según sea necesario. Hay un aspecto como este:

mkfifo named_pipe
some_process | bftee named_pipe 16384 | gzip > raw_data.gz &
cat named_pipe | onlineAnalysis.pl > results

Además, el proceso reacciona a las señales de la siguiente manera:SIGUSR1 -> imprime contadores en STDERRSIGTERM, SIGINT -> primero sale del bucle principal y vacía el búfer a la tubería, el segundo finaliza el programa inmediatamente.

Tal vez esto ayude a alguien en el futuro... Disfrutar


Inspirado por su pregunta, he escrito un programa simple que le permitirá hacer esto:

$ myprogram 2>&1 | ftee /tmp/mylog

Se comporta de manera similar a tee pero clona la entrada estándar a la salida estándar y a una canalización con nombre (un requisito por ahora) sin bloquear. Esto significa que si desea iniciar sesión de esta manera, es posible que pierda sus datos de registro, pero supongo que es aceptable en su escenario.El truco es bloquear SIGPIPE señal e ignorar el error al escribir en un fifo roto. Esta muestra puede optimizarse de varias maneras, por supuesto, pero hasta ahora, creo que hace el trabajo.

/* ftee - clone stdin to stdout and to a named pipe 
(c) [email protected]
WTFPL Licence */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
    int readfd, writefd;
    struct stat status;
    char *fifonam;
    char buffer[BUFSIZ];
    ssize_t bytes;
    
    signal(SIGPIPE, SIG_IGN);

    if(2!=argc)
    {
        printf("Usage:\n someprog 2>&1 | %s FIFO\n FIFO - path to a"
            " named pipe, required argument\n", argv[0]);
        exit(EXIT_FAILURE);
    }
    fifonam = argv[1];

    readfd = open(fifonam, O_RDONLY | O_NONBLOCK);
    if(-1==readfd)
    {
        perror("ftee: readfd: open()");
        exit(EXIT_FAILURE);
    }

    if(-1==fstat(readfd, &status))
    {
        perror("ftee: fstat");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    if(!S_ISFIFO(status.st_mode))
    {
        printf("ftee: %s in not a fifo!\n", fifonam);
        close(readfd);
        exit(EXIT_FAILURE);
    }

    writefd = open(fifonam, O_WRONLY | O_NONBLOCK);
    if(-1==writefd)
    {
        perror("ftee: writefd: open()");
        close(readfd);
        exit(EXIT_FAILURE);
    }

    close(readfd);

    while(1)
    {
        bytes = read(STDIN_FILENO, buffer, sizeof(buffer));
        if (bytes < 0 && errno == EINTR)
            continue;
        if (bytes <= 0)
            break;

        bytes = write(STDOUT_FILENO, buffer, bytes);
        if(-1==bytes)
            perror("ftee: writing to stdout");
        bytes = write(writefd, buffer, bytes);
        if(-1==bytes);//Ignoring the errors
    }
    close(writefd); 
    return(0);
}

Puede compilarlo con este comando estándar:

$ gcc ftee.c -o ftee

Puede verificarlo rápidamente ejecutando, por ejemplo:

$ ping www.google.com | ftee /tmp/mylog

$ cat /tmp/mylog

También tenga en cuenta que esto no es un multiplexor. Solo puede tener un proceso haciendo $ cat /tmp/mylog a la vez.


Parece que bash <> El operador de redirección (3.6.10 Abrir descriptores de archivo para leer y escribir) hace que escribir en un archivo/fifo abierto no bloquee. Esto debería funcionar:

$ mkfifo /tmp/mylog
$ exec 4<>/tmp/mylog
$ myprogram 2>&1 | tee >&4
$ cat /tmp/mylog # on demend

Solución dada por gniourf_gniourf en el canal IRC #bash.


Sin embargo, esto crearía un archivo de registro cada vez mayor incluso si no se usa hasta que la unidad se quede sin espacio.

¿Por qué no rotar periódicamente los registros? Incluso hay un programa para hacerlo por ti logrotate .

También hay un sistema para generar mensajes de registro y hacer diferentes cosas con ellos según el tipo. Se llama syslog .

Incluso podrías combinar los dos. Haga que su programa genere mensajes de syslog, configure syslog para colocarlos en un archivo y use logrotate para asegurarse de que no llenen el disco.

Si resulta que estaba escribiendo para un pequeño sistema integrado y la salida del programa es pesada, hay una variedad de técnicas que podría considerar.

  • Syslog remoto:envíe los mensajes de syslog a un servidor de syslog en la red.
  • Use los niveles de gravedad disponibles en syslog para hacer cosas diferentes con los mensajes. P.ej. descarte "INFO" pero registre y reenvíe "ERR" o mayor. P.ej. consolar
  • Utilice un controlador de señales en su programa para volver a leer la configuración en HUP y variar la generación de registros "a pedido" de esta manera.
  • Haga que su programa escuche en un socket Unix y escriba mensajes cuando esté abierto. Incluso podría implementar una consola interactiva en su programa de esta manera.
  • Usando un archivo de configuración, proporcione un control granular de la salida de registro.

Linux
  1. Comando dmesg de Linux - Imprimir búfer de anillo del kernel

  2. Linux:¿entiende cómo iniciar sesión en Linux?

  3. ¿Cómo se realiza la E/S de la consola sin bloqueo en Linux en C?

  4. ¿La asignación de memoria en Linux no bloquea?

  5. ¿Cómo asignar un búfer del kernel de Linux al espacio del usuario?

Cómo iniciar automáticamente la sesión de pantalla en Linux al iniciar sesión

Guía completa de registro de Linux

Gestión de memoria de Linux:memoria virtual y paginación de demanda

C++ Obtener una cadena del Portapapeles en Linux

Vuelva a cargar las asignaciones de grupo de un usuario de Linux sin cerrar sesión

registro de la marca de agua máxima de la memoria RAM de un proceso de Linux