Example of a Trickle Load Java Program

The following Java program tests different batch sizes and concurrent connections. You can modify and run the program to test and tune your workload, or you can contact your Yellowbrick Sales Engineer for assistance.

This example demonstrates several coding best practices that are necessary to optimize performance:
  • Ensure no thread contention exists between concurrent loaders.
  • Use java.util.concurrent.Atomic<Class>, java.util.concurrent.<Class>Adder, and so on, to perform accumulation of statistics among threads and to avoid thread lock contention, which decreases concurrency.
  • Design for batch size and use statistics to find the optimal batch size for a given workload. Do this iteratively and for each table.
  • Do not hand-write INSERT statements; instead use parameterized inserts using a PreparedStatement per thread.
  • Use addBatch/executeBatch to achieve the batching.
package io.yellowbrick;

import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

public class Load {
    private final String mUrl;
    private final Driver mDriver;
    private final Properties mProperties;
    private final Options mOptions;
    private final AtomicInteger mLoader = new AtomicInteger();
    private final LongAdder mInserted = new LongAdder();
    private long mStart;

    public Load(String url, Options options, Properties properties) throws SQLException {
        this.mUrl = url;
        this.mOptions = options;
        this.mProperties = properties;
        this.mDriver = DriverManager.getDriver(mUrl);
    }
    
    private static final class Options {
        int mLoaders = 5;
        int mBatchSize = 500;
        int mTransactionSize = 500;
        int mInsertions = 1 * 1000 * 1000;
        String mDriver = "postgresql";
        String mTable = "load_test";
        boolean mTablePerLoad = false;
        boolean mDropTable = false;
        boolean mUnlogged = false;
    }

    public static void main(String[] args) throws Throwable {

        // Do arguments: required host port database user password
        Options options = new Options();
        if (args.length < 5) {
            System.err.println("Usage: Load host port database user password [option1=value1] [option2=value2]");
            System.err.println("Where: options can be:");
            System.err.printf("       [loaders=NN] # of loaders (default: %d)\n", options.mLoaders);
            System.err.printf("       [batchsize=NN] size of batches (default: %d)\n", options.mBatchSize);
            System.err.printf("       [transactionsize=NN] size of transactions (default: %d)\n", options.mTransactionSize);
            System.err.printf("       [insertions=NN] # of insertions (default: %d rows)\n", options.mInsertions);
            System.err.printf("       [driver=postgresql|pgsql] driver (default: %s)\n", options.mDriver);
            System.err.printf("       [table=tablename] driver (default: %s)\n", options.mTable);
            System.err.printf("       [table-per-load=true|false] unique table per load (default: %s)\n", options.mTablePerLoad);
            System.err.printf("       [drop-table=true|false] drop table before load (default: %s)\n", options.mDropTable);
            System.err.printf("       [unlogged=true|false] create table UNLOGGED (default: %s)\n", options.mUnlogged);
            System.err.println("  or any jdbc property for the jdbc driver");
            System.err.println();
            System.err.println("Example: Load yb11 5432 yellowbrick yellowbrick yellowbrick insertions=100000");
            System.exit(1);
        }
        
        // Specify required and optional properties.
        Properties properties = new Properties();
        properties.put("user", args[3]);
        properties.put("password", args[4]);
        for (int i = 5; i < args.length; ++i) {
            String[] parts = args[i].split("=");

            // Discover our app's internal options.  Others are passed to JDBC connection.
            if ("loaders".equals(parts[0])) {
                options.mLoaders = Integer.parseInt(parts[1]);
            } else if ("batchsize".equals(parts[0])) {
                options.mBatchSize = Integer.parseInt(parts[1]);
            } else if ("transactionsize".equals(parts[0])) {
                options.mTransactionSize = Integer.parseInt(parts[1]);
            } else if ("insertions".equals(parts[0])) {
                options.mInsertions = Integer.parseInt(parts[1]);
            } else if ("driver".equals(parts[0])) {
                options.mDriver = parts[1];
            } else if ("table".equals(parts[0])) {
                options.mTable = parts[1];
            } else if ("drop-table".equals(parts[0])) {
                options.mDropTable = "true".equals(parts[1]);
            } else if ("table-per-load".equals(parts[0])) {
                options.mTablePerLoad = "true".equals(parts[1]);
            } else if ("unlogged".equals(parts[0])) {
                options.mUnlogged = "true".equals(parts[1]);

            // A jdbc option.
            } else {
                properties.put(parts[0], parts[1]);
            }
        }

        // Make the loader, and run it.
        new Load(String.format("jdbc:%s://%s:%s/%s", options.mDriver, args[0], args[1], args[2]), options, properties).run();
    }

    private void run() throws Throwable {
        System.out.printf("Connecting to database %s with parameters %s\n", mUrl, mProperties);
        System.out.printf("Using %d loader threads with batch size %d, transaction size %d, to load %d rows\n", mOptions.mLoaders, mOptions.mBatchSize, mOptions.mTransactionSize, mOptions.mInsertions);

        // Create initial connection.
        try (Connection connection = connect()) {

            // Create N loader threads, and submit each to executor.
            ExecutorService executor = Executors.newFixedThreadPool(mOptions.mLoaders);
            CompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<>(executor);
            try {

                // Unique table for the loader?
                if (!mOptions.mTablePerLoad) {
                    prepareTable(connection, mOptions.mTable);
                }

                // Record our start time.
                mStart = System.nanoTime();

                // Submit N loaders.
                for (int i = mOptions.mLoaders; i > 0; --i) {
                    completionService.submit(this::load);
                    System.out.printf("Launched loader %d\n",(mOptions.mLoaders - i) + 1);
                }

                // Now, poll through the completed work, waiting for each to resolve.
                for (int i = mOptions.mLoaders; i > 0; --i) {
                    try {
                        // Wait 1s, if we are not done, report and keep trying.
                        while (true) {
                            Future<?> task = completionService.poll(1, TimeUnit.SECONDS);
                            if (task == null) {
                                report();
                            } else {
                                Object loader = task.get();
                                System.out.printf("\nReceived completion for loader %s", loader);
                                break;
                            }
                        }

                    } catch (ExecutionException e) {
                        throw e.getCause();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }

                // Final report.
                System.out.println();
                report();
                System.out.println();

                // Do final flush.
                if (!mOptions.mTablePerLoad) {
                    System.out.println("Doing final flush to " + mOptions.mTable);
                    long flushStart = System.nanoTime();
                    try (Statement statement = connection.createStatement()) {
                        statement.execute("YFLUSH " + mOptions.mTable);
                    }
                    System.out.printf("Flush took %d ms\n", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - flushStart));
                }

            } finally {
                executor.shutdown();
            }

            System.out.println("Done");
        }
    }

    private void prepareTable(Connection connection, String tableName) throws SQLException {

        // Drop table if required.
        if (mOptions.mDropTable) {
            try (Statement statement = connection.createStatement()) {
                statement.execute(String.format("DROP TABLE IF EXISTS %s", tableName));
            }
        }

        // Create table.
        try (Statement statement = connection.createStatement()) {
            statement.execute(String.format("CREATE %s TABLE IF NOT EXISTS %s(a VARCHAR(32), b VARCHAR(64), c BIGINT, d NUMERIC(10,2), e TIMESTAMPTZ, f BOOLEAN)", mOptions.mUnlogged ? "UNLOGGED" : "", tableName));
        }
    }

    private Map<String, Object> load() throws SQLException {
        Map<String, Object> result = new HashMap<>();
        int id = mLoader.incrementAndGet();
        result.put("id", id);

        // Initialize our randomizer; doesn't need to be cryptographcially secure.
        Random random = new Random();

        // We use two buffers for a/b columns to randomize
        byte[] a = new byte[16];
        byte[] b = new byte[32];

        // Create initial connection.
        try (Connection connection = connect()) {
            connection.setAutoCommit(false);

            // Construct table name.
            String tableName = mOptions.mTablePerLoad ? String.format("%s__%d", mOptions.mTable, id) : mOptions.mTable;
            result.put("table", tableName);

            // Unique table for the loader?
            if (mOptions.mTablePerLoad) {
                prepareTable(connection, tableName);
            }

            // Create insertion statement.
            try (PreparedStatement preparedStatement = connection.prepareStatement(String.format("INSERT INTO %s VALUES(?, ?, ?, ?, ?, ?)", tableName))) {

                // Loop until we've got the # of insertions we want.
                int inserted = 0, uncommitted = 0, committed = 0;
                while (mInserted.longValue() < mOptions.mInsertions) {

                    // Create a batch.
                    for (int i = mOptions.mBatchSize; i > 0; --i) {
                        preparedStatement.setString(1, randomString(random, a));
                        preparedStatement.setString(2, randomString(random, b));
                        preparedStatement.setLong(3, random.nextLong());
                        preparedStatement.setDouble(4, random.nextDouble());
                        preparedStatement.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
                        preparedStatement.setBoolean(6, random.nextBoolean());
                        preparedStatement.addBatch();
                    }

                    // Execute the batch.
                    preparedStatement.executeBatch();
                    uncommitted += mOptions.mBatchSize;
                    if (uncommitted >= mOptions.mTransactionSize) {
                        connection.commit();
                        committed++;
                        uncommitted = 0;
                    }

                    // Increment what we did.
                    mInserted.add(mOptions.mBatchSize);
                    inserted += mOptions.mBatchSize;
                }
                connection.commit();
                result.put("inserted", inserted);
                result.put("committed", committed);
            }

            // Do final flush.
            if (mOptions.mTablePerLoad) {
                long flushStart = System.nanoTime();
                try (Statement statement = connection.createStatement()) {
                    statement.execute("YFLUSH " + tableName);
                }
                result.put("flush_ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - flushStart));
            }
        }

        return result;
    }

    private String randomString(Random random, byte[] buffer) {
        random.nextBytes(buffer);
        return Base64.getEncoder().encodeToString(buffer);
    }

    private void report() {
        long nanos = System.nanoTime() - mStart;
        long inserted = mInserted.longValue();
        double seconds = TimeUnit.NANOSECONDS.toSeconds(nanos);
        double rate = inserted / seconds;
        System.out.printf("\rInsertion rate: %.2f rows/second, [%d total, %d seconds]", rate, inserted, (long) seconds);
    }

    private Connection connect() throws SQLException {
        return this.mDriver.connect(mUrl, mProperties);
    }
}