c语言环形队列

这篇具有很好参考价值的文章主要介绍了c语言环形队列。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一位数组队列

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char* buffer;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void* read_thread(void* arg);
void* send_thread(void* arg);

int main() {
    CircularBuffer* buffer = circular_buffer_init();

    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, read_thread, buffer);
    pthread_create(&thread2, NULL, send_thread, buffer);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    circular_buffer_destroy(buffer);

    return 0;
}

CircularBuffer* circular_buffer_init() {
    CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
    buffer->buffer = (char*)malloc(BUFFER_SIZE);
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
    return buffer;
}

void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    int remaining = PACKET_SIZE - HEADER_SIZE - length;
    if (remaining > 0) {
        memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
    }

    memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
    buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    memcpy(data, buffer->buffer + buffer->read_idx, length);
    buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_destroy(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);
    free(buffer->buffer);
    free(buffer);
}

void* read_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    FILE* file = fopen("input.txt", "r");
    if (file == NULL) {
        printf("Failed to open file.\n");
        pthread_exit(NULL);
    }

    char* data = (char*)malloc(PACKET_SIZE);
    memset(data, 0, PACKET_SIZE);
    char header[HEADER_SIZE] = "DATA HEADER";
    memcpy(data, header, HEADER_SIZE);

    ssize_t bytesRead;
    while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circular_buffer_write(buffer, data, PACKET_SIZE);
    }

    free(data);
    fclose(file);
    pthread_exit(NULL);
}

void* send_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket < 0) {
        printf("Failed to create UDP socket.\n");
        pthread_exit(NULL);
    }

    struct sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(1234);
    server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    char* data = (char*)malloc(PACKET_SIZE);

    while (1) {
        circular_buffer_read(buffer, data, PACKET_SIZE);

        ssize_t bytesSent = sendto(udp_socket, data, PACKET_SIZE, 0, (struct sockaddr*)&server_address, sizeof(server_address));
        if (bytesSent < 0) {
            printf("Failed to send UDP packet.\n");
            pthread_exit(NULL);
        }
    }

    free(data);
    close(udp_socket);
    pthread_exit(NULL);
}

全部打印

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char* buffer;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void print_hex(const char* data, int length);
void* read_thread(void* arg);
void* send_thread(void* arg);

int main() {
    CircularBuffer* buffer = circular_buffer_init();

    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, read_thread, buffer);
    pthread_create(&thread2, NULL, send_thread, buffer);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    circular_buffer_destroy(buffer);

    return 0;
}

CircularBuffer* circular_buffer_init() {
    CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
    buffer->buffer = (char*)malloc(BUFFER_SIZE);
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
    return buffer;
}

void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    int remaining = PACKET_SIZE - HEADER_SIZE - length;
    if (remaining > 0) {
        memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
    }

    memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
    buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);

    // Print the written data in hexadecimal format
    print_hex(data, length);
}

void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    memcpy(data, buffer->buffer + buffer->read_idx, length);
    buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);

    // Print the read data in hexadecimal format
    print_hex(data, length);
}

void circular_buffer_destroy(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);
    free(buffer->buffer);
    free(buffer);
}

void print_hex(const char* data, int length) {
    printf("Data: ");
    for (int i = 0; i < length; i++) {
        printf("%02X ", (unsigned char)data[i]);
    }
    printf("\n");
}

void* read_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    FILE* file = fopen("./123.txt", "r");
    if (file == NULL) {
        printf("Failed to open file.\n");
        pthread_exit(NULL);
    }

    char* data = (char*)malloc(PACKET_SIZE);
    memset(data, 0, PACKET_SIZE);
    char header[HEADER_SIZE] = "DATA HEADER";
    memcpy(data, header, HEADER_SIZE);

    ssize_t bytesRead;
    while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circular_buffer_write(buffer, data, PACKET_SIZE);
    }

    free(data);
    fclose(file);
    pthread_exit(NULL);
}

void* send_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket < 0) {
        printf("Failed to create UDP socket.\n");
        pthread_exit(NULL);
    }

    struct sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(1234);
    server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    char* data = (char*)malloc(PACKET_SIZE);

    while (1) {
        circular_buffer_read(buffer, data, PACKET_SIZE);

        ssize_t bytesSent = sendto(udp_socket, data, PACKET_SIZE, 0, (struct sockaddr*)&server_address, sizeof(server_address));
        if (bytesSent < 0) {
            printf("Failed to send UDP packet.\n");
            pthread_exit(NULL);
        }

        // Print the UDP sent data in hexadecimal format
        print_hex(data, PACKET_SIZE);
    }

    free(data);
    close(udp_socket);
    pthread_exit(NULL);
}

只打印发送

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char* buffer;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void print_hex(const char* data, int length);
void* read_thread(void* arg);
void* send_thread(void* arg);

int main() {
    CircularBuffer* buffer = circular_buffer_init();

    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, read_thread, buffer);
    pthread_create(&thread2, NULL, send_thread, buffer);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    circular_buffer_destroy(buffer);

    return 0;
}

CircularBuffer* circular_buffer_init() {
    CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
    buffer->buffer = (char*)malloc(BUFFER_SIZE);
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
    return buffer;
}

void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    int remaining = PACKET_SIZE - HEADER_SIZE - length;
    if (remaining > 0) {
        memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
    }

    memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
    buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    memcpy(data, buffer->buffer + buffer->read_idx, length);
    buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_destroy(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);
    free(buffer->buffer);
    free(buffer);
}

void print_hex(const char* data, int length) {
    printf("Data: ");
    for (int i = 0; i < length; i++) {
        printf("%02X ", (unsigned char)data[i]);
    }
    printf("\n");
}

void* read_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    FILE* file = fopen("input.txt", "r");
    if (file == NULL) {
        printf("Failed to open file.\n");
        pthread_exit(NULL);
    }

    char* data = (char*)malloc(PACKET_SIZE);
    memset(data, 0, PACKET_SIZE);
    char header[HEADER_SIZE] = "DATA HEADER";
    memcpy(data, header, HEADER_SIZE);

    ssize_t bytesRead;
    while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circular_buffer_write(buffer, data, HEADER_SIZE + bytesRead);
    }

    free(data);
    fclose(file);
    pthread_exit(NULL);
}

void* send_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket < 0) {
        printf("Failed to create UDP socket.\n");
        pthread_exit(NULL);
    }

    struct sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(1234);
    server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    char* data = (char*)malloc(PACKET_SIZE);

    while (1) {
        circular_buffer_read(buffer, data, PACKET_SIZE);

        int valid_data_length = PACKET_SIZE - HEADER_SIZE;
        int padded_data_length = valid_data_length;
        for (int i = valid_data_length - 1; i >= 0; i--) {
            if (data[HEADER_SIZE + i] != 0) {
                break;
            }
            padded_data_length--;
        }

        // Print the UDP sent data in hexadecimal format
        print_hex(data + HEADER_SIZE, padded_data_length);

        ssize_t bytesSent = sendto(udp_socket, data + HEADER_SIZE, padded_data_length, 0, (struct sockaddr*)&server_address, sizeof(server_address));
        if (bytesSent < 0) {
            printf("Failed to send UDP packet.\n");
            pthread_exit(NULL);
        }
    }

    free(data);
    close(udp_socket);
    pthread_exit(NULL);
}

inux c 建立一个256M大小的一位数组环形队列,建立两个线程一个用于读取指定文件内容存入环形队列,一个用UDP每次读取环形队列中8192字节发送到指定地址,不足8192仅发送剩余数据。读取数据时最大以1024字节为一包数据读取,读取时每包数据自定义16字节的数据包头组委有效数据存入环形队列,如果环形队列数据满了,等待UDP发送数据有空闲空间读取线程再继续读。环形队列读写、初始化环形缓冲区、销毁环形缓冲区分别写成独立函数,UDP将在fread读取读取的数据发送完成后结束所有线程。分别用16进制printf写入缓冲区的数据和udp发送的数据

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define BUFFER_SIZE 256 * 1024 * 1024  // 256M字节大小的环形队列
#define PACKET_SIZE 8192
#define MAX_PACKET_SIZE 1024
#define HEADER_SIZE 16

typedef struct {
    char buffer[BUFFER_SIZE];
    int read_index;
    int write_index;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t full;
    pthread_cond_t empty;
} CircularBuffer;

typedef struct {
    CircularBuffer* buffer;
    FILE* file;
} ReaderArgs;

typedef struct {
    CircularBuffer* buffer;
    struct sockaddr_in* addr;
} UdpArgs;

void initCircularBuffer(CircularBuffer* buffer) {
    buffer->read_index = 0;
    buffer->write_index = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->full, NULL);
    pthread_cond_init(&buffer->empty, NULL);
}

void destroyCircularBuffer(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->full);
    pthread_cond_destroy(&buffer->empty);
}

void writeToCircularBuffer(CircularBuffer* buffer, const char* data, int size) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->empty, &buffer->lock);
    }

    int remainingBytes = size;
    while (remainingBytes > 0) {
        int writeSize = (remainingBytes > BUFFER_SIZE - buffer->count) ? BUFFER_SIZE - buffer->count : remainingBytes;
        memcpy(buffer->buffer + buffer->write_index, data + size - remainingBytes, writeSize);
        buffer->write_index = (buffer->write_index + writeSize) % BUFFER_SIZE;
        buffer->count += writeSize;
        remainingBytes -= writeSize;
    }

    pthread_cond_signal(&buffer->full);
    pthread_mutex_unlock(&buffer->lock);
}

void readFromCircularBuffer(CircularBuffer* buffer, char* dest, int size) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->full, &buffer->lock);
    }

    int remainingBytes = size;
    while (remainingBytes > 0 && buffer->count > 0) {
        int readSize = (remainingBytes > buffer->count) ? buffer->count : remainingBytes;
        memcpy(dest + size - remainingBytes, buffer->buffer + buffer->read_index, readSize);
        buffer->read_index = (buffer->read_index + readSize) % BUFFER_SIZE;
        buffer->count -= readSize;
        remainingBytes -= readSize;
    }

    pthread_cond_signal(&buffer->empty);
    pthread_mutex_unlock(&buffer->lock);
}

void* readerThread(void* arg) {
    ReaderArgs* args = (ReaderArgs*)arg;
    CircularBuffer* buffer = args->buffer;
    FILE* file = args->file;
    char packet[MAX_PACKET_SIZE];
    int bytesRead;

    while ((bytesRead = fread(packet + HEADER_SIZE, sizeof(char), MAX_PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        // Write header
        memcpy(packet, &bytesRead, sizeof(int));

        // Write to circular buffer
        writeToCircularBuffer(buffer, packet, bytesRead + HEADER_SIZE);
    }

    fclose(file);
    pthread_exit(NULL);
}

void* udpThread(void* arg) {
    UdpArgs* args = (UdpArgs*)arg;
    CircularBuffer* buffer = args->buffer;
    struct sockaddr_in* addr = args->addr;
    int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    char packet[PACKET_SIZE];
    int bytesSent, dataLength;

    while (1) {
        // Read from the circular buffer
        readFromCircularBuffer(buffer, packet, PACKET_SIZE);

        // Extract data length from header
        memcpy(&dataLength, packet, sizeof(int));

        // Send UDP packet
        bytesSent = sendto(sockfd, packet + HEADER_SIZE, dataLength, 0, (struct sockaddr*)addr, sizeof(struct sockaddr));
        if (bytesSent < 0) {
            perror("Failed to send UDP packet");
            break;
        }
    }

    close(sockfd);
    pthread_exit(NULL);
}

void printBufferAsHex(const char* buffer, int size) {
    int i;
    for (i = 0; i < size; i++) {
        printf("%02X ", (unsigned char)buffer[i]);
        if ((i + 1) % 16 == 0)
            printf("\n");
    }
    printf("\n");
}

int main() {
    CircularBuffer buffer;
    initCircularBuffer(&buffer);

    FILE* file = fopen("path/to/file", "rb");
    if (!file) {
        perror("Failed to open file");
        return EXIT_FAILURE;
    }

    pthread_t readerThreadID, udpThreadID;

    ReaderArgs readerArgs = { .buffer = &buffer, .file = file };
    if (pthread_create(&readerThreadID, NULL, readerThread, (void*)&readerArgs) != 0) {
        perror("Failed to create reader thread");
        return EXIT_FAILURE;
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(UDP_PORT);  // 替换为所需的UDP端口
    inet_aton("127.0.0.1", &addr.sin_addr);  // 替换为所需的目标IP地址

    UdpArgs udpArgs = { .buffer = &buffer, .addr = &addr };
    if (pthread_create(&udpThreadID, NULL, udpThread, (void*)&udpArgs) != 0) {
        perror("Failed to create UDP thread");
        return EXIT_FAILURE;
    }

    pthread_join(readerThreadID, NULL);
    pthread_cond_signal(&buffer.full);  // 通知UDP线程结束
    pthread_join(udpThreadID, NULL);

    destroyCircularBuffer(&buffer);

    // 打印缓冲区内容(以16进制形式)
    printf("缓冲区内容:\n");
    printBufferAsHex(buffer.buffer, BUFFER_SIZE);

    return EXIT_SUCCESS;
}

二维数字队列

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char **buffer;
    int *length;
    int size;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

typedef struct {
    CircularBuffer *buffer;
    const char *filename;
} ReaderParams;

typedef struct {
    CircularBuffer *buffer;
    const char *dest_ip;
    int dest_port;
} SenderParams;

void circularBufferInit(CircularBuffer *buffer, int size) {
    buffer->buffer = (char**)malloc(sizeof(char*) * size);
    buffer->length = (int*)malloc(sizeof(int) * size);
    for (int i = 0; i < size; i++) {
        buffer->buffer[i] = (char*)malloc(PACKET_SIZE);
        buffer->length[i] = 0;
    }
    buffer->size = size;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
}

void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == buffer->size) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    char *packet = buffer->buffer[buffer->write_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - size;

    // 添加包头
    memcpy(packet, "Custom Header", HEADER_SIZE);

    // 添加数据
    memcpy(packet + HEADER_SIZE, data, size);

    // 补充0
    memset(packet + HEADER_SIZE + size, 0, padding_size);

    // 记录数据包长度
    buffer->length[buffer->write_idx] = size;

    buffer->write_idx = (buffer->write_idx + 1) % buffer->size;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);
}

void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    char *packet = buffer->buffer[buffer->read_idx];
    int packet_size = buffer->length[buffer->read_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;

    // 读取数据
    memcpy(data, packet + HEADER_SIZE, packet_size);

    // 填补的0不需要被读取,直接跳过

    buffer->read_idx = (buffer->read_idx + 1) % buffer->size;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circularBufferDestroy(CircularBuffer *buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);

    for (int i = 0; i < buffer->size; i++) {
        free(buffer->buffer[i]);
    }
    free(buffer->buffer);
    free(buffer->length);

    buffer->size = 0;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
}

void *readerThread(void *params) {
    ReaderParams *readerParams = (ReaderParams*)params;

    FILE *file = fopen(readerParams->filename, "rb");
    if (file == NULL) {
        perror("Failed to open file");
        return NULL;
    }

    char data[PACKET_SIZE];
    size_t bytesRead = 0;

    while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circularBufferWrite(readerParams->buffer, data, bytesRead);
        usleep(1000);
    }

    fclose(file);

    return NULL;
}

int createUdpSocket(const char *dest_ip, int dest_port) {
    int sock = socket(AF_INET, SOCK_DGRAM, 0);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(dest_ip);
    server_addr.sin_port = htons(dest_port);

    if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("Failed to connect to UDP server");
        close(sock);
        return -1;
    }

    return sock;
}

void *senderThread(void *params) {
    SenderParams *senderParams = (SenderParams*)params;

    int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);

    char data[PACKET_SIZE];
    while (1) {
        circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);
        send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);
        usleep(1000);
    }

    close(sock);

    return NULL;
}

int main() {
    CircularBuffer buffer;
    circularBufferInit(&buffer, BUFFER_SIZE);

    ReaderParams readerParams;
    readerParams.buffer = &buffer;
    readerParams.filename = "input.txt";

    SenderParams senderParams;
    senderParams.buffer = &buffer;
    senderParams.dest_ip = "127.0.0.1";  // 目标地址
    senderParams.dest_port = 12345;  // 目标端口

    pthread_t readerThreadId;
    pthread_t senderThreadId;

    pthread_create(&readerThreadId, NULL, readerThread, &readerParams);
    pthread_create(&senderThreadId, NULL, senderThread, &senderParams);

    pthread_join(readerThreadId, NULL);
    pthread_join(senderThreadId, NULL);

    circularBufferDestroy(&buffer);

    return 0;
}

备注文章来源地址https://www.toymoban.com/news/detail-727339.html

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char **buffer;
    int *length;
    int size;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

typedef struct {
    CircularBuffer *buffer;
    const char *filename;
} ReaderParams;

typedef struct {
    CircularBuffer *buffer;
    const char *dest_ip;
    int dest_port;
} SenderParams;

void circularBufferInit(CircularBuffer *buffer, int size) {
    buffer->buffer = (char**)malloc(sizeof(char*) * size);
    buffer->length = (int*)malloc(sizeof(int) * size);
    for (int i = 0; i < size; i++) {
        buffer->buffer[i] = (char*)malloc(PACKET_SIZE);
        buffer->length[i] = 0;
    }
    buffer->size = size;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
}

void circularBufferWrite(CircularBuffer *writbuffer, const void *data, size_t size) {
    pthread_mutex_lock(&writbuffer->lock);

    while (writbuffer->count == writbuffer->size) {
        pthread_cond_wait(&writbuffer->not_full, &writbuffer->lock);
    }

    char *packet = writbuffer->buffer[writbuffer->write_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - size;

    // 添加包头
    memcpy(packet, "Custom Header", HEADER_SIZE);

    // 添加数据
    memcpy(packet + HEADER_SIZE, data, size);

    // 补充0
    memset(packet + HEADER_SIZE + size, 0, padding_size);

    // 记录数据包长度
    writbuffer->length[writbuffer->write_idx] = size;

    writbuffer->write_idx = (writbuffer->write_idx + 1) % writbuffer->size;
    writbuffer->count++;

    pthread_cond_signal(&writbuffer->not_empty);
    pthread_mutex_unlock(&writbuffer->lock);
}

void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == buffer->size) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }
    /*
	用于判断环形队列是否已满的逻辑,并在队列已满时进行阻塞等待的操作。
buffer->count表示当前环形队列中已经存储的数据包数量。
buffer->size表示环形队列的最大容量,即可以存储的最大数据包数量。
在以上代码中,通过条件判断 buffer->count == buffer->size 来判断环形队列是否已满。如果队列已满,即存储的数据包数量等于最大容量,进入 while 循环。
	*/
    char *packet = buffer->buffer[buffer->write_idx];/*将环形缓冲区中写入位置(buffer->write_idx)索引所指向的字符指针(buffer->buffer[buffer->write_idx])赋值给 packet。

这行代码假设 buffer->buffer 是一个 char** 类型的指针,指向存储数据的字符指针数组。通过访问 buffer->buffer[buffer->write_idx],可以获得当前待写入的位置在环形缓冲区中指向的字符数组的指针。*/
    int padding_size = PACKET_SIZE - HEADER_SIZE - size;/*运算得出填充大小,大于零表示本包数据不足*/

    // 添加包头
    memcpy(packet, "Custom Header", HEADER_SIZE);

    // 添加数据
    memcpy(packet + HEADER_SIZE, data, size);

    // 补充0
    memset(packet + HEADER_SIZE + size, 0, padding_size);

    // 记录数据包长度
    buffer->length[buffer->write_idx] = size;

    buffer->write_idx = (buffer->write_idx + 1) % buffer->size;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);//发送信号通知其他等待中的线程,表示缓冲区不再为空,这可能是用于唤醒一个等待从缓冲区中读取数据的消费者线程。
    pthread_mutex_unlock(&buffer->lock);//解锁缓冲区的互斥锁,表示写入操作已完成,
}

void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    char *packet = buffer->buffer[buffer->read_idx];
    int packet_size = buffer->length[buffer->read_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;

    // 读取数据
    memcpy(data, packet + HEADER_SIZE, packet_size);

    // 填补的0不需要被读取,直接跳过

    buffer->read_idx = (buffer->read_idx + 1) % buffer->size;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circularBufferDestroy(CircularBuffer *buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);

    for (int i = 0; i < buffer->size; i++) {
        free(buffer->buffer[i]);
    }
    free(buffer->buffer);
    free(buffer->length);

    buffer->size = 0;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
}

void *readerThread(void *params) {
    ReaderParams *readerParams = (ReaderParams*)params;

    FILE *file = fopen(readerParams->filename, "rb");
    if (file == NULL) {
        perror("Failed to open file");
        return NULL;
    }

    char data[PACKET_SIZE];
    size_t bytesRead = 0;

    while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circularBufferWrite(readerParams->buffer, data, bytesRead);
        usleep(1000);
    }

    fclose(file);

    return NULL;
}

int createUdpSocket(const char *dest_ip, int dest_port) {
    int sock = socket(AF_INET, SOCK_DGRAM, 0);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(dest_ip);
    server_addr.sin_port = htons(dest_port);

    if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("Failed to connect to UDP server");
        close(sock);
        return -1;
    }

    return sock;
}

void *senderThread(void *params) {
    SenderParams *senderParams = (SenderParams*)params;

    int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);

    char data[PACKET_SIZE];
    while (1) {
        circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);
        send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);
        usleep(1000);
    }

    close(sock);

    return NULL;
}

int main() {
    CircularBuffer buffer;
    circularBufferInit(&buffer, BUFFER_SIZE);

    ReaderParams readerParams;
    readerParams.buffer = &buffer;
    readerParams.filename = "input.txt";

    SenderParams senderParams;
    senderParams.buffer = &buffer;
    senderParams.dest_ip = "127.0.0.1";  // 目标地址
    senderParams.dest_port = 12345;  // 目标端口

    pthread_t readerThreadId;
    pthread_t senderThreadId;

    pthread_create(&readerThreadId, NULL, readerThread, &readerParams);
    pthread_create(&senderThreadId, NULL, senderThread, &senderParams);

    pthread_join(readerThreadId, NULL);
    pthread_join(senderThreadId, NULL);

    circularBufferDestroy(&buffer);

    return 0;
}

到了这里,关于c语言环形队列的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【数据结构】设计环形队列

    环形队列是一种线性数据结构,其操作表现基于 FIFO(先进先出)原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。 环形队列的一个好处是我们可以利用这个队列之前用过的空间。在一个普通队列里,一旦一个队列满了,我们就不能插入下一个元

    2024年02月09日
    浏览(95)
  • 环形队列的实现 [详解在代码中]

     

    2024年02月03日
    浏览(28)
  • 环形队列+DMA空闲中断+接收串口数据

    本次实验利用环形队列+DMA空闲中断+串口。。通过这个实验可以非常深入的理解队列,DMA,串口的知识。如果你能自己实现掌握这个实验,那么你应该基本掌握了队列,DMA,串口的知识。 本次使用的是用环形队列当缓冲器区接收串口数据。我们可以先区了解DMA的空闲中断。本次

    2024年02月13日
    浏览(39)
  • 实现环形队列的各种基本运算的算法

    目的: 领会环形队列的存储结构和掌握环形队列中各种基本运算算法的设计。 内容: 编写一个乘成sqqueue.cpp,实现环形队列(假设栈中的元素类型ElemType为char)的各种基本运算,并在此基础上设计一个程序3-3.cpp完成以下功能。 初始化队列q。 判断队列q是否非空。 依次进队

    2024年02月06日
    浏览(33)
  • 【并发编程】无锁环形队列Disruptor并发框架使用

    Disruptor 是苹国外厂本易公司LMAX开发的一个高件能列,研发的初夷是解决内存队列的延识问顾在性能测试中发现竟然与10操作处于同样的数量级),基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCn演讲后,获得了业界关注,201年,企业应用软件专家Martin Fower专门撰

    2024年02月14日
    浏览(38)
  • 【Linux】生产者消费者模型(阻塞队列与环形队列)和POSIX信号量

    我们这里举一个例子,来解释生产者消费者模型,我们学生–消费者,供应商–生产者,超市–交易场所,我们买东西只需要关系售货架子上是否有商品即可,没有了商品,超市从供应商进行供货。供应商和供应商不能同时向一个货架进行供货,所以生产者之间是互斥的关系

    2024年02月03日
    浏览(37)
  • 【Linux】生产者消费者模型:基于阻塞队列和环形队列 | 单例模式线程池

    死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。 当多线程并发执行并都需要访问临界资源时,因为每个线程都是不同的执行流,这就有可能 导致数据不一致问题 ,为了避免此问题的发生

    2024年01月24日
    浏览(47)
  • 【linux】POSIX信号量+基于环形队列的生产消费模型

    喜欢的点赞,收藏,关注一下把! 上篇文章最后我们基于BlockQueue生产者消费者模型写了代码,测试什么的都通过了。最后我们说代码还有一些不足的地方,由这些不足从而引入了我们接下来要学的信号量! 我们在看一看不足的地方 1.一个线程,在操作临界资源的时候,必须

    2024年02月01日
    浏览(40)
  • 【Linux】基于环形队列的生产者消费者模型的实现

    文章目录 前言 一、基于环形队列的生产者消费者模型的实现 上一篇文章我们讲了信号量的几个接口和基于环形队列的生产者消费者模型,下面我们就快速来实现。 首先我们创建三个文件,分别是makefile,RingQueue.hpp,以及main.cc。我们先简单搭建一下环形队列的框架: 首先我们

    2024年02月11日
    浏览(42)
  • 【Linux系统编程二十九】基于信号量的环形队列生产消费模型

    当共享资源被当成整体使用时,则共享资源的数量要么是1,要么是0。 当被访问使用时,共享资源的数量就为0,当没有被使用时,数量就为1。 共享资源是可以被分成多份来使用的,只不过不同的线程访问的是该共享资源的不同的区域,它是允许多个线程并发访问的,只不过

    2024年01月22日
    浏览(48)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包