Blog Taxonomy

Home

Multithreaded code and Pandas

kinow @ Jul 22, 2018 13:22:14 ()

Woman looking

Pandas provides high-performance data structures in Python. I think in Java there are similar data structures in projects like Apache Commons Collections, Google Guava, and also Trove.

In the Java libraries thread-safety is always a must-have feature. Probably as it is quite common for a Java program to have more than one thread, especially if the code runs in some sort of web container.

I recently learned that Pandas, on the other hand, does not guarantee any thread-safety. I found that while reading an issue about race condition in the IndexEngine, and after preparing a pull request for that.

from concurrent.futures import ThreadPoolExecutor
import pandas as pd

x = pd.date_range('2001', '2020')
with ThreadPoolExecutor(2) as p:
    assert all(p.map(lambda x: x.is_unique, [x]*2))

When you create an index like that, it will delegate most of the hard work to the IndexEngine. Inside the IndexEngine, the values passed for the index are stored, and then an empty Hashtable is created (as well as several flags for the state of the object, such as unique, which defines whether the index has unique elements or not).

Once a user calls a method like is_unique, then the flags are updated, the Hashtable mapping is populated, and while doing so, if not all elements are unique, the flag for unique is set to false, or true otherwise. But if the user does not need that operation, we will avoid populating the mapping until we really need it.

I believe it is done that way for performance. However, at the cost that the state is shared among calls, which makes it harder to use this API in a multithreaded environment - though still possible by moving the synchronization to your caller code.

Some Apache software also do the same, asking users to synchronize, serialize, or handle certain corner cases on their side. There is a huge cost associated with maintaining an Open Source project that promises thread-safety.

But maybe Dask provides an alternative to use Pandas with multiple threads. But I have not used that yet.

♥ Open Source

NB: even though Pandas is not thread-safe, it does not mean you should not use it. Just use with care when using multiple threads

Cylc Scheduler Internals - Part 1

kinow @ Jul 14, 2018 22:42:47 ()

This is the first post in a series of three (or maybe four later) based on diagrams I collected while debugging the Cylc scheduler. The scheduler is called by the cylc start utility.

NB: this is a post to remember things, not really expecting to give someone enough information to be able to hack the Cylc Scheduler (though you can and would have fun!).

Instead of going at length on what happens (and there is quite a bit happening when you run cylc start my.suite), I will use the following diagram, followed by a few paragraphs to highlight certain parts. The code used was based on Cylc 7.7.1.

When a Cylc command like cylc start is invoked, it actually gets translated into bin/cylc-$command_name. cylc-start is a Shell file, that will simply call cylc-run. cylc-run then imports scheduler_cli… but what you need to know is that in the end scheduler_cli will create an instance of Scheduler with the right constructor arguments, and call its start method.

After that point, you are at the left-most lifeline, on the Scheduler constructor (i.e. the init method of the scheduler.py’s Scheduler class).

If you follow the method calls - which are hopefully easy to understand and follow - you will find that the constructor merely creates a few objects, prepares the suite information, and the suite database.

Then the start method kicks things off, interacting with previously created objects, but also with some singletons for logging and configuration. Oh, that Cylc banner is also printed here (in case you would like to customize it as in SpringBoot).

After that, if you are running the suite as daemon, it will be daemonized, by forking the current process, but with Python.

One important step that happens here, is the initialization of the HTTP Server. This server will be used to communicate with the Suite Server Program. It will be listening to connections with the right endpoints available only after the configure method.

Lastly, we have configure and run methods, which are two very important methods to be discussed in the next part of this series, as they are quite extensive, and deserve their own diagrams.

You can download the source file for the diagram used in this post, and edit it with draw.io.

ImportError when debugging cylc in Eclipse

kinow @ Jul 10, 2018 00:47:13 ()

Since I started reading cylc’s source code in Eclipse to create some sequence diagrams, I have not been able to debug it properly without hitting errors in some part of the program execution.

The error message was “ImportError: cannot import name _remove_dead_weakref”, which was a bit enigmatic as I never heard about that function, but it seemed to be something internal, or at least not from the project code base. And searching the Internet did not help much.

Here is the complete console output in Eclipse.

pydev debugger: starting (pid: 15124)
timeout 10 ps -opid,args 13640  # return 1

            ._.                                                       
            | |            The Cylc Suite Engine [7.7.1-37-g09c8a]    
._____._. ._| |_____.           Copyright (C) 2008-2018 NIWA          
| .___| | | | | .___|  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
| !___| !_! | | !___.  This program comes with ABSOLUTELY NO WARRANTY;
!_____!___. |_!_____!  see `cylc warranty`.  It is free software, you 
      .___! |           are welcome to redistribute it under certain  
      !_____!                conditions; see `cylc conditions`.       
2018-07-10T01:00:47+12 INFO - Suite starting: server=localhost:44444 pid=15124
2018-07-10T01:00:47+12 INFO - Cylc version: 7.7.1-37-g09c8a
2018-07-10T01:00:47+12 INFO - Run mode: live
2018-07-10T01:00:47+12 INFO - Initial point: 1
2018-07-10T01:00:47+12 INFO - Final point: None
2018-07-10T01:00:47+12 INFO - Cold Start 1
2018-07-10T01:00:47+12 DEBUG - [hello.1] -released to the task pool
2018-07-10T01:00:47+12 DEBUG - BEGIN TASK PROCESSING
2018-07-10T01:00:47+12 DEBUG - [hello.1] -waiting => queued
2018-07-10T01:00:47+12 DEBUG - 1 task(s) de-queued
2018-07-10T01:00:47+12 INFO - [hello.1] -submit-num=1, owner@host=localhost
2018-07-10T01:00:47+12 DEBUG - [hello.1] -queued => ready
2018-07-10T01:00:47+12 DEBUG - END TASK PROCESSING (took 0.023609161377 seconds)
2018-07-10T01:00:48+12 DEBUG - ['cylc', 'jobs-submit', '--debug', '--', '/home/kinow/Development/python/workspace/example-suite/log/job', '1/hello/01']
2018-07-10T01:00:48+12 ERROR - [jobs-submit cmd] cylc jobs-submit --debug -- /home/kinow/Development/python/workspace/example-suite/log/job 1/hello/01
    [jobs-submit ret_code] 1
    [jobs-submit err]
    Traceback (most recent call last):
      File "/home/kinow/Development/python/workspace/cylc/bin/cylc-jobs-submit", line 52, in <module>
        from cylc.batch_sys_manager import BatchSysManager
      File "/home/kinow/Development/python/workspace/cylc/lib/cylc/batch_sys_manager.py", line 114, in <module>
        from cylc.task_message import (
      File "/home/kinow/Development/python/workspace/cylc/lib/cylc/task_message.py", line 26, in <module>
        from logging import getLevelName, WARNING, ERROR, CRITICAL
      File "/home/kinow/Development/python/anaconda2/lib/python2.7/logging/__init__.py", line 26, in <module>
        import sys, os, time, cStringIO, traceback, warnings, weakref, collections
      File "/home/kinow/Development/python/anaconda2/lib/python2.7/weakref.py", line 14, in <module>
        from _weakref import (
    ImportError: cannot import name _remove_dead_weakref
2018-07-10T01:00:48+12 ERROR - [jobs-submit cmd] cylc jobs-submit --debug -- /home/kinow/Development/python/workspace/example-suite/log/job 1/hello/01
    [jobs-submit ret_code] 1
    [jobs-submit out] 2018-07-10T01:00:48+12|1/hello/01|1
2018-07-10T01:00:48+12 INFO - [hello.1] -(current:ready) submission failed at 2018-07-10T01:00:48+12
2018-07-10T01:00:48+12 ERROR - [hello.1] -submission failed
2018-07-10T01:00:48+12 DEBUG - [hello.1] -ready => submit-failed
2018-07-10T01:00:48+12 DEBUG - BEGIN TASK PROCESSING
2018-07-10T01:00:48+12 DEBUG - 0 task(s) de-queued
2018-07-10T01:00:48+12 DEBUG - END TASK PROCESSING (took 0.00175499916077 seconds)
2018-07-10T01:00:49+12 WARNING - suite stalled

As the current diagram I am working on has quite a few if‘s and else‘s, I decided to investigate why this error was occurring. Then, after some elimination I found that it was due to the missing Anaconda 2 entry in my $PATH environment variable.

I had this variable configured in a custom script I load whenever I decide to use Anaconda 2. And reproducing the same behaviour in Eclipse was easy.

A screen shot of Eclipse with source code
Locating the bug

Et voilà! Eclipse was happily debugging again!

A screen shot of Eclipse with source code
Locating the bug

So if you have a similar problem, try comparing your environment variables and check if you have some entries missing, and try adding them in Eclipse Debug configuration.

Happy cycling!

A simple cylc suite

kinow @ Jul 08, 2018 18:59:13 ()

I have been writing more suites for cylc lately, and found an example that has proved to be useful for debugging certain parts of the code.

It is an extremely simple suite, similar to what is in cylc’s documentation. It sleeps for N seconds, and prints a message.

What makes it extra simpler, is that it cycles through integers, and has a limit of 1 maximum active points.

It is essentially the same as running the command in your shell session. With the difference that it will run through all cylc’s internal, only once, and allow you to debug and diagnostic parts nor related to cycling and graphs (as for these parts you would probably need a more elaborate example).

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    max active cycle points = 1
    [[dependencies]]
        [[[P1]]]
            graph = "hello"
[runtime]
    [[hello]]
        script = "sleep 10; echo PING"

I also combine this suite with the following global.rc.

[editors] 
    terminal = vim 
    gui = gvim -f

[communication]
    base port = 44444
    method = http
    maximum number of ports = 1

With “base port” set to 44444, and the maximum number of ports to 1, I will be able to run only one task. But that way I can configure Wireshark and other tools to default to 44444/HTTP, for ease of debugging.

Then initialize the suite with something like: cylc start --non-daemon --debug /home/kinow/Development/python/workspace/example-suite/

Happy cycling!

What happens when you create a new dataset in Apache Jena Fuseki

kinow @ May 29, 2018 18:59:04 ()

Last post was about what happens when you upload a Turtle file to Apache Jena Fuseki. And now today’s post will be about what happens when you create a new dataset in Apache Jena Fuseki.

In theory, that happens before you upload a Turtle file, but this post series won’t follow a logical order. It will be more based on what I find interesting.

Oh, the dataset created is an in-memory dataset. Here’s a simplified sequence diagram. Again, these articles are more brain-dumps, used by myself for later reference.

ActionDatasets#execPostContainer() (Fuseki Core)

ActionDatasets, as per the name, handles HTTP requests related to datasets. Such as when you create a new dataset. It is also an HTTP action. The concept in Jena, as far as I could tell, is similar to Jenkins’ actions, but simpler, without the UI/Jelly/Groovy part.

This class also has a few static fields, which hold system information. One variable is actually called system. (wonder how well it works if you try to deploy Jena Fuseki with multiple JVM’s 1 2).

Its first task is to create an UUID, using JenaUUID (from Jena Core). This class looks very interesting, wonder how it works.

Then it creates a DatasetDescriptionRegistry, which is a registry to keep track of the datasets created. There is also some validation of parameters and state check, and then the transaction is started (system.begin()).

Model / (Core)

I used Model and ModelFactory before when working with ontologies and Protégé. ActionDatasets will create a Model.

An RDF Model. An RDF model is a set of Statements. Methods are provided for creating resources, properties and literals and the Statements which link them, for adding statements to and removing them from a model, for querying a model and set operations for combining models.

It also gets a StreamRDF from the model (i.e. model.getGraph()), which will be used later by the RDFParser.

ActionDatasets#assemblerFromForm() (Fuseki)

In #assemblerFromForm(), it will create a template, and then use RDFParser to parse the template and load into SteamRFF. The template looks like this:

# Licensed under the terms of http://www.apache.org/licenses/LICENSE-2.0

@prefix :        <#> .
@prefix fuseki:  <http://jena.apache.org/fuseki#> .
@prefix rdf:     <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .

@prefix rdfs:    <http://www.w3.org/2000/01/rdf-schema#> .
@prefix tdb:     <http://jena.hpl.hp.com/2008/tdb#> .
@prefix ja:      <http://jena.hpl.hp.com/2005/11/Assembler#> .

## ---------------------------------------------------------------
## Updatable in-memory dataset.

<#service1> rdf:type fuseki:Service ;
    # URI of the dataset -- http://host:port/aaa
    fuseki:name                        "aaa" ;
    fuseki:serviceQuery                "sparql" ;
    fuseki:serviceQuery                "query" ;
    fuseki:serviceUpdate               "update" ;
    fuseki:serviceUpload               "upload" ;
    fuseki:serviceReadWriteGraphStore  "data" ;     
    fuseki:serviceReadGraphStore       "get" ;
    fuseki:dataset                     <#dataset> ;
    .

# Transactional, in-memory dataset. Initially empty.
<#dataset> rdf:type ja:DatasetTxnMem .

Persisting the dataset

Jena Fuseki will create a local copy of the template, using RIOT’s RDFDataMgr. For my environment, running from Eclipse, the file location was /home/kinow/Development/java/jena/jena/jena-fuseki2/jena-fuseki-core/run/system_files/902154aa-2bb6-11b2-8053-024232e7b374.

Fuseki now will look for exactly one Service Name statement (http://jena.apache.org/fuseki#name). The name will be validated for things like blank space, empty, ‘/’, etc.

Once the validation passes, then it will persist the file in somewhere like /home/kinow/Development/java/jena/jena/jena-fuseki2/jena-fuseki-core/run/configuration/aaa.ttl. But not without checking first it the file existed.

As this is a brand new file, the Model instance will be written on the file now.

DatasetAccessPoint (ARQ)

Funny, when I wrote it I immediately put this class under Fuseki Core, but it is actually in ARQ. Why not in Fuseki?. Looks like ARQ has a Web layer too.

Fuseki’s FusekiBuilder#buildDataAccessPoint() creates the DataAccessPoint. The DataAccessPoint‘s Javadocs say: “A name in the URL space of the server”.

  • DataService contains operations, and endpoints
  • Services are added to endpoints, such as an endpoint for the Quads_RW, the REST_Quads_RW from previous post
  • The name and the data service are used to create the DataAccessPoint

The current HttpAction in the request contains a reference to Fuseki’s DataAccessPointRegistry. Fuseki’s DataAccessPointRegistry extends Atlas’ Registry, which uses a ConcurrentHashMap (again, how does it work with multiple JVM’s?).

The new DataAccessPoint is registered with Fuseki’s registry (not the other registry).

And then, finally, the transaction is committed, the response is prepared (a 200 OK in text/plain). And a null is returned, which means empty response.

And here’s some of the logs produced during this experiment.

[2018-05-28 21:11:43] Config     INFO  Load configuration: file:///home/kinow/Development/java/jena/jena/jena-fuseki2/jena-fuseki-core/run/configuration/ds2.ttl
[2018-05-28 21:11:43] Config     INFO  Load configuration: file:///home/kinow/Development/java/jena/jena/jena-fuseki2/jena-fuseki-core/run/configuration/p1.ttl
[2018-05-28 21:11:43] Config     INFO  Register: /ds2
[2018-05-28 21:11:43] Config     INFO  Register: /p1
[2018-05-28 21:11:43] Server     INFO  Started 2018/05/28 21:11:43 NZST on port 3030
[2018-05-28 21:11:52] Admin      INFO  [1] GET http://localhost:3030/$/server
[2018-05-28 21:11:52] Admin      INFO  [1] 200 OK (11 ms)
[2018-05-28 21:12:41] Admin      INFO  [2] GET http://localhost:3030/$/server
[2018-05-28 21:12:41] Admin      INFO  [2] 200 OK (3 ms)
[2018-05-28 21:22:33] Admin      INFO  [3] POST http://localhost:3030/$/datasets
[2018-05-28 21:46:25] Admin      INFO  [3] Create database : name = /aaa
[2018-05-28 22:16:17] Admin      INFO  [3] 200 OK (3,224.390 s)
[2018-05-28 22:16:18] Admin      INFO  [4] GET http://localhost:3030/$/server
[2018-05-28 22:16:18] Admin      INFO  [4] 200 OK (8 ms)

So that’s that.

Happy hacking !