- 浏览: 1951 次
最新评论
近期在熟悉开源代码memcached,学习下其中的多线程网络编程,写了个简单的实例:
关键思路:
1. 多线程分别是一个主线程和n个workers线程,每个线程都是一个单独的实例。
2. 主线程event_dispatch_loop负责处理监听fd,监听客户端的建立连接请求,以及accept连接,将已建立的连接round robin到各个worker。
3. 每个workers线程含有一个CONNECTION QUEUE,被主线程round robin写入,对应workers线程读出并处理,主线程与worker线程间的通信通过一对pipe。
流程经典图示:
代码列表:
server.c:服务端
client.c:客户端
编译:
gcc -lpthread server.c -o server
gcc -lpthread client.c -o client
运行结果:
./server
46912496232624 :pthread_self.
======waiting for client's request======
THREAD_ID 1084229952 :recv msg from client: tt01
THREAD_ID 1094719808 :recv msg from client: eeee
THREAD_ID 1105209664 :recv msg from client: ssss
./client 127.0.0.1
发送出数据:tt01
接收到数据:[tt01]THREAD_ID 1084229952 ..server reply!
发送出数据:eeee
接收到数据:[eeee]THREAD_ID 1094719808 ..server reply!
发送出数据:ssss
接收到数据:[ssss]THREAD_ID 1105209664 ..server reply!
上代码:
#include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> #include <netinet/in.h> #include <netdb.h> #include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #define MAXLINE 4096 #define DATA_BUFFER_SIZE 4096 /* EVLIST_X_ Private space: 0x1000-0xf000 */ #define EVLIST_ALL (0xf000 | 0x9f) #define MEMCACHED_CONN_DISPATCH(arg0, arg1) #define EV_TIMEOUT 0x01 #define EV_READ 0x02 #define EV_WRITE 0x04 #define EV_SIGNAL 0x08 #define EV_PERSIST 0x10 /* Persistant event */ #ifdef WIN32 #define evutil_socket_t intptr_t #else #define evutil_socket_t int #endif #define ITEMS_PER_ALLOC 64 /******************************* TYPE DECLARATION ******************************/ struct event_base { int event_no; int event_count; }; enum protocol { ascii_prot = 3, /* arbitrary value. */ binary_prot, negotiating_prot /* Discovering the protocol */ }; enum network_transport { local_transport, /* Unix sockets*/ tcp_transport, udp_transport }; /** * Possible states of a connection. */ enum conn_states { conn_listening, /**< the socket which listens for connections */ conn_new_cmd /**< Prepare connection for next command */ }; /* An item in the connection queue. */ typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; enum conn_states init_state; int event_flags; int read_buffer_size; enum network_transport transport; CQ_ITEM *next; }; /* A connection queue. */ typedef struct conn_queue CQ; struct conn_queue { CQ_ITEM *head; CQ_ITEM *tail; pthread_mutex_t lock; pthread_cond_t cond; }; /** * Stats stored per-thread. */ struct thread_stats { pthread_mutex_t mutex; }; //worker 线程结构体 typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ } LIBEVENT_THREAD; //主线程结构体 typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ } LIBEVENT_DISPATCHER_THREAD; /** * Global stats. */ struct stats { pthread_mutex_t mutex; unsigned int curr_conns; unsigned int total_conns; uint64_t rejected_conns; unsigned int conn_structs; }; /* When adding a setting, be sure to update process_stat_settings */ /** * Globally accessible settings as derived from the commandline. */ struct settings { size_t maxbytes; int maxconns; int port; char *inter; int verbose; char *socketpath; /* path to unix socket if using local socket */ int num_threads; /* number of worker (without dispatcher) libevent threads to run */ enum protocol binding_protocol; }; /** file scope variables **/ /******************************* FILE SCOPE VARIABLES ******************************/ /* Which thread we assigned a connection to most recently. */ static int last_thread = -1; /* Lock for global stats */ static pthread_mutex_t stats_lock; /* * Number of worker threads that have finished setting themselves up. */ static int init_count = 0; static pthread_mutex_t init_lock; static pthread_cond_t init_cond; static struct stats stats; static struct settings settings; static struct event_base *main_base; static struct event_base *current_base = NULL; /* * Each libevent instance has a wakeup pipe, which other threads * can use to signal that they've put a new connection on its queue. */ static LIBEVENT_THREAD *threads; static LIBEVENT_DISPATCHER_THREAD dispatcher_thread; /* Free list of CQ_ITEM structs */ static CQ_ITEM *cqi_freelist; static pthread_mutex_t cqi_freelist_lock; /******************************* EVENT_BASE ******************************/ struct event_base * event_base_new() { struct event_base *base = NULL; if ((base = (struct event_base *)calloc(1, sizeof(struct event_base))) == NULL) { printf("%s: calloc", __LINE__); return NULL; } return base; } void event_base_free(struct event_base *base) { if (base == NULL && current_base) base = current_base; /* If we're freeing current_base, there won't be a current_base. */ if (base == current_base) current_base = NULL; /* Don't actually free NULL. */ if (base == NULL) { printf("%s: no base to free", __LINE__); return; } free(base); } struct event_base * event_init(void) { struct event_base *base = event_base_new(); if (base == NULL) { printf("%s: Unable to construct event_base", __LINE__); return NULL; } current_base = base; return (base); } //The related function of conn_queue /******************************* CONNECTION QUEUE ******************************/ /* * Initializes a connection queue. */ static void cq_init(CQ *cq) { pthread_mutex_init(&cq->lock, NULL); pthread_cond_init(&cq->cond, NULL); cq->head = NULL; cq->tail = NULL; } /* * Looks for an item on a connection queue, but doesn't block if there isn't * one. * Returns the item, or NULL if no item is available */ static CQ_ITEM *cq_pop(CQ *cq) { CQ_ITEM *item; pthread_mutex_lock(&cq->lock); item = cq->head; if (NULL != item) { cq->head = item->next; if (NULL == cq->head) cq->tail = NULL; } pthread_mutex_unlock(&cq->lock); return item; } /* * Adds an item to a connection queue. */ static void cq_push(CQ *cq, CQ_ITEM *item) { item->next = NULL; pthread_mutex_lock(&cq->lock); if (NULL == cq->tail) cq->head = item; else cq->tail->next = item; cq->tail = item; pthread_cond_signal(&cq->cond); pthread_mutex_unlock(&cq->lock); } /* * Returns a fresh connection queue item. */ static CQ_ITEM *cqi_new(void) { CQ_ITEM *item = NULL; pthread_mutex_lock(&cqi_freelist_lock); if (cqi_freelist) { item = cqi_freelist; cqi_freelist = item->next; } pthread_mutex_unlock(&cqi_freelist_lock); if (NULL == item) { int i; /* Allocate a bunch of items at once to reduce fragmentation */ item = (CQ_ITEM*)malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC); if (NULL == item) return NULL; /* * Link together all the new items except the first one * (which we'll return to the caller) for placement on * the freelist. */ for (i = 2; i < ITEMS_PER_ALLOC; i++) item[i - 1].next = &item[i]; pthread_mutex_lock(&cqi_freelist_lock); item[ITEMS_PER_ALLOC - 1].next = cqi_freelist; cqi_freelist = &item[1]; pthread_mutex_unlock(&cqi_freelist_lock); } return item; } /* * Frees a connection queue item (adds it to the freelist.) */ static void cqi_free(CQ_ITEM *item) { pthread_mutex_lock(&cqi_freelist_lock); item->next = cqi_freelist; cqi_freelist = item; pthread_mutex_unlock(&cqi_freelist_lock); } /******************************* GLOBAL STATS ******************************/ void STATS_LOCK() { pthread_mutex_lock(&stats_lock); } void STATS_UNLOCK() { pthread_mutex_unlock(&stats_lock); } /******************************* LIBEVENT_THREAD ******************************/ void conn_process(CQ_ITEM *item, LIBEVENT_THREAD *me) { int connfd,n; char buff[4096]; //printf("======conn_process======\n"); while(1){ connfd = item->sfd; //while(TRUE) { // 从客户端接收数据 n = recv(connfd, buff, MAXLINE, 0); buff[n-1] = '\0'; printf("THREAD_ID %ld :recv msg from client: %s\n",me->thread_id, buff); //close(connfd); // 向客户端发送数据 char szText[44] = " TCP Server Demo! \r\n"; sprintf(szText,"[%s]THREAD_ID %ld ..server reply!\n",buff,pthread_self()); send(connfd, szText, strlen(szText), 0); //close(connfd); } close(connfd); break; } } /* * Processes an incoming "handle a new connection" item. This is called when * input arrives on the libevent wakeup pipe. */ static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg; CQ_ITEM *item; char buf[1]; if (read(fd, buf, 1) != 1) if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); switch (buf[0]) { case 'c': item = cq_pop(me->new_conn_queue); if (NULL != item) { conn_process(item, me); } cqi_free(item); break; } /* we were told to flip the lock type and report in */ } static void wait_for_thread_registration(int nthreads) { while (init_count < nthreads) { pthread_cond_wait(&init_cond, &init_lock); } } static void register_thread_initialized(void) { pthread_mutex_lock(&init_lock); init_count++; pthread_cond_signal(&init_cond); pthread_mutex_unlock(&init_lock); } /* * Worker thread: main event loop */ static void *worker_libevent(void *arg) { LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg; /* Any per-thread setup can happen here; thread_init() will block until * all threads have finished initializing. */ /* set an indexable thread-specific memory item for the lock type. * this could be unnecessary if we pass the conn *c struct through * all item_lock calls... */ //me->item_lock_type = ITEM_LOCK_GRANULAR; //pthread_setspecific(item_lock_type_key, &me->item_lock_type); register_thread_initialized(); //event_base_loop(me->base, 0); while(1) { thread_libevent_process(me->notify_receive_fd, 0, me); } return NULL; } /* * Creates a worker thread. */ static void create_worker(void *(*func)(void *), void *arg) { //pthread_t thread; pthread_attr_t attr; int ret; pthread_attr_init(&attr); if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) { perror("pthread_create"); exit(1); } } /* * Set up a thread's information. */ static void setup_thread(LIBEVENT_THREAD *me) { me->base = event_init(); if (! me->base) { fprintf(stderr, "Can't allocate event base\n"); exit(1); } me->new_conn_queue = (struct conn_queue *)malloc(sizeof(struct conn_queue)); if (me->new_conn_queue == NULL) { perror("Failed to allocate memory for connection queue"); exit(EXIT_FAILURE); } cq_init(me->new_conn_queue); if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { perror("Failed to initialize mutex"); exit(EXIT_FAILURE); } } void methread_init(int nthreads, struct event_base *main_base) { int i; int power; pthread_mutex_init(&init_lock, NULL); pthread_cond_init(&init_cond, NULL); pthread_mutex_init(&cqi_freelist_lock, NULL); cqi_freelist = NULL; threads = (LIBEVENT_THREAD *)calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can't allocate thread descriptors"); exit(1); } dispatcher_thread.base = main_base; dispatcher_thread.thread_id = pthread_self(); for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; ////比如libevent句柄,连接队列等的初始化 setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ //stats.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); } /******************************* LIBEVENT_DISPATCHER_THREAD ******************************/ /* * Dispatches a new connection to another thread. This is only ever called * from the main thread, either during initialization (for UDP) or because * of an incoming connection. */ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); char buf[1]; int tid = (last_thread + 1) % settings.num_threads; LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } } /* * Wait continously for events. We exit only if no events are left. */ void event_dispatch_loop(int listenfd) { int connfd; printf("======waiting for client's request======\n"); while(1){ if( (connfd = accept(listenfd, (struct sockaddr*)NULL, NULL)) == -1){ perror("accept"); continue; } dispatch_conn_new(connfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport); //close(connfd); } } int listening_socket(int listenfd) { int connfd; struct sockaddr_in servaddr; char buff[4096]; int n; if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){ perror("socket"); exit(0); } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(6666); if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){ perror("bind"); exit(0); } if( listen(listenfd, 10) == -1){ perror("listen"); exit(0); } return listenfd; } /******************************* MAIN ******************************/ int main() { int listenfd; settings.num_threads=3; /* initialize main thread libevent instance */ main_base = event_init(); /* start up worker threads if MT mode */ methread_init(settings.num_threads, main_base); /* create the listening socket, bind it, and init */ listenfd = listening_socket(listenfd); printf("%ld :pthread_self.\n",pthread_self()); /* Give the sockets a moment to open. I know this is dumb, but the error * is only an advisory. */ usleep(1000); /* enter the event loop */ event_dispatch_loop(listenfd); close(listenfd); }
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<sys/types.h> #include<sys/socket.h> #include<netinet/in.h> #include <arpa/inet.h> #define MAXLINE 4096 int main(int argc, char** argv) { int sockfd, n,nRecv; char recvline[4096], sendline[4096]; struct sockaddr_in servaddr; if( argc != 2){ printf("usage: ./client <ipaddress>\n"); exit(0); } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(6666); if( inet_pton(AF_INET, argv[1], &servaddr.sin_addr) <= 0){ printf("inet_pton error for %s\n",argv[1]); exit(0); } //printf("send msg to server: \n"); while(1) { printf("发送出数据:"); // 向服务器端发送数据 fgets(sendline, 4096, stdin); if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ printf("create socket error: %s(errno: %d)\n", strerror(errno),errno); exit(0); } if( connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0){ printf("connect error: %s(errno: %d)\n",strerror(errno),errno); exit(0); } if( send(sockfd, sendline, strlen(sendline), 0) < 0) { printf("send msg error: %s(errno: %d)\n", strerror(errno), errno); exit(0); } //从服务器端接收数据 int nRecv = recv(sockfd, recvline, 4096, 0); if(nRecv > 0) { recvline[nRecv] = '\0'; printf("接收到数据:%s\n", recvline); } //close(sockfd); } exit(0); }
相关推荐
Unix多线程和socket编程技术培训教材
本文介绍了在Windows 操作系统下基于TCP/IP 协议Socket 套接口的通信机制以及多线程编程知识与技巧,并给出多线程方式实现多用户与服务端(C/S)并发通信模型的详细算法,最后展现了用C++编写的多用户与服务器通信的...
项目名称 基于TCP协议模型的聊天室 项目功能 支持最多100人同时在线聊天,要求每个客户端登陆时需要输入昵称,然后发送任意想说的内容 项目的架构和分析 采用C/S架构进行设计
本书范例丰富,且具有代表性,如Socket编程、客户/服务端编程、多线程开发、CGI编程、X Windows下的Motif编程等。读者直接或只需稍作修改就可以将它们应用到自己的应用程序开发中。这些范例的源代码可以从配套光盘的...
Socket编程特别是多线程编程是一个很大的课题,本文针对在不同环境(Windows、Unix)两个软件来讲解socket和多线程的可移植编程技术,所涉及的是其中较基础的部分
socket通信, 多线程编程, 信号量 线程锁
Linux网络编程之socket编程篇 Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口)...
全书从对套接口API的综合讨论开始,论述了基本编程内容后,即转入高级套接口编程的相关主题,包括IPv4与IPv6的互操作性、UNIX域协议、非阻塞I/O、路由套接口、广播、多播、线程、原始套接口、数据链路访问等,对于...
Linux网络编程之socket编程篇 Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口)...
Linux网络编程之socket编程篇 Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口)...
21socket编程(十六) UNIX域协议特点 UNIX域地址结构 UNIX域字节流回射客户/服务 UNIX域套接字编程注意点 22socket编程(十七) socketpair sendmsg/recvmsg UNIX域套接字传递描述符字 Linux网络编程之进程间通信篇...
Linux网络编程之socket编程篇 Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口) 02...
Linux网络编程之socket编程篇 Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口)...
Linux网络编程之socket编程篇 Linux网络编程之进程间通信篇 Linux网络编程之线程篇 Linux网络编程之TCP/IP基础篇 01TCPIP基础(一) ISO/OSI参考模型 TCP/IP四层模型 基本概念(对等通信、封装、分用、端口)...
Linux系统是一个免费使用和自由传播的类Unix操作系统,基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。它继承了Unix以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统,Linux是许多企业...
全书由13章组成,内容涉及到Lindx系统编程基础、TCP/UDP协议、套接字编程概念及I/O模型、高级编程中需要用到的进程问通信同步、多路复用、多线程编程和一些高级套接字控制方法、IPv6介绍以及网络安全等。...
4.4.1 多线程编程实例 127 4.4.2 Linux下线程创建函数pthread_create() 129 4.4.3 线程的结束函数pthread_join()和pthread_exit() 129 4.4.4 线程的属性 130 4.4.5 线程间的互斥 132 4.4.6 线程中使用信号...
4.4.1 多线程编程实例 127 4.4.2 Linux下线程创建函数pthread_create() 129 4.4.3 线程的结束函数pthread_join()和pthread_exit() 129 4.4.4 线程的属性 130 4.4.5 线程间的互斥 132 4.4.6 线程中使用信号...
4.4.1 多线程编程实例 127 4.4.2 Linux下线程创建函数pthread_create() 129 4.4.3 线程的结束函数pthread_join()和pthread_exit() 129 4.4.4 线程的属性 130 4.4.5 线程间的互斥 132 4.4.6 线程中使用信号...
第十五章 多线程编程 15.1 线程的概念 15.1.1 线程的概念 15.1.2 线程的分类 15.1.3 线程的创建和等待函数 15.1.4 线程的属性函数 15.2 线程间的同步 15.2.1 无名信号量 ...