Skip to content

HTTPFilesystem has a race condition on data size between the open and read calls, if content changes at server between the 2 calls #1541

Open
@masariello

Description

@masariello

The following script reproduces the issues

The script spins up an http server that makes the json content 1 chat longer every 1s.

Then the client bit hits the url with a 1s sleep between the open and read calls. The json parsing immediately fails because the terminating { gets chopped.

For good measure the same test url is also hit with a requests.get call that does not seem to have any issues.

import json
from time import sleep
import datetime as dt
import requests
from threading import Thread
from urllib.parse import urlparse
import fsspec
from fsspec.implementations.dirfs import DirFileSystem

dummy_base_url = 'http://localhost:8080/'
dummy_uri = 'foo/bar'
dummy_url = dummy_base_url + dummy_uri

protocol = urlparse(dummy_url).scheme
dummy_fs = fsspec.filesystem(protocol)
dummy_dirfs = DirFileSystem(path=dummy_base_url, fs=dummy_fs)


def start_dummy_http_server():
    """Spin up a dummy HTTP Server to test the observed issue
    The returned response from a GET request is in JSON format
    The returned message is of varying size; with the size changing every second

    :return:
    """
    from http.server import HTTPServer, BaseHTTPRequestHandler

    class Serv(BaseHTTPRequestHandler):
        def do_GET(self):
            t = dt.datetime.now()
            s = t.second
            t_str = t.strftime('%Y-%m-%d %H:%M:%S')
            response = {'time': t_str, 'extra': 'x'*s}  # YYYY-MM-DD HH:MM:SS xxxxxxxx (x repeated s times)
            response = json.dumps(response)
            self.send_response(200)
            self.send_header('Content-Length', str(len(response)))
            self.end_headers()
            self.wfile.write(bytes(response, 'utf-8'))

    httpd = HTTPServer(('localhost', 8080), Serv)
    httpd.serve_forever()

print("starting http server")
thread = Thread(target=start_dummy_http_server, daemon=True)
thread.start()
print("http server started")

count = 0
while count < 500:
    count += 1
    t = dt.datetime.now()
    old_r = requests.get(dummy_url).content  # this works fine
    with dummy_dirfs.open(dummy_uri) as f:
        open_size = f.size
        sleep(1)
        # the message on the server has changed between open and read
        # the server returns the new message, but truncates it to size of original message
        data = f.read()
    new_r = requests.get(dummy_url).content  # this works fine
    old_requests_size = len(old_r)
    new_requests_size = len(new_r)
    read_size = len(data)  # len(json.dumps(y))
    print(f"#{count} - t = {t}")
    print(f"open size = {open_size}")
    print(f"read size = {read_size}")
    print(f"orig requests size = {old_requests_size}")
    print(f"new requests size = {new_requests_size}")
    print(data)
    j = json.loads(data)  # this will fail; because the JSON is not valid
    num_keys = len(j.keys())
    print("success")
    sleep(1)


thread.join()
print("script ended")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions