SqlBigQueryService.java
001 /*
002  * SPDX-License-Identifier: Apache-2.0
003  *
004  * Copyright 2020-2022 Agorapulse.
005  *
006  * Licensed under the Apache License, Version 2.0 (the "License");
007  * you may not use this file except in compliance with the License.
008  * You may obtain a copy of the License at
009  *
010  *     https://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing, software
013  * distributed under the License is distributed on an "AS IS" BASIS,
014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015  * See the License for the specific language governing permissions and
016  * limitations under the License.
017  */
018 package com.agorapulse.micronaut.bigquery.mock;
019 
020 import com.agorapulse.micronaut.bigquery.BigQueryService;
021 import com.agorapulse.micronaut.bigquery.impl.DefaultBigQueryService;
022 import com.agorapulse.micronaut.bigquery.RowResult;
023 import com.axiomalaska.jdbc.NamedParameterPreparedStatement;
024 import io.micronaut.context.annotation.Replaces;
025 import io.reactivex.Flowable;
026 
027 import javax.inject.Singleton;
028 import javax.sql.DataSource;
029 import java.sql.Connection;
030 import java.sql.PreparedStatement;
031 import java.sql.ResultSet;
032 import java.sql.SQLException;
033 import java.sql.Timestamp;
034 import java.time.Instant;
035 import java.util.Map;
036 import java.util.function.Function;
037 
038 @Singleton
039 @Replaces(DefaultBigQueryService.class)
040 public class SqlBigQueryService implements BigQueryService {
041 
042     private static class Database {
043         private final Connection connection;
044         private final PreparedStatement statement;
045         private final ResultSet resultSet;
046 
047         public Database(Connection connection, PreparedStatement statement, ResultSet resultSet) {
048             this.connection = connection;
049             this.statement = statement;
050             this.resultSet = resultSet;
051         }
052     }
053 
054     private final DataSource dataSource;
055 
056     public SqlBigQueryService(DataSource dataSource) {
057         this.dataSource = dataSource;
058     }
059 
060     @Override
061     public <T> Flowable<T> query(Map<String, ?> namedParameters, String sqlString, final Function<RowResult, T> builder) {
062 
063         return Flowable.generate(
064             () -> {
065                 Connection connection = dataSource.getConnection();
066                 String sql = fixPlaceholders(sqlString, namedParameters);
067                 NamedParameterPreparedStatement stmt = NamedParameterPreparedStatement.createNamedParameterPreparedStatement(connection, sql);
068                 fillNamedParameters(namedParameters, stmt);
069                 return new Database(connection, stmt, stmt.executeQuery());
070             },
071             (database, emitter-> {
072                 try {
073                     SqlRowResult rowResult = new SqlRowResult(database.resultSet);
074                     if (database.resultSet.next()) {
075                         emitter.onNext(builder.apply(rowResult));
076                     else {
077                         emitter.onComplete();
078                     }
079                 catch (Exception e) {
080                     emitter.onError(e);
081                 }
082                 return database;
083             },
084             database -> {
085                 database.resultSet.close();
086                 database.statement.close();
087                 database.connection.close();
088             }
089         );
090     }
091 
092     @Override
093     public void execute(Map<String, ?> namedParameters, String sqlString) {
094         String sql = fixPlaceholders(sqlString, namedParameters);
095         try (
096             Connection connection = dataSource.getConnection();
097             NamedParameterPreparedStatement stmt = NamedParameterPreparedStatement.createNamedParameterPreparedStatement(connection, sql)
098         ) {
099             fillNamedParameters(namedParameters, stmt);
100             stmt.execute();
101         catch (SQLException e) {
102             throw new IllegalArgumentException("Cannot execute " + sqlString, e);
103         }
104     }
105 
106     private void fillNamedParameters(Map<String, ?> namedParameters, NamedParameterPreparedStatement stmt) {
107         namedParameters.forEach((parameter, x-> {
108             try {
109                 stmt.setObject(parameter, convertIfNecessary(x));
110             catch (SQLException throwables) {
111                 throw new IllegalStateException("Cannot set named parameter " + parameter + " with value " + x, throwables);
112             }
113         });
114     }
115 
116     @Override
117     public Object convertIfNecessary(Object object) {
118         if (object instanceof Instant) {
119             return new Timestamp(((Instantobject).toEpochMilli());
120         }
121         return BigQueryService.super.convertIfNecessary(object);
122     }
123 
124     private static String fixPlaceholders(String sqlString, Map<String, ?> namedParameters) {
125         String result = sqlString;
126         for (String key : namedParameters.keySet()) {
127             result = result.replace("@" + key, ":" + key);
128         }
129         return result;
130     }
131 }