Home - GabrielTrev/Trabalho-SO2 GitHub Wiki

1. Introdução

Pipes são um mecanismo de comunicação interprocessos presentes em todas as versões do linux. É um fluxo de dados de mão única entre processos: todo dado escrito por um processo no pipe é encaminhado pelo kernel a outro processo que irá lê-lo.

Um exemplo da utilização do pipe está no shell. A barra vertical | permite a criação de um pipe para a comunicação entre dois processos:

$ ls | more

O output padrão do primeiro processo ls é redirecionado para o pipe e o segundo processo more passa a receber em seu input os dados desse mesmo pipe.

Pipes podem ser considerados arquivos abertos sem uma imagem correspondente no sistema de arquivos montados. Um processo cria um novo pipe através da chamada de sistema pipe(), que retorna um par de file descriptors. O processo, então, compartilha esse descritores aos seus descendentes através de um fork(). O processo realiza a leitura do pipe através da chamada de sistema read() e a escrita no pipe através da write().

POSIX define pipes como half-duplex, ou seja, mesmo que a chamada de sistema retorne dois descritores de arquivo, cada processo deveria fechar um antes de usar o outro. Caso não o faça, pode acontecer de não encontrar EOF ou de esperar por muito tempo pra poder escrever.

2. Chamada de sistema

Um pipe pode ser criado com as chamadas pipe(2) e pipe2(2):

int pipe (int pipefd[2]); 
int pipe2 (int pipefd[2], int flags);

Em include/linux/syscalls.h

asmlinkage long sys_pipe2(int __user *fildes, int flags);

Em /asm/unistd.h

#define __NR_pipe2				311
__SYSCALL(311, sys_pipe2, 2)

As flags que podem ser passadas como parâmetro são as seguintes: O_CLOEXEC, O_DIRECT e O_NONBLOCK. No início da criação do pipe é feita uma verificação se a flag passada como parâmetro é uma dessas:

static int __do_pipe_flags(int *fd, struct file **files, int flags)
{
        ...
	if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT))
		return -EINVAL;
        ...
}

O_CLOEXEC

Realiza o fechamento automático do pipe quando a chamada de sistema exec() é executada. Para isso, seta a flag FD_CLOEXEC nos dois novos descritores de arquivos.

Em fs/file.c

int __alloc_fd(struct files_struct *files, unsigned start, unsigned end, unsigned flags)
{
...
	if (flags & O_CLOEXEC)
		__set_close_on_exec(fd, fdt);
	else
		__clear_close_on_exec(fd, fdt);
...
static inline void __set_close_on_exec(unsigned int fd, struct fdtable *fdt)
{
	__set_bit(fd, fdt->close_on_exec);
}
static struct fdtable * alloc_fdtable(unsigned int nr)
{
	struct fdtable *fdt;
	void *data;

	/*
	 * Figure out how many fds we actually want to support in this fdtable.
	 * Allocation steps are keyed to the size of the fdarray, since it
	 * grows far faster than any of the other dynamic data. We try to fit
	 * the fdarray into comfortable page-tuned chunks: starting at 1024B
	 * and growing in powers of two from there on.
	 */
	nr /= (1024 / sizeof(struct file *));
	nr = roundup_pow_of_two(nr + 1);
	nr *= (1024 / sizeof(struct file *));
	/*
	 * Note that this can drive nr *below* what we had passed if sysctl_nr_open
	 * had been set lower between the check in expand_files() and here.  Deal
	 * with that in caller, it's cheaper that way.
	 *
	 * We make sure that nr remains a multiple of BITS_PER_LONG - otherwise
	 * bitmaps handling below becomes unpleasant, to put it mildly...
	 */
	if (unlikely(nr > sysctl_nr_open))
		nr = ((sysctl_nr_open - 1) | (BITS_PER_LONG - 1)) + 1;

	fdt = kmalloc(sizeof(struct fdtable), GFP_KERNEL_ACCOUNT);
	if (!fdt)
		goto out;
	fdt->max_fds = nr;
	data = alloc_fdmem(nr * sizeof(struct file *));
	if (!data)
		goto out_fdt;
	fdt->fd = data;

	data = alloc_fdmem(max_t(size_t,
				 2 * nr / BITS_PER_BYTE + BITBIT_SIZE(nr), L1_CACHE_BYTES));
	if (!data)
		goto out_arr;
	fdt->open_fds = data;
	data += nr / BITS_PER_BYTE;
	fdt->close_on_exec = data;
	data += nr / BITS_PER_BYTE;
	fdt->full_fds_bits = data;

	return fdt;

out_arr:
	kvfree(fdt->fd);
out_fdt:
	kfree(fdt);
out:
	return NULL;
}

O_DIRECT

Cria um pipe que realiza operacoes de entrada e saída em modo de pacotes. Cada escrita, write(), ao pipe é tratada como um pacote separado e cada leitura, read() vai ler um pacote por vez.

É interessante notar que escritas maiores do que PIPE_BUF bytes serão divididas em múltiplos pacotes.

Se uma operação de leitura, read(), especificar um tamanho de buffer menor do que o próximo pacote, então o número de byytes requisitado é lido e os bytes em excesso são descartados. Especificar um tamanho do buffer, PIPE_BUF, é suficiente para ler os maiores pacotes possíveis. (observe o ponto anterior)

Pacotes de tamanho zero não são suportados.

Em include/asm/limits.h

#ifndef __ASM_PIPE_H
#define __ASM_PIPE_H

#ifndef PAGE_SIZE
#include <asm/page.h>
#endif

#define PIPE_BUF	PAGE_SIZE

#endif
static inline int is_packetized(struct file *file)
{
	return (file->f_flags & O_DIRECT) != 0;
}
int create_pipe_files(struct file **res, int flags)
{
        ...
	f->f_flags = O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT));
        ...
}

O_NONBLOCK

Essa flag, quando setada, faz com que o comando open() para leitura apenas, retorne sem atraso. Já a mesma operação para escrita, irá retornar erro se não houver outro processo com o arquivo aberto para leitura.

No caso dessa flag não estar setada a tentativa de abrir o arquivo em modo leitura irá bloquear a thread que fez a chamada até que outra thread abra o arquivo para escrita, da mesma forma irá ocorrer na abertura para escrita, bloqueando até que outra abra para leitura.

A chamada de sistema fcntl() poderia realizar a mesma funcao, mas para evitar mais chamadas, a verificação é realizada no código pipe.c nas implementações da leitura pipe_read() e escrita pipe_write().

pipe_read(struct kiocb *iocb, struct iov_iter *to){
                ...
		if (!pipe->waiting_writers) {
			/* syscall merging: Usually we must not sleep
			 * if O_NONBLOCK is set, or if we got some data.
			 * But if a writer sleeps in kernel space, then
			 * we can wait for that data without violating POSIX.
			 */
			if (ret)
				break;
			if (filp->f_flags & O_NONBLOCK) {
				ret = -EAGAIN;
				break;
			}
		}
		if (signal_pending(current)) {
			if (!ret)
				ret = -ERESTARTSYS;
			break;
		}
		if (do_wakeup) {
			wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
 			kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
		}
		pipe_wait(pipe);
	}
	__pipe_unlock(pipe);
                ...
}
pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
...
if (bufs < pipe->buffers)
			continue;
if (filp->f_flags & O_NONBLOCK) {
			if (!ret)
				ret = -EAGAIN;
			break;
		}
                ...
}

No vetor de arquivos abertos do processo, a posição indicada por pfd[0] é aberta apenas para leitura (O_RDONLY) e a posição pfd[1] é aberta apenas para escrita (O_WRONLY). Assim, não é possível escrever em pfd[0] ou ler de pfd[1].

3. Estrutura de Dados

O Pipe possui uma estrutura essencial, pipe_inode_info , que faz parte das estruturas dos arquivos de Pipe, como por exemplo o inode. Abaixo, temos a implementação dessa estrutura:

struct pipe_inode_info {
	struct mutex mutex;
	wait_queue_head_t wait;
	unsigned int nrbufs, curbuf, buffers;
	unsigned int readers;
	unsigned int writers;
	unsigned int files;
	unsigned int waiting_writers;
	unsigned int r_counter;
	unsigned int w_counter;
	struct page *tmp_page;
	struct fasync_struct *fasync_readers;
	struct fasync_struct *fasync_writers;
	struct pipe_buffer *bufs;
	struct user_struct *user;
};

Em destacando as variáveis dessa estrutura, temos em um primeiro caso a mutex, utilizada para proteção do Pipe e assim não se tem o risco de ocorrer uma escrita, enquanto o outro lê por exemplo. Em resumo é utilizada para garantir exclusão mútua tanto na escrita, quanto na leitura do Pipe. Para realizar essas tarefas temos algumas funções, com destaque na void pipe_lock(struct pipe_inode_info *pipe) e void pipe_unlock(struct pipe_inode_info *pipe), que recebe a referência para o Pipe e altera o mutex. É utilizada para proteção do Pipe, assim não se tem o risco de um processo conseguir a permissão de escrever no Pipe, enquanto o outro lê por exemplo.

void pipe_lock(struct pipe_inode_info *pipe)
{
	/*
	 * pipe_lock() nests non-pipe inode locks (for writing to a file)
	 */
	pipe_lock_nested(pipe, I_MUTEX_PARENT);
}
EXPORT_SYMBOL(pipe_lock);

void pipe_unlock(struct pipe_inode_info *pipe)
{
	if (pipe->files)
		mutex_unlock(&pipe->mutex);
}
EXPORT_SYMBOL(pipe_unlock);

Em relação a variável wait, temos uma lista de espera contendo leitores e escritores que aguardam a vez de utilizar o Pipe quando este está vazia ou cheio por exemplo. Um exemplo é a função void pipe_wait(struct pipe_inode_info *pipe), onde um escritor por exemplo, chama esssa rotina e diz que quer esperar uma leitura por exemplo para poder continuar sua escrita. Assim ele unlock e libera o uso do Pipe e assim quando tiver ocorrido a leitura e assim finalizar a espera, ele assume o pipe novamente e continua a escrita.

void pipe_wait(struct pipe_inode_info *pipe)
{
	DEFINE_WAIT(wait);

	/*
	 * Pipes are system-local resources, so sleeping on them
	 * is considered a noninteractive wait:
	 */
	prepare_to_wait(&pipe->wait, &wait, TASK_INTERRUPTIBLE);
	pipe_unlock(pipe);
	schedule();
	finish_wait(&pipe->wait, &wait);
	pipe_lock(pipe);
}

Uma outra variável que iremos dar destaque, é a @tmp_page que possui o intuito de reserva uma página, alocada, disponível para ser utilizada por um buffer. No código abaixo por exemplo, temos que se uma página referenciada pelo buffer não é mais utilizada pelo pipe e se a página reservada para alocação estiver não referenciada. Esta página, tmp_page, recebe a referencia da página não utilizada, e assim, passa a ter uma referência para futuras posições do cache.

static void anon_pipe_buf_release(struct pipe_inode_info *pipe,
				  struct pipe_buffer *buf)
{
	struct page *page = buf->page;

	/*
	 * If nobody else uses this page, and we don't already have a
	 * temporary page, let's keep track of it as a one-deep
	 * allocation cache. (Otherwise just release our reference to it)
	 */
	if (page_count(page) == 1 && !pipe->tmp_page)
		pipe->tmp_page = page;
	else
		put_page(page);
}

Em relação as demais variáveis, temos as seguintes definições:

@waiting_writers: Número de escritores esperando
@r_counter: Contador de leitura
@w_counter: Contador de escrita
@user: Criador do buffer

Lembrando que as posiçoes do Buffer representam as struct buffer, como representado na figura abaixo

struct pipe_buffer {
	struct page *page;
	unsigned int offset, len;
	const struct pipe_buf_operations *ops;
	unsigned int flags;
	unsigned long private;
};

Em uma ultima análise, na imagem abaixo, temos o vetor de buffers, que se baseia em uma lista circular, onde cada posição pode apontar para uma página na memória, onde acontece a escrita e a leitura no Pipe. Como variáveis de destaque, temos a bufs que é a própria lista, a nrbufs que representa o número de buffers com conteúdo escrito, o curbuf que é o buffer atual, onde esta acontecendo leitura ou escrita. A imagem abaixo ilustra a situação

Calculo da próxima posição do Buffer

4. Criação do pipe

A criação de um pipe é feita pelo usuário a partir das chamadas de sistema pipe e pipe2. A diferença entre as duas chamadas é que o parâmetro flags é configurado automaticamente como 0 na chamada pipe, sendo que ele é especificado na chamada pipe2.

// Chamada de sistema pipe - chama pipe2 com flags = 0
SYSCALL_DEFINE1(pipe, int __user *, fildes)
{
	return sys_pipe2(fildes, 0);
}
// Chamada de sistema pipe2 - responsável pela criação de um novo pipe
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
	struct file *files[2]; // Vetor de arquivos do pipe (leitura e escrita)
	int fd[2]; // Vetor de descritores de arquivo
	
	int error;
	
	// Função principal usada para criacao
	// Antigamente, era chamada de do_pipe
	error = __do_pipe_flags(fd, files, flags);
	if (!error) {
		// Passa para fildes os descritores alocados
		// Retorna 0 se nao houver erro
		if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) {
			// Passa por aqui se houver erro	
			// Remove referencia aos arquivos de pipe
			fput(files[0]);
			fput(files[1]);
			
			// Desaloca entradas do vetor de arquivos abertos
			put_unused_fd(fd[0]);
			put_unused_fd(fd[1]);

			error = -EFAULT;
		} else {
			
			// Atribui entradas do vetor de arquivos aos arquivos de pipe
			fd_install(fd[0], files[0]);
			fd_install(fd[1], files[1]);
		}
	}
	return error;
}

A função __do_pipe_flags é responsavel pela criacao dos arquivos do pipe e da alocacao de descritores para entradas nao utilizadas no vetor de arquivos abertos para referenciar esses arquivos.

// Funcao __do_pipe_flags - chamadas pela syscall para criar pipe
static int __do_pipe_flags(int *fd, struct file **files, int flags)
{
	// Valor de retorno
	int error;
	
	// Descritores dos arquivos de escrita e leitura
	int fdw, fdr;
	
	// Possiveis flags:
	// 		O_CLOEXEC: arquivos sao fechados, no caso de chamada exec
	//  	O_NONBLOCK: leitura e escrita sao nao bloqueantes
	// 		O_DIRECT: escrita e feita em packet mode
	// Faz verificacao das flags, retorna EINVAL se houver valor invalido
	if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT))
		return -EINVAL;

	// Cria os arquivos do pipe - referenciado por files
	error = create_pipe_files(files, flags);
	if (error)
		return error;
	
	// Aloca uma entrada no vetor de arquivos abertos para a leitura
	error = get_unused_fd_flags(flags);
	if (error < 0)
		goto err_read_pipe;
	fdr = error;
	
	// Aloca uma entrada no vetor de arquivos abertos para a escrita
	error = get_unused_fd_flags(flags);
	if (error < 0)
		goto err_fdr;
	fdw = error;

	audit_fd_pair(fdr, fdw);
	
	fd[0] = fdr;
	fd[1] = fdw;
	return 0;

 err_fdr:
	// caso a alocacao de fdw tenha sido mal sucedida, libera a entrada de fdr
	put_unused_fd(fdr);

 err_read_pipe:

	// Libera os arquivos criados
	fput(files[0]);
	fput(files[1]);
	return error;
}

Os pipes são visto pelos processos como se fossem arquivos comuns. Isso é possível devido ao sistema de arquivos virtual do linux. Na inicialização do kernel, um sistema de arquivos especial para pipes, pipefs é montado. A partir desse sistema de arquivos, os pipes são armazenados como inodes em estruturas de diretórios no diretório raiz do sistema de arquivos. Esse sistema de arquivos criado é considerado um pseudo-filesystem, já que não é persistente. Ele também não pode ser acessado pelo usuário, já que não possui ponto de montagem na árvore de diretórios do sistema.

// Estrutura do sistema de arquivos especial para os pipes
static struct file_system_type pipe_fs_type = {
	.name		= "pipefs",
	.mount		= pipefs_mount, // Funcao de mount para o pipefs
	.kill_sb	= kill_anon_super,
};
// Função que monta o sistema de arquivos de pipe na inicializacao do kernel
static int __init init_pipe_fs(void)
{
	int err = register_filesystem(&pipe_fs_type);

	if (!err) {
		// Chama a funcao de mount especifica para o pipefs
		pipe_mnt = kern_mount(&pipe_fs_type);
		if (IS_ERR(pipe_mnt)) {
			err = PTR_ERR(pipe_mnt);
			unregister_filesystem(&pipe_fs_type);
		}
	}
	return err;
}
// Função utilizada para montar o sistema de arquivos do pipe
static struct dentry *pipefs_mount(struct file_system_type *fs_type,
			 int flags, const char *dev_name, void *data)
{
	// Funcao utilizada para montar pseudo-filesystems, como pipes e sockets
	return mount_pseudo(fs_type, "pipe:", &pipefs_ops,
			&pipefs_dentry_operations, PIPEFS_MAGIC);
}

Dessa forma, na criação de um pipe, é necessário adicionar uma entrada de diretório dentro do diretório raiz do sistema de arquivos de pipe (seu mount é armazenado na variável global pipe_mnt). Essa entrada conterá o inode correspondente ao pipe criado. Dentro desse inode, deve-se alocar a estrutura de informações do pipe, assim como inicializar essa estrutura e seus buffers e, ainda, referenciar as operações de pipe (pipefifo_fops) dentro do inode. Esse inode deverá ser referenciado pelos arquivos de leitura e de escrita, que também referenciarão as informações e operações do pipe. A função create_pipe_files é responsável por criar a estrutura do pipe, alocar os arquivos de leitura e escrita e alocar um inode para referenciar esse pipe no sistema de arquivos. Além de alocar essas estruturas, a função também as inicializa, fazendo, por exemplo, com que as operações padrão em um arquivo sejam redirecionadas para as operações padrão do pipe, que são definidas em pipefifo_fops.

// create_pipe_files - aloca e inicializa arquivos de pipe
int create_pipe_files(struct file **res, int flags)
{
	// Valor de retorno
	int err;
	
	// Aloca e inicializa um inode para o pipe
	// A criacao da estrutura do pipe (pipe_inode_info) e feita aqui
	struct inode *inode = get_pipe_inode();
	
	// Arquivo auxiliar (usado para o arquivo de escrita)
	struct file *f;

	// Caminho para o pipe dentro do sistema de arquivos especial
	// struct path {
	// 		struct vfsmount *mnt;
	// 		struct dentry *dentry;
	// }
	struct path path;
	
	// Nome para a dentry do inode do pipe
	static struct qstr name = { .name = "" };

	if (!inode)
		return -ENFILE;

	err = -ENOMEM;
	
	// Aloca uma directory entry no sistema de arquivos especial
	path.dentry = d_alloc_pseudo(pipe_mnt->mnt_sb, &name);
	if (!path.dentry)
		goto err_inode;

	// Referencia o mount utilizado em path
	// struct vfsmount {
	// 		struct dentry *mnt_root; // Entrada do diretorio raiz
	// 		// Superbloco do sistema de arquivos
	// 		struct super_block *mnt_sb;	
	// 		int mnt_flags;
	// };
	path.mnt = mntget(pipe_mnt);

	// Associa o inode a dentry
	d_instantiate(path.dentry, inode);

	// Aloca o arquivo de escrita
	// Observar que as operacoes padrao sao passadas por pipefifo_fops
	f = alloc_file(&path, FMODE_WRITE, &pipefifo_fops);
	if (IS_ERR(f)) {
		err = PTR_ERR(f);
		goto err_dentry;
	}

	// Configura as flags de O_NONBLOCK e O_DIRECT no arquivo de escrita
	f->f_flags = O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT));
	f->private_data = inode->i_pipe;
	
	// Aloca o arquivo de leitura
	res[0] = alloc_file(&path, FMODE_READ, &pipefifo_fops);
	if (IS_ERR(res[0])) {
		err = PTR_ERR(res[0]);
		goto err_file;
	}

	// Incrementa o numero de referencias nos membros do path
	path_get(&path);

	// As informacoes do pipe sao armazenadas em private_data no arquivo
	// Configura a flag de O_NONBLOCK no arquivo de leitura
	res[0]->private_data = inode->i_pipe;
	res[0]->f_flags = O_RDONLY | (flags & O_NONBLOCK);
	
	res[1] = f;
	return 0;

err_file:
	// Caso haja falha na alocacao do segundo arquivo, desaloca o primeiro
	put_filp(f);
err_dentry:
	// Desaloca as estruturas de pipe_inode_info e path
	free_pipe_info(inode->i_pipe);
	path_put(&path);
	return err;

err_inode:
	// Desaloca es estruturas de pipe_inode_info e path
	free_pipe_info(inode->i_pipe);
	iput(inode);
	return err;
}

A função get_pipe_inode, além de alocar e inicializar e estrutura de inode que referencia o pipe, aloca e inicializa a estrutura pipe_inode_info, contendo as informações do pipe. As informações do pipe serão passadas depois para os arquivos de leitura e saida, para seus campos private_data, que serão usadas nas operações de leitura e escrita.

// get_pipe_inode - aloca e inicializa um inode para o pipe
// Tambem aloca e inicializa a estrutura pipe_inode_info
static struct inode * get_pipe_inode(void)
{
	struct inode *inode = new_inode_pseudo(pipe_mnt->mnt_sb);
	struct pipe_inode_info *pipe;

	// Verifica alocacao do inode
	if (!inode)
		goto fail_inode;

	// Obtem o numero para o inode
	inode->i_ino = get_next_ino();

	// Aloca a estrutura de informacoes do pipe
	pipe = alloc_pipe_info();
	if (!pipe)
		goto fail_iput;
	
	// Referencia as informacoes do pipe no inode
	inode->i_pipe = pipe;
	
	// Inicializa o numero de arquivos que referenciam o pipe como 2
	pipe->files = 2;

	// Inicializa o numero de leitores e escritores do pipe como 1
	pipe->readers = pipe->writers = 1;
	
	// Referencia as operacoes do pipe no inode
	inode->i_fop = &pipefifo_fops;

	/*
	 * Mark the inode dirty from the very beginning,
	 * that way it will never be moved to the dirty
	 * list because "mark_inode_dirty()" will think
	 * that it already _is_ on the dirty list.
	 */
	inode->i_state = I_DIRTY;
	
	// Configura o modo do inode
	inode->i_mode = S_IFIFO | S_IRUSR | S_IWUSR;
	// Configura o user id e o group id do inode com os valores correntes
	inode->i_uid = current_fsuid();
	inode->i_gid = current_fsgid();

	// Configura as datas de modificacao acesso e mudanca com as datas atuais
	inode->i_atime = inode->i_mtime = inode->i_ctime = current_time(inode);

	return inode;

fail_iput:
	// Desaloca o inode em caso de erro
	iput(inode);

fail_inode:
	return NULL;
}

Função que aloca as informações do pipe

// alloc_pipe_info - aloca e inicializa informacoes do pipe
struct pipe_inode_info *alloc_pipe_info(void)
{
	struct pipe_inode_info *pipe;

	// #define PIPE_DEF_BUFFERS	16
	unsigned long pipe_bufs = PIPE_DEF_BUFFERS;

	struct user_struct *user = get_current_user();
	unsigned long user_bufs;

	// Aloca estrutura de informacoes de pipe
	pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL_ACCOUNT);
	if (pipe == NULL)
		goto out_free_uid;

	// Redefine o numero de buffers se o tamanho maximo de pipe for definido
	// para um valor menor do que 64kB
	if (pipe_bufs * PAGE_SIZE > pipe_max_size && !capable(CAP_SYS_RESOURCE))
		pipe_bufs = pipe_max_size >> PAGE_SHIFT;

	// Soma o numero de buffers de pipe na quantidade contabilizada para
	// o usuario
	user_bufs = account_pipe_buffers(user, 0, pipe_bufs);
        
        /* Maximum allocatable pages per user. Hard limit is unset 
         * by default, soft matches default values.
         */
        //unsigned long pipe_user_pages_hard;
        //unsigned long pipe_user_pages_soft = PIPE_DEF_BUFFERS * INR_OPEN_CUR;
        
	// Se a quantidade de buffers passar do soft limit para o usuario, 
	// aloca apenas um buffer
	if (too_many_pipe_buffers_soft(user_bufs)) {
		user_bufs = account_pipe_buffers(user, pipe_bufs, 1);
		pipe_bufs = 1;
	}
	
	// Verifica se a quantidade passo do hard limit
	if (too_many_pipe_buffers_hard(user_bufs))
		goto out_revert_acct;

	// Aloca o vetor de buffers do pipe
	pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer),
			     GFP_KERNEL_ACCOUNT);
	
	if (pipe->bufs) {
		// Inicializa a fila de espera
		init_waitqueue_head(&pipe->wait);
		
		// Inicializa a quantidade de leitores e escritores como 1
		pipe->r_counter = pipe->w_counter = 1;
		
		// Inicializa a quantidade de buffers do pipe, o usuario corrente
		// e o mutex
		pipe->buffers = pipe_bufs;
		pipe->user = user;
		mutex_init(&pipe->mutex);
		return pipe;
	}

out_revert_acct:
	// Subtrai os buffers de pipe do usuario
	(void) account_pipe_buffers(user, pipe_bufs, 0);
	kfree(pipe);
out_free_uid:
	free_uid(user);
	return NULL;
}

5. Operações com pipes

Para os processos que utilizam um pipe, o mesmo é representado como um arquivo comum, entretanto sabemos que um pipe difere de arquivos comuns, pois um pipe nunca chega a ser gravado na memória secundária, existindo somente na memória primária. É por conta desta distinção que se faz necessário armazenar na estrutura do arquivo uma referência para as funções que devem ser chamadas para realizar as operações relacionadas a um arquivo.

Desta forma, qualquer operação de leitura/escrita deve ser feita utilizando a chamada de sistema padrão para arquivos, que será responsável por chamar a função correta para o caso dos pipes.

//Syscall write
SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf,
		size_t, count)
{
	struct fd f = fdget_pos(fd);
	ssize_t ret = -EBADF;

	if (f.file) {
		loff_t pos = file_pos_read(f.file);
		ret = vfs_write(f.file, buf, count, &pos);
		if (ret >= 0)
			file_pos_write(f.file, pos);
		fdput_pos(f);
	}

	return ret;
}
ssize_t __vfs_write(struct file *file, const char __user *p, size_t count,
		    loff_t *pos)
{
	if (file->f_op->write)
		return file->f_op->write(file, p, count, pos);
	else if (file->f_op->write_iter)
		return new_sync_write(file, p, count, pos);
	else
		return -EINVAL;
}

Uma das estruturas criadas pela função new_sync_write é o iov_iter:

struct iov_iter {
	int type;
	size_t iov_offset;
	size_t count;
	union {
		const struct iovec *iov;
		const struct kvec *kvec;
		const struct bio_vec *bvec;
		struct pipe_inode_info *pipe;
	};
	union {
		unsigned long nr_segs;
		struct {
			int idx;
			int start_idx;
		};
	};
};

Após criar as estruturas necessárias, a função de escrita aponta para a função write_iter, do fops apontado pelas operações do arquivo do pipe:

return file->f_op->write_iter(kio, iter);
//Estrutura que contem as operações do pipe.
const struct file_operations pipefifo_fops = {
	.open		= fifo_open,
	.llseek		= no_llseek,
	.read_iter	= pipe_read,
	.write_iter	= pipe_write,
	.poll		= pipe_poll,
	.unlocked_ioctl	= pipe_ioctl,
	.release	= pipe_release,
	.fasync		= pipe_fasync,
};

Por fim, a função pipe_write é utilizada para fazer a escrita dos dados

static ssize_t
pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
        //Recupera o inode do pipe a partir do iocb
	struct file *filp = iocb->ki_filp;
	struct pipe_inode_info *pipe = filp->private_data;

	ssize_t ret = 0;
	int do_wakeup = 0;
        //Recupera o numero de caracteres a serem copiados do iov_iter
	size_t total_len = iov_iter_count(from);
	ssize_t chars;

	/* Null write succeeds. */
        //Escrita nula, sucesso sempre
	if (unlikely(total_len == 0))
		return 0;

        //Trava o mutex do pipe
	__pipe_lock(pipe);

        //Caso o pipe não tenha nenhum leitor, gera o erro broken pipe
	if (!pipe->readers) {
		send_sig(SIGPIPE, current, 0);
		ret = -EPIPE;
		goto out;
	}

        /*
         * Tenta anexar o começo da escrita ao fim da página que representa o
         * Buffer anterior, dessa maneira escritas pequenas não utilizam uma página inteira
         * Ou é possivel alocar somente uma página nova para uma escrita maior que o tamanho
         * da página.
         */
	/* We try to merge small writes */
	chars = total_len & (PAGE_SIZE-1); /* Calcula o número de bytes que tentaremos escrever */
	if (pipe->nrbufs && chars != 0) {
                //Calcula o indice do buffer a ser utilizado e recupera sua referencia
		int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) &
							(pipe->buffers - 1);
		struct pipe_buffer *buf = pipe->bufs + lastbuf;
		int offset = buf->offset + buf->len;

                // Verifica se o número de caracteres a serem escritos
                // Cabem na página anterior.
		if (buf->ops->can_merge && offset + chars <= PAGE_SIZE) {
			ret = pipe_buf_confirm(pipe, buf);
			if (ret)
				goto out;

                        //Faz a escrita para a memória.
			ret = copy_page_from_iter(buf->page, offset, chars, from);
			if (unlikely(ret < chars)) {
				ret = -EFAULT;
				goto out;
			}
			do_wakeup = 1;
			buf->len += ret;
                        // Caso não haja mais bytes a serem escritos, vai para out.
			if (!iov_iter_count(from))
				goto out;
		}
	}

        //loop de escrita
	for (;;) {
		int bufs;
                // sem leitores, broken pipe
		if (!pipe->readers) {
			send_sig(SIGPIPE, current, 0);
			if (!ret)
				ret = -EPIPE;
			break;
		}
		bufs = pipe->nrbufs;
                // Verifica se o pipe ainda não possue o numero maximo de buffers.
		if (bufs < pipe->buffers) {
			int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1);
			struct pipe_buffer *buf = pipe->bufs + newbuf;
			struct page *page = pipe->tmp_page;
			int copied;
                       
			if (!page) {
				page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
				if (unlikely(!page)) {
					ret = ret ? : -ENOMEM;
					break;
				}
				pipe->tmp_page = page;
			}
			/* Always wake up, even if the copy fails. Otherwise
			 * we lock up (O_NONBLOCK-)readers that sleep due to
			 * syscall merging.
			 * FIXME! Is this really true?
			 */
			do_wakeup = 1;
			copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
			if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) {
				if (!ret)
					ret = -EFAULT;
				break;
			}
                        // bytes copiados nessa iteração são somados ao total.
			ret += copied;

			/* Insert it into the buffer array */
			buf->page = page;
			buf->ops = &anon_pipe_buf_ops;
			buf->offset = 0;
			buf->len = copied;
			buf->flags = 0;
			if (is_packetized(filp)) {
				buf->ops = &packet_pipe_buf_ops;
				buf->flags = PIPE_BUF_FLAG_PACKET;
			}
			pipe->nrbufs = ++bufs;
			pipe->tmp_page = NULL;

                        // Verifica se ainda é necessário continuar o loop.
			if (!iov_iter_count(from))
				break;
		}
		if (bufs < pipe->buffers)
			continue;
		if (filp->f_flags & O_NONBLOCK) {
			if (!ret)
                                // Não bloqueante e nenhum dado foi copiado
                                // Tentar mais tarde
				ret = -EAGAIN;
			break;
		}
		if (signal_pending(current)) {
			if (!ret)
				ret = -ERESTARTSYS;
			break;
		}
		if (do_wakeup) {
                        // Acorda os processos na fila de escrita
			wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
			kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
			do_wakeup = 0;
		}
                // Entra em espera (libera o mutex em pipe_wait() e quando a espera acaba
                // o bloqueia novamente).
		pipe->waiting_writers++;
		pipe_wait(pipe);
		pipe->waiting_writers--;
	}
out:
	__pipe_unlock(pipe);
	if (do_wakeup) {
		wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
		kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
	}
	if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {
		int err = file_update_time(filp);
		if (err)
			ret = err;
		sb_end_write(file_inode(filp)->i_sb);
	}
	return ret;
}

Algoritmo para leitura do pipe

static ssize_t
pipe_read(struct kiocb *iocb, struct iov_iter *to)
{
        // Recupera o numero de dados a ser lido
	size_t total_len = iov_iter_count(to);

        // Recupera unformações do inode.
	struct file *filp = iocb->ki_filp;
	struct pipe_inode_info *pipe = filp->private_data;
	int do_wakeup;
	ssize_t ret;

        // Leitura nula, retorna sucesso, 0 bytes lidos.
	/* Null read succeeds. */
	if (unlikely(total_len == 0))
		return 0;

	do_wakeup = 0;
	ret = 0;
        // Bloqueia o mutex.
	__pipe_lock(pipe);
        // Entra no loop de leitura.
	for (;;) {
		int bufs = pipe->nrbufs;
		if (bufs) {
                        // Recupera referencia para o primeiro buffer a ser lido
			int curbuf = pipe->curbuf;
			struct pipe_buffer *buf = pipe->bufs + curbuf;
			size_t chars = buf->len;
			size_t written;
			int error;

                        // Caso o leitor queira mais bytes que a quantidade disponivel
                        // Copia somente os que estão disponiveis neste buffer
                        // O restante fica para outra iteração
			if (chars > total_len)
				chars = total_len;

			error = pipe_buf_confirm(pipe, buf);
			if (error) {
				if (!ret)
					ret = error;
				break;
			}

			written = copy_page_to_iter(buf->page, buf->offset, chars, to);
			if (unlikely(written < chars)) {
				if (!ret)
					ret = -EFAULT;
				break;
			}
                        // Ajusta os offsets
			ret += chars;
			buf->offset += chars;
			buf->len -= chars;

			/* Was it a packet buffer? Clean up and exit */
			if (buf->flags & PIPE_BUF_FLAG_PACKET) {
				total_len = chars;
				buf->len = 0;
			}

                        // Esvaziou o buffer? é necessário liberar a página
			if (!buf->len) {
				pipe_buf_release(pipe, buf);
				curbuf = (curbuf + 1) & (pipe->buffers - 1);
				pipe->curbuf = curbuf;
				pipe->nrbufs = --bufs;
				do_wakeup = 1;
			}
                        // Subtrai os bytes que já foram lidos
			total_len -= chars;
                        // Leitura concluida, sai do loop.
			if (!total_len)
				break;	/* common path: read succeeded */
		}
		if (bufs)	/* More to do? */
			continue;
		if (!pipe->writers)
			break;
		if (!pipe->waiting_writers) {
			/* syscall merging: Usually we must not sleep
			 * if O_NONBLOCK is set, or if we got some data.
			 * But if a writer sleeps in kernel space, then
			 * we can wait for that data without violating POSIX.
			 */
			if (ret)
				break;
			if (filp->f_flags & O_NONBLOCK) {
                                // Sem escritores? volte mais tarde
				ret = -EAGAIN;
				break;
			}
		}
		if (signal_pending(current)) {
			if (!ret)
				ret = -ERESTARTSYS;
			break;
		}
		if (do_wakeup) {
                        // Acorda o processo em espera pra escrita
			wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
 			kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
		}
		pipe_wait(pipe);
	}
        // Libera o mutex
	__pipe_unlock(pipe);

	/* Signal writers asynchronously that there is more room. */
	if (do_wakeup) {
		wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
		kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
	}
	if (ret > 0)
		file_accessed(filp);
	return ret;
}
⚠️ **GitHub.com Fallback** ⚠️