Wednesday, June 06, 2007

pthread example

Set pthread joinable or detached status?

Usually, the default pthread attribution is in joinable status, and then we always encounter a case that multiple threads created when asynchronous. For example, main process created thread 1 and then wait for the thread1 to finish to join it back. But at the same time, the main process cannot create another thread any more.

And something notable is that we cannot just create pthreads of joinable without waiting to join them, because joining them in is like resource recycle mechanism when they are finished.

So why are the joinable threads so worthy? What i understand is that the process which created the joinable pthread needed to know the return status from the pthread it created. In that way, the joinable pthread is worthy to wati to join back.

On the contrary, not every pthread we must to wait. At this moment, we can create them as detached status, which means we just create and leave them away, no status returned from them. Actually, if we create pthread as detached status, we can create them infinitely without waiting to join them back, and we don't need to do that basically.

sample code below,


#include
#include

void *run(void *argv)
{
  pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
  pthread_t thd;
  pthread_attr_t attr;
  int i=0, rc=0;

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  for (i=0; 1; ++i) {
    if ( (rc=pthread_create(&thd, &attr, run, (void*)i))!=0 ) {
      printf("After %d pthread created(rc=%d)\n", i, rc);
      perror("pthread_create");
      return -1;
    }

#if 0
    if ( (rc=pthread_join(thd, NULL))!=0 ) {
      perror("pthread_join");
      return -1;
    }
#endif

    if ( i%100==0 ) {
      printf("%dth pthread created\n", i);
    }
  }

  pthread_attr_destroy(&attr);
  return 0;

}

Result:
After 382 pthreads created(errno:12)
pthread_create: Cannot allocate memory


#include
#include

void *run(void *argv)
{
  pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
  pthread_t thd;
  pthread_attr_t attr;
  int i=0, rc=0;

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

  for (i=0; 1; ++i) {
    if ( (rc=pthread_create(&thd, &attr, run, (void*)i))!=0 ) {
      printf("After %d pthread created(rc=%d)\n", i, rc);
      perror("pthread_create");
      return -1;
    }

    if ( i%100==0 ) {
      printf("%dth pthread created\n", i);
    }

  }

  pthread_attr_destroy(&attr);
  return 0;

}

Result:
2700th pthread created
2800th pthread created
2900th pthread created
3000th pthread created
3100th pthread created
After 3183 pthread created(rc=11)

pthread_create: Resource temporarily unavailable

Tuesday, June 05, 2007

Message Queue example

The time to use Message Queue I know is internal logging, and that's the way it used.

msgq_rcv <- MSG Queue <- msgq_snd

sample code below,
msgq.h

#ifndef _MSGQ_H_
#define _MSGQ_H_

#define MSGQ_PATH "/tmp"
#define MSGQ_ID 111
#define MSG_LEN 128

typedef struct {
long mtype; // MUST
time_t time;
char msg[MSG_LEN+1];
} msgq_t ;

#endif /* _MSGQ_H_ */

msgq_rcv.c

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <time.h>
#include "msgq.h"

int main(int argc, char *argv[])
{
key_t key;
int msgqid;
msgq_t msg;

if ( (key=ftok(MSGQ_PATH, MSGQ_ID)) == -1) {
perror("ftok");
return -1;
}

if ( (msgqid = msgget(key, 0666|IPC_CREAT)) == -1) {
perror("msgget");
return -1;
}

while (1) {
if (-1==msgrcv(msgqid, &msg, sizeof(msg), 0, 0)) {
perror("msgrcv");
return -1;
}
printf("%s->%s\n", ctime(&msg.time), msg.msg);
}

return 0;
}

msgq_snd.c

#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <time.h>
#include <string.h>
#include "msgq.h"

int main(int argc, char *argv[])
{
key_t key;
int msgqid;
msgq_t msg;

if ( (key=ftok(MSGQ_PATH, MSGQ_ID)) == -1) {
perror("ftok");
return -1;
}

if ( (msgqid = msgget(key, 0666|IPC_CREAT)) == -1) {
perror("msgget");
return -1;
}

while (1) {
memset(&msg, 0x00, sizeof(msg));
msg.mtype = 1; // MUST
fgets(msg.msg, sizeof(msg.msg)-1, stdin);
time(&(msg.time));
if (-1==msgsnd(msgqid, (msgq_t*)&msg, sizeof(msg), 0)) {
perror("msgsnd");
return -1;
}
}
}

read/write the binary content to file

An example shown here to present
1) how to write the binary content to a file
2) and then read the content from it

Some steps
1) Prepare chunks of memory
Usually the content will be records of memory of any structure type, in case it can be restored back with the structure in step4.
2) Output to a file
Open a file first by open()/fopen(), and then write the records to the file by write()/fwrite(). Of course, all the content can be written once or one by one, it depends on your design decision. Remember to close the file at last.
3) Input the file
We want to read the binary content. First we should open the file and read the content by read()/fread(). How many size we need? Actually, we would not have no idea the real amount of record, but we know the basic element should be the original structure type. so we can use the structure as the element size and then get the chunks of the same size one by one. Finally and without any exception, when we can get nothing, we have gotten all the records without any damage.
4) Recognize the content
The way to recognize the binary content is usually to cast the binary record to be of the original structure type.

Pitfall here,
Usually, we can get the exact all the records without any damage. But exceptions always happen around, so we should have some preventions in hand. The common exception here is the damaged data, so how to make sure the data integrity is the major task. A way to keep data integrity is append the checksum to the real data. so before storing the data, we calculating the checksum and append the result to the data, and after we read the content, we calculate the checksum and compare with the old one. If they match, the data isn't damaged. If not, trouble happened.

Another issue concerned here is the structure layout. Usually, the structure size will always be the multiples of 4, which is defined by the WORD size of the processor. If the actual size is not the multiples of 4, the compiler will expand it to be the multiples by stuffing the hole up. So make sure the structure as compact as possible.

Issue of binary data exchange between different byte order systems is also the problem. It should be the responsibility of the application of upper layer to make sure the portability.

#include <stdio.h>
#include <time.h>
#include <string.h>

#define USER_LEN 10
#define PASS_LEN 10
#define TOTAL_ACCOUNTS 10

typedef struct {
time_t create_time;
char user[USER_LEN+1];
char pass[PASS_LEN+1];
char reserved[2];
} account_t ;

int main(int argc, char *argv[])
{
int i=0;
FILE *fd;
account_t acc[TOTAL_ACCOUNTS];

// writer
memset(acc, 0x00, sizeof(acc[0])*TOTAL_ACCOUNTS);
for (i=0; i< TOTAL_ACCOUNTS; ++i) {
snprintf(acc[i].user, sizeof(acc[0].user)-1, "user%d", i);
snprintf(acc[i].pass, sizeof(acc[0].pass)-1, "pass%d", i);
time(&acc[i].create_time);
}
fd = fopen("accounts.bin", "wb");
fwrite(acc, sizeof(acc[0]), TOTAL_ACCOUNTS, fd);
fclose(fd);

// reader
memset(acc, 0x00, sizeof(acc[0])*TOTAL_ACCOUNTS);
fd = fopen("accounts.bin", "rb");
for (i=0; !feof(fd) ; ++i) {
fread(&acc[i], sizeof(acc[0]), 1, fd);
if ( feof(fd) ) // A pitfall here
break;

printf("%d,%s/%s/%s\n", i, acc[i].user, acc[i].pass
, ctime(&acc[i].create_time) );
}
fclose(fd);
return 0;
}


Usually, hexdump is alwasy useful to dump the binary content out.

Monday, June 04, 2007

Note

The characteristic of Broadcast packet
1) Destination MAC address is all of 0xff.
2) used in arp request, icmp ping request

Windows Socket

The project must be setting to link wsock32.lib or ws2_32.lib. The difference between them I know is ws2_32.lib supports select function, but the other doesn't.
#include <stdio.h> #include <winsock2.h> #include <time.h> #define SENDBUF_LEN 100 #define RECVBUF_LEN 100 int main(int argc, char *argv[]) { WSADATA wsaData; int cfd; struct sockaddr_in addr; char *ip; short port; char sendbuf[SENDBUF_LEN]; char recvbuf[RECVBUF_LEN]; int recvlen=0; fd_set rfds, wfds; struct timeval timeout; int S=0; timeout.tv_sec=1; timeout.tv_usec=0; if (argc != 3) { printf("Usage:\n\t%s server_ip server_port\n" , argv[0]); return -1; } ip = argv[1]; port = (short)atoi(argv[2]); WSAStartup(0x0101, &wsaData); if ( (cfd = socket(AF_INET, SOCK_STREAM, 0) < 0){ printf("Cannot create socekt\n"); return -1; } memset(&addr, 0x00, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip); if ( connect(cfd, (struct sockaddr *)&addr, sizeof(struct sockaddr)) < 0) { printf("Cannot connect to %s\n", ip); closesocket(cfd); return -1; } FD_ZERO(&wfds); FD_SET((unsigned int)cfd, &wfds); if ( select(cfd+1, NULL, &wfds, NULL, &timeout) <=0) { printf("select fail\n"); return -1; } sprintf(sendbuf, "GET / HTTP/1.0\r\n"); if (send(cfd, sendbuf, strlen(sendbuf), 0) < 0) { printf("Cannot send\n"); closesocket(cfd); return -1; } srand(time(NULL)); while (S < 2000) { int s=0; while (s < sizeof(sendbuf)) { sendbuf[s++] = (char)rand(); } if (send(cfd, sendbuf, sizeof(sendbuf), 0) < 0) { printf("Cannot send\n"); closesocket(cfd); return -1; } S += s; } sprintf(sendbuf, "\r\n\r\n"); if (send(cfd, sendbuf, strlen(sendbuf), 0) < 0) { printf("Cannot send\n"); closesocket(cfd); return -1; } FD_ZERO(&rfds); FD_SET((unsigned int)cfd, &rfds); if ( select(cfd+1, &rfds, NULL, NULL, &timeout) <=0) { printf("select fail\n"); return -1; } if ( (recvlen=recv(cfd, recvbuf, strlen(recvbuf), 0)) < 0) { printf("Cannot recv\n"); closesocket(cfd); return -1; } recvbuf[recvlen]=0x00; printf("%s", recvbuf); return 0; }