#include "mupdf/fitz.h" #include "curl_stream.h" #include #include #include #include #ifdef _WIN32 #include #else #include #endif #undef DEBUG_BLOCK_FETCHING #ifdef DEBUG_BLOCK_FETCHING #ifdef _WIN32 #include static void output(const char *fmt, ...) { va_list args; char text[256]; va_start(args, fmt); vsnprintf(text, sizeof(text), fmt, args); va_end(args); OutputDebugString(text); } #else #define output printf #endif #define DEBUG_MESSAGE(A) do { output A; } while(0) #else #define DEBUG_MESSAGE(A) do { } while(0) #endif #define BLOCK_SHIFT 18 #define BLOCK_SIZE (1<>3] & (1<<((num) & 7))) != 0) typedef struct curlstate { fz_context *ctx; CURL *easy; /* START: The following entries are protected by the lock */ CURLcode curl_error; int data_arrived; int complete; int kill_thread; int accept_ranges; int head; /* content buffer */ size_t content_length; /* 0 => Unknown length */ unsigned char *buffer; size_t buffer_fill; size_t buffer_max; /* map of which blocks we have */ unsigned char *map; size_t map_length; /* outstanding curl request info */ size_t next_fill_start; /* The next file offset we will fetch to */ size_t current_fill_start; /* The current file offset we are fetching to */ size_t current_fill_end; /* END: The above entries are protected by the lock */ void (*more_data)(void *,int); void *more_data_arg; unsigned char public_buffer[4096]; /* We assume either Windows threads or pthreads here. */ #ifdef _WIN32 void *thread; DWORD thread_id; HANDLE mutex; #else pthread_t thread; pthread_mutex_t mutex; #endif } curlstate; #ifdef _WIN32 static int locked; static void lock(curlstate *state) { WaitForSingleObject(state->mutex, INFINITE); assert(locked == 0); locked = 1; } static void unlock(curlstate *state) { assert(locked == 1); locked = 0; ReleaseMutex(state->mutex); } #else static void lock(curlstate *state) { pthread_mutex_lock(&state->mutex); } static void unlock(curlstate *state) { pthread_mutex_unlock(&state->mutex); } #endif static size_t on_curl_header(void *ptr, size_t size, size_t nmemb, void *state_) { struct curlstate *state = state_; lock(state); if (strncmp(ptr, "Accept-Ranges: bytes", 20) == 0) { DEBUG_MESSAGE(("header arrived with Accept-Ranges!\n")); state->accept_ranges = 1; } if (strncmp(ptr, "Content-Length:", 15) == 0) { char *s = ptr; state->content_length = fz_atoi(s + 15); DEBUG_MESSAGE(("header arrived with Content-Length: %d\n", state->content_length)); } unlock(state); return nmemb * size; } static size_t on_curl_data(void *ptr, size_t size, size_t nmemb, void *state_) { struct curlstate *state = state_; size_t old_start; size *= nmemb; lock(state); if (state->data_arrived == 0) { /* This is the first time data has arrived. * If the header has Accept-Ranges then we can do byte requests. * We know the Content-Length from having processed the header already. */ if (state->content_length == 0) { /* What a crap server. Won't tell us how big the file * is. We'll have to expand as data as arrives. */ DEBUG_MESSAGE(("have no length!\n")); } else if (state->accept_ranges) { /* We got a range header, and the correct http response * code. We can assume that byte fetches are accepted * and we'll run without progressive mode. */ size_t len = state->content_length; state->map_length = (len+BLOCK_SIZE-1)>>BLOCK_SHIFT; state->map = fz_malloc_no_throw(state->ctx, (state->map_length+7)>>3); state->buffer = fz_malloc_no_throw(state->ctx, len); state->buffer_max = len; if (state->map == NULL || state->buffer == NULL) { unlock(state); return 0; } memset(state->map, 0, (state->map_length+7)>>3); DEBUG_MESSAGE(("have range header content_length=%d!\n", state->content_length)); } else { /* We know the length, and that we can use ByteRanges - * we can run as a progressive file. */ state->buffer = fz_malloc_no_throw(state->ctx, state->content_length); if (state->buffer == NULL) { unlock(state); return 0; } state->buffer_max = state->content_length; } state->data_arrived = 1; } if (state->content_length == 0) { size_t newsize = (state->current_fill_start + size); if (newsize > state->buffer_max) { /* Expand the buffer */ size_t new_max = state->buffer_max * 2; if (new_max == 0) new_max = 4096; fz_try(state->ctx) state->buffer = fz_realloc_array(state->ctx, state->buffer, new_max, unsigned char); fz_catch(state->ctx) { unlock(state); return 0; } state->buffer_max = new_max; } } DEBUG_MESSAGE(("data arrived: offset=%ld len=%ld\n", state->current_fill_start, size)); /* Although we always trigger fills starting on block boundaries, * code this to allow for curl calling us to copy smaller blocks * as they arrive. */ old_start = state->current_fill_start; memcpy(state->buffer + state->current_fill_start, ptr, size); state->current_fill_start += size; /* If we've reached the end, or at least a different block * mark that we've got that block. */ if (state->map && (state->current_fill_start == state->content_length || (((state->current_fill_start ^ old_start) & ~(BLOCK_SIZE-1)) != 0))) { old_start >>= BLOCK_SHIFT; state->map[old_start>>3] |= 1<<(old_start & 7); } unlock(state); return size; } static void fetch_chunk(struct curlstate *state) { char text[32]; size_t block, start, end; CURLcode ret; ret = curl_easy_perform(state->easy); if (ret != CURLE_OK) { /* If we get an error, store it, and kill the thread. * The next fetch will return it. */ lock(state); state->curl_error = ret; state->kill_thread = 1; unlock(state); return; } /* We finished the header, now request the body. */ lock(state); if (state->head) { state->head = 0; curl_easy_setopt(state->easy, CURLOPT_NOBODY, 0); curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, NULL); curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, NULL); if (state->accept_ranges) { fz_snprintf(text, 32, "%d-%d", 0, BLOCK_SIZE-1); curl_easy_setopt(state->easy, CURLOPT_RANGE, text); state->next_fill_start = BLOCK_SIZE; } unlock(state); return; } /* We finished the current body. If not accepting ranges, that's the end. */ if (!state->accept_ranges) { DEBUG_MESSAGE(("we got it all, in one request.\n")); state->complete = 1; state->kill_thread = 1; unlock(state); return; } /* Find the next block to fetch */ assert((state->next_fill_start & (BLOCK_SHIFT-1)) == 0); block = state->next_fill_start>>BLOCK_SHIFT; if (state->content_length > 0) { /* Find the next block that we haven't got */ size_t map_length = state->map_length; unsigned char *map = state->map; while (block < map_length && HAVE_BLOCK(map, block)) ++block; if (block == map_length) { block = 0; while (block < map_length && HAVE_BLOCK(map, block)) ++block; if (block == map_length) { /* We've got it all! */ DEBUG_MESSAGE(("we got it all block=%d map_length=%d!\n", block, map_length)); state->complete = 1; state->kill_thread = 1; unlock(state); return; } } } else { state->complete = 1; state->kill_thread = 1; unlock(state); return; } DEBUG_MESSAGE(("block requested was %d, fetching %d\n", state->next_fill_start>>BLOCK_SHIFT, block)); /* Set up fetch of that block */ start = block<current_fill_start = start; if (state->content_length > 0 && end >= state->content_length) end = state->content_length-1; state->current_fill_end = end; fz_snprintf(text, 32, "%d-%d", start, end); /* Unless anyone changes this in the meantime, the * next block we fetch will follow on from this one. */ state->next_fill_start = state->current_fill_start+BLOCK_SIZE; unlock(state); /* Request next range! */ DEBUG_MESSAGE(("requesting range %s\n", text)); curl_easy_setopt(state->easy, CURLOPT_RANGE, text); } static int cs_next(fz_context *ctx, fz_stream *stream, size_t len) { struct curlstate *state = stream->state; size_t len_read = 0; int64_t read_point = stream->pos; int block = read_point>>BLOCK_SHIFT; size_t left_over = (-read_point) & (BLOCK_SIZE-1); unsigned char *buf = state->public_buffer; assert(len != 0); stream->rp = stream->wp = buf; lock(state); /* If we got an error from the fetching thread, * throw it here (but just once). */ if (state->curl_error) { CURLcode err = state->curl_error; state->curl_error = 0; unlock(state); fz_throw(ctx, FZ_ERROR_GENERIC, "cannot fetch data: %s", curl_easy_strerror(err)); } if ((size_t) read_point > state->content_length) { unlock(state); if (state->data_arrived == 0) fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (A) (offset=%ld)", read_point); return EOF; } if (len > sizeof(state->public_buffer)) len = sizeof(state->public_buffer); if (state->map == NULL) { /* We are doing a simple linear fetch as we don't know the * content length. */ if (read_point + len > state->current_fill_start) { unlock(state); fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (B) (offset=%ld)", read_point); } memcpy(buf, state->buffer + read_point, len); unlock(state); stream->wp = buf + len; stream->pos += len; if (len == 0) return EOF; return *stream->rp++; } /* We are reading from a "mapped" file */ if (read_point + len > state->content_length) len = state->content_length - read_point; if (left_over > len) left_over = len; if (left_over > 0) { /* We are starting midway through a block */ if (!HAVE_BLOCK(state->map, block)) { state->next_fill_start = block<buffer + read_point, left_over); buf += left_over; read_point += left_over; len -= left_over; len_read += left_over; } /* Copy any complete blocks */ while (len > BLOCK_SIZE) { if (!HAVE_BLOCK(state->map, block)) { /* We don't have enough data to fulfill the request. */ /* Fetch the next block from here. */ unlock(state); state->next_fill_start = block<wp += len_read; stream->pos += len_read; /* If we haven't fetched anything, throw. */ if (len_read == 0) fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (D) (offset=%ld)", read_point); /* Otherwise, we got at least one byte, so we can safely return that. */ return *stream->rp++; } block++; memcpy(buf, state->buffer + read_point, BLOCK_SIZE); buf += BLOCK_SIZE; read_point += BLOCK_SIZE; len -= BLOCK_SIZE; len_read += BLOCK_SIZE; } /* Copy any trailing bytes */ if (len > 0) { if (!HAVE_BLOCK(state->map, block)) { /* We don't have enough data to fulfill the request. */ /* Fetch the next block from here. */ unlock(state); state->next_fill_start = block<wp += len_read; stream->pos += len_read; /* If we haven't fetched anything, throw. */ if (len_read == 0) fz_throw(ctx, FZ_ERROR_TRYLATER, "read of a block we don't have (E) (offset=%ld)", read_point); /* Otherwise, we got at least one byte, so we can safely return that. */ return *stream->rp++; } memcpy(buf, state->buffer + read_point, len); len_read += len; } unlock(state); stream->wp += len_read; stream->pos += len_read; if (len_read == 0) return EOF; return *stream->rp++; } static void cs_close(fz_context *ctx, void *state_) { struct curlstate *state = state_; lock(state); state->kill_thread = 1; unlock(state); #ifdef _WIN32 WaitForSingleObject(state->thread, INFINITE); CloseHandle(state->thread); CloseHandle(state->mutex); #else pthread_join(state->thread, NULL); pthread_mutex_destroy(&state->mutex); #endif curl_easy_cleanup(state->easy); fz_free(ctx, state->buffer); fz_free(ctx, state->map); fz_free(ctx, state); } static void cs_seek(fz_context *ctx, fz_stream *stm, int64_t offset, int whence) { struct curlstate *state = stm->state; stm->wp = stm->rp; if (whence == SEEK_END) { size_t clen; int data_arrived; lock(state); data_arrived = state->data_arrived; clen = state->content_length; unlock(state); if (!data_arrived) fz_throw(ctx, FZ_ERROR_TRYLATER, "still awaiting file length"); stm->pos = clen + offset; } else if (whence == SEEK_CUR) stm->pos += offset; else stm->pos = offset; if (stm->pos < 0) stm->pos = 0; } static void fetcher_thread(curlstate *state) { /* Keep fetching chunks on a background thread until * either we have to kill the thread, or the fetch * is complete. */ while (1) { int complete; lock(state); complete = state->complete || state->kill_thread; unlock(state); if (complete) break; fetch_chunk(state); if (state->more_data) state->more_data(state->more_data_arg, 0); } if (state->more_data) state->more_data(state->more_data_arg, 1); } #ifdef _WIN32 static DWORD WINAPI win_thread(void *lparam) { fetcher_thread((curlstate *)lparam); return 0; } #else static void * pthread_thread(void *arg) { fetcher_thread((curlstate *)arg); return NULL; } #endif fz_stream *fz_open_url(fz_context *ctx, const char *url, int kbps, void (*more_data)(void *,int), void *more_data_arg) { struct curlstate *state; fz_stream *stm; CURLcode code; state = fz_malloc_struct(ctx, struct curlstate); state->ctx = ctx; code = curl_global_init(CURL_GLOBAL_ALL); if (code != CURLE_OK) fz_throw(ctx, FZ_ERROR_GENERIC, "curl_global_init failed"); state->easy = curl_easy_init(); if (!state->easy) fz_throw(ctx, FZ_ERROR_GENERIC, "curl_easy_init failed"); curl_easy_setopt(state->easy, CURLOPT_URL, url); curl_easy_setopt(state->easy, CURLOPT_FOLLOWLOCATION, 1); curl_easy_setopt(state->easy, CURLOPT_MAXREDIRS, 12); curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYPEER, 0); curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYHOST, 0); curl_easy_setopt(state->easy, CURLOPT_MAX_RECV_SPEED_LARGE, kbps * 1024); curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, on_curl_header); curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, state); curl_easy_setopt(state->easy, CURLOPT_WRITEFUNCTION, on_curl_data); curl_easy_setopt(state->easy, CURLOPT_WRITEDATA, state); /* Get only the HEAD first. */ state->head = 1; curl_easy_setopt(state->easy, CURLOPT_NOBODY, 1); #ifdef _WIN32 state->mutex = CreateMutex(NULL, FALSE, NULL); if (state->mutex == NULL) fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed"); state->thread = CreateThread(NULL, 0, win_thread, state, 0, &state->thread_id); if (state->thread == NULL) fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed"); #else if (pthread_mutex_init(&state->mutex, NULL)) fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed"); if (pthread_create(&state->thread, NULL, pthread_thread, state)) fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed"); #endif state->more_data = more_data; state->more_data_arg = more_data_arg; stm = fz_new_stream(ctx, state, cs_next, cs_close); stm->progressive = 1; stm->seek = cs_seek; return stm; }