| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| /* Copyright (c) 2001 by David Chiang. All rights reserved.*/ |
| |
| #include <string.h> |
| #include <stdlib.h> |
| #include <unistd.h> |
| #include <fcntl.h> |
| #include <stdio.h> |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <sys/time.h> |
| #include <netinet/in.h> |
| #include <sched.h> |
| #include <pthread.h> |
| #include <errno.h> |
| |
| #include "sentserver.h" |
| |
| #define MAX_CLIENTS 64 |
| |
| struct clientinfo { |
| int s; |
| struct sockaddr_in sin; |
| }; |
| |
| struct line { |
| int id; |
| char *s; |
| int status; |
| struct line *next; |
| } *head, **ptail; |
| |
| int n_sent = 0, n_received=0, n_flushed=0; |
| |
| #define STATUS_RUNNING 0 |
| #define STATUS_ABORTED 1 |
| #define STATUS_FINISHED 2 |
| |
| pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; |
| pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; |
| pthread_mutex_t input_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| int n_clients = 0; |
| int s; |
| int expect_multiline_output = 0; |
| int log_mutex = 0; |
| int stay_alive = 0; /* dont panic and die with zero clients */ |
| int quiet = 0; |
| |
| void queue_finish(struct line *node, char *s, int fid); |
| char * read_line(int fd, int multiline); |
| void done (int code); |
| |
| struct line * queue_get(int fid) { |
| struct line *cur; |
| char *s, *synch; |
| |
| if (log_mutex) fprintf(stderr, "Getting for data for fid %d\n", fid); |
| if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); |
| pthread_mutex_lock(&queue_mutex); |
| |
| /* First, check for aborted sentences. */ |
| |
| if (log_mutex) fprintf(stderr, " Checking queue for aborted jobs (fid %d)\n", fid); |
| for (cur = head; cur != NULL; cur = cur->next) { |
| if (cur->status == STATUS_ABORTED) { |
| cur->status = STATUS_RUNNING; |
| |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| |
| return cur; |
| } |
| } |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| |
| /* Otherwise, read a new one. */ |
| if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid); |
| if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid); |
| pthread_mutex_lock(&input_mutex); |
| s = read_line(0,0); |
| |
| while (s) { |
| if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); |
| pthread_mutex_lock(&queue_mutex); |
| if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid); |
| pthread_mutex_unlock(&input_mutex); |
| |
| cur = malloc(sizeof (struct line)); |
| cur->id = n_sent; |
| cur->s = s; |
| cur->next = NULL; |
| |
| *ptail = cur; |
| ptail = &cur->next; |
| |
| n_sent++; |
| |
| if (strcmp(s,"===SYNCH===\n")==0){ |
| fprintf(stderr, "Received ===SYNCH=== signal (fid %d)\n", fid); |
| // Note: queue_finish calls free(cur->s). |
| // Therefore we need to create a new string here. |
| synch = malloc((strlen("===SYNCH===\n")+2) * sizeof (char)); |
| synch = strcpy(synch, s); |
| |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| queue_finish(cur, synch, fid); /* handles its own lock */ |
| |
| if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid); |
| if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid); |
| pthread_mutex_lock(&input_mutex); |
| |
| s = read_line(0,0); |
| } else { |
| if (log_mutex) fprintf(stderr, " Received new data %d (fid %d)\n", cur->id, fid); |
| cur->status = STATUS_RUNNING; |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| return cur; |
| } |
| } |
| |
| if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid); |
| pthread_mutex_unlock(&input_mutex); |
| /* Only way to reach this point: no more output */ |
| |
| if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); |
| pthread_mutex_lock(&queue_mutex); |
| if (head == NULL) { |
| fprintf(stderr, "Reached end of file. Exiting.\n"); |
| done(0); |
| } else |
| ptail = NULL; /* This serves as a signal that there is no more input */ |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| |
| return NULL; |
| } |
| |
| void queue_panic() { |
| struct line *next; |
| while (head && head->status == STATUS_FINISHED) { |
| /* Write out finished sentences */ |
| if (head->status == STATUS_FINISHED) { |
| fputs(head->s, stdout); |
| fflush(stdout); |
| } |
| /* Write out blank line for unfinished sentences */ |
| if (head->status == STATUS_ABORTED) { |
| fputs("\n", stdout); |
| fflush(stdout); |
| } |
| /* By defition, there cannot be any RUNNING sentences, since |
| function is only called when n_clients == 0 */ |
| free(head->s); |
| next = head->next; |
| free(head); |
| head = next; |
| n_flushed++; |
| } |
| fclose(stdout); |
| fprintf(stderr, "All clients died. Panicking, flushing completed sentences and exiting.\n"); |
| done(1); |
| } |
| |
| void queue_abort(struct line *node, int fid) { |
| if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); |
| pthread_mutex_lock(&queue_mutex); |
| node->status = STATUS_ABORTED; |
| if (n_clients == 0) { |
| if (stay_alive) { |
| fprintf(stderr, "Warning! No live clients detected! Staying alive, will retry soon.\n"); |
| } else { |
| queue_panic(); |
| } |
| } |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| } |
| |
| |
| void queue_print() { |
| struct line *cur; |
| |
| fprintf(stderr, " Queue\n"); |
| |
| for (cur = head; cur != NULL; cur = cur->next) { |
| switch(cur->status) { |
| case STATUS_RUNNING: |
| fprintf(stderr, " %d running ", cur->id); break; |
| case STATUS_ABORTED: |
| fprintf(stderr, " %d aborted ", cur->id); break; |
| case STATUS_FINISHED: |
| fprintf(stderr, " %d finished ", cur->id); break; |
| |
| } |
| fprintf(stderr, "\n"); |
| //fprintf(stderr, cur->s); |
| } |
| } |
| |
| void queue_finish(struct line *node, char *s, int fid) { |
| struct line *next; |
| if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); |
| pthread_mutex_lock(&queue_mutex); |
| |
| free(node->s); |
| node->s = s; |
| node->status = STATUS_FINISHED; |
| n_received++; |
| |
| /* Flush out finished nodes */ |
| while (head && head->status == STATUS_FINISHED) { |
| |
| if (log_mutex) fprintf(stderr, " Flushing finished node %d\n", head->id); |
| |
| fputs(head->s, stdout); |
| fflush(stdout); |
| if (log_mutex) fprintf(stderr, " Flushed node %d\n", head->id); |
| free(head->s); |
| |
| next = head->next; |
| free(head); |
| |
| head = next; |
| |
| n_flushed++; |
| |
| if (head == NULL) { /* empty queue */ |
| if (ptail == NULL) { /* This can only happen if set in queue_get as signal that there is no more input. */ |
| fprintf(stderr, "All sentences finished. Exiting.\n"); |
| done(0); |
| } else /* ptail pointed at something which was just popped off the stack -- reset to head*/ |
| ptail = &head; |
| } |
| } |
| |
| if (log_mutex) fprintf(stderr, " Flushing output %d\n", head->id); |
| fflush(stdout); |
| if (!quiet) |
| fprintf(stderr, "%d sentences sent, %d sentences finished, %d sentences flushed\n", n_sent, n_received, n_flushed); |
| |
| if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); |
| pthread_mutex_unlock(&queue_mutex); |
| |
| } |
| |
| char * read_line(int fd, int multiline) { |
| int size = 80; |
| char errorbuf[100]; |
| char *s = malloc(size+2); |
| int result, errors=0; |
| int i = 0; |
| |
| result = read(fd, s+i, 1); |
| |
| while (1) { |
| if (result < 0) { |
| perror("read()"); |
| sprintf(errorbuf, "Error code: %d\n", errno); |
| fprintf(stderr, "%s", errorbuf); |
| errors++; |
| if (errors > 5) { |
| free(s); |
| return NULL; |
| } else { |
| sleep(1); /* retry after delay */ |
| } |
| } else if (result == 0) { |
| break; |
| } else if (multiline==0 && s[i] == '\n') { |
| break; |
| } else { |
| if (s[i] == '\n'){ |
| /* if we've reached this point, |
| then multiline must be 1, and we're |
| going to poll the fd for an additional |
| line of data. The basic design is to |
| run a select on the filedescriptor fd. |
| Select will return under two conditions: |
| if there is data on the fd, or if a |
| timeout is reached. We'll select on this |
| fd. If select returns because there's data |
| ready, keep going; else assume there's no |
| more and return the data we already have. |
| */ |
| |
| fd_set set; |
| FD_ZERO(&set); |
| FD_SET(fd, &set); |
| |
| struct timeval timeout; |
| timeout.tv_sec = 3; // number of seconds for timeout |
| timeout.tv_usec = 0; |
| |
| int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout); |
| if (ready<1){ |
| break; // no more data, stop looping |
| } |
| } |
| i++; |
| |
| if (i == size) { |
| size = size*2; |
| s = realloc(s, size+2); |
| } |
| } |
| |
| result = read(fd, s+i, 1); |
| } |
| |
| if (result == 0 && i == 0) { /* end of file */ |
| free(s); |
| return NULL; |
| } |
| |
| s[i] = '\n'; |
| s[i+1] = '\0'; |
| |
| return s; |
| } |
| |
| void * new_client(void *arg) { |
| struct clientinfo *client = (struct clientinfo *)arg; |
| struct line *cur; |
| int result; |
| char *s; |
| char errorbuf[100]; |
| |
| pthread_mutex_lock(&clients_mutex); |
| n_clients++; |
| pthread_mutex_unlock(&clients_mutex); |
| |
| fprintf(stderr, "Client connected (%d connected)\n", n_clients); |
| |
| for (;;) { |
| |
| cur = queue_get(client->s); |
| |
| if (cur) { |
| /* fprintf(stderr, "Sending to client: %s", cur->s); */ |
| if (!quiet) |
| fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s); |
| result = write(client->s, cur->s, strlen(cur->s)); |
| if (result < strlen(cur->s)){ |
| perror("write()"); |
| sprintf(errorbuf, "Error code: %d\n", errno); |
| fprintf(stderr, "%s", errorbuf); |
| |
| pthread_mutex_lock(&clients_mutex); |
| n_clients--; |
| pthread_mutex_unlock(&clients_mutex); |
| |
| fprintf(stderr, "Client died (%d connected)\n", n_clients); |
| queue_abort(cur, client->s); |
| |
| close(client->s); |
| free(client); |
| |
| pthread_exit(NULL); |
| } |
| } else { |
| close(client->s); |
| pthread_mutex_lock(&clients_mutex); |
| n_clients--; |
| pthread_mutex_unlock(&clients_mutex); |
| fprintf(stderr, "Client dismissed (%d connected)\n", n_clients); |
| pthread_exit(NULL); |
| } |
| |
| s = read_line(client->s,expect_multiline_output); |
| if (s) { |
| /* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */ |
| if (!quiet) |
| fprintf(stderr, "Client (fid %d) returned data %d\n", client->s, cur->id); |
| // queue_print(); |
| queue_finish(cur, s, client->s); |
| } else { |
| pthread_mutex_lock(&clients_mutex); |
| n_clients--; |
| pthread_mutex_unlock(&clients_mutex); |
| |
| fprintf(stderr, "Client died (%d connected)\n", n_clients); |
| queue_abort(cur, client->s); |
| |
| close(client->s); |
| free(client); |
| |
| pthread_exit(NULL); |
| } |
| |
| } |
| return 0; |
| } |
| |
| void done (int code) { |
| close(s); |
| exit(code); |
| } |
| |
| |
| |
| int main (int argc, char *argv[]) { |
| struct sockaddr_in sin, from; |
| int g; |
| socklen_t len; |
| struct clientinfo *client; |
| int port; |
| int opt; |
| int errors = 0; |
| int argi; |
| char *key = NULL, *client_key; |
| int use_key = 0; |
| /* the key stuff here doesn't provide any |
| real measure of security, it's mainly to keep |
| jobs from bumping into each other. */ |
| |
| pthread_t tid; |
| port = DEFAULT_PORT; |
| |
| for (argi=1; argi < argc; argi++){ |
| if (strcmp(argv[argi], "-m")==0){ |
| expect_multiline_output = 1; |
| } else if (strcmp(argv[argi], "-k")==0){ |
| argi++; |
| if (argi == argc){ |
| fprintf(stderr, "Key must be specified after -k\n"); |
| exit(1); |
| } |
| key = argv[argi]; |
| use_key = 1; |
| } else if (strcmp(argv[argi], "--stay-alive")==0){ |
| stay_alive = 1; /* dont panic and die with zero clients */ |
| } else if (strcmp(argv[argi], "-q")==0) { |
| quiet = 1; |
| } else { |
| port = atoi(argv[argi]); |
| } |
| } |
| |
| /* Initialize data structures */ |
| head = NULL; |
| ptail = &head; |
| |
| /* Set up listener */ |
| s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
| opt = 1; |
| setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); |
| |
| sin.sin_family = AF_INET; |
| sin.sin_addr.s_addr = htonl(INADDR_ANY); |
| sin.sin_port = htons(port); |
| while (bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) { |
| perror("bind()"); |
| sleep(1); |
| errors++; |
| if (errors > 100) |
| exit(1); |
| } |
| |
| len = sizeof(sin); |
| getsockname(s, (struct sockaddr *) &sin, &len); |
| |
| fprintf(stderr, "Listening on port %hu\n", ntohs(sin.sin_port)); |
| |
| while (listen(s, MAX_CLIENTS) < 0) { |
| perror("listen()"); |
| sleep(1); |
| errors++; |
| if (errors > 100) |
| exit(1); |
| } |
| |
| for (;;) { |
| len = sizeof(from); |
| g = accept(s, (struct sockaddr *)&from, &len); |
| if (g < 0) { |
| perror("accept()"); |
| sleep(1); |
| continue; |
| } |
| client = malloc(sizeof(struct clientinfo)); |
| client->s = g; |
| bcopy(&from, &client->sin, len); |
| |
| if (use_key){ |
| fd_set set; |
| FD_ZERO(&set); |
| FD_SET(client->s, &set); |
| |
| struct timeval timeout; |
| timeout.tv_sec = 3; // number of seconds for timeout |
| timeout.tv_usec = 0; |
| |
| int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout); |
| if (ready<1){ |
| fprintf(stderr, "Prospective client failed to respond with correct key.\n"); |
| close(client->s); |
| free(client); |
| } else { |
| client_key = read_line(client->s,0); |
| client_key[strlen(client_key)-1]='\0'; /* chop trailing newline */ |
| if (strcmp(key, client_key)==0){ |
| pthread_create(&tid, NULL, new_client, client); |
| } else { |
| fprintf(stderr, "Prospective client failed to respond with correct key.\n"); |
| close(client->s); |
| free(client); |
| } |
| free(client_key); |
| } |
| } else { |
| pthread_create(&tid, NULL, new_client, client); |
| } |
| } |
| |
| } |
| |
| |
| |