Building a Poller/Parser/Sender System for Scraping Data

In a previous post I described how I want to implement a Neural Network infrastructure for my algo trading experiments. In order to train my model I need some data.

I need to fetch my data from a source or multiple sources in case the information I need is not available from a single source. In my case I need fundamental data, which is freely available from the SEC. I also need historical pricing data which I will fetch from Yahoo Finance.

I want to be able to specify for what ticker symbols I want to fetch data and what kind of fundamental data I am interested in (e.g.: cash, long-term debt, etc). Similarly, I want to be able to specify the date period for the data. Since this data comes from different sources and thus in a different structure, my tool must be able to parse it into a single format that I can work with. Finally, it needs to be stored in a relational database (e.g.: SQLite) for later and repeated querying.

For the implementation I have used Python. I called it Turbine and it is available on GitHub. This is the landscape I came up with:

_config.yml

The main workers of the framework are the Poller, Parser and the Sender.

DirectoryWatcher: The DirectoryWatcher watches the input directory for any incoming request.json files. These files contain information about what data Turbine should fetch. A sample request.json file is listed below:

{
  "topic": "concept",
  "resources": [
    {
      "tickers": ["AAPL", "MSFT"],
      "concepts": [
        {
          "name": "AccountsPayableCurrent",
          "year": 2014
        }
      ]
    },
    {
      "tickers": ["BIIB"],
      "concepts": [
        {
          "name": "LongTermDebt",
          "year": 2014
        }
      ]
    }
  ]
}

Poller: Poller threads connect to the source and poll for data. The source can be a database, file system, Web API, etc. The data is cached on disc as JSON or CSV file and then passed on to the Parser for further processing. Pollers will look if the requested data is cached before trying to fetch it from the original source. If you want to force the Pollers to re-fetch the data from the source you must erase the cache directory.

Parser: Parsers get the data files polled by the Pollers. They extract data and make it available for persistence. For that they dynamically load extractors from disc that handle the different data formats. The results are then passed on to the Senders.

You can easily create your custom extractors by subclassing the AbstractExtractor class and place the file into the extractors folder. Turbine will pick them up automatically on the next restart.

Here is an example of an extractor that processes CSV files:

class PriceExtractor(AbstractExtractor):
    @overrides(AbstractExtractor)
    def supports_input(self, request: DataExtractionRequest):
        return request.file_extension and request.file_extension.upper() == ".CSV"

    ...

    @ overrides(AbstractExtractor)
    def extract(self, request: DataExtractionRequest):
        self.log(f"Proccessing '{request.file}'")
        result = []
        rows = self._read_rows_from_file(file=request.file)
        for row in rows:
            try:
                result.append(
                    Price(
                        ticker=Ticker(symbol=request.ticker),
                        date=dt.datetime.strptime(row['Date'], "%Y-%m-%d").date(),
                        open=float(row['Open']),
                        high=float(row['High']),
                        low=float(row['Low']),
                        close=float(row['Close']),
                        adj_close=float(row['Adj Close']),
                        volume=float(row['Volume'])
                    )
                )
            except Exception as e:
                self.log_exception(exception=e)

        return result

In the above example the method supports_input(...) simply tells the framework that this extractor can only be used for files that have the extension .csv.

The extract(...) method should return a list of model objects that hold the extracted values.

Notice also how the extractor comes with logging capabilities as well using self.log(...).

Sender: Sender threads are responsible for persisting data into an SQLite database.

Other features are the following:

  • Configurabe: Turbine is fully customizable and configurable. Name, poller/parser/sender implementation can be configured using a .ini file.
  • Logging: Build in logging that can be configured using a .ini file. Any exceptions are automatically caught and properly logged.
  • Multithreading: For maximum performance you can define how many pollers, parsers and senders Turbine should spawn. It autimatically optimizes based on available CPUs.
  • Transaction Handler: You want to move successfully proccessed files to an archive directory and failed-to-proccess files to an error folder? No problem. Just implement your custom Transaction Handler to get notified when a poll just finished or failed processing which you can then handle as you see fit.
  • Monitoring: Turbine can be configured to listen on a specified port. Using the build in client a user can remotely connect to it and run basic queries or stop Turbine.

For information visit the GitHub repo.

Written on December 11, 2021