Mercurial > hgrepos > Python2 > PyMuPDF
diff mupdf-source/platform/x11/curl_stream.c @ 2:b50eed0cc0ef upstream
ADD: MuPDF v1.26.7: the MuPDF source as downloaded by a default build of PyMuPDF 1.26.4.
The directory name has changed: no version number in the expanded directory now.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 15 Sep 2025 11:43:07 +0200 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mupdf-source/platform/x11/curl_stream.c Mon Sep 15 11:43:07 2025 +0200 @@ -0,0 +1,651 @@ +// Copyright (C) 2004-2021 Artifex Software, Inc. +// +// This file is part of MuPDF. +// +// MuPDF is free software: you can redistribute it and/or modify it under the +// terms of the GNU Affero General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// MuPDF is distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +// details. +// +// You should have received a copy of the GNU Affero General Public License +// along with MuPDF. If not, see <https://www.gnu.org/licenses/agpl-3.0.en.html> +// +// Alternative licensing terms are available from the licensor. +// For commercial licensing, see <https://www.artifex.com/> or contact +// Artifex Software, Inc., 39 Mesa Street, Suite 108A, San Francisco, +// CA 94129, USA, for further information. + +#include "mupdf/fitz.h" +#include "curl_stream.h" + +#include <assert.h> +#include <string.h> +#include <ctype.h> + +#include <curl/curl.h> + +#ifdef _WIN32 +#include <windows.h> +#else +#include <pthread.h> +#endif + +#undef DEBUG_BLOCK_FETCHING + +#ifdef DEBUG_BLOCK_FETCHING +#ifdef _WIN32 +#include <varargs.h> +static void +output(const char *fmt, ...) +{ + va_list args; + char text[256]; + + va_start(args, fmt); + vsnprintf(text, sizeof(text), fmt, args); + va_end(args); + + OutputDebugString(text); +} +#else +#define output printf +#endif + +#define DEBUG_MESSAGE(A) do { output A; } while(0) +#else +#define DEBUG_MESSAGE(A) do { } while(0) +#endif + +#define BLOCK_SHIFT 18 +#define BLOCK_SIZE (1<<BLOCK_SHIFT) + +#define HAVE_BLOCK(map, num) (((map)[(num)>>3] & (1<<((num) & 7))) != 0) + +typedef struct curlstate +{ + fz_context *ctx; + CURL *easy; + + /* START: The following entries are protected by the lock */ + CURLcode curl_error; + char error_buffer[CURL_ERROR_SIZE]; + int data_arrived; + int complete; + int kill_thread; + int accept_ranges; + int head; + + /* content buffer */ + size_t content_length; /* 0 => Unknown length */ + unsigned char *buffer; + size_t buffer_fill; + size_t buffer_max; + + /* map of which blocks we have */ + unsigned char *map; + size_t map_length; + + /* outstanding curl request info */ + size_t next_fill_start; /* The next file offset we will fetch to */ + size_t current_fill_start; /* The current file offset we are fetching to */ + size_t current_fill_end; + /* END: The above entries are protected by the lock */ + + void (*more_data)(void *,int); + void *more_data_arg; + + unsigned char public_buffer[4096]; + + /* We assume either Windows threads or pthreads here. */ +#ifdef _WIN32 + void *thread; + DWORD thread_id; + HANDLE mutex; +#else + pthread_t thread; + pthread_mutex_t mutex; +#endif +} curlstate; + +#ifdef _WIN32 +static int locked; + +static void +lock(curlstate *state) +{ + WaitForSingleObject(state->mutex, INFINITE); + assert(locked == 0); + locked = 1; +} + +static void +unlock(curlstate *state) +{ + assert(locked == 1); + locked = 0; + ReleaseMutex(state->mutex); +} +#else +static void +lock(curlstate *state) +{ + pthread_mutex_lock(&state->mutex); +} + +static void +unlock(curlstate *state) +{ + pthread_mutex_unlock(&state->mutex); +} +#endif + +static size_t on_curl_header(void *ptr, size_t size, size_t nmemb, void *state_) +{ + struct curlstate *state = state_; + + lock(state); + if (fz_strncasecmp(ptr, "Accept-Ranges: bytes", 20) == 0) + { + DEBUG_MESSAGE(("header arrived with Accept-Ranges!\n")); + state->accept_ranges = 1; + } + + if (fz_strncasecmp(ptr, "Content-Length:", 15) == 0) + { + char *s = ptr; + state->content_length = fz_atoi(s + 15); + DEBUG_MESSAGE(("header arrived with Content-Length: %zu\n", state->content_length)); + } + unlock(state); + + return nmemb * size; +} + +static size_t on_curl_data(void *ptr, size_t size, size_t nmemb, void *state_) +{ + struct curlstate *state = state_; + size_t old_start; + + size *= nmemb; + + lock(state); + if (state->data_arrived == 0) + { + /* This is the first time data has arrived. + * If the header has Accept-Ranges then we can do byte requests. + * We know the Content-Length from having processed the header already. + */ + if (state->content_length == 0) + { + /* What a crap server. Won't tell us how big the file + * is. We'll have to expand as data as arrives. */ + DEBUG_MESSAGE(("have no length!\n")); + } + else if (state->accept_ranges) + { + /* We got a range header, and the correct http response + * code. We can assume that byte fetches are accepted + * and we'll run without progressive mode. */ + size_t len = state->content_length; + state->map_length = (len+BLOCK_SIZE-1)>>BLOCK_SHIFT; + state->map = fz_malloc_no_throw(state->ctx, (state->map_length+7)>>3); + state->buffer = fz_malloc_no_throw(state->ctx, len); + state->buffer_max = len; + if (state->map == NULL || state->buffer == NULL) + { + unlock(state); + return 0; + } + memset(state->map, 0, (state->map_length+7)>>3); + DEBUG_MESSAGE(("have range header content_length=%zu!\n", state->content_length)); + } + else + { + /* We know the length, and that we can use ByteRanges - + * we can run as a progressive file. */ + state->buffer = fz_malloc_no_throw(state->ctx, state->content_length); + if (state->buffer == NULL) + { + unlock(state); + return 0; + } + state->buffer_max = state->content_length; + } + + state->data_arrived = 1; + } + + if (state->content_length == 0) + { + size_t newsize = (state->current_fill_start + size); + if (newsize > state->buffer_max) + { + /* Expand the buffer */ + size_t new_max = state->buffer_max * 2; + if (new_max == 0) + new_max = 4096; + fz_try(state->ctx) + state->buffer = fz_realloc_array(state->ctx, state->buffer, new_max, unsigned char); + fz_catch(state->ctx) + { + unlock(state); + return 0; + } + state->buffer_max = new_max; + } + } + + DEBUG_MESSAGE(("data arrived: offset=%ld len=%ld\n", state->current_fill_start, size)); + /* Although we always trigger fills starting on block boundaries, + * code this to allow for curl calling us to copy smaller blocks + * as they arrive. */ + old_start = state->current_fill_start; + if (state->current_fill_start + size > state->buffer_max) { + unlock(state); + return 0; + } + memcpy(state->buffer + state->current_fill_start, ptr, size); + state->current_fill_start += size; + /* If we've reached the end, or at least a different block + * mark that we've got that block. */ + if (state->map && (state->current_fill_start == state->content_length || + (((state->current_fill_start ^ old_start) & ~(BLOCK_SIZE-1)) != 0))) + { + old_start >>= BLOCK_SHIFT; + state->map[old_start>>3] |= 1<<(old_start & 7); + } + unlock(state); + + return size; +} + +static void fetch_chunk(struct curlstate *state) +{ + char text[32]; + size_t block, start, end; + CURLcode ret; + + ret = curl_easy_perform(state->easy); + if (ret != CURLE_OK) { + /* If we get an error, store it, and kill the thread. + * The next fetch will return it. */ + lock(state); + state->curl_error = ret; + state->kill_thread = 1; + unlock(state); + return; + } + + /* We finished the header, now request the body. */ + lock(state); + if (state->head) + { + state->head = 0; + curl_easy_setopt(state->easy, CURLOPT_NOBODY, 0); + curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, NULL); + curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, NULL); + if (state->accept_ranges) + { + fz_snprintf(text, 32, "%d-%d", 0, BLOCK_SIZE-1); + curl_easy_setopt(state->easy, CURLOPT_RANGE, text); + state->next_fill_start = BLOCK_SIZE; + } + unlock(state); + return; + } + + /* We finished the current body. If not accepting ranges, that's the end. */ + if (!state->accept_ranges) + { + DEBUG_MESSAGE(("we got it all, in one request.\n")); + state->complete = 1; + state->kill_thread = 1; + unlock(state); + return; + } + + /* Find the next block to fetch */ + assert((state->next_fill_start & (BLOCK_SHIFT-1)) == 0); + block = state->next_fill_start>>BLOCK_SHIFT; + if (state->content_length > 0) + { + /* Find the next block that we haven't got */ + size_t map_length = state->map_length; + unsigned char *map = state->map; + while (block < map_length && HAVE_BLOCK(map, block)) + ++block; + if (block == map_length) + { + block = 0; + while (block < map_length && HAVE_BLOCK(map, block)) + ++block; + if (block == map_length) + { + /* We've got it all! */ + DEBUG_MESSAGE(("we got it all block=%zu map_length=%zu!\n", block, map_length)); + state->complete = 1; + state->kill_thread = 1; + unlock(state); + return; + } + } + } + else + { + state->complete = 1; + state->kill_thread = 1; + unlock(state); + return; + } + + DEBUG_MESSAGE(("block requested was %zu, fetching %zu\n", state->next_fill_start>>BLOCK_SHIFT, block)); + + /* Set up fetch of that block */ + start = block<<BLOCK_SHIFT; + end = start + BLOCK_SIZE-1; + state->current_fill_start = start; + if (state->content_length > 0 && end >= state->content_length) + end = state->content_length-1; + state->current_fill_end = end; + fz_snprintf(text, 32, "%d-%d", start, end); + + /* Unless anyone changes this in the meantime, the + * next block we fetch will follow on from this one. */ + state->next_fill_start = state->current_fill_start+BLOCK_SIZE; + unlock(state); + + /* Request next range! */ + DEBUG_MESSAGE(("requesting range %s\n", text)); + curl_easy_setopt(state->easy, CURLOPT_RANGE, text); +} + +static int cs_next(fz_context *ctx, fz_stream *stream, size_t len) +{ + struct curlstate *state = stream->state; + size_t len_read = 0; + int64_t read_point = stream->pos; + int block = read_point>>BLOCK_SHIFT; + size_t left_over = (-read_point) & (BLOCK_SIZE-1); + unsigned char *buf = state->public_buffer; + int err_type; + + assert(len != 0); + + stream->rp = stream->wp = buf; + lock(state); + err_type = state->complete ? FZ_ERROR_GENERIC : FZ_ERROR_TRYLATER; + + /* If we got an error from the fetching thread, + * throw it here (but just once). */ + if (state->curl_error) + { + CURLcode err = state->curl_error; + char errstr[CURL_ERROR_SIZE]; + memcpy(errstr, state->error_buffer, CURL_ERROR_SIZE); + memset(state->error_buffer, 0, CURL_ERROR_SIZE); + state->curl_error = 0; + unlock(state); + fz_throw(ctx, FZ_ERROR_GENERIC, "cannot fetch data: %s: %s", curl_easy_strerror(err), errstr); + } + + if ((size_t) read_point > state->content_length) + { + unlock(state); + if (state->data_arrived == 0) + fz_throw(ctx, err_type, "read of a block we don't have (A) (offset=%ld)", read_point); + return EOF; + } + + if (len > sizeof(state->public_buffer)) + len = sizeof(state->public_buffer); + + if (state->map == NULL) + { + /* We are doing a simple linear fetch as we don't know the + * content length. */ + if (read_point + len > state->current_fill_start) + { + unlock(state); + fz_throw(ctx, err_type, "read of a block we don't have (B) (offset=%ld)", read_point); + } + memcpy(buf, state->buffer + read_point, len); + unlock(state); + stream->wp = buf + len; + stream->pos += len; + if (len == 0) + return EOF; + return *stream->rp++; + } + + /* We are reading from a "mapped" file */ + if (read_point + len > state->content_length) + len = state->content_length - read_point; + if (left_over > len) + left_over = len; + if (left_over > 0) + { + /* We are starting midway through a block */ + if (!HAVE_BLOCK(state->map, block)) + { + state->next_fill_start = block<<BLOCK_SHIFT; + unlock(state); + fz_throw(ctx, err_type, "read of a block we don't have (C) (offset=%ld)", read_point); + } + block++; + memcpy(buf, state->buffer + read_point, left_over); + buf += left_over; + read_point += left_over; + len -= left_over; + len_read += left_over; + } + + /* Copy any complete blocks */ + while (len > BLOCK_SIZE) + { + if (!HAVE_BLOCK(state->map, block)) + { + /* We don't have enough data to fulfill the request. */ + /* Fetch the next block from here. */ + unlock(state); + state->next_fill_start = block<<BLOCK_SHIFT; + stream->wp += len_read; + stream->pos += len_read; + /* If we haven't fetched anything, throw. */ + if (len_read == 0) + fz_throw(ctx, err_type, "read of a block we don't have (D) (offset=%ld)", read_point); + /* Otherwise, we got at least one byte, so we can safely return that. */ + return *stream->rp++; + } + block++; + memcpy(buf, state->buffer + read_point, BLOCK_SIZE); + buf += BLOCK_SIZE; + read_point += BLOCK_SIZE; + len -= BLOCK_SIZE; + len_read += BLOCK_SIZE; + } + + /* Copy any trailing bytes */ + if (len > 0) + { + if (!HAVE_BLOCK(state->map, block)) + { + /* We don't have enough data to fulfill the request. */ + /* Fetch the next block from here. */ + unlock(state); + state->next_fill_start = block<<BLOCK_SHIFT; + stream->wp += len_read; + stream->pos += len_read; + /* If we haven't fetched anything, throw. */ + if (len_read == 0) + fz_throw(ctx, err_type, "read of a block we don't have (E) (offset=%ld)", read_point); + /* Otherwise, we got at least one byte, so we can safely return that. */ + return *stream->rp++; + } + memcpy(buf, state->buffer + read_point, len); + len_read += len; + } + + unlock(state); + stream->wp += len_read; + stream->pos += len_read; + if (len_read == 0) + return EOF; + return *stream->rp++; +} + +static void cs_close(fz_context *ctx, void *state_) +{ + struct curlstate *state = state_; + + lock(state); + state->kill_thread = 1; + unlock(state); + +#ifdef _WIN32 + WaitForSingleObject(state->thread, INFINITE); + CloseHandle(state->thread); + CloseHandle(state->mutex); +#else + pthread_join(state->thread, NULL); + pthread_mutex_destroy(&state->mutex); +#endif + + curl_easy_cleanup(state->easy); + fz_free(ctx, state->buffer); + fz_free(ctx, state->map); + fz_free(ctx, state); +} + +static void cs_seek(fz_context *ctx, fz_stream *stm, int64_t offset, int whence) +{ + struct curlstate *state = stm->state; + + stm->wp = stm->rp; + if (whence == SEEK_END) + { + size_t clen; + int data_arrived; + int complete; + lock(state); + data_arrived = state->data_arrived; + clen = state->content_length; + complete = state->complete; + unlock(state); + if (!data_arrived && !complete) + fz_throw(ctx, FZ_ERROR_TRYLATER, "still awaiting file length"); + stm->pos = clen + offset; + } + else if (whence == SEEK_CUR) + stm->pos += offset; + else + stm->pos = offset; + if (stm->pos < 0) + stm->pos = 0; +} + +static void +fetcher_thread(curlstate *state) +{ + /* Keep fetching chunks on a background thread until + * either we have to kill the thread, or the fetch + * is complete. */ + while (1) { + int complete; + lock(state); + complete = state->complete || state->kill_thread; + unlock(state); + if (complete) + break; + fetch_chunk(state); + if (state->more_data) + state->more_data(state->more_data_arg, 0); + } + if (state->more_data) + state->more_data(state->more_data_arg, 1); + lock(state); + state->complete = 1; + unlock(state); +} + +#ifdef _WIN32 +static DWORD WINAPI +win_thread(void *lparam) +{ + fetcher_thread((curlstate *)lparam); + + return 0; +} +#else +static void * +pthread_thread(void *arg) +{ + fetcher_thread((curlstate *)arg); + return NULL; +} +#endif + +fz_stream *fz_open_url(fz_context *ctx, const char *url, int kbps, void (*more_data)(void *,int), void *more_data_arg) +{ + struct curlstate *state; + fz_stream *stm; + CURLcode code; + + state = fz_malloc_struct(ctx, struct curlstate); + state->ctx = ctx; + + code = curl_global_init(CURL_GLOBAL_ALL); + if (code != CURLE_OK) + fz_throw(ctx, FZ_ERROR_GENERIC, "curl_global_init failed"); + + state->easy = curl_easy_init(); + if (!state->easy) + fz_throw(ctx, FZ_ERROR_GENERIC, "curl_easy_init failed"); + + curl_easy_setopt(state->easy, CURLOPT_URL, url); + curl_easy_setopt(state->easy, CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt(state->easy, CURLOPT_MAXREDIRS, 12); + curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYPEER, 0); + curl_easy_setopt(state->easy, CURLOPT_SSL_VERIFYHOST, 0); + curl_easy_setopt(state->easy, CURLOPT_MAX_RECV_SPEED_LARGE, kbps * 1024); + curl_easy_setopt(state->easy, CURLOPT_HEADERFUNCTION, on_curl_header); + curl_easy_setopt(state->easy, CURLOPT_WRITEHEADER, state); + curl_easy_setopt(state->easy, CURLOPT_WRITEFUNCTION, on_curl_data); + curl_easy_setopt(state->easy, CURLOPT_WRITEDATA, state); + curl_easy_setopt(state->easy, CURLOPT_FAILONERROR, 1L); + curl_easy_setopt(state->easy, CURLOPT_ERRORBUFFER, &state->error_buffer); +#ifdef DEBUG_BLOCK_FETCHING + curl_easy_setopt(state->easy, CURLOPT_VERBOSE, 1L); +#endif + + /* Get only the HEAD first. */ + state->head = 1; + curl_easy_setopt(state->easy, CURLOPT_NOBODY, 1); + +#ifdef _WIN32 + state->mutex = CreateMutex(NULL, FALSE, NULL); + if (state->mutex == NULL) + fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed"); + + state->thread = CreateThread(NULL, 0, win_thread, state, 0, &state->thread_id); + if (state->thread == NULL) + fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed"); +#else + if (pthread_mutex_init(&state->mutex, NULL)) + fz_throw(ctx, FZ_ERROR_GENERIC, "mutex creation failed"); + + if (pthread_create(&state->thread, NULL, pthread_thread, state)) + fz_throw(ctx, FZ_ERROR_GENERIC, "thread creation failed"); +#endif + state->more_data = more_data; + state->more_data_arg = more_data_arg; + + stm = fz_new_stream(ctx, state, cs_next, cs_close); + stm->progressive = 1; + stm->seek = cs_seek; + return stm; +}
